Last active
July 6, 2025 18:11
-
-
Save pangyuteng/91323f79aa15bff8603cad6d96aaecb3 to your computer and use it in GitHub Desktop.
buffer flushing logic using python asyncio Queue and Event
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
buffer flushing logic using python asyncio Queue and Event | |
""" | |
import numpy as np | |
import asyncio | |
entire_list = [] | |
async def flusher(queue, max_queue_size, interval, flush_event, pool): | |
while True: | |
done, pending = await asyncio.wait( | |
[flush_event.wait(), asyncio.sleep(interval)], | |
return_when=asyncio.FIRST_COMPLETED | |
) | |
mylist = [] | |
while True: | |
try: | |
item = queue.get_nowait() | |
mylist.append(item) | |
except asyncio.QueueEmpty: | |
break | |
print(flush_event.is_set(),len(mylist),len(entire_list)) | |
if len(mylist) > 0: | |
entire_list.extend(mylist) # for assert | |
# TODO | |
# async with pool.connection() as conn: | |
# async with conn.cursor() as cur: | |
# await cur.executemany(...) | |
# await conn.commit() | |
# clear flush event if it was set | |
if flush_event.is_set(): | |
flush_event.clear() | |
async def add_transaction(queue, transaction, max_queue_size, flush_event): | |
await queue.put(transaction) | |
if queue.qsize() >= max_queue_size: | |
flush_event.set() | |
async def main(): | |
total_event_count = 1500 | |
pool = None # TODO | |
queue = asyncio.Queue() | |
max_queue_size = 50 | |
interval = 0.5 # flush interval in seconds | |
flush_event = asyncio.Event() | |
flusher_task = asyncio.create_task(flusher(queue, max_queue_size, interval, flush_event, pool)) | |
# mockup transactionss | |
for i in range(total_event_count): | |
await add_transaction(queue, f"Transaction {i}", max_queue_size, flush_event) | |
num = np.random.randint(1,4) if np.random.rand() < 0.01 else 0.001 | |
await asyncio.sleep(num) | |
# Wait a bit to ensure all transactions are processed | |
await asyncio.sleep(5) | |
# clean up | |
flusher_task.cancel() | |
try: | |
await flusher_task | |
except asyncio.CancelledError: | |
pass | |
#if pool is not None: | |
# await pool.close() | |
assert(len(entire_list)==total_event_count) | |
print("pass") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment