Last active
February 14, 2022 16:08
-
-
Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.
AWS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Cross-IAM Invalidate CloudFront from a central pipeline account | |
### | |
### new codepipelineActions.LambdaInvokeAction({ | |
### runOrder: 3, | |
### actionName: 'InvalidateCloudFront', | |
### role: this.devOpsPipelineRole, | |
### lambda: Function.fromFunctionArn(this, "test-cloudfrontInvalidator", "arn:aws:lambda:eu-west-1::function:pipelineInvalidateCloudFront"), | |
### userParameters: { | |
### bucketName: bucket.bucketName, | |
### targetPipelineRole: this.testPipelineRole.roleArn | |
### } | |
from __future__ import print_function | |
import json | |
import time | |
import urllib | |
import boto3 | |
import botocore | |
from boto3.session import Session | |
cloudfront_client = boto3.client('cloudfront') | |
codepipeline_client = boto3.client('codepipeline') | |
def handler(event, context): | |
''' | |
Creates a cloudfront invalidation for content added to an S3 bucket | |
''' | |
# Log the the received event locally. | |
# print("Received event: " + json.dumps(event, indent=2)) | |
# Extract the Job ID | |
job_id = event['CodePipeline.job']['id'] | |
# Extract the Job Data | |
job_data = event['CodePipeline.job']['data'] | |
# Extract the params | |
params = get_user_params(job_data) | |
# Get the object from the event. | |
bucket = params['bucketName'] | |
cloudfront_client = get_cloudfront_client(params['targetPipelineRole']) | |
cf_distro_id = get_cloudfront_distribution_id( | |
cloudfront_client, bucket | |
) | |
if cf_distro_id: | |
print("Creating invalidation from {} for Cloudfront distribution {}".format( | |
bucket, cf_distro_id)) | |
try: | |
invalidation = cloudfront_client.create_invalidation( | |
DistributionId=cf_distro_id, | |
InvalidationBatch={ | |
'Paths': { | |
'Quantity': 1, | |
'Items': ['/*'] | |
}, | |
'CallerReference': str(time.time()) | |
}) | |
put_job_success( | |
job_id, | |
"Submitted invalidation ID {}".format( | |
invalidation['Invalidation']['Id']) | |
) | |
except Exception as e: | |
print("Error processing event from bucket {}. Event {}".format( | |
bucket, json.dumps(event, indent=2))) | |
raise e | |
else: | |
print("Bucket {} does not appeaer to be an origin for a Cloudfront distribution".format(bucket)) | |
put_job_failure( | |
job_id, | |
"Bucket does not to have an origin for a CF distribution" | |
) | |
return 'Success' | |
def get_cloudfront_client(role_arn): | |
# create an STS client object that represents a live connection to the STS service | |
sts_client = boto3.client('sts') | |
# Call the assume_role method of the STSConnection object and pass the role | |
# ARN and a role session name. | |
assumed_role_object = sts_client.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName="TargetAccountPipelineRoleSession" | |
) | |
# From the response that contains the assumed role, get the temporary | |
# credentials that can be used to make subsequent API calls | |
credentials = assumed_role_object['Credentials'] | |
# Get a session for the assumed role | |
session = Session(aws_access_key_id=credentials['AccessKeyId'], | |
aws_secret_access_key=credentials['SecretAccessKey'], | |
aws_session_token=credentials['SessionToken']) | |
# return a CodePipeline client with the assumed role | |
return session.client('cloudfront', config=botocore.client.Config(signature_version='s3v4')) | |
def put_job_success(job, message): | |
"""Notify CodePipeline of a successful job | |
Args: | |
job: The CodePipeline job ID | |
message: A message to be logged relating to the job status | |
Raises: | |
Exception: Any exception thrown by .put_job_success_result() | |
""" | |
print('Putting job success') | |
print(message) | |
codepipeline_client.put_job_success_result(jobId=job) | |
def put_job_failure(job, message): | |
"""Notify CodePipeline of a failed job | |
Args: | |
job: The CodePipeline job ID | |
message: A message to be logged relating to the job status | |
Raises: | |
Exception: Any exception thrown by .put_job_failure_result() | |
""" | |
print('Putting job failure') | |
print(message) | |
codepipeline_client.put_job_failure_result( | |
jobId=job, failureDetails={ | |
'message': message, | |
'type': 'JobFailed' | |
} | |
) | |
def get_cloudfront_distribution_id(client, bucket): | |
cf_distro_id = None | |
# Create a reusable Paginator | |
paginator = client.get_paginator('list_distributions') | |
# Create a PageIterator from the Paginator | |
page_iterator = paginator.paginate() | |
print("Looking for origin: {}".format(bucket)) | |
for page in page_iterator: | |
for distribution in page['DistributionList']['Items']: | |
for cf_origin in distribution['Origins']['Items']: | |
# print("Origin {}".format(cf_origin['DomainName'])) | |
if bucket == cf_origin['DomainName']: | |
print("Found it!") | |
cf_distro_id = distribution['Id'] | |
return cf_distro_id | |
def get_user_params(job_data): | |
"""Decodes the JSON user parameters and validates the required properties. | |
Args: | |
job_data: The job data structure containing the UserParameters string which should be a valid JSON structure | |
Returns: | |
The JSON parameters decoded as a dictionary. | |
Raises: | |
Exception: The JSON can't be decoded or a property is missing. | |
""" | |
try: | |
# Get the user parameters which contain the stack, artifact and file settings | |
user_parameters = job_data['actionConfiguration']['configuration']['UserParameters'] | |
decoded_parameters = json.loads(user_parameters) | |
except Exception as e: | |
# We're expecting the user parameters to be encoded as JSON | |
# so we can pass multiple values. If the JSON can't be decoded | |
# then fail the job with a helpful message. | |
raise Exception('UserParameters could not be decoded from JSON') | |
return decoded_parameters |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This updates AWS Lambda's environment variable values since it is currently not possible to use | |
# it directly from SSM as reference to trigget deployment changes | |
import boto3 | |
client = boto3.client('lambda', region_name='eu-west-1') | |
paginator = client.get_paginator('list_functions') | |
page_iterator = paginator.paginate() | |
for page in page_iterator: | |
for function in page['Functions']: | |
if "Environment" in function and "DT_TENANT" in function['Environment']['Variables']: | |
current_function = function['FunctionName'] | |
print(current_function, end='') | |
current_env = client.get_function_configuration( | |
FunctionName=current_function) | |
new_env = {} | |
new_env['Variables'] = current_env['Environment']['Variables'] | |
new_env['Variables']['DT_TENANT'] = ... | |
response = client.update_function_configuration( | |
FunctionName=current_function, | |
Environment=new_env | |
) | |
print(" changed.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment