Last active
December 23, 2021 06:58
-
-
Save alukach/20b17d4deff4f8b17d2d1cf7490fe1c0 to your computer and use it in GitHub Desktop.
AWS S3 Batch Operation boilerplate
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import boto3 | |
from botocore.exceptions import ClientError | |
s3 = boto3.resource("s3") | |
TMP_FAILURE = "TemporaryFailure" | |
FAILURE = "PermanentFailure" | |
SUCCESS = "Succeeded" | |
def process_object(src_object): | |
return "TODO: Populate with processing task..." | |
def get_s3_object(event): | |
# Parse Amazon S3 Key, Key Version, and Bucket ARN | |
s3Key = urllib.parse.unquote(event["tasks"][0]["s3Key"]) | |
s3VersionId = event["tasks"][0]["s3VersionId"] # Unused | |
s3BucketArn = event["tasks"][0]["s3BucketArn"] | |
s3Bucket = s3BucketArn.split(":::")[-1] | |
return s3.Object(s3Bucket, s3Key) | |
def build_result(status: str, msg: str): | |
return dict(resultCode=status, resultString=msg) | |
def handler(event, context): | |
task_id = event["tasks"][0]["taskId"] | |
job_params = { | |
"invocationId": event["invocationId"], | |
"invocationSchemaVersion": event["invocationSchemaVersion"] | |
} | |
s3_object = get_s3_object(event) | |
try: | |
output = process_object(s3_object) | |
# Mark as succeeded | |
result = build_result(SUCCESS, output) | |
except ClientError as e: | |
# If request timed out, mark as a temp failure | |
# and Amazon S3 batch operations will make the task for retry. If | |
# any other exceptions are received, mark as permanent failure. | |
errorCode = e.response["Error"]["Code"] | |
errorMessage = e.response["Error"]["Message"] | |
if errorCode == "RequestTimeout": | |
result = build_result( | |
TMP_FAILURE, "Retry request to Amazon S3 due to timeout." | |
) | |
else: | |
result = build_result(FAILURE, f"{errorCode}: {errorMessage}") | |
except Exception as e: | |
# Catch all exceptions to permanently fail the task | |
result = build_result(FAILURE, f"Exception: {e}") | |
return { | |
**job_params, | |
"treatMissingKeysAs": "PermanentFailure", | |
"results": [{**result, "taskId": task_id}], | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment