Skip to content

Instantly share code, notes, and snippets.

@eevmanu
Last active March 30, 2025 02:30
Show Gist options
  • Save eevmanu/dfcfb71c649f3021c39159114113822c to your computer and use it in GitHub Desktop.
Save eevmanu/dfcfb71c649f3021c39159114113822c to your computer and use it in GitHub Desktop.
extract part of python official docs related to concurrency from the python official repo in github

goal

pass that content to an llm in order to distill the ways to handle concurrency in python by only using standard library

firs attempt to gemini-2.5-pro-exp-03-25 in aistudio.google.com


docs extracted from https://github.com/python/cpython

how to call this script

$ GH_PAT=${GH_PAT} bash download_python_concurrency_docs.sh

GH_PAT is your github personal access token, you can generate from here github.com/settings/personal-access-tokens

for more info


TODO

  • a more elaborated solution will be use .md instead of .rst, consider tools like r.jina.ai (jira reader)
#!/bin/bash
# Create a new output file
output_file="python_concurrency_docs.md"
echo "# Python Concurrency Documentation" > "$output_file"
echo "Compiled on $(date)" >> "$output_file"
echo "" >> "$output_file"
# Process each URL
while IFS= read -r url; do
# Skip empty lines
if [ -z "$url" ]; then
continue
fi
echo "Processing $url"
# Add the URL as a title
echo "## $url" >> "$output_file"
echo "" >> "$output_file"
# Download content and append to file
curl -s -L --header "Authorization: token ${GH_PAT}" "$url" >> "$output_file"
# Add separation between documents
echo "" >> "$output_file"
echo "---" >> "$output_file"
echo "" >> "$output_file"
done < <(grep -E '^https://' <<EOF | grep -v '^#'
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-runner.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-task.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-stream.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-exceptions.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-queue.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-subprocess.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-sync.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-eventloop.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-extending.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-future.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-platforms.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-policy.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-protocol.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-graph.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-api-index.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-llapi-index.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/asyncio-dev.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/_thread.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/concurrent.futures.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/contextvars.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/mmap.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/multiprocessing.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/multiprocessing.shared_memory.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/queue.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/sched.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/select.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/selectors.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/signal.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/socket.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/ssl.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/subprocess.rst
https://raw.githubusercontent.com/python/cpython/refs/heads/main/Doc/library/threading.rst
EOF
)
echo "Documentation compilation complete: $output_file"

Okay, let's break down the concurrency and parallelism mechanisms available in the Python standard library, based strictly on the provided documentation snippets, ordered from highest to lowest level of abstraction.

Core Concepts:

  • Concurrency: Managing multiple tasks seemingly happening at the same time. Tasks might be interleaved on a single CPU core (cooperative multitasking like asyncio) or run truly in parallel. Often used for I/O-bound work.
  • Parallelism: Executing multiple tasks simultaneously, typically on different CPU cores. Requires multiple processes or threads (though CPython threads are limited by the GIL for CPU-bound tasks). Best for CPU-bound work.
  • Multithreading: Using multiple threads within a single process. Shares memory but is affected by the Global Interpreter Lock (GIL) in CPython, limiting true CPU parallelism. Good for I/O-bound concurrency.
  • Multiprocessing: Using multiple independent processes. Each has its own memory space and interpreter, bypassing the GIL for true CPU parallelism. Requires inter-process communication (IPC) if data needs sharing.
  • Asynchronous I/O (Asyncio): A specific concurrency model using an event loop and async/await syntax, primarily for efficient I/O-bound tasks. It's single-threaded cooperative multitasking.

Here are the ways to achieve these, based on the provided docs:


1. High-Level: asyncio.run()

  • Level: Highest-level entry point for asyncio.
  • Purpose: Runs a top-level async def function (coroutine), automatically managing the event loop lifecycle (creation, running the task, shutdown, closing). Ideal for simple asyncio programs.
  • Best for: I/O-bound concurrency.
  • Mechanism: Asynchronous cooperative multitasking via event loop.
  • Documentation: asyncio-runner.rst, asyncio.rst
# From asyncio.rst sidebar and asyncio-runner.rst example
import asyncio
import time

async def main():
    print(f"[{time.strftime('%X')}] Hello ...")
    await asyncio.sleep(1)
    print(f"[{time.strftime('%X')}] ... World!")

# asyncio.run() manages the event loop
asyncio.run(main())
  • Expected Output:
    [HH:MM:SS] Hello ...
    # (waits for ~1 second)
    [HH:MM:SS] ... World!
    
    (HH:MM:SS will be the actual time)

2. High-Level: asyncio.Runner

  • Level: High-level context manager for asyncio.
  • Purpose: Simplifies running multiple top-level async functions within the same event loop context, managing the loop lifecycle via a with statement.
  • Best for: I/O-bound concurrency when multiple entry points are needed.
  • Mechanism: Asynchronous cooperative multitasking via event loop.
  • Documentation: asyncio-runner.rst
# Based on asyncio-runner.rst example
import asyncio
import time

async def task_one():
    print(f"[{time.strftime('%X')}] Task One starting...")
    await asyncio.sleep(1)
    print(f"[{time.strftime('%X')}] Task One finished.")
    return "One Complete"

async def task_two():
    print(f"[{time.strftime('%X')}] Task Two starting...")
    await asyncio.sleep(0.5)
    print(f"[{time.strftime('%X')}] Task Two finished.")
    return "Two Complete"

# Using the Runner context manager
with asyncio.Runner() as runner:
    print("Running Task One...")
    result1 = runner.run(task_one())
    print(f"Task One returned: {result1}")

    print("\nRunning Task Two...")
    result2 = runner.run(task_two())
    print(f"Task Two returned: {result2}")

print("\nRunner context finished.")
  • Expected Output:
    Running Task One...
    [HH:MM:SS] Task One starting...
    # (waits for ~1 second)
    [HH:MM:SS] Task One finished.
    Task One returned: One Complete
    
    Running Task Two...
    [HH:MM:SS] Task Two starting...
    # (waits for ~0.5 seconds)
    [HH:MM:SS] Task Two finished.
    Task Two returned: Two Complete
    
    Runner context finished.
    
    (HH:MM:SS will be the actual time)

3. High-Level: concurrent.futures Executors

  • Level: High-level interface for thread and process pools.

  • Purpose: Execute callables asynchronously using threads or processes, abstracting away the direct management of threads/processes.

  • Mechanism: Multithreading or Multiprocessing.

  • Documentation: concurrent.futures.rst

    • a) ThreadPoolExecutor

      • Best for: I/O-bound tasks (network requests, file I/O). CPU-bound tasks won't see true parallelism due to the GIL.
      • Mechanism: Multithreading.
      # Based on concurrent.futures.rst ThreadPoolExecutor Example
      import concurrent.futures
      import time
      import random
      
      URLS = ['http://example.com', 'http://example.org', 'http://example.net']
      
      # Dummy function to simulate URL fetching
      def load_url(url, timeout):
          print(f"Fetching {url}...")
          # Simulate network delay
          delay = random.uniform(0.5, 1.5)
          time.sleep(delay)
          print(f"Finished fetching {url} after {delay:.2f}s")
          return f"Data from {url}"
      
      # We can use a with statement to ensure threads are cleaned up promptly
      with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
          # Start the load operations and mark each future with its URL
          future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
          for future in concurrent.futures.as_completed(future_to_url):
              url = future_to_url[future]
              try:
                  data = future.result()
                  print(f'{url} page data: {data}')
              except Exception as exc:
                  print(f'{url} generated an exception: {exc}')
      • Expected Output (Order may vary):
        Fetching http://example.com...
        Fetching http://example.org...
        Fetching http://example.net...
        Finished fetching http://example.org after X.XXs
        http://example.org page data: Data from http://example.org
        Finished fetching http://example.com after Y.YYs
        http://example.com page data: Data from http://example.com
        Finished fetching http://example.net after Z.ZZs
        http://example.net page data: Data from http://example.net
        
    • b) ProcessPoolExecutor

      • Best for: CPU-bound tasks, as it bypasses the GIL using separate processes.
      • Mechanism: Multiprocessing. Requires that functions and arguments are picklable. Needs if __name__ == '__main__' guard.
      # Based on concurrent.futures.rst ProcessPoolExecutor Example
      import concurrent.futures
      import math
      import time
      
      PRIMES = [112272535095293, 112582705942171, 112272535095293, 115280095190773]
      
      def is_prime(n):
          if n < 2: return False
          if n == 2: return True
          if n % 2 == 0: return False
          sqrt_n = int(math.floor(math.sqrt(n)))
          for i in range(3, sqrt_n + 1, 2):
              if n % i == 0:
                  return False
          # Simulate some work
          # time.sleep(0.1)
          return True
      
      # Required for multiprocessing on some platforms
      if __name__ == '__main__':
          start_time = time.monotonic()
          with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
              results = executor.map(is_prime, PRIMES)
              for number, prime in zip(PRIMES, results):
                  print('%d is prime: %s' % (number, prime))
          end_time = time.monotonic()
          print(f"Took {end_time - start_time:.2f} seconds")
      • Expected Output:
        112272535095293 is prime: True
        112582705942171 is prime: True
        112272535095293 is prime: True
        115280095190773 is prime: True
        Took X.XX seconds
        
        (The time taken will be significantly less than the sequential time if multiple cores are available)
    • c) InterpreterPoolExecutor

      • Best for: CPU-bound tasks, offering true parallelism via sub-interpreters, each with its own GIL.
      • Mechanism: Multi-interpreter parallelism within threads. Requires picklable functions/args. Objects are generally isolated between interpreters.
      • Note: This is relatively new and its usage patterns are still evolving. The documentation doesn't provide a full example, but its usage mirrors ThreadPoolExecutor.
      # Conceptual example based on InterpreterPoolExecutor description
      import concurrent.futures
      import time
      
      def cpu_intensive_task(n):
          print(f"Starting task {n}...")
          # Simulate CPU work
          result = sum(i*i for i in range(10**7))
          print(f"Finished task {n}.")
          return n, result
      
      # Required for multiprocessing/subinterpreters on some platforms
      if __name__ == '__main__':
          start_time = time.monotonic()
          # Note: InterpreterPoolExecutor might not be available if compiled without subinterpreter support
          try:
              with concurrent.futures.InterpreterPoolExecutor(max_workers=4) as executor:
                  futures = [executor.submit(cpu_intensive_task, i) for i in range(4)]
                  for future in concurrent.futures.as_completed(futures):
                      try:
                          n, result = future.result()
                          print(f"Task {n} result obtained (length: {len(str(result))})")
                      except Exception as exc:
                          print(f'Task generated an exception: {exc}')
          except ImportError:
               print("InterpreterPoolExecutor not available in this build.")
      
          end_time = time.monotonic()
          print(f"Took {end_time - start_time:.2f} seconds")
      • Expected Output (Order may vary):
        Starting task 0...
        Starting task 1...
        Starting task 2...
        Starting task 3...
        Finished task X.
        Task X result obtained (length: ...)
        Finished task Y.
        Task Y result obtained (length: ...)
        Finished task Z.
        Task Z result obtained (length: ...)
        Finished task W.
        Task W result obtained (length: ...)
        Took T.TT seconds
        
        (Time should reflect parallel execution if multiple cores/interpreters are used)

4. Mid-Level: asyncio Core Constructs

  • Level: Core building blocks for asyncio applications.

  • Purpose: Define asynchronous operations and run them concurrently.

  • Best for: I/O-bound concurrency.

  • Mechanism: Asynchronous cooperative multitasking via event loop.

  • Documentation: asyncio-task.rst

    • a) Coroutines (async/await)

      • Defines asynchronous functions whose execution can be paused and resumed.
      # Based on asyncio-task.rst Coroutines example
      import asyncio
      import time
      
      async def say_after(delay, what):
          await asyncio.sleep(delay)
          print(what)
      
      async def main():
          print(f"started at {time.strftime('%X')}")
          await say_after(1, 'hello')
          await say_after(2, 'world')
          print(f"finished at {time.strftime('%X')}")
      
      asyncio.run(main())
      • Expected Output:
        started at HH:MM:SS
        # (waits ~1 second)
        hello
        # (waits ~2 seconds)
        world
        finished at HH:MM:SS+3s
        
    • b) asyncio.create_task()

      • Schedules a coroutine to run concurrently as an asyncio.Task.
      # Based on asyncio-task.rst create_task example
      import asyncio
      import time
      
      async def say_after(delay, what):
          await asyncio.sleep(delay)
          print(what)
      
      async def main():
          # Schedule tasks to run concurrently
          task1 = asyncio.create_task(say_after(1, 'hello'))
          task2 = asyncio.create_task(say_after(2, 'world'))
      
          print(f"started at {time.strftime('%X')}")
      
          # Wait until both tasks are completed
          # The await allows other tasks (like sleep) to run
          await task1
          await task2
      
          print(f"finished at {time.strftime('%X')}")
      
      asyncio.run(main())
      • Expected Output:
        started at HH:MM:SS
        # (waits ~1 second)
        hello
        # (waits ~1 more second)
        world
        finished at HH:MM:SS+2s
        
    • c) asyncio.gather()

      • Runs multiple awaitables (coroutines, tasks, futures) concurrently and aggregates results.
      # Based on asyncio-task.rst gather example
      import asyncio
      import time
      
      async def factorial(name, number):
          f = 1
          for i in range(2, number + 1):
              print(f"Task {name}: Compute factorial({number}), currently i={i}...")
              await asyncio.sleep(0.5) # Shorter sleep for demo
              f *= i
          print(f"Task {name}: factorial({number}) = {f}")
          return f
      
      async def main():
          print(f"started at {time.strftime('%X')}")
          # Schedule three calls *concurrently*:
          results = await asyncio.gather(
              factorial("A", 2),
              factorial("B", 3),
              factorial("C", 4),
          )
          print(f"finished at {time.strftime('%X')}")
          print(f"Results: {results}")
      
      asyncio.run(main())
      • Expected Output (Interleaving may vary slightly):
        started at HH:MM:SS
        Task A: Compute factorial(2), currently i=2...
        Task B: Compute factorial(3), currently i=2...
        Task C: Compute factorial(4), currently i=2...
        # (waits ~0.5s)
        Task A: factorial(2) = 2
        Task B: Compute factorial(3), currently i=3...
        Task C: Compute factorial(4), currently i=3...
        # (waits ~0.5s)
        Task B: factorial(3) = 6
        Task C: Compute factorial(4), currently i=4...
        # (waits ~0.5s)
        Task C: factorial(4) = 24
        finished at HH:MM:SS+~1.5s
        Results: [2, 6, 24]
        
    • d) asyncio.TaskGroup

      • Provides structured concurrency; ensures all tasks within the group complete or are cancelled upon exit/error.
      # Based on asyncio-task.rst TaskGroup example
      import asyncio
      import time
      
      async def say_after(delay, what):
          await asyncio.sleep(delay)
          print(what)
          return f"{what} done"
      
      async def main():
          print(f"started at {time.strftime('%X')}")
          async with asyncio.TaskGroup() as tg:
              task1 = tg.create_task(say_after(1, 'hello'))
              task2 = tg.create_task(say_after(2, 'world'))
              print(f"Tasks created at {time.strftime('%X')}")
          # The 'async with' block implicitly awaits task completion
          print(f"finished at {time.strftime('%X')}")
          # Task results can be accessed after the block
          print(f"Task 1 result: {task1.result()}")
          print(f"Task 2 result: {task2.result()}")
      
      
      asyncio.run(main())
      • Expected Output:
        started at HH:MM:SS
        Tasks created at HH:MM:SS
        # (waits ~1 second)
        hello
        # (waits ~1 more second)
        world
        finished at HH:MM:SS+2s
        Task 1 result: hello done
        Task 2 result: world done
        
    • e) asyncio.to_thread()

      • Runs blocking functions (especially I/O-bound) in a separate thread managed by asyncio's default executor, making them non-blocking from the event loop's perspective.
      • Best for: Integrating blocking I/O libraries into asyncio code.
      • Mechanism: Multithreading integrated with asyncio event loop.
      # Based on asyncio-task.rst to_thread example
      import asyncio
      import time
      
      def blocking_io(duration, name):
          print(f"[{time.strftime('%X')}] start blocking_io {name}")
          # Note: time.sleep() simulates any blocking IO-bound operation.
          time.sleep(duration)
          print(f"[{time.strftime('%X')}] blocking_io {name} complete")
          return f"{name} finished"
      
      async def main():
          print(f"[{time.strftime('%X')}] started main")
      
          # Run blocking_io in a separate thread via to_thread
          # Run asyncio.sleep concurrently in the main event loop thread
          results = await asyncio.gather(
              asyncio.to_thread(blocking_io, 1.0, "Task A"),
              asyncio.to_thread(blocking_io, 0.5, "Task B"), # Another blocking task
              asyncio.sleep(1.2) # An async task
          )
      
          print(f"[{time.strftime('%X')}] finished main")
          print(f"Results: {results}") # Note: sleep returns None
      
      asyncio.run(main())
      • Expected Output (Interleaving/timing may vary slightly):
        [HH:MM:SS] started main
        [HH:MM:SS] start blocking_io Task A
        [HH:MM:SS] start blocking_io Task B
        # (waits ~0.5s)
        [HH:MM:SS] blocking_io Task B complete
        # (waits ~0.5s)
        [HH:MM:SS] blocking_io Task A complete
        # (waits ~0.2s until sleep(1.2) finishes)
        [HH:MM:SS] finished main
        Results: ['Task A finished', 'Task B finished', None]
        
        (Total time should be around 1.2 seconds, showing concurrency)
    • f) asyncio Streams (open_connection, start_server)

      • High-level API for network programming using async/await.
      • Best for: Network I/O-bound tasks.
      • Mechanism: Asynchronous I/O via event loop.
      # Based on asyncio-stream.rst TCP echo client example
      # Requires a running echo server on 127.0.0.1:8888
      # You can use the server example from the docs, run separately.
      import asyncio
      
      async def tcp_echo_client(message):
          try:
              reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
              print(f'Send: {message!r}')
              writer.write(message.encode())
              await writer.drain() # Ensure data is sent
      
              data = await reader.read(100)
              print(f'Received: {data.decode()!r}')
      
              print('Close the connection')
              writer.close()
              await writer.wait_closed()
          except ConnectionRefusedError:
              print("Connection refused. Is the server running?")
          except Exception as e:
              print(f"An error occurred: {e}")
      
      # Example usage:
      asyncio.run(tcp_echo_client('Hello Async World!'))
      • Expected Output (if server is running):
        Send: 'Hello Async World!'
        Received: 'Hello Async World!'
        Close the connection
        
      • Expected Output (if server is NOT running):
        Connection refused. Is the server running?
        

5. Mid-Level: threading Module

  • Level: Direct thread creation and management.

  • Purpose: Running code in separate OS threads within the same process.

  • Best for: I/O-bound concurrency. Limited benefit for CPU-bound tasks in CPython due to GIL.

  • Mechanism: Preemptive Multithreading (OS handles scheduling). Shared memory requires explicit synchronization.

  • Documentation: threading.rst

    • a) threading.Thread

      • Create and manage threads directly.
      # Simple threading example based on threading.rst concepts
      import threading
      import time
      import os
      
      def worker(num):
          """Thread worker function"""
          print(f'Worker {num}: Starting (OS Thread ID: {threading.get_native_id()})')
          time.sleep(1)
          print(f'Worker {num}: Exiting')
      
      threads = []
      for i in range(3):
          t = threading.Thread(target=worker, args=(i,))
          threads.append(t)
          t.start() # Start the thread's activity
      
      print(f"Main thread ({threading.get_native_id()}) waiting for workers...")
      for t in threads:
          t.join() # Wait for the thread to terminate
      
      print("Main thread: All workers finished.")
      • Expected Output (Order of starting/exiting messages may vary):
        Worker 0: Starting (OS Thread ID: XXXX)
        Worker 1: Starting (OS Thread ID: YYYY)
        Worker 2: Starting (OS Thread ID: ZZZZ)
        Main thread (OS Thread ID: WWWW) waiting for workers...
        # (waits ~1 second)
        Worker 0: Exiting
        Worker 1: Exiting
        Worker 2: Exiting
        Main thread: All workers finished.
        
    • b) threading Synchronization Primitives (Lock, RLock, Event, Condition, Semaphore, Barrier)

      • Used to coordinate threads and protect shared resources.
      # Example using threading.Lock based on threading.rst concepts
      import threading
      import time
      
      shared_resource = 0
      lock = threading.Lock()
      
      def critical_section_worker(num):
          global shared_resource
          print(f"Worker {num}: Trying to acquire lock...")
          with lock: # Acquire lock using context manager
              print(f"Worker {num}: Lock acquired. Accessing resource.")
              local_copy = shared_resource
              # Simulate work
              time.sleep(0.1)
              shared_resource = local_copy + 1
              print(f"Worker {num}: Resource updated to {shared_resource}. Releasing lock.")
          # Lock is released automatically by 'with' statement exit
      
      threads = []
      for i in range(5):
          t = threading.Thread(target=critical_section_worker, args=(i,))
          threads.append(t)
          t.start()
      
      for t in threads:
          t.join()
      
      print(f"\nFinal shared_resource value: {shared_resource}")
      • Expected Output (Order varies, but critical sections don't overlap):
        Worker 0: Trying to acquire lock...
        Worker 0: Lock acquired. Accessing resource.
        Worker 1: Trying to acquire lock...
        Worker 2: Trying to acquire lock...
        Worker 3: Trying to acquire lock...
        Worker 4: Trying to acquire lock...
        Worker 0: Resource updated to 1. Releasing lock.
        Worker 1: Lock acquired. Accessing resource.
        Worker 1: Resource updated to 2. Releasing lock.
        Worker 2: Lock acquired. Accessing resource.
        Worker 2: Resource updated to 3. Releasing lock.
        Worker 3: Lock acquired. Accessing resource.
        Worker 3: Resource updated to 4. Releasing lock.
        Worker 4: Lock acquired. Accessing resource.
        Worker 4: Resource updated to 5. Releasing lock.
        
        Final shared_resource value: 5
        

6. Mid-Level: multiprocessing Module

  • Level: Direct process creation and management, plus tools for communication and sharing.

  • Purpose: Running code in separate OS processes, bypassing the GIL.

  • Best for: CPU-bound parallelism.

  • Mechanism: Multiprocessing. Processes have separate memory; requires explicit IPC (Queues, Pipes) or shared memory mechanisms. Needs if __name__ == '__main__' guard.

  • Documentation: multiprocessing.rst

    • a) multiprocessing.Process

      • Create and manage processes directly.
      # Based on multiprocessing.rst Process example
      from multiprocessing import Process
      import os
      import time
      
      def info(title):
          print(title)
          print('module name:', __name__)
          # Use getpid() instead of getppid() for clarity in child
          print('process id:', os.getpid())
          # getppid() shows parent PID
          try:
               print('parent process:', os.getppid())
          except AttributeError: # getppid not on all platforms
               pass
      
      
      def f(name):
          info('function f')
          print('hello', name)
          time.sleep(1)
          print('function f finishing')
      
      if __name__ == '__main__':
          info('main line')
          p = Process(target=f, args=('bob',))
          p.start()
          print("Main process continues...")
          p.join() # Wait for process p to finish
          print("Main process finished.")
      • Expected Output:
        main line
        module name: __main__
        process id: XXXX
        parent process: YYYY
        Main process continues...
        function f
        module name: __mp_main__ (or similar, depending on start method)
        process id: ZZZZ
        parent process: XXXX
        hello bob
        # (waits ~1 second)
        function f finishing
        Main process finished.
        
    • b) multiprocessing Communication (Queue, Pipe)

      • Mechanisms for passing picklable objects between processes.
      # Based on multiprocessing.rst Queue example
      from multiprocessing import Process, Queue
      import time
      
      def worker(q, worker_id):
          item = q.get() # Blocks until an item is available
          print(f"Worker {worker_id} got: {item}")
          time.sleep(0.5) # Simulate work
          print(f"Worker {worker_id} finished.")
      
      if __name__ == '__main__':
          q = Queue()
      
          p1 = Process(target=worker, args=(q, 1))
          p2 = Process(target=worker, args=(q, 2))
      
          p1.start()
          p2.start()
      
          print("Main process putting items...")
          q.put("Item A")
          q.put("Item B")
      
          # Wait for workers to finish
          p1.join()
          p2.join()
          print("Main process finished.")
      • Expected Output (Order of 'got' messages may vary):
        Main process putting items...
        Worker 1 got: Item A
        Worker 2 got: Item B
        # (waits ~0.5s)
        Worker 1 finished.
        Worker 2 finished.
        Main process finished.
        
    • c) multiprocessing Shared State (Value, Array, Manager)

      • Ways to share data between processes (use with caution). Value/Array use shared memory; Manager uses a server process.
      # Based on multiprocessing.rst Value/Array example
      from multiprocessing import Process, Value, Array, Lock
      import time
      
      def modify(n, a, l):
          with l: # Use lock for synchronized access
              n.value += 1.0
              for i in range(len(a)):
                  a[i] *= 2
          print(f"Child modified: n={n.value}, a={list(a)}")
      
      if __name__ == '__main__':
          lock = Lock()
          num = Value('d', 0.0, lock=False) # Create raw value, use external lock
          arr = Array('i', range(5), lock=False) # Create raw array
      
          print(f"Initial: n={num.value}, a={list(arr)}")
      
          p1 = Process(target=modify, args=(num, arr, lock))
          p2 = Process(target=modify, args=(num, arr, lock))
      
          p1.start()
          p2.start()
      
          p1.join()
          p2.join()
      
          print(f"Final: n={num.value}, a={list(arr)}")
      • Expected Output (Order of 'Child modified' lines may vary):
        Initial: n=0.0, a=[0, 1, 2, 3, 4]
        Child modified: n=1.0, a=[0, 2, 4, 6, 8]
        Child modified: n=2.0, a=[0, 4, 8, 12, 16]
        Final: n=2.0, a=[0, 4, 8, 12, 16]
        
    • d) multiprocessing.Pool

      • Manages a pool of worker processes for executing tasks in parallel.
      • Best for: Data parallelism (applying a function to many inputs).
      # Based on multiprocessing.rst Pool example
      from multiprocessing import Pool
      import time
      
      def f(x):
          # Simulate some CPU work
          # time.sleep(0.1)
          return x*x
      
      if __name__ == '__main__':
          start_time = time.monotonic()
          # Use context manager for automatic cleanup
          with Pool(processes=4) as pool:
              # map blocks until all results are ready
              results = pool.map(f, range(10))
              print(f"map results: {results}")
      
              # apply_async is non-blocking, returns AsyncResult
              res_async = pool.apply_async(f, (10,))
              print(f"apply_async result for f(10): {res_async.get(timeout=5)}")
      
          end_time = time.monotonic()
          print(f"Pool operations took {end_time - start_time:.2f} seconds")
      • Expected Output:
        map results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        apply_async result for f(10): 100
        Pool operations took X.XX seconds
        

7. Mid-to-Low Level: subprocess Module

  • Level: Running external commands as separate processes.

  • Purpose: Interact with external programs, potentially in parallel with the main Python script.

  • Best for: Offloading work to existing command-line tools, managing external processes. Can be used for both I/O (if interacting via pipes) and CPU-bound (if the external tool is CPU-bound) tasks.

  • Mechanism: Multiprocessing (external processes).

  • Documentation: subprocess.rst

    • a) subprocess.run()

      • Simple interface to run a command and wait for completion.
      # Based on subprocess.rst run example
      import subprocess
      
      # Run 'ls' and capture output
      # Use a command available on most systems, like 'echo' or 'dir'/'type' on Windows
      try:
          # POSIX example:
          # result = subprocess.run(["ls", "-l", "/dev/null"], capture_output=True, text=True, check=True)
          # Windows example:
          result = subprocess.run(["cmd", "/c", "dir", "C:\\Windows\\System32\\notepad.exe"],
                                  capture_output=True, text=True, check=True, shell=True) # shell=True needed for cmd /c
          print("Command executed successfully.")
          print("Args:", result.args)
          print("Return Code:", result.returncode)
          print("Stdout:")
          print(result.stdout)
          print("Stderr:", result.stderr or "<empty>")
      except FileNotFoundError:
          print("Error: Command not found.")
      except subprocess.CalledProcessError as e:
          print(f"Error: Command failed with exit code {e.returncode}")
          print("Stderr:", e.stderr or "<empty>")
      • Expected Output (Windows example, details may vary):
        Command executed successfully.
        Args: ['cmd', '/c', 'dir', 'C:\\Windows\\System32\\notepad.exe']
        Return Code: 0
        Stdout:
         Volume in drive C has no label.
         Volume Serial Number is XXXX-XXXX
        ...
         Directory of C:\Windows\System32
        ...
        MM/DD/YYYY  HH:MM PM           XXX,XXX notepad.exe
                       1 File(s)        XXX,XXX bytes
                       0 Dir(s)  YYY,YYY,YYY,YYY bytes free
        
        Stderr: <empty>
        
    • b) subprocess.Popen

      • Flexible interface for creating and managing child processes, allowing non-blocking interaction via pipes.
      # Based on subprocess.rst Popen example (adapted for cross-platform)
      import subprocess
      import sys
      import time
      
      # Command to print numbers 0 through 4 with a delay
      # Python script used for cross-platform compatibility
      cmd_args = [
          sys.executable, '-c',
          "import time; [print(i, flush=True) or time.sleep(0.2) for i in range(5)]"
      ]
      
      print("Starting subprocess...")
      with subprocess.Popen(cmd_args, stdout=subprocess.PIPE, text=True) as proc:
          print(f"Subprocess PID: {proc.pid}")
      
          # Read output line by line without blocking indefinitely
          while True:
              output_line = proc.stdout.readline()
              if not output_line and proc.poll() is not None:
                  # No more output and process has finished
                  break
              if output_line:
                  print(f"Received: {output_line.strip()}")
      
          # Ensure process finished and get return code
          rc = proc.wait() # Should return quickly as poll() was not None
          print(f"Subprocess finished with return code: {rc}")
      • Expected Output:
        Starting subprocess...
        Subprocess PID: XXXX
        Received: 0
        # (waits ~0.2s)
        Received: 1
        # (waits ~0.2s)
        Received: 2
        # (waits ~0.2s)
        Received: 3
        # (waits ~0.2s)
        Received: 4
        # (waits ~0.2s)
        Subprocess finished with return code: 0
        

8. Low-Level: asyncio Event Loop and Primitives

  • Level: Direct interaction with the asyncio event loop, transports, protocols, and synchronization primitives.

  • Purpose: Building frameworks, libraries, or fine-grained control over async operations.

  • Best for: Complex I/O-bound concurrency scenarios.

  • Mechanism: Asynchronous cooperative multitasking via event loop.

  • Documentation: asyncio-eventloop.rst, asyncio-protocol.rst, asyncio-sync.rst

    • a) Event Loop Methods (call_soon, call_later, add_reader, etc.)

      • Scheduling callbacks, interacting with file descriptors directly.
      # Based on asyncio-eventloop.rst Hello World with call_soon()
      import asyncio
      
      def hello_world(loop, message):
          """A callback to print a message and stop the event loop"""
          print(message)
          loop.stop()
      
      loop = asyncio.new_event_loop()
      asyncio.set_event_loop(loop) # Required before Python 3.10 in some cases
      
      # Schedule a call to hello_world()
      loop.call_soon(hello_world, loop, "Hello Low-Level World!")
      
      print("Starting event loop...")
      # Blocking call interrupted by loop.stop()
      try:
          loop.run_forever()
      finally:
          print("Closing event loop.")
          loop.close()
      • Expected Output:
        Starting event loop...
        Hello Low-Level World!
        Closing event loop.
        
    • b) Transports and Protocols

      • Callback-based API for network protocols, used by asyncio streams internally.
      # Based on asyncio-protocol.rst TCP Echo Client
      # Requires a running echo server on 127.0.0.1:8888
      import asyncio
      
      class EchoClientProtocol(asyncio.Protocol):
          def __init__(self, message, on_con_lost):
              self.message = message
              self.on_con_lost = on_con_lost
              self.transport = None
      
          def connection_made(self, transport):
              self.transport = transport
              print('Connection made.')
              self.transport.write(self.message.encode())
              print(f'Data sent: {self.message!r}')
      
          def data_received(self, data):
              print(f'Data received: {data.decode()!r}')
              # Close connection after receiving data
              print('Closing transport.')
              self.transport.close()
      
          def connection_lost(self, exc):
              print('Connection lost.')
              if exc:
                  print(f'Error: {exc}')
              # Signal the main coroutine that connection is lost
              self.on_con_lost.set_result(True)
      
      async def main():
          loop = asyncio.get_running_loop()
          on_con_lost = loop.create_future()
          message = 'Hello via Protocol!'
      
          try:
              transport, protocol = await loop.create_connection(
                  lambda: EchoClientProtocol(message, on_con_lost),
                  '127.0.0.1', 8888)
      
              print("Waiting for connection to close...")
              # Wait until the protocol signals that the connection is lost
              await on_con_lost
      
          except ConnectionRefusedError:
               print("Connection refused. Is the server running?")
          except Exception as e:
               print(f"An error occurred: {e}")
          # transport is closed implicitly by protocol or error handling
      
      asyncio.run(main())
      • Expected Output (if server running):
        Connection made.
        Data sent: 'Hello via Protocol!'
        Waiting for connection to close...
        Data received: 'Hello via Protocol!'
        Closing transport.
        Connection lost.
        
    • c) asyncio Synchronization Primitives (Lock, Event, etc.)

      • Similar to threading primitives but designed for asyncio tasks.
      # Based on asyncio-sync.rst Event example
      import asyncio
      
      async def waiter(event, name):
          print(f'{name}: waiting for event ...')
          await event.wait() # Blocks until event.set() is called
          print(f'{name}: ... event received!')
      
      async def main():
          event = asyncio.Event()
      
          # Spawn Tasks to wait until 'event' is set.
          waiter_task1 = asyncio.create_task(waiter(event, "Waiter 1"))
          waiter_task2 = asyncio.create_task(waiter(event, "Waiter 2"))
      
          print("Main: Sleeping before setting event...")
          await asyncio.sleep(1)
          print("Main: Setting the event.")
          event.set() # Wake up all waiting tasks
      
          # Wait until the waiter tasks are finished.
          await waiter_task1
          await waiter_task2
          print("Main: Waiters finished.")
      
      asyncio.run(main())
      • Expected Output (Order of waiter messages may vary slightly):
        Waiter 1: waiting for event ...
        Waiter 2: waiting for event ...
        Main: Sleeping before setting event...
        # (waits ~1 second)
        Main: Setting the event.
        Waiter 1: ... event received!
        Waiter 2: ... event received!
        Main: Waiters finished.
        

9. Low-Level: selectors Module

  • Level: High-level wrapper around OS I/O multiplexing mechanisms (select, poll, epoll, kqueue).
  • Purpose: Monitor multiple file descriptors for I/O readiness efficiently. Used internally by asyncio.
  • Best for: Building custom event loops or network applications needing fine control over non-blocking I/O.
  • Mechanism: I/O Multiplexing.
  • Documentation: selectors.rst
# Based on selectors.rst example
import selectors
import socket
import sys

sel = selectors.DefaultSelector() # Uses the best implementation for the platform

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('Accepted connection from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read) # Register client socket for reading

def read(conn, mask):
    try:
        data = conn.recv(1000)  # Should be ready
        if data:
            print('Echoing', repr(data), 'to', conn.getpeername())
            conn.send(data)  # Hope it won't block
        else:
            # No data means client closed connection
            print('Closing connection to', conn.getpeername())
            sel.unregister(conn)
            conn.close()
    except ConnectionResetError:
        print('Connection reset by peer', conn.getpeername())
        sel.unregister(conn)
        conn.close()

# Set up listening socket
host = '127.0.0.1'
port = 12345
print(f"Starting server on {host}:{port}")
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow reuse
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False) # Essential for selectors
sel.register(lsock, selectors.EVENT_READ, accept) # Register server socket for reading (accepts)

try:
    while True:
        # Wait for events, timeout=None means block indefinitely
        events = sel.select(timeout=None)
        if not events:
            print("Selector returned no events, looping again.")
            continue
        for key, mask in events:
            callback = key.data # The function we registered (accept or read)
            callback(key.fileobj, mask) # Call the function
except KeyboardInterrupt:
    print("Server shutting down.")
finally:
    lsock.close()
    sel.close()

# To test: telnet 127.0.0.1 12345, type something, press Enter.
  • Expected Output (Interaction):
    Starting server on 127.0.0.1:12345
    # (Connect with telnet)
    Accepted connection from ('127.0.0.1', XXXXX)
    # (Type 'hello' in telnet and press Enter)
    Echoing b'hello\r\n' to ('127.0.0.1', XXXXX)
    # (Disconnect telnet)
    Closing connection to ('127.0.0.1', XXXXX)
    # (Ctrl+C to stop server)
    Server shutting down.
    

10. Low-Level: _thread Module

  • Level: Lowest-level threading API.
  • Purpose: Basic thread creation. threading module is built on this and is preferred.
  • Best for: Situations where the overhead of threading.Thread objects is undesirable (rare).
  • Mechanism: Preemptive Multithreading.
  • Documentation: _thread.rst
# Based on _thread.rst concepts
import _thread
import time
import threading # To get main thread ID easily

def low_level_worker(thread_name, delay):
    count = 0
    print(f"Starting {thread_name} (OS ID: {threading.get_native_id()})")
    while count < 3:
        time.sleep(delay)
        count += 1
        print(f"{thread_name}: Count is {count}")
    print(f"{thread_name}: Finished.")
    # Note: _thread doesn't have a built-in join mechanism easily accessible here
    # We rely on the main thread sleeping long enough

# Start new threads
try:
    _thread.start_new_thread(low_level_worker, ("Thread-1", 0.5))
    _thread.start_new_thread(low_level_worker, ("Thread-2", 0.7))
except Exception as e:
    print(f"Error: unable to start thread - {e}")

print(f"Main thread ({threading.get_native_id()}) waiting...")
# Crude way to wait for threads started with _thread
time.sleep(3)
print("Main thread finished waiting.")
  • Expected Output (Interleaving will vary):
    Starting Thread-1 (OS ID: XXXX)
    Starting Thread-2 (OS ID: YYYY)
    Main thread (OS ID: ZZZZ) waiting...
    Thread-1: Count is 1
    Thread-2: Count is 1
    Thread-1: Count is 2
    Thread-1: Count is 3
    Thread-1: Finished.
    Thread-2: Count is 2
    Thread-2: Count is 3
    Thread-2: Finished.
    Main thread finished waiting.
    

11. Other Related Mechanisms (Mentioned in Docs):

  • queue.Queue: Thread-safe queue for communication between threads. (Doc: queue.rst)
  • signal: Handling asynchronous OS signals, typically in the main thread. (Doc: signal.rst)
  • mmap: Memory-mapped files. Can potentially be used for IPC if mapped as shared, but multiprocessing.shared_memory or Manager is usually preferred for structured sharing. (Doc: mmap.rst)
  • contextvars: Manage context-local state correctly across asyncio tasks and threads. Crucial for context propagation in concurrent code. (Doc: contextvars.rst)
  • sched: A general-purpose single-threaded event scheduler based on time delays. Not typically used for I/O or CPU concurrency in the same way as the others, more for scheduling future actions within one thread. (Doc: sched.rst)
  • select: The low-level OS interface for I/O multiplexing. selectors is the preferred higher-level interface. (Doc: select.rst)

This covers the spectrum from highly abstract (asyncio.run, concurrent.futures) down to low-level primitives (_thread, select), using only the provided documentation snippets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment