Skip to content

Instantly share code, notes, and snippets.

@recalde
Created June 23, 2025 19:48
Show Gist options
  • Save recalde/a8b8d24340799d182d4a198ad62f65f4 to your computer and use it in GitHub Desktop.
Save recalde/a8b8d24340799d182d4a198ad62f65f4 to your computer and use it in GitHub Desktop.
import os
import csv
import tempfile
import uuid
import time
import boto3
from datetime import datetime
class LambdaMetricsLogger:
def __init__(self):
self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "unknown_lambda")
self.job_id = str(uuid.uuid4())
# Load config from environment variables
self.bucket = os.getenv("LML_BUCKET", "default-metrics-bucket")
self.prefix_template = os.getenv("LML_PREFIX", "{function_name}/year={year}/month={month}/day={day}/hour={hour}")
self.use_job_id_as_filename = os.getenv("LML_USE_JOB_ID", "true").lower() == "true"
# Track stage timings and data volumes
self.stages = []
self.current_stage = None
self.temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix=".csv")
self.writer = csv.DictWriter(self.temp_file, fieldnames=["timestamp", "stage", "duration_ms", "bytes_transferred"])
self.writer.writeheader()
def start_stage(self, stage_name):
self.current_stage = {
"name": stage_name,
"start_time": time.time(),
"bytes_transferred": 0
}
def end_stage(self, bytes_transferred=None):
if self.current_stage is None:
return
end_time = time.time()
duration_ms = (end_time - self.current_stage["start_time"]) * 1000
row = {
"timestamp": datetime.utcnow().isoformat(),
"stage": self.current_stage["name"],
"duration_ms": round(duration_ms, 2),
"bytes_transferred": bytes_transferred or self.current_stage.get("bytes_transferred", 0)
}
self.writer.writerow(row)
self.temp_file.flush()
self.current_stage = None
def increment_bytes(self, count):
if self.current_stage:
self.current_stage["bytes_transferred"] += count
def _build_s3_key(self):
now = datetime.utcnow()
s3_prefix = self.prefix_template.format(
function_name=self.function_name,
year=now.year,
month=f"{now.month:02d}",
day=f"{now.day:02d}",
hour=f"{now.hour:02d}"
)
filename = f"{self.job_id}.csv" if self.use_job_id_as_filename else f"{self.function_name}_{now.strftime('%H%M%S')}.csv"
return f"{s3_prefix}/{filename}"
def flush_to_s3(self):
self.temp_file.close()
s3 = boto3.client("s3")
s3_key = self._build_s3_key()
with open(self.temp_file.name, "rb") as f:
s3.upload_fileobj(f, self.bucket, s3_key)
os.unlink(self.temp_file.name)
return f"s3://{self.bucket}/{s3_key}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment