Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Last active June 9, 2025 14:13
Show Gist options
  • Save mooreniemi/17de9e6c605a4f3873709c700e21bda3 to your computer and use it in GitHub Desktop.
Save mooreniemi/17de9e6c605a4f3873709c700e21bda3 to your computer and use it in GitHub Desktop.
blocking ds
[package]
name = "llp"
version = "0.1.0"
edition = "2021"
[dependencies]
rustc-hash = "2"
memmap2 = "0.9"
zstd = { version = "0.13", features = ["zstdmt", "experimental"] }
serde_json = "1.0"
clap = { version = "4.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
crossbeam-channel = "0.5"
lz4_flex = "0.11"
rand = "0.8"
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = "0.7"
papi-bindings = "0.5"
2025-06-09T03:21:26.808889Z INFO doc_server: Starting doc server with configured arena sizes decomp_arena_bytes=2048 string_arena_bytes=4096
2025-06-09T03:21:26.808930Z INFO doc_server: Writing sample docs path="docs.bin" document_count=100000
2025-06-09T03:21:30.519118Z INFO doc_server: Sample documents written successfully document_count=100000
2025-06-09T03:21:30.519466Z INFO doc_server: Sample documents written document_count=100000 doc_write_time_us=3710516
2025-06-09T03:21:30.519479Z INFO doc_server: Loading document store path="docs.bin"
2025-06-09T03:21:30.519522Z INFO doc_server: Document store loaded document_count=100000
2025-06-09T03:21:30.607115Z INFO doc_server: Document index built index_entries=100000
2025-06-09T03:21:30.607369Z INFO doc_server: 📊 Metrics reporter started - will report every 5 seconds
2025-06-09T03:21:30.607425Z INFO doc_server: Server running at http://127.0.0.1:4000
2025-06-09T03:21:30.607444Z INFO doc_server: Starting worker pool worker_count=7
2025-06-09T03:21:30.607481Z INFO doc_server: Worker started with thread-local arena worker_id=1
2025-06-09T03:21:30.607496Z INFO doc_server: Worker started with thread-local arena worker_id=0
2025-06-09T03:21:30.607521Z INFO doc_server: Worker started with thread-local arena worker_id=2
2025-06-09T03:21:30.607543Z INFO doc_server: Worker started with thread-local arena worker_id=5
2025-06-09T03:21:30.607558Z INFO doc_server: Worker started with thread-local arena worker_id=3
2025-06-09T03:21:30.607573Z INFO doc_server: Worker started with thread-local arena worker_id=6
2025-06-09T03:21:30.607573Z INFO doc_server: Worker started with thread-local arena worker_id=4
2025-06-09T03:21:47.165708Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.165768Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167465Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167486Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167494Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167502Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167502Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167514Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167581Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167593Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167595Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167604Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:47.167608Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048
2025-06-09T03:21:47.167620Z INFO doc_server: Creating arena arena_name=string size_bytes=4096
2025-06-09T03:21:50.626680Z INFO doc_server: 📊 Performance metrics with percentiles rps="41712.5" dps="41713.3" bottleneck="🔥 FETCH" avg_decompression_us=4 avg_response_building_us=0 document_fetch_p50_us=6 document_fetch_p90_us=15 document_fetch_p99_us=31 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=9 decompression_p99_us=20 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=1 response_building_p99_us=10 response_building_p100_us=45 spillovers_per_sec="0.0" spillover_bytes_per_sec="0.0" total_requests=208768 total_documents=208772 total_spillovers=0 total_spillover_bytes=0 decompression_samples=3386 response_samples=3380 fetch_samples=3386
2025-06-09T03:21:55.637187Z INFO doc_server: 📊 Performance metrics with percentiles rps="18568.1" dps="18568.1" bottleneck="🔥 FETCH" avg_decompression_us=4 avg_response_building_us=0 document_fetch_p50_us=8 document_fetch_p90_us=16 document_fetch_p99_us=30 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=11 decompression_p99_us=21 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=4 response_building_p99_us=11 response_building_p100_us=45 spillovers_per_sec="0.0" spillover_bytes_per_sec="0.0" total_requests=301726 total_documents=301730 total_spillovers=0 total_spillover_bytes=0 decompression_samples=4219 response_samples=4214 fetch_samples=4219
2025-06-09T03:21:55.819953Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 13. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.819987Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 14. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.820019Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 15. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.820034Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 16. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.820054Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 17. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.820074Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 18. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.820085Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 19. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836905Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 13. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836932Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 14. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836943Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 15. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836953Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 16. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836961Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 17. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836968Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 18. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836978Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 19. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836986Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 20. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.836994Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 21. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837001Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 22. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837012Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 23. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837020Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 24. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837028Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 25 bytes=17
2025-06-09T03:21:55.837039Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 25. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837047Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 26 bytes=17
2025-06-09T03:21:55.837050Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 26. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837058Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 27 bytes=17
2025-06-09T03:21:55.837061Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 27. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837068Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 28 bytes=17
2025-06-09T03:21:55.837071Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 28. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837096Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 29 bytes=17
2025-06-09T03:21:55.837100Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 29. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837119Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 30 bytes=17
2025-06-09T03:21:55.837129Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 30. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837183Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 31 bytes=17
2025-06-09T03:21:55.837195Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 31. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837232Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 32 bytes=17
2025-06-09T03:21:55.837237Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 32. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837256Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 33 bytes=17
2025-06-09T03:21:55.837260Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 33. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837268Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 34 bytes=17
2025-06-09T03:21:55.837271Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 34. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837279Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 35 bytes=17
2025-06-09T03:21:55.837286Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 35. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837309Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 36 bytes=17
2025-06-09T03:21:55.837318Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 36. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837333Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 37 bytes=17
2025-06-09T03:21:55.837337Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 37. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837345Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 38 bytes=17
2025-06-09T03:21:55.837349Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 38. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837367Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 39 bytes=17
2025-06-09T03:21:55.837382Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 39. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837392Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 40 bytes=17
2025-06-09T03:21:55.837395Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 40. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837410Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 41 bytes=17
2025-06-09T03:21:55.837418Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 41. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837431Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 42 bytes=17
2025-06-09T03:21:55.837440Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 42. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837454Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 43 bytes=17
2025-06-09T03:21:55.837458Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 43. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837470Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 44 bytes=17
2025-06-09T03:21:55.837478Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 44. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837487Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 45 bytes=17
2025-06-09T03:21:55.837494Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 45. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837516Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 46 bytes=17
2025-06-09T03:21:55.837520Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 46. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837533Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 47 bytes=17
2025-06-09T03:21:55.837536Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 47. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837544Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 48 bytes=17
2025-06-09T03:21:55.837547Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 48. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:21:55.837554Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 49 bytes=17
2025-06-09T03:21:55.837567Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 49. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283
2025-06-09T03:22:00.649755Z INFO doc_server: 📊 Performance metrics with percentiles rps="1.0" dps="17.2" bottleneck="🔥 FETCH" avg_decompression_us=3 avg_response_building_us=8 document_fetch_p50_us=8 document_fetch_p90_us=16 document_fetch_p99_us=31 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=11 decompression_p99_us=21 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=4 response_building_p99_us=11 response_building_p100_us=45 spillovers_per_sec="13.8" spillover_bytes_per_sec="2569.8" total_requests=301731 total_documents=301816 total_spillovers=69 total_spillover_bytes=12877 decompression_samples=4305 response_samples=4219 fetch_samples=4305
// doc_server.rs
//
// This is an example of using blocking threads (1-cpus) to handle doc store requests.
// LZ4 compression is used to compress and decompress documents.
//
// This file showcases a few things: 1/ using arenas w/spillover to make memory usage more predictable,
// 2/ using a global sampling metrics reporter to track performance metrics at intervals (to avoid
// costly logging in the hot path) and 3/ non-blocking logging (but with no rotation)
//
// Using the `test_client` tool this server bencnmarked at 50k TPS on 7 cxns for random reads.
use std::io::{BufRead, BufReader};
use std::net::TcpListener;
use std::{
borrow::Cow,
cell::RefCell,
collections::HashMap,
fs::{File, OpenOptions},
io::{Seek, SeekFrom, Write},
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
},
thread,
time::Instant,
};
use clap::Parser;
use memmap2::Mmap;
use tracing::{debug, error, info, warn};
#[derive(Parser, Debug)]
#[command(name = "doc_server")]
#[command(about = "A document server with configurable arena sizes")]
struct Args {
/// Size of the decompression arena in bytes
#[arg(long, default_value = "8192")]
decomp_arena_size: usize,
/// Size of the string arena in bytes
#[arg(long, default_value = "16384")]
string_arena_size: usize,
/// Number of documents to generate
#[arg(long, default_value = "10000")]
document_count: u32,
}
// Latency sample for percentile calculation
#[derive(Debug, Clone, Copy)]
enum LatencySample {
Decompression(u64),
ResponseBuilding(u64),
DocumentFetch(u64),
}
// Global channel for sending latency samples (lock-free!)
static LATENCY_SAMPLES: OnceLock<crossbeam_channel::Sender<LatencySample>> = OnceLock::new();
// Global metrics - lock-free atomic counters only
struct Metrics {
requests_total: AtomicU64,
documents_fetched: AtomicU64,
decompression_time_us: AtomicU64,
response_building_time_us: AtomicU64,
arena_spillovers: AtomicU64,
arena_spillover_bytes: AtomicU64,
}
static METRICS: Metrics = Metrics {
requests_total: AtomicU64::new(0),
documents_fetched: AtomicU64::new(0),
decompression_time_us: AtomicU64::new(0),
response_building_time_us: AtomicU64::new(0),
arena_spillovers: AtomicU64::new(0),
arena_spillover_bytes: AtomicU64::new(0),
};
impl Metrics {
fn record_request(&self) {
self.requests_total.fetch_add(1, Ordering::Relaxed);
}
fn record_document(&self, decompression_us: u64) {
self.documents_fetched.fetch_add(1, Ordering::Relaxed);
self.decompression_time_us
.fetch_add(decompression_us, Ordering::Relaxed);
// Send sample to metrics thread (lock-free, non-blocking)
if let Some(sender) = LATENCY_SAMPLES.get() {
// Use try_send to avoid blocking if the channel is full
let _ = sender.try_send(LatencySample::Decompression(decompression_us));
}
}
fn record_response_building(&self, time_us: u64) {
self.response_building_time_us
.fetch_add(time_us, Ordering::Relaxed);
// Send sample to metrics thread (lock-free, non-blocking)
if let Some(sender) = LATENCY_SAMPLES.get() {
let _ = sender.try_send(LatencySample::ResponseBuilding(time_us));
}
}
fn record_spillover_bytes(&self, bytes: u64) {
self.arena_spillovers.fetch_add(1, Ordering::Relaxed);
self.arena_spillover_bytes
.fetch_add(bytes, Ordering::Relaxed);
}
fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
requests_total: self.requests_total.load(Ordering::Relaxed),
documents_fetched: self.documents_fetched.load(Ordering::Relaxed),
decompression_time_us: self.decompression_time_us.load(Ordering::Relaxed),
response_building_time_us: self.response_building_time_us.load(Ordering::Relaxed),
arena_spillovers: self.arena_spillovers.load(Ordering::Relaxed),
arena_spillover_bytes: self.arena_spillover_bytes.load(Ordering::Relaxed),
}
}
}
// Helper function to calculate percentiles from a sorted slice
fn calculate_percentiles_from_slice(sorted_data: &[u64]) -> (u64, u64, u64, u64) {
if sorted_data.is_empty() {
return (0, 0, 0, 0);
}
let len = sorted_data.len();
let p50_idx = ((len as f64 * 0.50) as usize).min(len - 1);
let p90_idx = ((len as f64 * 0.90) as usize).min(len - 1);
let p99_idx = ((len as f64 * 0.99) as usize).min(len - 1);
let p100_idx = len - 1;
(
sorted_data[p50_idx],
sorted_data[p90_idx],
sorted_data[p99_idx],
sorted_data[p100_idx],
)
}
#[derive(Debug, Clone)]
struct MetricsSnapshot {
requests_total: u64,
documents_fetched: u64,
decompression_time_us: u64,
response_building_time_us: u64,
arena_spillovers: u64,
arena_spillover_bytes: u64,
}
struct MetricsReporter {
receiver: crossbeam_channel::Receiver<LatencySample>,
last_snapshot: MetricsSnapshot,
last_time: Instant,
decompression_samples: Vec<u64>,
response_building_samples: Vec<u64>,
document_fetch_samples: Vec<u64>,
}
impl MetricsReporter {
fn new() -> (Self, crossbeam_channel::Sender<LatencySample>) {
let (sender, receiver) = crossbeam_channel::bounded::<LatencySample>(10_000);
let reporter = Self {
receiver,
last_snapshot: METRICS.snapshot(),
last_time: Instant::now(),
decompression_samples: Vec::with_capacity(5000),
response_building_samples: Vec::with_capacity(5000),
document_fetch_samples: Vec::with_capacity(5000),
};
(reporter, sender)
}
fn collect_samples(&mut self) {
while let Ok(sample) = self.receiver.try_recv() {
match sample {
LatencySample::Decompression(us) => {
self.decompression_samples.push(us);
if self.decompression_samples.len() > 5000 {
self.decompression_samples.drain(0..2500);
}
}
LatencySample::ResponseBuilding(us) => {
self.response_building_samples.push(us);
if self.response_building_samples.len() > 5000 {
self.response_building_samples.drain(0..2500);
}
}
LatencySample::DocumentFetch(us) => {
self.document_fetch_samples.push(us);
if self.document_fetch_samples.len() > 5000 {
self.document_fetch_samples.drain(0..2500);
}
}
}
}
}
fn calculate_percentiles(
&mut self,
) -> (
(u64, u64, u64, u64),
(u64, u64, u64, u64),
(u64, u64, u64, u64),
) {
let decomp_percentiles = if self.decompression_samples.is_empty() {
(0, 0, 0, 0)
} else {
self.decompression_samples.sort_unstable();
calculate_percentiles_from_slice(&self.decompression_samples)
};
let response_percentiles = if self.response_building_samples.is_empty() {
(0, 0, 0, 0)
} else {
self.response_building_samples.sort_unstable();
calculate_percentiles_from_slice(&self.response_building_samples)
};
let fetch_percentiles = if self.document_fetch_samples.is_empty() {
(0, 0, 0, 0)
} else {
self.document_fetch_samples.sort_unstable();
calculate_percentiles_from_slice(&self.document_fetch_samples)
};
(decomp_percentiles, response_percentiles, fetch_percentiles)
}
fn identify_bottleneck(&self, decomp_p99: u64, response_p99: u64, fetch_p99: u64) -> String {
let metrics = [
("FETCH", fetch_p99),
("DECOMP", decomp_p99),
("RESPONSE", response_p99),
];
metrics.iter().max_by_key(|(_, p99)| *p99).map_or_else(
|| "❓ UNKNOWN".to_string(),
|(name, p99)| {
if *p99 > 0 {
format!("🔥 {name}")
} else {
"✅ ALL_FAST".to_string()
}
},
)
}
fn report_metrics(&mut self) {
self.collect_samples();
let current_snapshot = METRICS.snapshot();
let current_time = Instant::now();
let duration = current_time.duration_since(self.last_time);
let requests_delta = current_snapshot.requests_total - self.last_snapshot.requests_total;
let docs_delta = current_snapshot.documents_fetched - self.last_snapshot.documents_fetched;
let decomp_delta =
current_snapshot.decompression_time_us - self.last_snapshot.decompression_time_us;
let response_delta = current_snapshot.response_building_time_us
- self.last_snapshot.response_building_time_us;
let spillover_delta =
current_snapshot.arena_spillovers - self.last_snapshot.arena_spillovers;
let spillover_bytes_delta =
current_snapshot.arena_spillover_bytes - self.last_snapshot.arena_spillover_bytes;
if requests_delta > 0 {
let rps = requests_delta as f64 / duration.as_secs_f64();
let dps = docs_delta as f64 / duration.as_secs_f64();
let avg_decomp = if docs_delta > 0 {
decomp_delta / docs_delta
} else {
0
};
let avg_response = if requests_delta > 0 {
response_delta / requests_delta
} else {
0
};
let (decomp_percentiles, response_percentiles, fetch_percentiles) =
self.calculate_percentiles();
let bottleneck = self.identify_bottleneck(
decomp_percentiles.2,
response_percentiles.2,
fetch_percentiles.2,
);
info!(
rps = format!("{:.1}", rps),
dps = format!("{:.1}", dps),
bottleneck = bottleneck,
avg_decompression_us = avg_decomp,
avg_response_building_us = avg_response,
document_fetch_p50_us = fetch_percentiles.0,
document_fetch_p90_us = fetch_percentiles.1,
document_fetch_p99_us = fetch_percentiles.2,
document_fetch_p100_us = fetch_percentiles.3,
decompression_p50_us = decomp_percentiles.0,
decompression_p90_us = decomp_percentiles.1,
decompression_p99_us = decomp_percentiles.2,
decompression_p100_us = decomp_percentiles.3,
response_building_p50_us = response_percentiles.0,
response_building_p90_us = response_percentiles.1,
response_building_p99_us = response_percentiles.2,
response_building_p100_us = response_percentiles.3,
spillovers_per_sec =
format!("{:.1}", spillover_delta as f64 / duration.as_secs_f64()),
spillover_bytes_per_sec = format!(
"{:.1}",
spillover_bytes_delta as f64 / duration.as_secs_f64()
),
total_requests = current_snapshot.requests_total,
total_documents = current_snapshot.documents_fetched,
total_spillovers = current_snapshot.arena_spillovers,
total_spillover_bytes = current_snapshot.arena_spillover_bytes,
decompression_samples = self.decompression_samples.len(),
response_samples = self.response_building_samples.len(),
fetch_samples = self.document_fetch_samples.len(),
"📊 Performance metrics with percentiles"
);
}
self.last_snapshot = current_snapshot;
self.last_time = current_time;
}
fn start(mut self) {
info!("📊 Metrics reporter started - will report every 5 seconds");
loop {
thread::sleep(std::time::Duration::from_secs(5));
self.report_metrics();
}
}
}
fn start_metrics_reporter() {
let (reporter, sender) = MetricsReporter::new();
// Initialize global sender
LATENCY_SAMPLES
.set(sender)
.expect("Failed to set latency samples channel");
thread::spawn(move || {
reporter.start();
});
}
#[derive(Debug, Clone)]
struct ArenaConfig {
decomp_size: usize,
string_size: usize,
}
static ARENA_CONFIG: OnceLock<ArenaConfig> = OnceLock::new();
fn get_arena_config() -> &'static ArenaConfig {
ARENA_CONFIG.get().expect("Arena config not initialized")
}
struct BumpArena {
name: String,
buf: RefCell<Vec<u8>>,
offset: RefCell<usize>,
spilled: RefCell<usize>,
spilled_bytes: RefCell<usize>,
}
impl BumpArena {
fn new(name: &str, size: usize) -> Self {
info!(arena_name = %name, size_bytes = size, "Creating arena");
Self {
name: name.to_string(),
buf: RefCell::new(vec![0; size]),
offset: RefCell::new(0),
spilled: RefCell::new(0),
spilled_bytes: RefCell::new(0),
}
}
fn alloc_bytes(&self, len: usize) -> Option<&mut [u8]> {
let mut offset = self.offset.borrow_mut();
let buf_ptr = self.buf.borrow_mut().as_mut_ptr();
let buf_len = self.buf.borrow().len();
if *offset + len > buf_len {
warn!(
arena_name = %self.name,
requested_bytes = len,
used_bytes = *offset,
capacity_bytes = buf_len,
"Arena allocation failed - insufficient space"
);
return None;
}
let start = *offset;
*offset += len;
debug!(
arena_name = %self.name,
allocated_bytes = len,
offset = start,
total_used = *offset,
capacity = buf_len,
"Arena allocation successful"
);
unsafe { Some(std::slice::from_raw_parts_mut(buf_ptr.add(start), len)) }
}
fn stats(&self) {
debug!(
arena_name = %self.name,
used_bytes = *self.offset.borrow(),
capacity_bytes = self.buf.borrow().len(),
spilled_allocs = *self.spilled.borrow(),
spilled_bytes = *self.spilled_bytes.borrow(),
"Arena statistics"
);
}
fn used(&self) -> usize {
*self.offset.borrow()
}
fn capacity(&self) -> usize {
self.buf.borrow().len()
}
fn reset(&self) {
let used_before = *self.offset.borrow();
let total_spilled = *self.spilled.borrow();
let total_spilled_bytes = *self.spilled_bytes.borrow();
*self.offset.borrow_mut() = 0;
debug!(
arena_name = %self.name,
freed_bytes = used_before,
lifetime_spilled_allocs = total_spilled,
lifetime_spilled_bytes = total_spilled_bytes,
"Arena reset"
);
}
fn alloc_str(&self, s: &str) -> Cow<str> {
let mut offset = self.offset.borrow_mut();
let mut buf = self.buf.borrow_mut();
let bytes = s.as_bytes();
let len = bytes.len();
if *offset + len > buf.len() {
*self.spilled.borrow_mut() += 1;
*self.spilled_bytes.borrow_mut() += len;
// Record spillover in global metrics
METRICS.record_spillover_bytes(len as u64);
warn!(
arena_name = %self.name,
string_content = %s,
bytes = len,
"Arena full, spilling string to heap"
);
return Cow::Owned(s.to_string());
}
buf[*offset..*offset + len].copy_from_slice(bytes);
let start = *offset;
*offset += len;
let result = unsafe {
std::str::from_utf8_unchecked(std::slice::from_raw_parts(buf.as_ptr().add(start), len))
};
debug!(
arena_name = %self.name,
string_content = %s,
bytes = len,
total_used = *offset,
capacity = buf.len(),
"String allocated in arena"
);
Cow::Borrowed(result)
}
}
struct DocReader {
/// where we decompress documents into, used doc by doc
decomp_arena: BumpArena,
/// where we keep the docs requested until request end
string_arena: BumpArena,
}
impl DocReader {
fn new(decomp_size: usize, string_size: usize) -> Self {
Self {
decomp_arena: BumpArena::new("decomp", decomp_size),
string_arena: BumpArena::new("string", string_size),
}
}
fn stats(&self) {
debug!(
decomp_arena_used = self.decomp_arena.used(),
decomp_arena_capacity = self.decomp_arena.capacity(),
"DocReader decomp arena stats"
);
self.string_arena.stats();
}
}
/// `Doc` could be in the arena or on the heap, thus `Cow`
#[derive(Debug)]
struct Doc<'a> {
title: Cow<'a, str>,
body: Cow<'a, str>,
}
/// `DocStore` is a simple in-memory doc store that uses a memory-mapped file to store the docs.
/// It's used to load the docs into memory and then use the `DocReader` to fetch the docs.
struct DocStore {
/// compressed docs
mmap: Arc<Mmap>,
/// doc id -> offset in mmap
index: HashMap<String, u64>,
}
impl DocStore {
fn write_sample_docs(path: &Path, document_count: u32) -> std::io::Result<()> {
info!(path = ?path, document_count = document_count, "Writing sample docs");
let mut file = OpenOptions::new().create(true).write(true).open(path)?;
let count = document_count;
file.write_all(&count.to_le_bytes())?;
let offset_pos = file.seek(SeekFrom::Current(0))?;
file.write_all(&vec![0u8; count as usize * 8])?;
let mut offsets = Vec::new();
for i in 0..count {
let offset = file.seek(SeekFrom::Current(0))?;
offsets.push(offset);
let title = format!("Document Title {i}");
let body = format!("This is the body content of document number {i}. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
let mut buffer = Vec::new();
buffer.write_all(&(title.len() as u32).to_le_bytes())?;
buffer.write_all(title.as_bytes())?;
buffer.write_all(&(body.len() as u32).to_le_bytes())?;
buffer.write_all(body.as_bytes())?;
let decomp_len = buffer.len() as u32;
let compressed = lz4_flex::compress_prepend_size(&buffer);
let comp_len = compressed.len() as u32;
file.write_all(&decomp_len.to_le_bytes())?;
file.write_all(&comp_len.to_le_bytes())?;
file.write_all(&compressed)?;
}
file.seek(SeekFrom::Start(offset_pos))?;
for offset in offsets {
file.write_all(&offset.to_le_bytes())?;
}
info!(
document_count = count,
"Sample documents written successfully"
);
Ok(())
}
fn new(path: &Path) -> std::io::Result<Self> {
info!(path = ?path, "Loading document store");
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
let doc_count = u32::from_le_bytes(mmap[0..4].try_into().unwrap()) as usize;
info!(document_count = doc_count, "Document store loaded");
let mut index = HashMap::new();
for i in 0..doc_count {
let off = 4 + i * 8;
let offset = u64::from_le_bytes(mmap[off..off + 8].try_into().unwrap());
index.insert(format!("doc_{i}"), offset);
}
info!(index_entries = index.len(), "Document index built");
Ok(Self {
mmap: Arc::new(mmap),
index,
})
}
#[allow(clippy::too_many_lines)]
fn get_doc<'a>(&'a self, key: &str, reader: &'a DocReader) -> Option<Doc<'a>> {
debug!(key = %key, "Fetching document");
let start_time = Instant::now();
let offset = *self.index.get(key)? as usize;
let mmap = &self.mmap;
let decomp_len = u32::from_le_bytes(mmap[offset..offset + 4].try_into().ok()?) as usize;
let comp_len = u32::from_le_bytes(mmap[offset + 4..offset + 8].try_into().ok()?) as usize;
let comp_bytes = &mmap[offset + 8..offset + 8 + comp_len];
debug!(
key = %key,
compressed_bytes = comp_len,
decompressed_bytes = decomp_len,
"Document compression info"
);
// Reset decomp arena for reuse
reader.decomp_arena.reset();
let Some(out_buf) = reader.decomp_arena.alloc_bytes(decomp_len) else {
warn!(
key = %key,
arena_name = %reader.decomp_arena.name,
"Decompression arena too small, spilling to heap"
);
METRICS.record_spillover_bytes(decomp_len as u64);
let heap_buf = lz4_flex::decompress_size_prepended(comp_bytes).ok()?;
let result = Self::parse_doc_to_string_arena(&heap_buf, reader)?;
debug!(
key = %key,
elapsed_us = start_time.elapsed().as_micros(),
method = "heap",
"Document fetched"
);
return Some(result);
};
let decomp_start = Instant::now();
// LZ4 data was compressed with compress_prepend_size, so we need to handle the size prefix
// Skip the first 4 bytes which contain the uncompressed size
if comp_bytes.len() < 4 {
error!(
key = %key,
compressed_size = comp_bytes.len(),
"Compressed data too small to contain size prefix"
);
return None;
}
let actual_compressed_data = &comp_bytes[4..];
let n = match lz4_flex::decompress_into(actual_compressed_data, out_buf) {
Ok(bytes) => bytes,
Err(e) => {
error!(
key = %key,
error = ?e,
compressed_size = actual_compressed_data.len(),
expected_decompressed_size = decomp_len,
"LZ4 decompression failed"
);
return None;
}
};
let decomp_time = decomp_start.elapsed().as_micros() as u64;
if n != decomp_len {
error!(
key = %key,
expected_bytes = decomp_len,
actual_bytes = n,
"Decompression length mismatch"
);
return None;
}
METRICS.record_document(decomp_time);
debug!(key = %key, decompressed_bytes = n, "Document decompressed to arena");
// Parse from decompressed buffer and copy strings to string arena
let mut inner_cursor = 0;
// Debug: Print first few bytes of decompressed data
debug!(
key = %key,
decompressed_size = n,
first_16_bytes = ?&out_buf[0..16.min(n)],
"Parsing decompressed document data"
);
if inner_cursor + 4 > n {
error!(key = %key, cursor = inner_cursor, total_size = n, "Not enough data for title length");
return None;
}
let title_len = u32::from_le_bytes({
match out_buf[inner_cursor..inner_cursor + 4].try_into() {
Ok(bytes) => bytes,
Err(e) => {
error!(key = %key, error = ?e, "Failed to read title length");
return None;
}
}
}) as usize;
inner_cursor += 4;
debug!(key = %key, title_len = title_len, cursor = inner_cursor, "Read title length");
if inner_cursor + title_len > n {
error!(
key = %key,
cursor = inner_cursor,
title_len = title_len,
total_size = n,
"Not enough data for title content"
);
return None;
}
let title_bytes = &out_buf[inner_cursor..inner_cursor + title_len];
let title_str = match std::str::from_utf8(title_bytes) {
Ok(s) => s,
Err(e) => {
error!(key = %key, error = ?e, "Title is not valid UTF-8");
return None;
}
};
let title = reader.string_arena.alloc_str(title_str);
inner_cursor += title_len;
debug!(key = %key, title = %title_str, cursor = inner_cursor, "Read title");
if inner_cursor + 4 > n {
error!(key = %key, cursor = inner_cursor, total_size = n, "Not enough data for body length");
return None;
}
let body_len = u32::from_le_bytes({
match out_buf[inner_cursor..inner_cursor + 4].try_into() {
Ok(bytes) => bytes,
Err(e) => {
error!(key = %key, error = ?e, "Failed to read body length");
return None;
}
}
}) as usize;
inner_cursor += 4;
debug!(key = %key, body_len = body_len, cursor = inner_cursor, "Read body length");
if inner_cursor + body_len > n {
error!(
key = %key,
cursor = inner_cursor,
body_len = body_len,
total_size = n,
"Not enough data for body content"
);
return None;
}
let body_bytes = &out_buf[inner_cursor..inner_cursor + body_len];
let body_str = match std::str::from_utf8(body_bytes) {
Ok(s) => s,
Err(e) => {
error!(key = %key, error = ?e, "Body is not valid UTF-8");
return None;
}
};
let body = reader.string_arena.alloc_str(body_str);
debug!(
key = %key,
title = %title_str,
body_preview = %&body_str[..20.min(body_str.len())],
elapsed_us = start_time.elapsed().as_micros(),
method = "arena",
"Document successfully parsed and fetched"
);
// Record total document fetch time for metrics
let total_fetch_time = start_time.elapsed().as_micros() as u64;
if let Some(sender) = LATENCY_SAMPLES.get() {
let _ = sender.try_send(LatencySample::DocumentFetch(total_fetch_time));
}
Some(Doc { title, body })
}
fn parse_doc_to_string_arena<'reader>(
buf: &[u8],
reader: &'reader DocReader,
) -> Option<Doc<'reader>> {
let mut inner_cursor = 0;
let title_len =
u32::from_le_bytes(buf[inner_cursor..inner_cursor + 4].try_into().ok()?) as usize;
inner_cursor += 4;
let title_bytes = &buf[inner_cursor..inner_cursor + title_len];
let title_str = std::str::from_utf8(title_bytes).ok()?;
let title = reader.string_arena.alloc_str(title_str);
inner_cursor += title_len;
let body_len =
u32::from_le_bytes(buf[inner_cursor..inner_cursor + 4].try_into().ok()?) as usize;
inner_cursor += 4;
let body_bytes = &buf[inner_cursor..inner_cursor + body_len];
let body_str = std::str::from_utf8(body_bytes).ok()?;
let body = reader.string_arena.alloc_str(body_str);
Some(Doc { title, body })
}
}
// Thread-local persistent DocReader with dual arenas
thread_local! {
static THREAD_READER: DocReader = {
let config = get_arena_config();
DocReader::new(config.decomp_size, config.string_size)
};
}
fn run_server(store: &Arc<DocStore>) -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:4000")?;
info!("Server running at http://127.0.0.1:4000");
// Create a worker pool with crossbeam-channel (much faster than std::mpsc)
let (work_tx, work_rx) = crossbeam_channel::unbounded();
let num_workers = std::thread::available_parallelism()
.map(|n| (n.get() - 1).max(1)) // Use available CPUs - 1, but at least 1 worker
.unwrap_or(4); // Fall back to 4 if detection fails
info!(worker_count = num_workers, "Starting worker pool");
// Spawn persistent worker threads - crossbeam-channel handles multi-consumer efficiently
for worker_id in 0..num_workers {
let work_rx = work_rx.clone();
let store = Arc::clone(store);
thread::spawn(move || {
info!(worker_id, "Worker started with thread-local arena");
// No mutex needed! crossbeam-channel is lock-free for multi-consumer
while let Ok(stream) = work_rx.recv() {
handle_connection(stream, &store, worker_id);
}
info!(worker_id, "Worker shutting down");
});
}
// Distribute connections to workers
for (conn_id, stream) in listener.incoming().enumerate() {
let stream = stream?;
debug!(connection_id = conn_id, "New connection accepted");
if work_tx.send(stream).is_err() {
error!("Failed to send connection to workers");
break;
}
}
Ok(())
}
#[allow(clippy::needless_pass_by_value)]
fn handle_connection(stream: std::net::TcpStream, store: &Arc<DocStore>, worker_id: usize) {
let peer_addr = stream
.peer_addr()
.unwrap_or_else(|_| "unknown".parse().unwrap());
debug!(worker_id, peer_addr = %peer_addr, "Worker handling connection");
let reader = BufReader::new(&stream);
let mut writer = std::io::BufWriter::new(&stream);
for (request_id, line) in reader.lines().enumerate() {
let Ok(line) = line else {
warn!(worker_id, "Connection broken");
break;
};
let keys: Vec<&str> = line.trim().split(',').collect();
debug!(
worker_id,
request_id,
key_count = keys.len(),
keys = ?keys,
"Processing batch request"
);
let request_start = Instant::now();
METRICS.record_request();
THREAD_READER.with(|reader| {
// Pre-allocate response string to avoid multiple allocations
let mut response_parts = Vec::with_capacity(keys.len() * 2 + 10);
let batch_start = Instant::now();
response_parts.push(format!(
r#"{{"worker_id":{},"request_id":{},"batch_size":{},"results":["#,
worker_id,
request_id,
keys.len()
));
let mut first = true;
for key in &keys {
debug!(worker_id, key = %key, "Looking up document");
if !first {
response_parts.push(",".to_string());
}
first = false;
if let Some(doc) = store.get_doc(key, reader) {
// Manual JSON construction to avoid serde_json overhead
response_parts.push(format!(
r#"{{"key":"{}","found":true,"title":"{}","body":"{}"}}"#,
key.replace('"', r#"\""#), // Basic JSON escaping
doc.title.as_ref().replace('"', r#"\""#),
doc.body.as_ref().replace('"', r#"\""#)
));
debug!(worker_id, key = %key, "Document found");
} else {
response_parts.push(format!(
r#"{{"key":"{}","found":false,"error":"Document not found"}}"#,
key.replace('"', r#"\""#)
));
debug!(worker_id, key = %key, "Document not found");
}
}
let processing_time_us = batch_start.elapsed().as_micros();
let response_build_start = Instant::now();
response_parts.push(format!(r#"],"processing_time_us":{processing_time_us}}}"#));
let response = response_parts.concat();
METRICS.record_response_building(response_build_start.elapsed().as_micros() as u64);
debug!(
worker_id,
request_id,
response_bytes = response.len(),
"Sending response"
);
let _ = writeln!(writer, "{response}");
let _ = writer.flush();
let request_time = request_start.elapsed();
debug!(
worker_id,
request_id,
elapsed_us = request_time.as_micros(),
"Request completed"
);
debug!(worker_id, "Arena state after request:");
reader.stats();
// Reset string arena after each request, keep decomp for next doc
reader.string_arena.reset();
});
}
debug!(worker_id, peer_addr = %peer_addr, "Connection finished");
}
fn main() -> std::io::Result<()> {
let args = Args::parse();
// Initialize async tracing for non-blocking logging
std::fs::create_dir_all("./logs").ok(); // Create logs directory
let file_appender = tracing_appender::rolling::hourly("./logs", "doc_server.log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_writer(non_blocking)
.with_ansi(false) // No color codes in log files
.init();
let config = ArenaConfig {
decomp_size: args.decomp_arena_size,
string_size: args.string_arena_size,
};
ARENA_CONFIG
.set(config.clone())
.expect("Failed to set arena config");
info!(
decomp_arena_bytes = config.decomp_size,
string_arena_bytes = config.string_size,
"Starting doc server with configured arena sizes"
);
let path = Path::new("docs.bin");
let doc_write_start = Instant::now();
DocStore::write_sample_docs(path, args.document_count)?;
let doc_write_time = doc_write_start.elapsed();
info!(
document_count = args.document_count,
doc_write_time_us = doc_write_time.as_micros(),
"Sample documents written"
);
let store = Arc::new(DocStore::new(path)?);
start_metrics_reporter();
// Keep the guard alive to prevent async logger from shutting down
let result = run_server(&store);
drop(guard); // Explicitly drop to flush remaining logs
result
}
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use clap::Parser;
use rand::Rng;
#[derive(Parser, Debug)]
#[command(name = "test_client")]
#[command(about = "Document server test client with configurable load testing")]
struct Args {
/// Number of concurrent connections
#[arg(short, long, default_value = "10")]
connections: usize,
/// Target transactions per second (0 = unlimited)
#[arg(short, long, default_value = "0")]
tps: usize,
/// Number of requests per connection
#[arg(short, long, default_value = "10")]
requests: usize,
/// Test duration in seconds (overrides requests if set)
#[arg(short, long)]
duration: Option<u64>,
/// Skip warmup tests
#[arg(long)]
skip_warmup: bool,
/// Maximum document ID to request (document range: 0 to max_doc_id-1)
#[arg(long, default_value = "10000")]
max_doc_id: u32,
}
fn main() -> std::io::Result<()> {
let args = Args::parse();
if !args.skip_warmup {
warmup_tests()?;
}
load_test(&args);
Ok(())
}
fn warmup_tests() -> std::io::Result<()> {
println!("🔥 Warmup: Connecting to server...");
let stream = TcpStream::connect("127.0.0.1:4000")?;
let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = stream.try_clone()?;
println!("Testing persistent connection performance...");
// Test 1: Single requests on persistent connection
for i in 0..5 {
let start = Instant::now();
writeln!(writer, "doc_{i}")?;
writer.flush()?;
let mut response = String::new();
reader.read_line(&mut response)?;
let elapsed = start.elapsed();
println!("Request {i}: {elapsed:?}");
}
println!("\nTesting batch request...");
let start = Instant::now();
writeln!(writer, "doc_0,doc_1,doc_2,doc_3,doc_4")?;
writer.flush()?;
let mut response = String::new();
reader.read_line(&mut response)?;
let elapsed = start.elapsed();
println!("Batch request: {elapsed:?}");
println!("Response size: {} bytes", response.len());
Ok(())
}
fn load_test(args: &Args) {
println!("\n🚀 Load test: {} connections", args.connections);
if args.tps > 0 {
println!("Target TPS: {} ({}μs interval)", args.tps, 1_000_000 / args.tps);
} else {
println!("Target TPS: unlimited");
}
println!("📚 Document range: doc_0 to doc_{} ({} total documents)", args.max_doc_id - 1, args.max_doc_id);
// Calculate per-connection rate if TPS limiting is enabled
let request_interval = if args.tps > 0 {
Some(Duration::from_micros(1_000_000 * args.connections as u64 / args.tps as u64))
} else {
None
};
let test_start = Instant::now();
let test_duration = args.duration.map(Duration::from_secs);
let handles: Vec<_> = (0..args.connections).map(|thread_id| {
let requests = args.requests;
let max_doc_id = args.max_doc_id; // Copy the value to avoid borrowing issues
std::thread::spawn(move || -> std::io::Result<(u128, usize, usize)> {
let stream = TcpStream::connect("127.0.0.1:4000")?;
let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = stream.try_clone()?;
let mut rng = rand::thread_rng(); // Thread-local RNG for better performance
let start = Instant::now();
let mut request_count = 0;
let mut total_response_size = 0;
let mut next_request_time = Instant::now();
loop {
// Check if we should stop (duration-based or request-based)
if let Some(duration) = test_duration {
if start.elapsed() >= duration {
break;
}
} else if request_count >= requests {
break;
}
// Rate limiting: wait until it's time for the next request
if let Some(interval) = request_interval {
let now = Instant::now();
if now < next_request_time {
std::thread::sleep(next_request_time - now);
}
next_request_time = Instant::now() + interval;
}
// Random document ID from 0 to max_doc_id-1 for realistic cache miss patterns
let doc_id = rng.gen_range(0..max_doc_id);
writeln!(writer, "doc_{doc_id}")?;
writer.flush()?;
let mut response = String::new();
reader.read_line(&mut response)?;
total_response_size += response.len();
request_count += 1;
}
let total_elapsed = start.elapsed().as_micros();
Ok((total_elapsed, request_count, total_response_size))
})
}).collect();
let mut total_time = 0u128;
let mut total_requests = 0;
let mut total_response_size = 0;
let mut successful_connections = 0;
for (i, handle) in handles.into_iter().enumerate() {
match handle.join().unwrap() {
Ok((elapsed_micros, request_count, response_size)) => {
total_time += elapsed_micros;
total_requests += request_count;
total_response_size += response_size;
successful_connections += 1;
let rps = if elapsed_micros > 0 {
request_count as u128 * 1_000_000 / elapsed_micros
} else { 0 };
println!("Connection {i}: {request_count} requests in {elapsed_micros}μs ({rps} RPS)");
}
Err(e) => {
println!("Connection {i} failed: {e}");
}
}
}
let total_test_time = test_start.elapsed();
println!("\n📊 Load Test Results:");
println!("- Successful connections: {successful_connections}/{}", args.connections);
println!("- Total requests: {total_requests}");
println!("- Total test time: {:.2}s", total_test_time.as_secs_f64());
#[allow(clippy::cast_precision_loss)]
let response_data_kb = total_response_size as f64 / 1024.0;
println!("- Total response data: {response_data_kb:.2}KB");
if successful_connections > 0 && total_requests > 0 {
#[allow(clippy::cast_sign_loss)]
let avg_latency = total_time / successful_connections as u128 / total_requests as u128;
#[allow(clippy::cast_precision_loss)]
let actual_tps = total_requests as f64 / total_test_time.as_secs_f64();
println!("- Actual TPS: {actual_tps:.1}");
println!("- Average latency: {avg_latency}μs per request");
if args.tps > 0 {
#[allow(clippy::cast_precision_loss)]
let tps_efficiency = (actual_tps / args.tps as f64 * 100.0).min(100.0);
println!("- TPS efficiency: {tps_efficiency:.1}%");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment