Last active
October 16, 2024 11:43
-
-
Save steventux/e0ecc9d5f50e8c5a591921660e25ab59 to your computer and use it in GitHub Desktop.
Azure function in python triggered by blob storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import hashlib | |
import logging | |
import requests | |
import time | |
import uuid | |
import azure.functions as func | |
import azurefunctions.extensions.bindings.blob as blob | |
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) | |
API_URL = "https://sandbox.api.service.nhs.uk/comms/v1/message-batches" | |
ROUTING_PLAN_ID = "b838b13c-f98c-4def-93f0-515d4e4f4ee1" | |
JSON_HEADERS = { | |
"Content-type": "application/json", | |
"Accept": "application/json", | |
} | |
@app.blob_trigger( | |
arg_name="client", path="dir1/dir2/blob", connection="AzureWebJobsStorage" | |
) | |
def blob_trigger(client: blob.BlobClient): | |
logging.info("Triggering batch message from blob update") | |
blob_content = client.download_blob.readall() | |
batch_message(blob_content) | |
def batch_message(raw_data): | |
fieldnames = ("nhs_number", "date_of_birth") | |
reader = csv.DictReader(raw_data, fieldnames) | |
api_messages = [api_message(row) for row in reader] | |
body = api_body(api_messages) | |
response = requests.post(API_URL, json=body, headers=JSON_HEADERS) | |
logging.info(response.text) | |
def api_body(api_messages): | |
return { | |
"data": { | |
"type": "MessageBatch", | |
"attributes": { | |
"messageBatchReference": reference_uuid(time.time()), | |
"routingPlanId": ROUTING_PLAN_ID, | |
"messages": api_messages, | |
}, | |
} | |
} | |
def api_message(message_data): | |
return { | |
"messageReference": reference_uuid(message_data["nhs_number"]), | |
"recipient": { | |
"nhsNumber": message_data["nhs_number"], | |
"dateOfBirth": message_data["date_of_birth"], | |
}, | |
"personalisation": {}, | |
} | |
def reference_uuid(val): | |
return str(uuid.UUID(hashlib.md5(val.encode("utf-8")).hexdigest())) | |
# batch_message(open("test.csv", "r")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment