-
-
Save siddhpant/2825884cdaf32c0eb6cdc90ec873f2c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import contextlib | |
import sys | |
from shutil import rmtree | |
import time | |
from pathlib import Path | |
from uuid import uuid4 | |
from aiofile import AIOFile | |
from aiofiles import open as aio_open | |
from anyio import open_file as anyio_open | |
from tabulate import tabulate | |
DATA_5MB = b"x" * 1024 * 1024 * 5 | |
WORKERS = 500 | |
ITERATIONS = 10 | |
SORT_RESULTS = True | |
RESULTS = [] | |
TMP_DIR = Path("async_fileio_test.d") | |
if TMP_DIR.exists(): | |
rmtree(TMP_DIR) | |
TMP_DIR_WRITE = TMP_DIR / "write" | |
TMP_DIR_WRITE.mkdir(parents=True) | |
READFILE_NAME = "readfile.txt" | |
READFILE_PATH = str(TMP_DIR / "readfile.txt") | |
USE_UVLOOP = False | |
def make_read_file(): | |
with open(READFILE_PATH, "wb") as fp: | |
fp.write(DATA_5MB) | |
fp.flush() | |
def get_unique_name() -> Path: | |
return TMP_DIR_WRITE / uuid4().hex | |
async def aiofile_read(): | |
async with AIOFile(READFILE_PATH, "rb") as afp: | |
return await afp.read() | |
async def aiofile_write(): | |
async with AIOFile(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
async def aiofile_write_flush(): | |
async with AIOFile(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
await afp.fsync() | |
async def aiofiles_read(): | |
async with aio_open(READFILE_PATH, "rb") as afp: | |
return await afp.read() | |
async def aiofiles_write(): | |
async with aio_open(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
async def aiofiles_write_flush(): | |
async with aio_open(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
await afp.flush() | |
async def anyio_read(): | |
async with await anyio_open(READFILE_PATH, "rb") as afp: | |
return await afp.read() | |
async def anyio_write(): | |
async with await anyio_open(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
async def anyio_write_flush(): | |
async with await anyio_open(get_unique_name(), "wb") as afp: | |
await afp.write(DATA_5MB) | |
await afp.flush() | |
def stdlib_read(): | |
with open(READFILE_PATH, "rb") as fp: | |
return fp.read() | |
def stdlib_write(): | |
with open(get_unique_name(), "wb") as fp: | |
fp.write(DATA_5MB) | |
def stdlib_write_flush(): | |
with open(get_unique_name(), "wb") as fp: | |
fp.write(DATA_5MB) | |
fp.flush() | |
def stdlib_read_unbuffered(): | |
with open(READFILE_PATH, "rb", buffering=0) as fp: | |
return fp.read() | |
def stdlib_write_unbuffered(): | |
with open(get_unique_name(), "wb", buffering=0) as fp: | |
fp.write(DATA_5MB) | |
# No need to flush in unbuffered mode. | |
def start_run(name): | |
if ITERATIONS > 1: | |
print( | |
f"Benchmarking '{name}' with {WORKERS} workers and {ITERATIONS} iterations." | |
) | |
def finish_run(name, start_time): | |
average_time = (time.time() - start_time) / ITERATIONS | |
print(f"Finished {name} with average {average_time:.2f} seconds.") | |
RESULTS.append((name, average_time)) | |
def finish_iteration(name, run_num, start_time): | |
if ITERATIONS > 1: | |
print(f"Finished run #{run_num + 1} in {time.time() - start_time:.2f} seconds.") | |
def select_event_loop_policy(): | |
"""Ask the user what event loop policy to use.""" | |
if sys.platform == "win32": | |
policies = [ | |
asyncio.WindowsProactorEventLoopPolicy, | |
asyncio.WindowsSelectorEventLoopPolicy, | |
] | |
else: | |
policies = [asyncio.DefaultEventLoopPolicy] | |
with contextlib.suppress(ImportError): | |
import uvloop | |
policies.append(uvloop) | |
print("Select an event loop policy:") | |
for i, policy in enumerate(policies): | |
print(f"{i}: {policy.__name__}", end="") | |
if i == 0: | |
print(" (default)") | |
else: | |
print() | |
choice = int(input("Enter a number: ")) | |
if policies[choice].__name__ == "uvloop": | |
global USE_UVLOOP | |
USE_UVLOOP = True | |
else: | |
asyncio.set_event_loop_policy(policies[choice]()) | |
if __name__ == "__main__": | |
async def run_benchmark(): | |
loop_name = "uvloop" | |
if not USE_UVLOOP: | |
loop_name = asyncio.get_event_loop_policy().__class__.__name__ | |
print(f"Starting tests with {loop_name}.") | |
make_read_file() | |
# Test async file io frameworks | |
for func in [ | |
aiofile_read, | |
aiofile_write, | |
aiofile_write_flush, | |
aiofiles_read, | |
aiofiles_write, | |
aiofiles_write_flush, | |
anyio_read, | |
anyio_write, | |
anyio_write_flush, | |
]: | |
name = func.__name__ | |
start_run(name) | |
start = time.time() | |
for x in range(ITERATIONS): | |
current_start = time.time() | |
await asyncio.gather(*[func() for _ in range(WORKERS)]) | |
finish_iteration(name, x, current_start) | |
finish_run(name, start) | |
if func.__name__.endswith("write"): | |
rmtree(TMP_DIR_WRITE) | |
TMP_DIR_WRITE.mkdir() | |
# Test stdlib | |
for func in [ | |
stdlib_read, | |
stdlib_write, | |
stdlib_write_flush, | |
stdlib_read_unbuffered, | |
stdlib_write_unbuffered, | |
]: | |
name = func.__name__ | |
start_run(name) | |
start = time.time() | |
for x in range(ITERATIONS): | |
current_start = time.time() | |
for _ in range(WORKERS): | |
func() | |
finish_iteration(name, x, current_start) | |
finish_run(name, start) | |
if func.__name__.endswith("write"): | |
rmtree(TMP_DIR_WRITE) | |
TMP_DIR_WRITE.mkdir() | |
# Test stdlib with asyncio.to_thread | |
for func in [ | |
stdlib_read, | |
stdlib_write, | |
stdlib_write_flush, | |
stdlib_read_unbuffered, | |
stdlib_write_unbuffered, | |
]: | |
name = f"asyncio.to_thread({func.__name__})" | |
start_run(name) | |
start = time.time() | |
for x in range(ITERATIONS): | |
current_start = time.time() | |
await asyncio.gather(*[asyncio.to_thread(func) | |
for _ in range(WORKERS)]) | |
finish_iteration(name, x, current_start) | |
finish_run(name, start) | |
if func.__name__.endswith("write"): | |
rmtree(TMP_DIR_WRITE) | |
TMP_DIR_WRITE.mkdir() | |
# Sort results by time | |
if SORT_RESULTS: | |
RESULTS.sort(key=lambda x: x[1]) | |
print(f"Finished running with {loop_name}.") | |
print(tabulate(RESULTS, headers=["Function", "Time (s)"], | |
tablefmt="github")) | |
select_event_loop_policy() | |
if not USE_UVLOOP: | |
asyncio.run(run_benchmark()) | |
else: | |
import uvloop | |
uvloop.run(run_benchmark()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
_UnixDefaultEventLoopPolicy
uvloop