Last active
July 1, 2019 23:22
-
-
Save JohnPreston/e98e5cfcc62b2236815574a84e58b42b to your computer and use it in GitHub Desktop.
Get password from DynamoDB and decrypt with KMS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import uuid | |
import httplib | |
import urlparse | |
import json | |
import boto3 | |
import string | |
import random | |
def get_password_from_dynamodb(env, stack_name, table_name): | |
""" | |
Function to get the password out of dynamodb | |
:param env: environment name | |
:param stack_name: name of the stack | |
:param table_name: name of the table storing all the DB passwords etc. | |
:return: string | |
""" | |
client = boto3.resource('dynamodb') | |
table = client.Table(table_name) | |
response = table.get_item( | |
Key={ | |
'env': env, | |
'stackname': stack_name | |
} | |
) | |
if 'Item' in response: | |
if 'passwordbase64' in response['Item']: | |
return response['Item']['passwordbase64'] | |
else: | |
return "No such attribute : passwordbase64" | |
else: | |
return "No key found" | |
def decrypt_password_from_b64(password_b64): | |
""" | |
Function to encrypt the password with KMS | |
:param password_b64: b64 string of the encrypted password | |
""" | |
password_encrypted = base64.b64decode(password_b64) | |
client = boto3.client('kms') | |
password = client.decrypt(CiphertextBlob=password_encrypted) | |
return password['Plaintext'] | |
def send_response(request, response, status=None, reason=None): | |
""" | |
:param request: CF lambda settings | |
:param response: object containing the response values | |
:param status: to report to CloudFormation | |
:param reason: Message to report to CloudFormation to explain the status | |
:return: Object with all the reponse objects | |
""" | |
if status is not None: | |
response['Status'] = status | |
if reason is not None: | |
response['Reason'] = reason | |
if 'ResponseURL' in request and request['ResponseURL']: | |
try: | |
url = urlparse.urlparse(request['ResponseURL']) | |
body = json.dumps(response) | |
https = httplib.HTTPSConnection(url.hostname) | |
https.request('PUT', url.path + '?' + url.query, body) | |
except: | |
print("Failed to send the message to CF") | |
return response | |
def lambda_handler(event, context): | |
""" | |
Core function called when Lambda is invoked | |
:param event: The Lambda event params | |
:param context: The Lambda context params | |
:return: Return the response to CloudFormation | |
""" | |
response = { | |
'StackId': event['StackId'], | |
'RequestId': event['RequestId'], | |
'LogicalResourceId': event['LogicalResourceId'], | |
'Status': 'SUCCESS' | |
} | |
if 'PhysicalResourceId' in event: | |
response['PhysicalResourceId'] = event['PhysicalResourceId'] | |
else: | |
response['PhysicalResourceId'] = str(uuid.uuid4()) | |
if event['RequestType'] == 'Delete': | |
return send_response(event, response) | |
if event['RequestType'] == 'Update': | |
return send_response(event, response) | |
for key in ['Env', 'StackName']: | |
if key not in event['ResourceProperties'] or not event['ResourceProperties'][key]: | |
return send_response( | |
event, response, status='FAILED', | |
reason='The properties Env and StackName must not be empty' | |
) | |
password_b64 = get_password_from_dynamodb(event['ResourceProperties']['Env'], | |
event['ResourceProperties']['StackName'], | |
event['ResourceProperties']['TableName']) | |
password = decrypt_password_from_b64(password_b64) | |
response['Data'] = {'password': password} | |
response['Reason'] = 'The value was successfully encrypted' | |
return send_response(event, response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment