Created
March 22, 2022 01:30
-
-
Save kylekyle/f2bd3f1606d1fe581161d9fa75581a88 to your computer and use it in GitHub Desktop.
A full example of a producer/consumer using process pools in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Event | |
from time import sleep | |
from traceback import print_exception | |
from multiprocessing import Pool, Manager | |
from queue import Full | |
from signal import signal, SIGTERM, SIGINT | |
class Consumer1: | |
def __init__(self): | |
print("Initializing Consumer1 ...") | |
def consume(self, batch): | |
sleep(0.001) | |
class Consumer2: | |
def __init__(self): | |
print("Initializing Consumer2 ...") | |
def consume(self, batch): | |
pass | |
def consume(q): | |
consumers = [Consumer1(), Consumer2()] | |
while True: | |
batch = q.get() | |
for consumer in consumers: | |
consumer.consume(batch) | |
if __name__ == '__main__': | |
num_processes = 10 | |
with Pool(num_processes) as pool: | |
batch = 0 | |
stopped = Event() | |
queue = Manager().Queue(num_processes) | |
for n in [SIGINT, SIGTERM]: | |
signal(n, lambda *a: stopped.set()) | |
def error(ex): | |
print(ex) | |
print_exception(type(ex), ex, ex.__traceback__) | |
stopped.set() | |
for i in range(num_processes): | |
pool.apply_async(consume, (queue,), {}, None, error) | |
while not stopped.is_set(): | |
batch += 1 | |
try: | |
queue.put_nowait(batch) | |
except Full: | |
print("skipped batch", batch) | |
print("Terminating pool ...") | |
pool.terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment