Okay, let's break down the concurrency and parallelism mechanisms available in the Python standard library, based strictly on the provided documentation snippets, ordered from highest to lowest level of abstraction.
Core Concepts:
- Concurrency: Managing multiple tasks seemingly happening at the same time. Tasks might be interleaved on a single CPU core (cooperative multitasking like
asyncio
) or run truly in parallel. Often used for I/O-bound work.
- Parallelism: Executing multiple tasks simultaneously, typically on different CPU cores. Requires multiple processes or threads (though CPython threads are limited by the GIL for CPU-bound tasks). Best for CPU-bound work.
- Multithreading: Using multiple threads within a single process. Shares memory but is affected by the Global Interpreter Lock (GIL) in CPython, limiting true CPU parallelism. Good for I/O-bound concurrency.
- Multiprocessing: Using multiple independent processes. Each has its own memory space and interpreter, bypassing the GIL for true CPU parallelism. Requires inter-process communication (IPC) if data needs sharing.
- Asynchronous I/O (Asyncio): A specific concurrency model using an event loop and
async
/await
syntax, primarily for efficient I/O-bound tasks. It's single-threaded cooperative multitasking.
Here are the ways to achieve these, based on the provided docs:
1. High-Level: asyncio.run()
- Level: Highest-level entry point for
asyncio
.
- Purpose: Runs a top-level
async def
function (coroutine), automatically managing the event loop lifecycle (creation, running the task, shutdown, closing). Ideal for simple asyncio programs.
- Best for: I/O-bound concurrency.
- Mechanism: Asynchronous cooperative multitasking via event loop.
- Documentation:
asyncio-runner.rst
, asyncio.rst
# From asyncio.rst sidebar and asyncio-runner.rst example
import asyncio
import time
async def main():
print(f"[{time.strftime('%X')}] Hello ...")
await asyncio.sleep(1)
print(f"[{time.strftime('%X')}] ... World!")
# asyncio.run() manages the event loop
asyncio.run(main())
2. High-Level: asyncio.Runner
- Level: High-level context manager for
asyncio
.
- Purpose: Simplifies running multiple top-level async functions within the same event loop context, managing the loop lifecycle via a
with
statement.
- Best for: I/O-bound concurrency when multiple entry points are needed.
- Mechanism: Asynchronous cooperative multitasking via event loop.
- Documentation:
asyncio-runner.rst
# Based on asyncio-runner.rst example
import asyncio
import time
async def task_one():
print(f"[{time.strftime('%X')}] Task One starting...")
await asyncio.sleep(1)
print(f"[{time.strftime('%X')}] Task One finished.")
return "One Complete"
async def task_two():
print(f"[{time.strftime('%X')}] Task Two starting...")
await asyncio.sleep(0.5)
print(f"[{time.strftime('%X')}] Task Two finished.")
return "Two Complete"
# Using the Runner context manager
with asyncio.Runner() as runner:
print("Running Task One...")
result1 = runner.run(task_one())
print(f"Task One returned: {result1}")
print("\nRunning Task Two...")
result2 = runner.run(task_two())
print(f"Task Two returned: {result2}")
print("\nRunner context finished.")
3. High-Level: concurrent.futures
Executors
-
Level: High-level interface for thread and process pools.
-
Purpose: Execute callables asynchronously using threads or processes, abstracting away the direct management of threads/processes.
-
Mechanism: Multithreading or Multiprocessing.
-
Documentation: concurrent.futures.rst
4. Mid-Level: asyncio
Core Constructs
-
Level: Core building blocks for asyncio
applications.
-
Purpose: Define asynchronous operations and run them concurrently.
-
Best for: I/O-bound concurrency.
-
Mechanism: Asynchronous cooperative multitasking via event loop.
-
Documentation: asyncio-task.rst
5. Mid-Level: threading
Module
-
Level: Direct thread creation and management.
-
Purpose: Running code in separate OS threads within the same process.
-
Best for: I/O-bound concurrency. Limited benefit for CPU-bound tasks in CPython due to GIL.
-
Mechanism: Preemptive Multithreading (OS handles scheduling). Shared memory requires explicit synchronization.
-
Documentation: threading.rst
-
a) threading.Thread
- Create and manage threads directly.
# Simple threading example based on threading.rst concepts
import threading
import time
import os
def worker(num):
"""Thread worker function"""
print(f'Worker {num}: Starting (OS Thread ID: {threading.get_native_id()})')
time.sleep(1)
print(f'Worker {num}: Exiting')
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start() # Start the thread's activity
print(f"Main thread ({threading.get_native_id()}) waiting for workers...")
for t in threads:
t.join() # Wait for the thread to terminate
print("Main thread: All workers finished.")
-
b) threading
Synchronization Primitives (Lock
, RLock
, Event
, Condition
, Semaphore
, Barrier
)
- Used to coordinate threads and protect shared resources.
# Example using threading.Lock based on threading.rst concepts
import threading
import time
shared_resource = 0
lock = threading.Lock()
def critical_section_worker(num):
global shared_resource
print(f"Worker {num}: Trying to acquire lock...")
with lock: # Acquire lock using context manager
print(f"Worker {num}: Lock acquired. Accessing resource.")
local_copy = shared_resource
# Simulate work
time.sleep(0.1)
shared_resource = local_copy + 1
print(f"Worker {num}: Resource updated to {shared_resource}. Releasing lock.")
# Lock is released automatically by 'with' statement exit
threads = []
for i in range(5):
t = threading.Thread(target=critical_section_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"\nFinal shared_resource value: {shared_resource}")
6. Mid-Level: multiprocessing
Module
-
Level: Direct process creation and management, plus tools for communication and sharing.
-
Purpose: Running code in separate OS processes, bypassing the GIL.
-
Best for: CPU-bound parallelism.
-
Mechanism: Multiprocessing. Processes have separate memory; requires explicit IPC (Queues, Pipes) or shared memory mechanisms. Needs if __name__ == '__main__'
guard.
-
Documentation: multiprocessing.rst
-
a) multiprocessing.Process
- Create and manage processes directly.
# Based on multiprocessing.rst Process example
from multiprocessing import Process
import os
import time
def info(title):
print(title)
print('module name:', __name__)
# Use getpid() instead of getppid() for clarity in child
print('process id:', os.getpid())
# getppid() shows parent PID
try:
print('parent process:', os.getppid())
except AttributeError: # getppid not on all platforms
pass
def f(name):
info('function f')
print('hello', name)
time.sleep(1)
print('function f finishing')
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
print("Main process continues...")
p.join() # Wait for process p to finish
print("Main process finished.")
-
b) multiprocessing
Communication (Queue
, Pipe
)
- Mechanisms for passing picklable objects between processes.
# Based on multiprocessing.rst Queue example
from multiprocessing import Process, Queue
import time
def worker(q, worker_id):
item = q.get() # Blocks until an item is available
print(f"Worker {worker_id} got: {item}")
time.sleep(0.5) # Simulate work
print(f"Worker {worker_id} finished.")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=worker, args=(q, 1))
p2 = Process(target=worker, args=(q, 2))
p1.start()
p2.start()
print("Main process putting items...")
q.put("Item A")
q.put("Item B")
# Wait for workers to finish
p1.join()
p2.join()
print("Main process finished.")
-
c) multiprocessing
Shared State (Value
, Array
, Manager
)
- Ways to share data between processes (use with caution).
Value
/Array
use shared memory; Manager
uses a server process.
# Based on multiprocessing.rst Value/Array example
from multiprocessing import Process, Value, Array, Lock
import time
def modify(n, a, l):
with l: # Use lock for synchronized access
n.value += 1.0
for i in range(len(a)):
a[i] *= 2
print(f"Child modified: n={n.value}, a={list(a)}")
if __name__ == '__main__':
lock = Lock()
num = Value('d', 0.0, lock=False) # Create raw value, use external lock
arr = Array('i', range(5), lock=False) # Create raw array
print(f"Initial: n={num.value}, a={list(arr)}")
p1 = Process(target=modify, args=(num, arr, lock))
p2 = Process(target=modify, args=(num, arr, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"Final: n={num.value}, a={list(arr)}")
- Expected Output (Order of 'Child modified' lines may vary):
Initial: n=0.0, a=[0, 1, 2, 3, 4]
Child modified: n=1.0, a=[0, 2, 4, 6, 8]
Child modified: n=2.0, a=[0, 4, 8, 12, 16]
Final: n=2.0, a=[0, 4, 8, 12, 16]
-
d) multiprocessing.Pool
- Manages a pool of worker processes for executing tasks in parallel.
- Best for: Data parallelism (applying a function to many inputs).
# Based on multiprocessing.rst Pool example
from multiprocessing import Pool
import time
def f(x):
# Simulate some CPU work
# time.sleep(0.1)
return x*x
if __name__ == '__main__':
start_time = time.monotonic()
# Use context manager for automatic cleanup
with Pool(processes=4) as pool:
# map blocks until all results are ready
results = pool.map(f, range(10))
print(f"map results: {results}")
# apply_async is non-blocking, returns AsyncResult
res_async = pool.apply_async(f, (10,))
print(f"apply_async result for f(10): {res_async.get(timeout=5)}")
end_time = time.monotonic()
print(f"Pool operations took {end_time - start_time:.2f} seconds")
- Expected Output:
map results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
apply_async result for f(10): 100
Pool operations took X.XX seconds
7. Mid-to-Low Level: subprocess
Module
-
Level: Running external commands as separate processes.
-
Purpose: Interact with external programs, potentially in parallel with the main Python script.
-
Best for: Offloading work to existing command-line tools, managing external processes. Can be used for both I/O (if interacting via pipes) and CPU-bound (if the external tool is CPU-bound) tasks.
-
Mechanism: Multiprocessing (external processes).
-
Documentation: subprocess.rst
-
a) subprocess.run()
- Simple interface to run a command and wait for completion.
# Based on subprocess.rst run example
import subprocess
# Run 'ls' and capture output
# Use a command available on most systems, like 'echo' or 'dir'/'type' on Windows
try:
# POSIX example:
# result = subprocess.run(["ls", "-l", "/dev/null"], capture_output=True, text=True, check=True)
# Windows example:
result = subprocess.run(["cmd", "/c", "dir", "C:\\Windows\\System32\\notepad.exe"],
capture_output=True, text=True, check=True, shell=True) # shell=True needed for cmd /c
print("Command executed successfully.")
print("Args:", result.args)
print("Return Code:", result.returncode)
print("Stdout:")
print(result.stdout)
print("Stderr:", result.stderr or "<empty>")
except FileNotFoundError:
print("Error: Command not found.")
except subprocess.CalledProcessError as e:
print(f"Error: Command failed with exit code {e.returncode}")
print("Stderr:", e.stderr or "<empty>")
- Expected Output (Windows example, details may vary):
Command executed successfully.
Args: ['cmd', '/c', 'dir', 'C:\\Windows\\System32\\notepad.exe']
Return Code: 0
Stdout:
Volume in drive C has no label.
Volume Serial Number is XXXX-XXXX
...
Directory of C:\Windows\System32
...
MM/DD/YYYY HH:MM PM XXX,XXX notepad.exe
1 File(s) XXX,XXX bytes
0 Dir(s) YYY,YYY,YYY,YYY bytes free
Stderr: <empty>
-
b) subprocess.Popen
- Flexible interface for creating and managing child processes, allowing non-blocking interaction via pipes.
# Based on subprocess.rst Popen example (adapted for cross-platform)
import subprocess
import sys
import time
# Command to print numbers 0 through 4 with a delay
# Python script used for cross-platform compatibility
cmd_args = [
sys.executable, '-c',
"import time; [print(i, flush=True) or time.sleep(0.2) for i in range(5)]"
]
print("Starting subprocess...")
with subprocess.Popen(cmd_args, stdout=subprocess.PIPE, text=True) as proc:
print(f"Subprocess PID: {proc.pid}")
# Read output line by line without blocking indefinitely
while True:
output_line = proc.stdout.readline()
if not output_line and proc.poll() is not None:
# No more output and process has finished
break
if output_line:
print(f"Received: {output_line.strip()}")
# Ensure process finished and get return code
rc = proc.wait() # Should return quickly as poll() was not None
print(f"Subprocess finished with return code: {rc}")
8. Low-Level: asyncio
Event Loop and Primitives
-
Level: Direct interaction with the asyncio event loop, transports, protocols, and synchronization primitives.
-
Purpose: Building frameworks, libraries, or fine-grained control over async operations.
-
Best for: Complex I/O-bound concurrency scenarios.
-
Mechanism: Asynchronous cooperative multitasking via event loop.
-
Documentation: asyncio-eventloop.rst
, asyncio-protocol.rst
, asyncio-sync.rst
-
a) Event Loop Methods (call_soon
, call_later
, add_reader
, etc.)
- Scheduling callbacks, interacting with file descriptors directly.
# Based on asyncio-eventloop.rst Hello World with call_soon()
import asyncio
def hello_world(loop, message):
"""A callback to print a message and stop the event loop"""
print(message)
loop.stop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # Required before Python 3.10 in some cases
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop, "Hello Low-Level World!")
print("Starting event loop...")
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
print("Closing event loop.")
loop.close()
-
b) Transports and Protocols
- Callback-based API for network protocols, used by
asyncio
streams internally.
# Based on asyncio-protocol.rst TCP Echo Client
# Requires a running echo server on 127.0.0.1:8888
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Connection made.')
self.transport.write(self.message.encode())
print(f'Data sent: {self.message!r}')
def data_received(self, data):
print(f'Data received: {data.decode()!r}')
# Close connection after receiving data
print('Closing transport.')
self.transport.close()
def connection_lost(self, exc):
print('Connection lost.')
if exc:
print(f'Error: {exc}')
# Signal the main coroutine that connection is lost
self.on_con_lost.set_result(True)
async def main():
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello via Protocol!'
try:
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
print("Waiting for connection to close...")
# Wait until the protocol signals that the connection is lost
await on_con_lost
except ConnectionRefusedError:
print("Connection refused. Is the server running?")
except Exception as e:
print(f"An error occurred: {e}")
# transport is closed implicitly by protocol or error handling
asyncio.run(main())
-
c) asyncio
Synchronization Primitives (Lock
, Event
, etc.)
- Similar to
threading
primitives but designed for asyncio
tasks.
# Based on asyncio-sync.rst Event example
import asyncio
async def waiter(event, name):
print(f'{name}: waiting for event ...')
await event.wait() # Blocks until event.set() is called
print(f'{name}: ... event received!')
async def main():
event = asyncio.Event()
# Spawn Tasks to wait until 'event' is set.
waiter_task1 = asyncio.create_task(waiter(event, "Waiter 1"))
waiter_task2 = asyncio.create_task(waiter(event, "Waiter 2"))
print("Main: Sleeping before setting event...")
await asyncio.sleep(1)
print("Main: Setting the event.")
event.set() # Wake up all waiting tasks
# Wait until the waiter tasks are finished.
await waiter_task1
await waiter_task2
print("Main: Waiters finished.")
asyncio.run(main())
9. Low-Level: selectors
Module
- Level: High-level wrapper around OS I/O multiplexing mechanisms (
select
, poll
, epoll
, kqueue
).
- Purpose: Monitor multiple file descriptors for I/O readiness efficiently. Used internally by
asyncio
.
- Best for: Building custom event loops or network applications needing fine control over non-blocking I/O.
- Mechanism: I/O Multiplexing.
- Documentation:
selectors.rst
# Based on selectors.rst example
import selectors
import socket
import sys
sel = selectors.DefaultSelector() # Uses the best implementation for the platform
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('Accepted connection from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # Register client socket for reading
def read(conn, mask):
try:
data = conn.recv(1000) # Should be ready
if data:
print('Echoing', repr(data), 'to', conn.getpeername())
conn.send(data) # Hope it won't block
else:
# No data means client closed connection
print('Closing connection to', conn.getpeername())
sel.unregister(conn)
conn.close()
except ConnectionResetError:
print('Connection reset by peer', conn.getpeername())
sel.unregister(conn)
conn.close()
# Set up listening socket
host = '127.0.0.1'
port = 12345
print(f"Starting server on {host}:{port}")
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow reuse
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False) # Essential for selectors
sel.register(lsock, selectors.EVENT_READ, accept) # Register server socket for reading (accepts)
try:
while True:
# Wait for events, timeout=None means block indefinitely
events = sel.select(timeout=None)
if not events:
print("Selector returned no events, looping again.")
continue
for key, mask in events:
callback = key.data # The function we registered (accept or read)
callback(key.fileobj, mask) # Call the function
except KeyboardInterrupt:
print("Server shutting down.")
finally:
lsock.close()
sel.close()
# To test: telnet 127.0.0.1 12345, type something, press Enter.
- Expected Output (Interaction):
Starting server on 127.0.0.1:12345
# (Connect with telnet)
Accepted connection from ('127.0.0.1', XXXXX)
# (Type 'hello' in telnet and press Enter)
Echoing b'hello\r\n' to ('127.0.0.1', XXXXX)
# (Disconnect telnet)
Closing connection to ('127.0.0.1', XXXXX)
# (Ctrl+C to stop server)
Server shutting down.
10. Low-Level: _thread
Module
- Level: Lowest-level threading API.
- Purpose: Basic thread creation.
threading
module is built on this and is preferred.
- Best for: Situations where the overhead of
threading.Thread
objects is undesirable (rare).
- Mechanism: Preemptive Multithreading.
- Documentation:
_thread.rst
# Based on _thread.rst concepts
import _thread
import time
import threading # To get main thread ID easily
def low_level_worker(thread_name, delay):
count = 0
print(f"Starting {thread_name} (OS ID: {threading.get_native_id()})")
while count < 3:
time.sleep(delay)
count += 1
print(f"{thread_name}: Count is {count}")
print(f"{thread_name}: Finished.")
# Note: _thread doesn't have a built-in join mechanism easily accessible here
# We rely on the main thread sleeping long enough
# Start new threads
try:
_thread.start_new_thread(low_level_worker, ("Thread-1", 0.5))
_thread.start_new_thread(low_level_worker, ("Thread-2", 0.7))
except Exception as e:
print(f"Error: unable to start thread - {e}")
print(f"Main thread ({threading.get_native_id()}) waiting...")
# Crude way to wait for threads started with _thread
time.sleep(3)
print("Main thread finished waiting.")
11. Other Related Mechanisms (Mentioned in Docs):
queue.Queue
: Thread-safe queue for communication between threads. (Doc: queue.rst
)
signal
: Handling asynchronous OS signals, typically in the main thread. (Doc: signal.rst
)
mmap
: Memory-mapped files. Can potentially be used for IPC if mapped as shared, but multiprocessing.shared_memory
or Manager
is usually preferred for structured sharing. (Doc: mmap.rst
)
contextvars
: Manage context-local state correctly across asyncio
tasks and threads. Crucial for context propagation in concurrent code. (Doc: contextvars.rst
)
sched
: A general-purpose single-threaded event scheduler based on time delays. Not typically used for I/O or CPU concurrency in the same way as the others, more for scheduling future actions within one thread. (Doc: sched.rst
)
select
: The low-level OS interface for I/O multiplexing. selectors
is the preferred higher-level interface. (Doc: select.rst
)
This covers the spectrum from highly abstract (asyncio.run
, concurrent.futures
) down to low-level primitives (_thread
, select
), using only the provided documentation snippets.