Created
June 23, 2025 19:48
-
-
Save recalde/a8b8d24340799d182d4a198ad62f65f4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import csv | |
import tempfile | |
import uuid | |
import time | |
import boto3 | |
from datetime import datetime | |
class LambdaMetricsLogger: | |
def __init__(self): | |
self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "unknown_lambda") | |
self.job_id = str(uuid.uuid4()) | |
# Load config from environment variables | |
self.bucket = os.getenv("LML_BUCKET", "default-metrics-bucket") | |
self.prefix_template = os.getenv("LML_PREFIX", "{function_name}/year={year}/month={month}/day={day}/hour={hour}") | |
self.use_job_id_as_filename = os.getenv("LML_USE_JOB_ID", "true").lower() == "true" | |
# Track stage timings and data volumes | |
self.stages = [] | |
self.current_stage = None | |
self.temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix=".csv") | |
self.writer = csv.DictWriter(self.temp_file, fieldnames=["timestamp", "stage", "duration_ms", "bytes_transferred"]) | |
self.writer.writeheader() | |
def start_stage(self, stage_name): | |
self.current_stage = { | |
"name": stage_name, | |
"start_time": time.time(), | |
"bytes_transferred": 0 | |
} | |
def end_stage(self, bytes_transferred=None): | |
if self.current_stage is None: | |
return | |
end_time = time.time() | |
duration_ms = (end_time - self.current_stage["start_time"]) * 1000 | |
row = { | |
"timestamp": datetime.utcnow().isoformat(), | |
"stage": self.current_stage["name"], | |
"duration_ms": round(duration_ms, 2), | |
"bytes_transferred": bytes_transferred or self.current_stage.get("bytes_transferred", 0) | |
} | |
self.writer.writerow(row) | |
self.temp_file.flush() | |
self.current_stage = None | |
def increment_bytes(self, count): | |
if self.current_stage: | |
self.current_stage["bytes_transferred"] += count | |
def _build_s3_key(self): | |
now = datetime.utcnow() | |
s3_prefix = self.prefix_template.format( | |
function_name=self.function_name, | |
year=now.year, | |
month=f"{now.month:02d}", | |
day=f"{now.day:02d}", | |
hour=f"{now.hour:02d}" | |
) | |
filename = f"{self.job_id}.csv" if self.use_job_id_as_filename else f"{self.function_name}_{now.strftime('%H%M%S')}.csv" | |
return f"{s3_prefix}/{filename}" | |
def flush_to_s3(self): | |
self.temp_file.close() | |
s3 = boto3.client("s3") | |
s3_key = self._build_s3_key() | |
with open(self.temp_file.name, "rb") as f: | |
s3.upload_fileobj(f, self.bucket, s3_key) | |
os.unlink(self.temp_file.name) | |
return f"s3://{self.bucket}/{s3_key}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment