Skip to content

Instantly share code, notes, and snippets.

@pablogsal
Created May 30, 2025 13:10
Show Gist options
  • Save pablogsal/5214e702dab0ddc0d94e096970f9b045 to your computer and use it in GitHub Desktop.
Save pablogsal/5214e702dab0ddc0d94e096970f9b045 to your computer and use it in GitHub Desktop.
import argparse
import _remote_debugging
import time
import signal
from collections import defaultdict
class SamplingProfiler:
def __init__(self):
self.function_stats = defaultdict(lambda: {'ncalls': 0, 'tottime': 0.0})
self.total_samples = 0
self.total_sample_time = 0.0
self.total_processing_time = 0.0
self.start_time = None
self.end_time = None
def add_sample(self, stack_trace, sample_time):
"""Add a stack trace sample and aggregate immediately"""
process_start = time.perf_counter()
self.total_samples += 1
self.total_sample_time += sample_time
for thread_id, frames in stack_trace:
if not frames:
continue
# Aggregate by function name (not line number)
for func, file, lineno in frames:
# Extract just filename and function name
file_name = file.split('/')[-1] if '/' in file else file
func_key = f"{file_name}({func})"
self.function_stats[func_key]['ncalls'] += 1
self.function_stats[func_key]['tottime'] += sample_time
process_end = time.perf_counter()
self.total_processing_time += (process_end - process_start)
def print_results(self):
"""Print sampling profiler results"""
if self.total_samples == 0:
print("No samples collected.")
return
wall_time = self.end_time - self.start_time if self.start_time and self.end_time else 0
total_function_calls = sum(stats['ncalls'] for stats in self.function_stats.values())
print(f"\nSampling Profiler Results")
print(f"Total samples: {self.total_samples}")
print(f"Wall time: {wall_time:.3f} seconds")
print(f"Sample rate: {self.total_samples/wall_time:.2f} Hz")
print(f"Sample time: {self.total_sample_time:.3f} seconds")
print(f"Processing time: {self.total_processing_time:.3f} seconds")
print(f"Avg sample+processing: {((self.total_sample_time + self.total_processing_time)/self.total_samples)*1e6:.2f} µs")
print(f"Functions observed: {len(self.function_stats)}")
print(f"\nOrdered by: sample count (time spent)")
print()
print(f"{'samples':<10} {'%time':<8} {'avg_ms':<8} {'sample_hz':<10} function")
print("-" * 70)
# Sort by sample count (ncalls) - this shows where most time is spent
sorted_funcs = sorted(self.function_stats.items(),
key=lambda x: x[1]['ncalls'],
reverse=True)
for func_name, stats in sorted_funcs:
samples = stats['ncalls']
sample_time = stats['tottime']
# Calculate meaningful metrics
percent_time = (samples / total_function_calls) * 100 if total_function_calls > 0 else 0
avg_sample_time_ms = (sample_time / samples * 1000) if samples > 0 else 0
sample_rate = samples / wall_time if wall_time > 0 else 0
print(f"{samples:<10} {percent_time:<7.1f}% {avg_sample_time_ms:<7.3f} "
f"{sample_rate:<9.1f} {func_name}")
print("\nInterpretation:")
print("- 'samples': Number of times function appeared in stack traces")
print("- '%time': Percentage of total execution time spent in this function")
print("- 'avg_ms': Average time to collect a sample when in this function")
print("- 'sample_hz': How often this function was sampled per second")
# Global profiler instance
profiler = SamplingProfiler()
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
print("\nShutdown requested...")
shutdown_requested = True
def sample(unwinder, interval_us):
global profiler, shutdown_requested
print("Starting profiling sampler... Press Ctrl+C to stop and see results.")
signal.signal(signal.SIGINT, signal_handler)
interval_sec = interval_us / 1e6
all = 0
fail = 0
slow = 0
profiler.start_time = time.perf_counter()
try:
while not shutdown_requested:
all += 1
t0 = time.perf_counter()
try:
stack_trace = unwinder.get_stack_trace()
t1 = time.perf_counter()
sample_time = t1 - t0
if stack_trace:
profiler.add_sample(stack_trace, sample_time)
except (RuntimeError, UnicodeDecodeError):
fail += 1
t1 = time.perf_counter()
elapsed = t1 - t0
if elapsed > interval_sec:
slow += 1
# Print progress stats every 100 samples
if all % 100 == 0:
wall_time = time.perf_counter() - profiler.start_time
success = all - fail
sample_rate = profiler.total_samples / wall_time if wall_time > 0 else 0
avg_sample_processing = ((profiler.total_sample_time + profiler.total_processing_time) / profiler.total_samples * 1e6) if profiler.total_samples > 0 else 0
print(f"Samples: {profiler.total_samples} | "
f"Rate: {sample_rate:.1f}Hz | "
f"Success: {(success/all)*100:.1f}% | "
f"Avg: {avg_sample_processing:.1f}µs | "
f"Slow: {slow}/{all} | "
f"Functions: {len(profiler.function_stats)}")
# if all > 1000 and (slow / all) > 0.90:
# raise RuntimeError("More than 90% of samples exceeded the requested interval")
sleep_time = interval_sec - elapsed
# if sleep_time > 0:
# time.sleep(sleep_time)
except KeyboardInterrupt:
pass
profiler.end_time = time.perf_counter()
# Print sampling statistics
wall_time = profiler.end_time - profiler.start_time
success = all - fail
print(f"\nSampling Summary:")
print(f"Total sample attempts: {all}")
print(f"Failed samples: {fail}")
print(f"Successful samples: {profiler.total_samples}")
print(f"Success rate: {(success / all) * 100:.2f}%")
print(f"Wall time: {wall_time:.3f} seconds")
print(f"Sample rate: {profiler.total_samples/wall_time:.2f} Hz")
print(f"Slow samples: {slow}/{all} ({(slow / all) * 100:.2f}%)")
# Print profiling results
profiler.print_results()
def main():
parser = argparse.ArgumentParser(description="Remote stack sampler")
parser.add_argument("pid", type=int, help="PID of the target process")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--interval", type=float,
help="Sampling interval in microseconds (e.g. 1000 for 1ms)")
args = parser.parse_args()
unwinder = _remote_debugging.RemoteUnwinder(args.pid, all_threads=False)
sample(unwinder, args.interval)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment