Skip to content

Instantly share code, notes, and snippets.

@pirate
Created June 13, 2025 22:54
Show Gist options
  • Save pirate/7c077a51317cbd31090526bbc257e058 to your computer and use it in GitHub Desktop.
Save pirate/7c077a51317cbd31090526bbc257e058 to your computer and use it in GitHub Desktop.
Debug script to use in any codebase to print all the active event loops and all their pending and active tasks.
#!/usr/bin/env python
import asyncio
import threading
import logging
import pytest
from typing import Dict, List, Any
import sys
import weakref
# Configure logging to show debug info
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_all_event_loops() -> Dict[str, Any]:
"""
Attempt to find all event loops across all threads.
This uses internal asyncio APIs and may not work in all Python versions.
"""
loops = {}
# Try to get the main thread's event loop
try:
main_loop = asyncio.get_event_loop()
loops[f"main_thread_{threading.main_thread().ident}"] = main_loop
except RuntimeError:
logger.debug("No event loop in main thread")
# Try to get running loop
try:
running_loop = asyncio.get_running_loop()
current_thread = threading.current_thread()
loops[f"running_loop_{current_thread.ident}"] = running_loop
except RuntimeError:
logger.debug("No running event loop in current thread")
# For Python 3.10+, we can try to access the thread-local storage
# This is quite hacky but might reveal loops pytest is managing
if hasattr(asyncio, '_get_running_loop'):
try:
# Access the internal thread-local storage where asyncio keeps loop references
import contextvars
if hasattr(contextvars, 'copy_context'):
ctx = contextvars.copy_context()
for var, value in ctx.items():
if 'loop' in str(var).lower():
logger.debug(f"Found context var with loop: {var} = {value}")
except Exception as e:
logger.debug(f"Error accessing context vars: {e}")
# Try to find loops in all threads (this is very hacky)
for thread_id, frame in sys._current_frames().items():
thread = None
for t in threading.enumerate():
if t.ident == thread_id:
thread = t
break
if thread:
logger.debug(f"Checking thread {thread.name} (ID: {thread_id})")
# Look for asyncio-related attributes in thread locals
if hasattr(thread, '_target') and thread._target:
logger.debug(f"Thread target: {thread._target}")
return loops
def get_pending_tasks(loop: asyncio.AbstractEventLoop) -> List[asyncio.Task]:
"""Get all pending tasks for a given event loop."""
try:
# For Python 3.7+
if hasattr(asyncio, 'all_tasks'):
return [task for task in asyncio.all_tasks(loop) if not task.done()]
else:
# Fallback for older Python versions
return [task for task in asyncio.Task.all_tasks(loop) if not task.done()]
except Exception as e:
logger.error(f"Error getting tasks for loop {loop}: {e}")
return []
def log_task_details(task: asyncio.Task, indent: str = " ") -> None:
"""Log detailed information about a task."""
try:
logger.info(f"{indent}Task: {task}")
logger.info(f"{indent} - Done: {task.done()}")
logger.info(f"{indent} - Cancelled: {task.cancelled()}")
if hasattr(task, '_coro') and task._coro:
logger.info(f"{indent} - Coroutine: {task._coro}")
if hasattr(task._coro, 'cr_frame') and task._coro.cr_frame:
frame = task._coro.cr_frame
logger.info(f"{indent} - Location: {frame.f_code.co_filename}:{frame.f_lineno}")
if hasattr(task, '_callbacks') and task._callbacks:
logger.info(f"{indent} - Callbacks: {len(task._callbacks)}")
for callback in task._callbacks:
logger.info(f"{indent} - {callback}")
# cancel the task
task.cancel()
except Exception as e:
logger.error(f"{indent}Error logging task details: {e}")
@pytest.fixture(scope="function", autouse=False)
def debug_asyncio_state():
"""
Pytest fixture that logs current asyncio state including:
- All event loops across threads
- All pending tasks in each loop
Use this fixture in your test functions to debug asyncio state:
def test_something(debug_asyncio_state):
# Your test code here
pass
"""
def log_asyncio_state(when: str):
logger.info(f"\n{'='*60}")
logger.info(f"ASYNCIO STATE DEBUG - {when}")
logger.info(f"{'='*60}")
# Log current thread info
current_thread = threading.current_thread()
logger.info(f"Current thread: {current_thread.name} (ID: {current_thread.ident})")
logger.info(f"Main thread: {threading.main_thread().name} (ID: {threading.main_thread().ident})")
logger.info(f"Active threads: {threading.active_count()}")
# Log all threads
for thread in threading.enumerate():
logger.info(f" Thread: {thread.name} (ID: {thread.ident}, Alive: {thread.is_alive()})")
# Find and log all event loops
loops = get_all_event_loops()
logger.info(f"\nFound {len(loops)} event loop(s):")
if not loops:
logger.info(" No event loops found")
for loop_name, loop in loops.items():
logger.info(f"\n Loop: {loop_name}")
logger.info(f" Object: {loop}")
logger.info(f" Running: {loop.is_running()}")
logger.info(f" Closed: {loop.is_closed()}")
# Get pending tasks
pending_tasks = get_pending_tasks(loop)
logger.info(f" Pending tasks: {len(pending_tasks)}")
for i, task in enumerate(pending_tasks):
logger.info(f"\n Task #{i+1}:")
log_task_details(task, " ")
# Try to get asyncio debug info
try:
import asyncio.events
if hasattr(asyncio.events, '_get_running_loop'):
logger.info(f"\nAsyncio internal running loop: {asyncio.events._get_running_loop()}")
except Exception as e:
logger.debug(f"Could not access asyncio internals: {e}")
logger.info(f"{'='*60}\n")
# Log state before test
log_asyncio_state("BEFORE TEST")
# Yield control back to the test
yield log_asyncio_state
# Log state after test
log_asyncio_state("AFTER TEST")
# Additional utility fixture for more targeted debugging
@pytest.fixture(scope="function")
def asyncio_loop_info():
"""
A simpler fixture that just returns a function to get current loop info.
Usage:
def test_something(asyncio_loop_info):
info = asyncio_loop_info()
print(f"Current loop: {info['current_loop']}")
print(f"Pending tasks: {len(info['pending_tasks'])}")
"""
def get_info():
try:
loop = asyncio.get_running_loop()
pending_tasks = get_pending_tasks(loop)
return {
'current_loop': loop,
'loop_running': loop.is_running(),
'loop_closed': loop.is_closed(),
'pending_tasks': pending_tasks,
'task_count': len(pending_tasks),
'thread_id': threading.current_thread().ident,
'thread_name': threading.current_thread().name
}
except RuntimeError:
return {
'current_loop': None,
'loop_running': False,
'loop_closed': True,
'pending_tasks': [],
'task_count': 0,
'thread_id': threading.current_thread().ident,
'thread_name': threading.current_thread().name
}
return get_info
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment