Skip to content

Instantly share code, notes, and snippets.

@pestilence669
Last active September 21, 2018 18:44
Show Gist options
  • Save pestilence669/76543dc317c1dc0824386f9172eb657e to your computer and use it in GitHub Desktop.
Save pestilence669/76543dc317c1dc0824386f9172eb657e to your computer and use it in GitHub Desktop.
Naïve Python AWS ELB log file regex parser
# vim: set ts=4 sw=4 et fileencoding=utf-8:
# The named tuple and JSON output are just examples / place holders.
from collections import namedtuple
from typing import Optional
import json
import re
import sys
AWS_ELB_LOG_RX = re.compile(r'''^
(?P<type>\S+)\s # http, https, h2, ws, wss
(?P<timestamp>\S+)\s # ISO 8601
(?P<elb>\S+)\s # The resource ID of the load balancer
(?P<client_port>\S+)\s
(?P<target_port>\S+)\s
(?P<request_processing_time>\S+)\s # seconds
(?P<target_processing_time>\S+)\s # seconds
(?P<response_processing_time>\S+)\s
(?P<elb_status_code>\S+)\s
(?P<target_status_code>\S+)\s
(?P<received_bytes>\S+)\s
(?P<sent_bytes>\S+)\s
(?P<request>"[^"]*")\s
(?P<user_agent>"[^"]*")\s
(?P<ssl_cipher>\S+)\s
(?P<ssl_protocol>\S+)\s
(?P<target_group_arn>\S+)\s
(?P<trace_id>"[^"]*")\s
(?P<domain_name>"[^"]*")\s
(?P<chosen_cert_arn>"[^"]*")\s
(?P<matched_rule_priority>\S+)\s
(?P<request_creation_time>\S+)\s # ISO 8601
(?P<actions_executed>"[^"]*")\s
(?P<redirect_url>"[^"]*")
''', re.VERBOSE)
Request = namedtuple('Request', [
'type',
'timestamp',
'elb',
'client_port',
'target_port',
'request_processing_time',
'target_processing_time',
'response_processing_time',
'elb_status_code',
'target_status_code',
'received_bytes',
'sent_bytes',
'request',
'user_agent',
'ssl_cipher',
'ssl_protocol',
'target_group_arn',
'trace_id',
'domain_name',
'chosen_cert_arn',
'matched_rule_priority',
'request_creation_time',
'actions_executed',
'redirect_url'])
def parse_line(line: str) -> Optional[Request]:
m = AWS_ELB_LOG_RX.match(line)
return Request(**m.groupdict()) if m else None
###############################################################################
if __name__ == '__main__':
for line in filter(None.__ne__, map(parse_line, sys.stdin)):
print(json.dumps(line._asdict()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment