Created
June 15, 2017 02:40
-
-
Save rhockenbury/74d176691a4d9e2a84f64cb314910fc6 to your computer and use it in GitHub Desktop.
Queue Celery Task in SQS from Lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json | |
import urllib | |
import boto3 | |
import uuid | |
import base64 | |
print("Loading function") | |
sqs = boto3.client("sqs") | |
QUEUE_URL = "YOUR_QUEUE_URL" | |
""" | |
{ | |
"task_name": "tasks.your_task_name", | |
"task_args": [ | |
"your", "task", "args" | |
] | |
} | |
""" | |
def lambda_handler(event, context): | |
print("Received event: " + json.dumps(event, indent=4)) | |
task_name = event["task_name"] | |
task_args = event["task_args"] | |
task_kwargs = {} | |
# generate celery task message | |
msg_id = str(uuid.uuid4()) | |
msg_envelope = { | |
"content-encoding": "utf-8", | |
"content-type": "application/json", | |
"headers": {}, | |
"properties": { | |
"body_encoding": "base64", | |
"correlation_id": msg_id, | |
"delivery_info": { | |
"exchange": None, | |
"routing_key": None | |
}, | |
"delivery_tag": None | |
} | |
} | |
msg_body = { | |
"task": task_name, | |
"args": task_args, | |
"kwargs": task_kwargs, | |
"id": msg_id, | |
"retries": 0 | |
} | |
# package celery task message | |
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body)) | |
msg = base64.b64encode(json.dumps(msg_envelope)) | |
# send message to sqs | |
response = sqs.send_message( | |
QueueUrl=QUEUE_URL, | |
MessageBody=msg | |
) | |
if response["MessageId"]: | |
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4)) | |
else: | |
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json | |
import urllib | |
import boto3 | |
import uuid | |
import base64 | |
print("Loading function") | |
sqs = boto3.client("sqs") | |
QUEUE_URL = "YOUR_QUEUE_URL" | |
""" | |
{ | |
"Records": [ | |
{ | |
"eventVersion": "2.0", | |
"eventTime": "1970-01-01T00:00:00.000Z", | |
"requestParameters": { | |
"sourceIPAddress": "127.0.0.1" | |
}, | |
"s3": { | |
"configurationId": "testConfigRule", | |
"object": { | |
"eTag": "0123456789abcdef0123456789abcdef", | |
"sequencer": "0A1B2C3D4E5F678901", | |
"key": "HappyFace.jpg", | |
"size": 1024 | |
}, | |
"bucket": { | |
"arn": "arn:aws:s3:::mybucket", | |
"name": "sourcebucket", | |
"ownerIdentity": { | |
"principalId": "EXAMPLE" | |
} | |
}, | |
"s3SchemaVersion": "1.0" | |
}, | |
"responseElements": { | |
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", | |
"x-amz-request-id": "EXAMPLE123456789" | |
}, | |
"awsRegion": "us-east-1", | |
"eventName": "ObjectCreated:Put", | |
"userIdentity": { | |
"principalId": "EXAMPLE" | |
}, | |
"eventSource": "aws:s3" | |
} | |
] | |
} | |
""" | |
def lambda_handler(event, context): | |
print("Received event: " + json.dumps(event, indent=4)) | |
s3_record = event["Records"][0]["s3"] | |
bucket = s3_record["bucket"]["name"] | |
key = urllib.unquote_plus(s3_record["object"]["key"].encode("utf8")) | |
etag = s3_record["object"]["eTag"] | |
# determine task type to queue based on S3 object key | |
if "some_string" in key: | |
task_name = "tasks.some_task" | |
elif "some_other_string" in key: | |
task_name = "tasks.some_other_task" | |
else: | |
task_name = "tasks.some_default_task" | |
task_args = [bucket, key, etag] | |
task_kwargs = {} | |
# generate celery task message | |
msg_id = str(uuid.uuid4()) | |
msg_envelope = { | |
"content-encoding": "utf-8", | |
"content-type": "application/json", | |
"headers": {}, | |
"properties": { | |
"body_encoding": "base64", | |
"correlation_id": msg_id, | |
"delivery_info": { | |
"exchange": None, | |
"routing_key": None | |
}, | |
"delivery_tag": None | |
} | |
} | |
msg_body = { | |
"task": task_name, | |
"args": task_args, | |
"kwargs": task_kwargs, | |
"id": msg_id, | |
"retries": 0 | |
} | |
# package celery task message | |
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body)) | |
msg = base64.b64encode(json.dumps(msg_envelope)) | |
# send message to sqs | |
response = sqs.send_message( | |
QueueUrl=QUEUE_URL, | |
MessageBody=msg | |
) | |
if response["MessageId"]: | |
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4)) | |
else: | |
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the tip ! But on my side, It ends by a crash ...