Created
April 21, 2022 10:34
-
-
Save JavaHutt/5aa5b3921458e15097e35bc19210f038 to your computer and use it in GitHub Desktop.
Querying AWS Athena using Golang SDK v2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"time" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/service/athena" | |
"github.com/aws/aws-sdk-go-v2/service/athena/types" | |
) | |
const ( | |
catalogName = "AwsDataCatalog" | |
dbName = "logs" | |
sleepDuration = 1000 | |
) | |
func main() { | |
ctx := context.Background() | |
cfg, err := config.LoadDefaultConfig(ctx) | |
if err != nil { | |
log.Fatal(err) | |
} | |
client := athena.NewFromConfig(cfg) | |
query := "SELECT id FROM lurl LIMIT 10" | |
queryExecutionId, err := submitAthenaQuery(ctx, client, query) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if err = waitForQueryToComplete(ctx, client, queryExecutionId); err != nil { | |
log.Fatal(err) | |
} | |
processResultRows(ctx, client, queryExecutionId) | |
} | |
func submitAthenaQuery(ctx context.Context, client *athena.Client, query string) (string, error) { | |
params := &athena.StartQueryExecutionInput{ | |
QueryString: aws.String(query), | |
QueryExecutionContext: &types.QueryExecutionContext{ | |
Database: aws.String(dbName), | |
}, | |
} | |
startExecutionResponse, err := client.StartQueryExecution(ctx, params) | |
if err != nil { | |
log.Fatal(err) | |
} | |
return *startExecutionResponse.QueryExecutionId, nil | |
} | |
func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionId string) error { | |
getQueryExecutionRequest := athena.GetQueryExecutionInput{ | |
QueryExecutionId: &queryExecutionId, | |
} | |
isQueryStillRunning := true | |
for isQueryStillRunning { | |
getQueryExecutionResponse, err := client.GetQueryExecution(ctx, &getQueryExecutionRequest) | |
if err != nil { | |
return err | |
} | |
queryState := getQueryExecutionResponse.QueryExecution.Status.State | |
switch queryState { | |
case types.QueryExecutionStateFailed: | |
return fmt.Errorf("The Amazon Athena query failed to run with error message: %s", | |
*getQueryExecutionResponse.QueryExecution.Status.StateChangeReason) | |
case types.QueryExecutionStateCancelled: | |
return fmt.Errorf("The Amazon Athena query was cancelled") | |
case types.QueryExecutionStateSucceeded: | |
isQueryStillRunning = false | |
default: | |
time.Sleep(sleepDuration * time.Millisecond) | |
} | |
fmt.Printf("The current status is: %s\n", queryState) | |
} | |
return nil | |
} | |
func processResultRows(ctx context.Context, client *athena.Client, queryExecutionId string) error { | |
getQueryResultsRequest := athena.GetQueryResultsInput{ | |
QueryExecutionId: &queryExecutionId, | |
} | |
getQueryResultsResult, err := client.GetQueryResults(ctx, &getQueryResultsRequest) | |
if err != nil { | |
return err | |
} | |
for _, row := range getQueryResultsResult.ResultSet.Rows { | |
allData := row.Data | |
for _, data := range allData { | |
fmt.Printf("The value of the column is %s\n", *data.VarCharValue) | |
} | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment