Skip to content

Instantly share code, notes, and snippets.

@siddhpant
Forked from Archmonger/async_fileio_test.py
Last active May 28, 2025 14:08
Show Gist options
  • Save siddhpant/2825884cdaf32c0eb6cdc90ec873f2c0 to your computer and use it in GitHub Desktop.
Save siddhpant/2825884cdaf32c0eb6cdc90ec873f2c0 to your computer and use it in GitHub Desktop.
import asyncio
import contextlib
import sys
from shutil import rmtree
import time
from pathlib import Path
from uuid import uuid4
from aiofile import AIOFile
from aiofiles import open as aio_open
from anyio import open_file as anyio_open
from tabulate import tabulate
DATA_5MB = b"x" * 1024 * 1024 * 5
WORKERS = 500
ITERATIONS = 10
SORT_RESULTS = True
RESULTS = []
TMP_DIR = Path("async_fileio_test.d")
if TMP_DIR.exists():
rmtree(TMP_DIR)
TMP_DIR_WRITE = TMP_DIR / "write"
TMP_DIR_WRITE.mkdir(parents=True)
READFILE_NAME = "readfile.txt"
READFILE_PATH = str(TMP_DIR / "readfile.txt")
USE_UVLOOP = False
def make_read_file():
with open(READFILE_PATH, "wb") as fp:
fp.write(DATA_5MB)
fp.flush()
def get_unique_name() -> Path:
return TMP_DIR_WRITE / uuid4().hex
async def aiofile_read():
async with AIOFile(READFILE_PATH, "rb") as afp:
return await afp.read()
async def aiofile_write():
async with AIOFile(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
async def aiofile_write_flush():
async with AIOFile(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
await afp.fsync()
async def aiofiles_read():
async with aio_open(READFILE_PATH, "rb") as afp:
return await afp.read()
async def aiofiles_write():
async with aio_open(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
async def aiofiles_write_flush():
async with aio_open(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
await afp.flush()
async def anyio_read():
async with await anyio_open(READFILE_PATH, "rb") as afp:
return await afp.read()
async def anyio_write():
async with await anyio_open(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
async def anyio_write_flush():
async with await anyio_open(get_unique_name(), "wb") as afp:
await afp.write(DATA_5MB)
await afp.flush()
def stdlib_read():
with open(READFILE_PATH, "rb") as fp:
return fp.read()
def stdlib_write():
with open(get_unique_name(), "wb") as fp:
fp.write(DATA_5MB)
def stdlib_write_flush():
with open(get_unique_name(), "wb") as fp:
fp.write(DATA_5MB)
fp.flush()
def stdlib_read_unbuffered():
with open(READFILE_PATH, "rb", buffering=0) as fp:
return fp.read()
def stdlib_write_unbuffered():
with open(get_unique_name(), "wb", buffering=0) as fp:
fp.write(DATA_5MB)
# No need to flush in unbuffered mode.
def start_run(name):
if ITERATIONS > 1:
print(
f"Benchmarking '{name}' with {WORKERS} workers and {ITERATIONS} iterations."
)
def finish_run(name, start_time):
average_time = (time.time() - start_time) / ITERATIONS
print(f"Finished {name} with average {average_time:.2f} seconds.")
RESULTS.append((name, average_time))
def finish_iteration(name, run_num, start_time):
if ITERATIONS > 1:
print(f"Finished run #{run_num + 1} in {time.time() - start_time:.2f} seconds.")
def select_event_loop_policy():
"""Ask the user what event loop policy to use."""
if sys.platform == "win32":
policies = [
asyncio.WindowsProactorEventLoopPolicy,
asyncio.WindowsSelectorEventLoopPolicy,
]
else:
policies = [asyncio.DefaultEventLoopPolicy]
with contextlib.suppress(ImportError):
import uvloop
policies.append(uvloop)
print("Select an event loop policy:")
for i, policy in enumerate(policies):
print(f"{i}: {policy.__name__}", end="")
if i == 0:
print(" (default)")
else:
print()
choice = int(input("Enter a number: "))
if policies[choice].__name__ == "uvloop":
global USE_UVLOOP
USE_UVLOOP = True
else:
asyncio.set_event_loop_policy(policies[choice]())
if __name__ == "__main__":
async def run_benchmark():
loop_name = "uvloop"
if not USE_UVLOOP:
loop_name = asyncio.get_event_loop_policy().__class__.__name__
print(f"Starting tests with {loop_name}.")
make_read_file()
# Test async file io frameworks
for func in [
aiofile_read,
aiofile_write,
aiofile_write_flush,
aiofiles_read,
aiofiles_write,
aiofiles_write_flush,
anyio_read,
anyio_write,
anyio_write_flush,
]:
name = func.__name__
start_run(name)
start = time.time()
for x in range(ITERATIONS):
current_start = time.time()
await asyncio.gather(*[func() for _ in range(WORKERS)])
finish_iteration(name, x, current_start)
finish_run(name, start)
if func.__name__.endswith("write"):
rmtree(TMP_DIR_WRITE)
TMP_DIR_WRITE.mkdir()
# Test stdlib
for func in [
stdlib_read,
stdlib_write,
stdlib_write_flush,
stdlib_read_unbuffered,
stdlib_write_unbuffered,
]:
name = func.__name__
start_run(name)
start = time.time()
for x in range(ITERATIONS):
current_start = time.time()
for _ in range(WORKERS):
func()
finish_iteration(name, x, current_start)
finish_run(name, start)
if func.__name__.endswith("write"):
rmtree(TMP_DIR_WRITE)
TMP_DIR_WRITE.mkdir()
# Test stdlib with asyncio.to_thread
for func in [
stdlib_read,
stdlib_write,
stdlib_write_flush,
stdlib_read_unbuffered,
stdlib_write_unbuffered,
]:
name = f"asyncio.to_thread({func.__name__})"
start_run(name)
start = time.time()
for x in range(ITERATIONS):
current_start = time.time()
await asyncio.gather(*[asyncio.to_thread(func)
for _ in range(WORKERS)])
finish_iteration(name, x, current_start)
finish_run(name, start)
if func.__name__.endswith("write"):
rmtree(TMP_DIR_WRITE)
TMP_DIR_WRITE.mkdir()
# Sort results by time
if SORT_RESULTS:
RESULTS.sort(key=lambda x: x[1])
print(f"Finished running with {loop_name}.")
print(tabulate(RESULTS, headers=["Function", "Time (s)"],
tablefmt="github"))
select_event_loop_policy()
if not USE_UVLOOP:
asyncio.run(run_benchmark())
else:
import uvloop
uvloop.run(run_benchmark())
@siddhpant
Copy link
Author

_UnixDefaultEventLoopPolicy

Function Time (s)
stdlib_read_unbuffered 0.200348
stdlib_read 0.242624
aiofiles_read 0.415127
asyncio.to_thread(stdlib_read) 0.437746
asyncio.to_thread(stdlib_read_unbuffered) 0.490132
anyio_read 0.548908
aiofile_read 0.87412
asyncio.to_thread(stdlib_write_unbuffered) 0.875594
anyio_write_flush 0.876996
aiofiles_write_flush 0.890379
asyncio.to_thread(stdlib_write) 0.892612
asyncio.to_thread(stdlib_write_flush) 0.897756
anyio_write 0.910071
aiofiles_write 1.01746
stdlib_write 1.03949
stdlib_write_flush 1.0398
stdlib_write_unbuffered 1.03991
aiofile_write 1.19374
aiofile_write_flush 1.33136

uvloop

Function Time (s)
stdlib_read_unbuffered 0.239012
stdlib_read 0.279507
asyncio.to_thread(stdlib_read) 0.381207
aiofiles_read 0.389197
asyncio.to_thread(stdlib_read_unbuffered) 0.442786
anyio_read 0.547405
asyncio.to_thread(stdlib_write) 0.811238
asyncio.to_thread(stdlib_write_unbuffered) 0.811762
asyncio.to_thread(stdlib_write_flush) 0.814089
aiofiles_write_flush 0.857403
anyio_write_flush 0.878293
anyio_write 0.909213
aiofiles_write 1.0003
stdlib_write_unbuffered 1.03255
stdlib_write_flush 1.04873
stdlib_write 1.07907
aiofile_read 1.16294
aiofile_write 1.23341
aiofile_write_flush 1.29143

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment