Created
April 2, 2023 00:06
-
-
Save Makeshift/6b0e4606f640d9c026ba6448490fa9b7 to your computer and use it in GitHub Desktop.
A script that deletes all noncurrent versions of files and delete markers in an S3 bucket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# This script adapted from https://wasabi-support.zendesk.com/hc/en-us/articles/360058028992-How-do-I-mass-delete-non-current-versions-inside-a-bucket- | |
# This script is used to delete non-current versions of objects in a bucket. It will not delete the current version of an object. | |
# I also implemented refreshable sessions from here https://stackoverflow.com/a/69226170 | |
""" | |
This scripts first argument must be an S3 URI (s3://bucket-name/<optional path>) from where to begin searching for noncurrent versions. | |
It will enumerate all objects under that prefix and delete all noncurrent versions. | |
Credentials are gathered automatically from the environment. | |
Wrap this script with aws-vault if that's how you get your credentials. | |
aws-vault exec profile -- ./delete_non-current-objects.py s3://bucket-name/path/to/folder | |
""" | |
import sys | |
from boto3 import client | |
from botocore.exceptions import ClientError | |
from uuid import uuid4 | |
from datetime import datetime, timezone | |
from time import time | |
from boto3 import Session | |
from botocore.credentials import RefreshableCredentials | |
from botocore.session import get_session | |
class RefreshableBotoSession: | |
""" | |
Boto Helper class which lets us create refreshable session, so that we can cache the client or resource. | |
Usage | |
----- | |
session = RefreshableBotoSession().refreshable_session() | |
client = session.client("s3") # we now can cache this client object without worrying about expiring credentials | |
""" | |
def __init__( | |
self, | |
region_name: str = None, | |
profile_name: str = None, | |
sts_arn: str = None, | |
session_name: str = None, | |
session_ttl: int = 3000 | |
): | |
""" | |
Initialize `RefreshableBotoSession` | |
Parameters | |
---------- | |
region_name : str (optional) | |
Default region when creating new connection. | |
profile_name : str (optional) | |
The name of a profile to use. | |
sts_arn : str (optional) | |
The role arn to sts before creating session. | |
session_name : str (optional) | |
An identifier for the assumed role session. (required when `sts_arn` is given) | |
session_ttl : int (optional) | |
An integer number to set the TTL for each session. Beyond this session, it will renew the token. | |
50 minutes by default which is before the default role expiration of 1 hour | |
""" | |
self.region_name = region_name | |
self.profile_name = profile_name | |
self.sts_arn = sts_arn | |
self.session_name = session_name or uuid4().hex | |
self.session_ttl = session_ttl | |
def __get_session_credentials(self): | |
""" | |
Get session credentials | |
""" | |
session = Session(region_name=self.region_name, | |
profile_name=self.profile_name) | |
# if sts_arn is given, get credential by assuming given role | |
if self.sts_arn: | |
sts_client = session.client( | |
service_name="sts", region_name=self.region_name) | |
response = sts_client.assume_role( | |
RoleArn=self.sts_arn, | |
RoleSessionName=self.session_name, | |
DurationSeconds=self.session_ttl, | |
).get("Credentials") | |
credentials = { | |
"access_key": response.get("AccessKeyId"), | |
"secret_key": response.get("SecretAccessKey"), | |
"token": response.get("SessionToken"), | |
"expiry_time": response.get("Expiration").isoformat(), | |
} | |
else: | |
session_credentials = session.get_credentials().__dict__ | |
credentials = { | |
"access_key": session_credentials.get("access_key"), | |
"secret_key": session_credentials.get("secret_key"), | |
"token": session_credentials.get("token"), | |
"expiry_time": datetime.fromtimestamp(time() + self.session_ttl).replace(tzinfo=timezone.utc).isoformat(), | |
} | |
return credentials | |
def refreshable_session(self) -> Session: | |
""" | |
Get refreshable boto3 session. | |
""" | |
# get refreshable credentials | |
refreshable_credentials = RefreshableCredentials.create_from_metadata( | |
metadata=self.__get_session_credentials(), | |
refresh_using=self.__get_session_credentials, | |
method="sts-assume-role", | |
) | |
# attach refreshable credentials current session | |
session = get_session() | |
session._credentials = refreshable_credentials | |
session.set_config_variable("region", self.region_name) | |
autorefresh_session = Session(botocore_session=session) | |
return autorefresh_session | |
def calculate_size(size, _size_table): | |
""" | |
This function dynamically calculates the right base unit symbol for size of the object. | |
:param size: size in integer to be dynamically calculated. | |
:param _size_table: dictionary of size in Bytes | |
:return: string of converted size. | |
""" | |
count = 0 | |
while size // 1024 > 0: | |
size = size / 1024 | |
count += 1 | |
return str(round(size, 2)) + ' ' + _size_table[count] | |
def create_connection_and_test(_bucket): | |
""" | |
Checks if the credentials are valid and if the bucket exists. | |
NOTE: creating the connection is not enough to test. We need to make a method call to check for its working status. | |
:param _bucket: bucket name string | |
:return: reference to the connection client | |
""" | |
try: | |
session = RefreshableBotoSession().refreshable_session() | |
_s3_client = session.client('s3') | |
# Test credentials are working | |
_s3_client.list_buckets() | |
try: | |
_s3_client.head_bucket(Bucket=bucket) | |
except ClientError: | |
# The bucket does not exist or you have no access. | |
raise Exception( | |
"$ bucket does not exist in the account please re-check the name and try again: ") | |
return _s3_client | |
except ClientError: | |
print("Invalid Access and Secret keys") | |
except Exception as e: | |
raise e | |
# cannot reach here | |
return None | |
if __name__ == '__main__': | |
# Generate a table for SI units symbol table. | |
size_table = {0: 'Bs', 1: 'KBs', 2: 'MBs', | |
3: 'GBs', 4: 'TBs', 5: 'PBs', 6: 'EBs'} | |
print("\n") | |
print("\n") | |
print("$ starting script...") | |
# Assumes the input is an s3 URI following the format s3://<bucket-name>/<prefix> | |
s3_uri = sys.argv[1].strip() | |
bucket = s3_uri.split('/')[2] | |
prefix = '/'.join(s3_uri.split('/')[3:]) | |
# test the connection and access keys. Also checks if the bucket is valid. | |
s3_client = create_connection_and_test(bucket) | |
# create a paginator with default settings. | |
object_response_paginator = s3_client.get_paginator('list_object_versions') | |
if len(prefix) > 0: | |
operation_parameters = {'Bucket': bucket, | |
'Prefix': prefix} | |
else: | |
operation_parameters = {'Bucket': bucket} | |
# initialize basic variables for in memory storage. | |
delete_marker_count = 0 | |
delete_marker_size = 0 | |
versioned_object_count = 0 | |
versioned_object_size = 0 | |
current_object_count = 0 | |
current_object_size = 0 | |
delete_marker_list = [] | |
version_list = [] | |
print("$ Calculating, please wait... this may take a while") | |
for object_response_itr in object_response_paginator.paginate(**operation_parameters): | |
if 'DeleteMarkers' in object_response_itr: | |
for delete_marker in object_response_itr['DeleteMarkers']: | |
delete_marker_list.append( | |
{'Key': delete_marker['Key'], 'VersionId': delete_marker['VersionId']}) | |
delete_marker_count += 1 | |
if 'Versions' in object_response_itr: | |
for version in object_response_itr['Versions']: | |
if version['IsLatest'] is False: | |
versioned_object_count += 1 | |
versioned_object_size += version['Size'] | |
version_list.append( | |
{'Key': version['Key'], 'VersionId': version['VersionId']}) | |
elif version['IsLatest'] is True: | |
current_object_count += 1 | |
current_object_size += version['Size'] | |
total_count = delete_marker_count + versioned_object_count + current_object_count | |
print(f'{total_count}\t', end='', flush=True) | |
print("\n") | |
print("-" * 10) | |
print("$ Total Delete markers: " + str(delete_marker_count)) | |
print("$ Number of Current objects: " + str(current_object_count)) | |
print("$ Current Objects size: ", calculate_size( | |
current_object_size, size_table)) | |
print("$ Number of Non-current objects: " + str(versioned_object_count)) | |
print("$ Non-current Objects size: ", | |
calculate_size(versioned_object_size, size_table)) | |
print("$ Total size of current + non current objects: ", | |
calculate_size(versioned_object_size + current_object_size, size_table)) | |
print("-" * 10) | |
print("\n") | |
delete_flag = False | |
while not delete_flag: | |
choice = input( | |
"$ Do you wish to delete the delete markers and non-current objects? [y/n] ") | |
if choice.strip().lower() == 'y': | |
delete_flag = True | |
print("$ starting deletes now...") | |
print("$ removing delete markers 1000 at a time") | |
for i in range(0, len(delete_marker_list), 1000): | |
response = s3_client.delete_objects( | |
Bucket=bucket, | |
Delete={ | |
'Objects': delete_marker_list[i:i + 1000], | |
'Quiet': True | |
} | |
) | |
print(response) | |
print("$ removing old versioned objects 1000 at a time") | |
for i in range(0, len(version_list), 1000): | |
response = s3_client.delete_objects( | |
Bucket=bucket, | |
Delete={ | |
'Objects': version_list[i:i + 1000], | |
'Quiet': True | |
} | |
) | |
print(response) | |
else: | |
print("$ aight then.") | |
print("$ All done.") | |
print("\n") | |
print("\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment