Last active
September 21, 2018 18:44
-
-
Save pestilence669/76543dc317c1dc0824386f9172eb657e to your computer and use it in GitHub Desktop.
Naïve Python AWS ELB log file regex parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vim: set ts=4 sw=4 et fileencoding=utf-8: | |
# The named tuple and JSON output are just examples / place holders. | |
from collections import namedtuple | |
from typing import Optional | |
import json | |
import re | |
import sys | |
AWS_ELB_LOG_RX = re.compile(r'''^ | |
(?P<type>\S+)\s # http, https, h2, ws, wss | |
(?P<timestamp>\S+)\s # ISO 8601 | |
(?P<elb>\S+)\s # The resource ID of the load balancer | |
(?P<client_port>\S+)\s | |
(?P<target_port>\S+)\s | |
(?P<request_processing_time>\S+)\s # seconds | |
(?P<target_processing_time>\S+)\s # seconds | |
(?P<response_processing_time>\S+)\s | |
(?P<elb_status_code>\S+)\s | |
(?P<target_status_code>\S+)\s | |
(?P<received_bytes>\S+)\s | |
(?P<sent_bytes>\S+)\s | |
(?P<request>"[^"]*")\s | |
(?P<user_agent>"[^"]*")\s | |
(?P<ssl_cipher>\S+)\s | |
(?P<ssl_protocol>\S+)\s | |
(?P<target_group_arn>\S+)\s | |
(?P<trace_id>"[^"]*")\s | |
(?P<domain_name>"[^"]*")\s | |
(?P<chosen_cert_arn>"[^"]*")\s | |
(?P<matched_rule_priority>\S+)\s | |
(?P<request_creation_time>\S+)\s # ISO 8601 | |
(?P<actions_executed>"[^"]*")\s | |
(?P<redirect_url>"[^"]*") | |
''', re.VERBOSE) | |
Request = namedtuple('Request', [ | |
'type', | |
'timestamp', | |
'elb', | |
'client_port', | |
'target_port', | |
'request_processing_time', | |
'target_processing_time', | |
'response_processing_time', | |
'elb_status_code', | |
'target_status_code', | |
'received_bytes', | |
'sent_bytes', | |
'request', | |
'user_agent', | |
'ssl_cipher', | |
'ssl_protocol', | |
'target_group_arn', | |
'trace_id', | |
'domain_name', | |
'chosen_cert_arn', | |
'matched_rule_priority', | |
'request_creation_time', | |
'actions_executed', | |
'redirect_url']) | |
def parse_line(line: str) -> Optional[Request]: | |
m = AWS_ELB_LOG_RX.match(line) | |
return Request(**m.groupdict()) if m else None | |
############################################################################### | |
if __name__ == '__main__': | |
for line in filter(None.__ne__, map(parse_line, sys.stdin)): | |
print(json.dumps(line._asdict())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment