Created
June 15, 2017 02:40
-
-
Save rhockenbury/74d176691a4d9e2a84f64cb314910fc6 to your computer and use it in GitHub Desktop.
Queue Celery Task in SQS from Lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json | |
import urllib | |
import boto3 | |
import uuid | |
import base64 | |
print("Loading function") | |
sqs = boto3.client("sqs") | |
QUEUE_URL = "YOUR_QUEUE_URL" | |
""" | |
{ | |
"task_name": "tasks.your_task_name", | |
"task_args": [ | |
"your", "task", "args" | |
] | |
} | |
""" | |
def lambda_handler(event, context): | |
print("Received event: " + json.dumps(event, indent=4)) | |
task_name = event["task_name"] | |
task_args = event["task_args"] | |
task_kwargs = {} | |
# generate celery task message | |
msg_id = str(uuid.uuid4()) | |
msg_envelope = { | |
"content-encoding": "utf-8", | |
"content-type": "application/json", | |
"headers": {}, | |
"properties": { | |
"body_encoding": "base64", | |
"correlation_id": msg_id, | |
"delivery_info": { | |
"exchange": None, | |
"routing_key": None | |
}, | |
"delivery_tag": None | |
} | |
} | |
msg_body = { | |
"task": task_name, | |
"args": task_args, | |
"kwargs": task_kwargs, | |
"id": msg_id, | |
"retries": 0 | |
} | |
# package celery task message | |
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body)) | |
msg = base64.b64encode(json.dumps(msg_envelope)) | |
# send message to sqs | |
response = sqs.send_message( | |
QueueUrl=QUEUE_URL, | |
MessageBody=msg | |
) | |
if response["MessageId"]: | |
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4)) | |
else: | |
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json | |
import urllib | |
import boto3 | |
import uuid | |
import base64 | |
print("Loading function") | |
sqs = boto3.client("sqs") | |
QUEUE_URL = "YOUR_QUEUE_URL" | |
""" | |
{ | |
"Records": [ | |
{ | |
"eventVersion": "2.0", | |
"eventTime": "1970-01-01T00:00:00.000Z", | |
"requestParameters": { | |
"sourceIPAddress": "127.0.0.1" | |
}, | |
"s3": { | |
"configurationId": "testConfigRule", | |
"object": { | |
"eTag": "0123456789abcdef0123456789abcdef", | |
"sequencer": "0A1B2C3D4E5F678901", | |
"key": "HappyFace.jpg", | |
"size": 1024 | |
}, | |
"bucket": { | |
"arn": "arn:aws:s3:::mybucket", | |
"name": "sourcebucket", | |
"ownerIdentity": { | |
"principalId": "EXAMPLE" | |
} | |
}, | |
"s3SchemaVersion": "1.0" | |
}, | |
"responseElements": { | |
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH", | |
"x-amz-request-id": "EXAMPLE123456789" | |
}, | |
"awsRegion": "us-east-1", | |
"eventName": "ObjectCreated:Put", | |
"userIdentity": { | |
"principalId": "EXAMPLE" | |
}, | |
"eventSource": "aws:s3" | |
} | |
] | |
} | |
""" | |
def lambda_handler(event, context): | |
print("Received event: " + json.dumps(event, indent=4)) | |
s3_record = event["Records"][0]["s3"] | |
bucket = s3_record["bucket"]["name"] | |
key = urllib.unquote_plus(s3_record["object"]["key"].encode("utf8")) | |
etag = s3_record["object"]["eTag"] | |
# determine task type to queue based on S3 object key | |
if "some_string" in key: | |
task_name = "tasks.some_task" | |
elif "some_other_string" in key: | |
task_name = "tasks.some_other_task" | |
else: | |
task_name = "tasks.some_default_task" | |
task_args = [bucket, key, etag] | |
task_kwargs = {} | |
# generate celery task message | |
msg_id = str(uuid.uuid4()) | |
msg_envelope = { | |
"content-encoding": "utf-8", | |
"content-type": "application/json", | |
"headers": {}, | |
"properties": { | |
"body_encoding": "base64", | |
"correlation_id": msg_id, | |
"delivery_info": { | |
"exchange": None, | |
"routing_key": None | |
}, | |
"delivery_tag": None | |
} | |
} | |
msg_body = { | |
"task": task_name, | |
"args": task_args, | |
"kwargs": task_kwargs, | |
"id": msg_id, | |
"retries": 0 | |
} | |
# package celery task message | |
msg_envelope["body"] = base64.b64encode(json.dumps(msg_body)) | |
msg = base64.b64encode(json.dumps(msg_envelope)) | |
# send message to sqs | |
response = sqs.send_message( | |
QueueUrl=QUEUE_URL, | |
MessageBody=msg | |
) | |
if response["MessageId"]: | |
print("Task message sent to sqs: " + json.dumps(msg_body, indent=4)) | |
else: | |
raise Exception("Task message unable to send: " + json.dumps(msg_body, indent=4)) |
Thanks for the tip ! But on my side, It ends by a crash ...
[2024-01-31 12:32:34,352: INFO/ForkPoolWorker-16] Task main.identification_task[2e945089-217d-497c-b53e-9c7dc663f08d] succeeded in 0.00021101800302858464s: 'yomy-queue.fifo'
[2024-01-31 12:32:34,352: CRITICAL/MainProcess] Unrecoverable error: TypeError('can only concatenate str (not "NoneType") to str')
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/celery/worker/worker.py", line 202, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 742, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.10/site-packages/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/usr/local/lib/python3.10/site-packages/kombu/asynchronous/hub.py", line 306, in create_loop
item()
File "/usr/local/lib/python3.10/site-packages/vine/promises.py", line 161, in __call__
return self.throw()
File "/usr/local/lib/python3.10/site-packages/vine/promises.py", line 158, in __call__
retval = fun(*final_args, **final_kwargs)
File "/usr/local/lib/python3.10/site-packages/kombu/message.py", line 131, in ack_log_error
self.ack(multiple=multiple)
File "/usr/local/lib/python3.10/site-packages/kombu/message.py", line 126, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "/usr/local/lib/python3.10/site-packages/kombu/transport/SQS.py", line 637, in basic_ack
queue = self.canonical_queue_name(message['routing_key'])
File "/usr/local/lib/python3.10/site-packages/kombu/transport/SQS.py", line 351, in canonical_queue_name
return self.entity_name(self.queue_name_prefix + queue_name)
TypeError: can only concatenate str (not "NoneType") to str
[2024-01-31 12:32:35,488: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
[2024-01-31 12:32:35,488: WARNING/MainProcess] UNABLE TO RESTORE 1 MESSAGES: (TypeError('can only concatenate str (not "NoneType") to str'),)
[2024-01-31 12:32:35,488: WARNING/MainProcess] EMERGENCY DUMP STATE TO FILE -> /tmp/tmp1alk3jko <-
[2024-01-31 12:32:35,488: WARNING/MainProcess] Cannot pickle state: TypeError("cannot pickle 'Message' object: a class that defines __slots__ without defining __getstate__ cannot be pickled with protocol 0"). Fallback to pformat.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the great resource! Saved me a ton of time.
Just had this issue:
TypeError: the JSON object must be str, bytes or bytearray, not dict
, which could fix by replace those lines:With these:
So the
body
could be serialized, as it's abyte
whenjson.dumps
expectsstr
.