Last active
February 12, 2023 04:08
-
-
Save nimboya/41a5dcf217b80cff676d3c3cedce893d to your computer and use it in GitHub Desktop.
Export AWS SecurityHub Findings to CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import botocore | |
import csv | |
securityhub = boto3.client('securityhub') | |
s3 = boto3.resource('s3') | |
_filter = Filters={ | |
'ComplianceStatus': [ | |
{ | |
'Value': 'FAILED', | |
'Comparison': 'EQUALS' | |
} | |
], | |
'RecordState': [ | |
{ | |
'Value': 'ACTIVE', | |
'Comparison': 'EQUALS' | |
} | |
], | |
'GeneratorId': [ | |
{ | |
'Value': 'aws-foundation', | |
'Comparison': 'PREFIX' | |
} | |
], | |
} | |
_sort = SortCriteria=[ | |
{ | |
'Field': 'ComplianceStatus', | |
'SortOrder': 'desc' | |
}, | |
{ | |
'Field': 'SeverityNormalized', | |
'SortOrder': 'desc' | |
} | |
] | |
MAX_ITEMS=100 | |
BUCKET_NAME='yourbucketname' | |
KEY='securityhubfindings.csv' | |
def lambda_handler(event, context): | |
# cannot seems to pass token = None into the get_findings() | |
result = securityhub.get_findings( | |
Filters=_filter, | |
SortCriteria=_sort, | |
MaxResults=MAX_ITEMS | |
) | |
with open("/tmp/data.csv", "w") as file: | |
csv_file = csv.writer(file) | |
keys = [] | |
count = 0 | |
while(result != None): | |
items = [] | |
findings = result['Findings'] | |
for finding in findings: | |
count += 1 | |
item = {} | |
item['standard'] = finding['GeneratorId'] | |
item['status'] = finding['Compliance']['Status'] | |
item['severity'] = finding['Severity']['Label'] | |
item['id'] = finding['ProductFields']['ControlId'] | |
item['title'] = finding['Title'] | |
item['account'] = finding['AwsAccountId'] | |
item['resourceType'] = finding['Resources'][0]['Type'] | |
item['resourceId'] = finding['Resources'][0]['Id'] | |
item['json'] = finding | |
items.append(item) | |
if (len(keys) == 0): | |
keys = list(item.keys()) | |
csv_file.writerow(keys) | |
for d in items: | |
csv_file.writerow(list(d.values())) | |
if "NextToken" in list(result.keys()): | |
token = result['NextToken'] | |
result = securityhub.get_findings(Filters=_filter, SortCriteria=_sort, MaxResults=MAX_ITEMS, NextToken=token) | |
else: | |
result = None | |
csv_binary = open('/tmp/data.csv', 'rb').read() | |
try: | |
obj = s3.Object(BUCKET_NAME, KEY) | |
obj.put(Body=csv_binary) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
print("The object does not exist.") | |
else: | |
raise | |
s3client = boto3.client('s3') | |
try: | |
download_url = s3client.generate_presigned_url( | |
'get_object', | |
Params={ | |
'Bucket': BUCKET_NAME, | |
'Key': KEY | |
}, | |
ExpiresIn=3600 | |
) | |
return { | |
"csv_link": download_url, | |
"total": count | |
} | |
except Exception as e: | |
raise utils_exception.ErrorResponse(400, e, Log) | |
return { | |
'message': 'Error found, please check your logs', | |
'total': 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment