Created
May 4, 2023 05:21
-
-
Save alukach/8c3e6c509115d8743aee1e90b4d33897 to your computer and use it in GitHub Desktop.
Multiprocessing + Asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An example of a script that does CPU-bound work (checksum calculation) followed by | |
IO-bound work (upload to server) in a performant manner. | |
Inspiration: https://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces#29147750 | |
""" | |
import asyncio | |
import datetime | |
import hashlib | |
import multiprocessing | |
import random | |
import time | |
import typing | |
from concurrent.futures import ProcessPoolExecutor | |
# Logging colors | |
green = "\033[92m" | |
blue = "\033[96m" | |
clear = "\033[0m" | |
def log(filename, msg, color=clear): | |
""" | |
Mock logger (works in multiprocessing scenario) | |
""" | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | |
print( | |
f"{timestamp:<24} {multiprocessing.current_process().name:<15} {filename:<8} {color}{msg}{clear}" | |
) | |
def generate_checksum(filepath: str) -> str: | |
""" | |
Blocking checksum generation. Should run in its own process. | |
""" | |
log(filepath, "Starting checksum...", color=blue) | |
delay = random.randint(1, 50) / 10 | |
time.sleep(delay) # Pretend this is fast + expensive calculation | |
log(filepath, f"Completed checksum after {delay} seconds.", color=blue) | |
return hashlib.sha256(random.randbytes(100)).hexdigest() | |
async def upload(filepath: str, checksum: str) -> None: | |
""" | |
Non-blocking upload. Should run as coroutine. | |
""" | |
async with max_concurrent_uploads: | |
log(filepath, "Starting upload...", color=green) | |
delay = random.randint(1, 10) | |
await asyncio.sleep(delay) # Pretend this is slow upload | |
log( | |
filepath, | |
f"Completed upload after {delay} seconds.", | |
color=green, | |
) | |
async def process_file(filepath: str): | |
checksum = await loop.run_in_executor(pool, generate_checksum, filepath) | |
await upload(filepath, checksum) | |
return filepath | |
async def main(filenames: typing.List[str]): | |
start = time.time() | |
for i, task in enumerate( | |
asyncio.as_completed([process_file(filename) for filename in filenames]) | |
): | |
# get the next result | |
filepath = await task | |
log(filepath, f"{int(i * 100 / len(filenames))}% done") | |
print(f"Complete in {time.time() - start:.1f} seconds") | |
if __name__ == "__main__": | |
random.seed(2) | |
filecount = 100 | |
cpu_count = multiprocessing.cpu_count() | |
max_concurrent_uploads = asyncio.Semaphore(cpu_count * 10) | |
pool = ProcessPoolExecutor(max_workers=cpu_count) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main([f"{filename}.txt" for filename in range(filecount)])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment