Skip to content

Instantly share code, notes, and snippets.

@joonathan
Forked from amertkara/aws_utils.py
Last active August 29, 2015 14:19
Show Gist options
  • Save joonathan/2deb43ee6fba2db653af to your computer and use it in GitHub Desktop.
Save joonathan/2deb43ee6fba2db653af to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import json
import urllib2
from M2Crypto import X509
from base64 import b64decode
from M2Crypto.Err import M2CryptoError
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"
def canonical_message_builder(content, format):
""" Builds the canonical message to be verified.
Sorts the fields as a requirement from AWS
Args:
content (dict): Parsed body of the response
format (list): List of the fields that need to go into the message
Returns (str):
canonical message
"""
m = ""
for field in sorted(format):
try:
m += field + "\n" + content[field] + "\n"
except KeyError:
# Build with what you have
pass
return str(m)
def verify_sns_notification(request):
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
Args:
request (HTTPRequest): The request object that is passed to the view function
Returns (bool):
True if he message passes the verification, False otherwise
Raises:
ValueError: If the body of the response couldn't be parsed
M2CryptoError: If an error raises during the verification process
URLError: If the SigningCertURL couldn't be opened
"""
cert = None
pubkey = None
canonical_message = None
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
content = json.loads(request.body)
decoded_signature = b64decode(content["Signature"])
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_notification_format)
else:
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None))
# Load the certificate and extract the public key
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
# Feed the canonical message to sign it with the public key from the certificate
pubkey.verify_update(canonical_message)
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
# if some other error occurred."
verification_result = pubkey.verify_final(decoded_signature)
if verification_result == 1:
return True
elif verification_result == 0:
return False
else:
raise M2CryptoError("Some error occured while verifying the signature.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment