Last active
June 17, 2017 04:29
-
-
Save rsmoorthy/f3f78966ba3fd4e35292d74bf8bc0109 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# This python script gets the cloudtrail logs and stores in /var/log/cloudtrail/ | |
# cloudtrail.log. This fetches AND parses the following AWS logs and stores in files. | |
# a) CloudTrail logs (JSON gzipped) | |
# b) S3 Access Logs | |
# c) ELB Access Logs | |
# It can incrementally get only the new modified access logs (of any of the above) and append to existing files. | |
import boto3 | |
import os | |
import sys | |
import json | |
import datetime | |
import pytz | |
import time | |
import itertools | |
import zlib | |
class CloudTrail: | |
def __init__(self, *args, **kwargs): | |
self.sincedb_path = '/var/tmp/cloudtrail_sincedb' | |
self.logfile = "/var/log/cloudtrail/cloudtrail.log" | |
self.profile = None | |
self.datewise = True | |
dt = datetime.date.today() | |
self.store = {'Key': None, 'LastModified': 0, 'year': dt.year, 'month': dt.month, 'day': dt.day } | |
for a in ['prefix', 'bucket', 'access_key_id', 'secret_access_key', 'sincedb_path', 'region', 'profile', 'logfile', 'datewise']: | |
if a in kwargs: | |
self.__dict__[a] = kwargs[a] | |
def save_store(self): | |
open(self.sincedb_path, "w").write(json.dumps(self.store)) | |
def read_store(self): | |
if os.path.isfile(self.sincedb_path): | |
try: | |
self.store = json.loads(open(self.sincedb_path).read()) | |
except: | |
pass | |
def incr_day(self): | |
dt = datetime.datetime(self.store['year'], self.store['month'], self.store['day'], 0, 0, 0).replace(tzinfo=pytz.utc) | |
dt += datetime.timedelta(days=1) | |
return {"year":dt.year, "month":dt.month, "day":dt.day} | |
def get_prefix(self, year=None, month=None, day=None): | |
year = year if year else self.store['year'] | |
month = month if month else self.store['month'] | |
day = day if day else self.store['day'] | |
if self.datewise: | |
prefix = '%s/%s/%4d/%02d/%02d' % (self.prefix, self.region, year, month, day) | |
else: | |
prefix = '%s/' % (self.prefix) | |
return prefix | |
def connect_s3(self): | |
if self.profile: | |
session = boto3.Session(profile_name=self.profile) | |
elif self.access_key_id and self.secret_access_key: | |
session = boto3.Session(aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key) | |
self.client = session.client('s3') | |
return | |
def get_list_of_objects(self, prefix=None): | |
prefix = prefix if prefix else self.get_prefix() | |
paginator = self.client.get_paginator('list_objects') | |
page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=prefix) | |
list_objs = [ contents for page in page_iterator if 'Contents' in page for contents in page['Contents']] | |
list_objs.sort(key=lambda x: x['LastModified']) | |
print "Read list of objects for %s, count=%d" % (prefix, len(list_objs)) | |
return list_objs | |
def get_objects(self, list_objs, limit=1): | |
dt = datetime.datetime.utcfromtimestamp(self.store['LastModified']).replace(tzinfo=pytz.utc) | |
utczero = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc) | |
objs_toget = [ o for o in list_objs if o['LastModified'] >= dt and o['Key'] != self.store['Key'] ] | |
print "Objects to get => %d" % (len(objs_toget)) | |
objs = [ self.client.get_object(Bucket=self.bucket, Key=o['Key']) for o in list(objs_toget)[:limit] ] | |
print "Got objs => %d" % (len(objs)) | |
def _extract_data(obj): | |
if obj['ContentType'] == '': | |
return obj['Body'].read().split("\n") | |
if 'ContentEncoding' in obj and obj['ContentEncoding'] == 'gzip': | |
return [ zlib.decompress(obj['Body'].read(), 16+zlib.MAX_WBITS) ] | |
if obj['ContentType'] == 'text/plain': | |
return [ obj['Body'].read().strip() ] | |
return [ obj['Body'].read() ] | |
body = [ _extract_data(o) for o in objs ] | |
lines = [ line for o in body for line in o if len(line) ] | |
outfile = open(self.logfile, "a") | |
[ outfile.write(line + "\n") for line in lines ] | |
print "Wrote lines => ", len(lines) | |
outfile.close() | |
if len(objs_toget): | |
lim = len(objs_toget) if len(objs_toget) < limit else limit | |
self.store['LastModified'] = int((objs_toget[lim-1]['LastModified'] - utczero).total_seconds()) | |
self.store['Key'] = objs_toget[lim-1]['Key'] | |
self.store['year'] = objs_toget[lim-1]['LastModified'].year | |
self.store['month'] = objs_toget[lim-1]['LastModified'].month | |
self.store['day'] = objs_toget[lim-1]['LastModified'].day | |
self.save_store() | |
return len(objs) | |
def run(self): | |
self.read_store() | |
self.connect_s3() | |
list_objs = self.get_list_of_objects() | |
while list_objs: | |
objs = ct.get_objects(list_objs, 10) | |
while objs: | |
objs = ct.get_objects(list_objs, 10) | |
if not self.datewise: | |
break | |
# Try next day | |
newdt = self.incr_day() | |
list_objs = self.get_list_of_objects(prefix=self.get_prefix(**newdt)) | |
if __name__ == "__main__": | |
from sys import argv | |
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='AWSLogs/999999999999/CloudTrail', logfile='ccc', sincedb_path='ccc.1') | |
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='AWSLogs/999999999999/elasticloadbalancing', logfile='ddd', sincedb_path='ddd.1') | |
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='logs', logfile='eee', sincedb_path='eee.1', datewise=False) | |
ct.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment