Last active
June 9, 2025 14:13
-
-
Save mooreniemi/17de9e6c605a4f3873709c700e21bda3 to your computer and use it in GitHub Desktop.
blocking ds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "llp" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
rustc-hash = "2" | |
memmap2 = "0.9" | |
zstd = { version = "0.13", features = ["zstdmt", "experimental"] } | |
serde_json = "1.0" | |
clap = { version = "4.0", features = ["derive"] } | |
tracing = "0.1" | |
tracing-subscriber = { version = "0.3", features = ["env-filter"] } | |
tracing-appender = "0.2" | |
crossbeam-channel = "0.5" | |
lz4_flex = "0.11" | |
rand = "0.8" | |
[target.'cfg(target_os = "linux")'.dependencies] | |
io-uring = "0.7" | |
papi-bindings = "0.5" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2025-06-09T03:21:26.808889Z INFO doc_server: Starting doc server with configured arena sizes decomp_arena_bytes=2048 string_arena_bytes=4096 | |
2025-06-09T03:21:26.808930Z INFO doc_server: Writing sample docs path="docs.bin" document_count=100000 | |
2025-06-09T03:21:30.519118Z INFO doc_server: Sample documents written successfully document_count=100000 | |
2025-06-09T03:21:30.519466Z INFO doc_server: Sample documents written document_count=100000 doc_write_time_us=3710516 | |
2025-06-09T03:21:30.519479Z INFO doc_server: Loading document store path="docs.bin" | |
2025-06-09T03:21:30.519522Z INFO doc_server: Document store loaded document_count=100000 | |
2025-06-09T03:21:30.607115Z INFO doc_server: Document index built index_entries=100000 | |
2025-06-09T03:21:30.607369Z INFO doc_server: 📊 Metrics reporter started - will report every 5 seconds | |
2025-06-09T03:21:30.607425Z INFO doc_server: Server running at http://127.0.0.1:4000 | |
2025-06-09T03:21:30.607444Z INFO doc_server: Starting worker pool worker_count=7 | |
2025-06-09T03:21:30.607481Z INFO doc_server: Worker started with thread-local arena worker_id=1 | |
2025-06-09T03:21:30.607496Z INFO doc_server: Worker started with thread-local arena worker_id=0 | |
2025-06-09T03:21:30.607521Z INFO doc_server: Worker started with thread-local arena worker_id=2 | |
2025-06-09T03:21:30.607543Z INFO doc_server: Worker started with thread-local arena worker_id=5 | |
2025-06-09T03:21:30.607558Z INFO doc_server: Worker started with thread-local arena worker_id=3 | |
2025-06-09T03:21:30.607573Z INFO doc_server: Worker started with thread-local arena worker_id=6 | |
2025-06-09T03:21:30.607573Z INFO doc_server: Worker started with thread-local arena worker_id=4 | |
2025-06-09T03:21:47.165708Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.165768Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167465Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167486Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167494Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167502Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167502Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167514Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167581Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167593Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167595Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167604Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:47.167608Z INFO doc_server: Creating arena arena_name=decomp size_bytes=2048 | |
2025-06-09T03:21:47.167620Z INFO doc_server: Creating arena arena_name=string size_bytes=4096 | |
2025-06-09T03:21:50.626680Z INFO doc_server: 📊 Performance metrics with percentiles rps="41712.5" dps="41713.3" bottleneck="🔥 FETCH" avg_decompression_us=4 avg_response_building_us=0 document_fetch_p50_us=6 document_fetch_p90_us=15 document_fetch_p99_us=31 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=9 decompression_p99_us=20 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=1 response_building_p99_us=10 response_building_p100_us=45 spillovers_per_sec="0.0" spillover_bytes_per_sec="0.0" total_requests=208768 total_documents=208772 total_spillovers=0 total_spillover_bytes=0 decompression_samples=3386 response_samples=3380 fetch_samples=3386 | |
2025-06-09T03:21:55.637187Z INFO doc_server: 📊 Performance metrics with percentiles rps="18568.1" dps="18568.1" bottleneck="🔥 FETCH" avg_decompression_us=4 avg_response_building_us=0 document_fetch_p50_us=8 document_fetch_p90_us=16 document_fetch_p99_us=30 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=11 decompression_p99_us=21 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=4 response_building_p99_us=11 response_building_p100_us=45 spillovers_per_sec="0.0" spillover_bytes_per_sec="0.0" total_requests=301726 total_documents=301730 total_spillovers=0 total_spillover_bytes=0 decompression_samples=4219 response_samples=4214 fetch_samples=4219 | |
2025-06-09T03:21:55.819953Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 13. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.819987Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 14. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.820019Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 15. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.820034Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 16. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.820054Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 17. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.820074Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 18. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.820085Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 19. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836905Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 13. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836932Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 14. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836943Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 15. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836953Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 16. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836961Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 17. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836968Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 18. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836978Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 19. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836986Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 20. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.836994Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 21. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837001Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 22. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837012Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 23. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837020Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 24. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837028Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 25 bytes=17 | |
2025-06-09T03:21:55.837039Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 25. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837047Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 26 bytes=17 | |
2025-06-09T03:21:55.837050Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 26. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837058Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 27 bytes=17 | |
2025-06-09T03:21:55.837061Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 27. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837068Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 28 bytes=17 | |
2025-06-09T03:21:55.837071Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 28. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837096Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 29 bytes=17 | |
2025-06-09T03:21:55.837100Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 29. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837119Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 30 bytes=17 | |
2025-06-09T03:21:55.837129Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 30. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837183Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 31 bytes=17 | |
2025-06-09T03:21:55.837195Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 31. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837232Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 32 bytes=17 | |
2025-06-09T03:21:55.837237Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 32. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837256Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 33 bytes=17 | |
2025-06-09T03:21:55.837260Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 33. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837268Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 34 bytes=17 | |
2025-06-09T03:21:55.837271Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 34. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837279Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 35 bytes=17 | |
2025-06-09T03:21:55.837286Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 35. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837309Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 36 bytes=17 | |
2025-06-09T03:21:55.837318Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 36. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837333Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 37 bytes=17 | |
2025-06-09T03:21:55.837337Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 37. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837345Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 38 bytes=17 | |
2025-06-09T03:21:55.837349Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 38. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837367Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 39 bytes=17 | |
2025-06-09T03:21:55.837382Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 39. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837392Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 40 bytes=17 | |
2025-06-09T03:21:55.837395Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 40. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837410Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 41 bytes=17 | |
2025-06-09T03:21:55.837418Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 41. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837431Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 42 bytes=17 | |
2025-06-09T03:21:55.837440Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 42. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837454Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 43 bytes=17 | |
2025-06-09T03:21:55.837458Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 43. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837470Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 44 bytes=17 | |
2025-06-09T03:21:55.837478Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 44. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837487Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 45 bytes=17 | |
2025-06-09T03:21:55.837494Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 45. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837516Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 46 bytes=17 | |
2025-06-09T03:21:55.837520Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 46. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837533Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 47 bytes=17 | |
2025-06-09T03:21:55.837536Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 47. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837544Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 48 bytes=17 | |
2025-06-09T03:21:55.837547Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 48. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:21:55.837554Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=Document Title 49 bytes=17 | |
2025-06-09T03:21:55.837567Z WARN doc_server: Arena full, spilling string to heap arena_name=string string_content=This is the body content of document number 49. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. bytes=283 | |
2025-06-09T03:22:00.649755Z INFO doc_server: 📊 Performance metrics with percentiles rps="1.0" dps="17.2" bottleneck="🔥 FETCH" avg_decompression_us=3 avg_response_building_us=8 document_fetch_p50_us=8 document_fetch_p90_us=16 document_fetch_p99_us=31 document_fetch_p100_us=64 decompression_p50_us=3 decompression_p90_us=11 decompression_p99_us=21 decompression_p100_us=55 response_building_p50_us=0 response_building_p90_us=4 response_building_p99_us=11 response_building_p100_us=45 spillovers_per_sec="13.8" spillover_bytes_per_sec="2569.8" total_requests=301731 total_documents=301816 total_spillovers=69 total_spillover_bytes=12877 decompression_samples=4305 response_samples=4219 fetch_samples=4305 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// doc_server.rs | |
// | |
// This is an example of using blocking threads (1-cpus) to handle doc store requests. | |
// LZ4 compression is used to compress and decompress documents. | |
// | |
// This file showcases a few things: 1/ using arenas w/spillover to make memory usage more predictable, | |
// 2/ using a global sampling metrics reporter to track performance metrics at intervals (to avoid | |
// costly logging in the hot path) and 3/ non-blocking logging (but with no rotation) | |
// | |
// Using the `test_client` tool this server bencnmarked at 50k TPS on 7 cxns for random reads. | |
use std::io::{BufRead, BufReader}; | |
use std::net::TcpListener; | |
use std::{ | |
borrow::Cow, | |
cell::RefCell, | |
collections::HashMap, | |
fs::{File, OpenOptions}, | |
io::{Seek, SeekFrom, Write}, | |
path::Path, | |
sync::{ | |
atomic::{AtomicU64, Ordering}, | |
Arc, OnceLock, | |
}, | |
thread, | |
time::Instant, | |
}; | |
use clap::Parser; | |
use memmap2::Mmap; | |
use tracing::{debug, error, info, warn}; | |
#[derive(Parser, Debug)] | |
#[command(name = "doc_server")] | |
#[command(about = "A document server with configurable arena sizes")] | |
struct Args { | |
/// Size of the decompression arena in bytes | |
#[arg(long, default_value = "8192")] | |
decomp_arena_size: usize, | |
/// Size of the string arena in bytes | |
#[arg(long, default_value = "16384")] | |
string_arena_size: usize, | |
/// Number of documents to generate | |
#[arg(long, default_value = "10000")] | |
document_count: u32, | |
} | |
// Latency sample for percentile calculation | |
#[derive(Debug, Clone, Copy)] | |
enum LatencySample { | |
Decompression(u64), | |
ResponseBuilding(u64), | |
DocumentFetch(u64), | |
} | |
// Global channel for sending latency samples (lock-free!) | |
static LATENCY_SAMPLES: OnceLock<crossbeam_channel::Sender<LatencySample>> = OnceLock::new(); | |
// Global metrics - lock-free atomic counters only | |
struct Metrics { | |
requests_total: AtomicU64, | |
documents_fetched: AtomicU64, | |
decompression_time_us: AtomicU64, | |
response_building_time_us: AtomicU64, | |
arena_spillovers: AtomicU64, | |
arena_spillover_bytes: AtomicU64, | |
} | |
static METRICS: Metrics = Metrics { | |
requests_total: AtomicU64::new(0), | |
documents_fetched: AtomicU64::new(0), | |
decompression_time_us: AtomicU64::new(0), | |
response_building_time_us: AtomicU64::new(0), | |
arena_spillovers: AtomicU64::new(0), | |
arena_spillover_bytes: AtomicU64::new(0), | |
}; | |
impl Metrics { | |
fn record_request(&self) { | |
self.requests_total.fetch_add(1, Ordering::Relaxed); | |
} | |
fn record_document(&self, decompression_us: u64) { | |
self.documents_fetched.fetch_add(1, Ordering::Relaxed); | |
self.decompression_time_us | |
.fetch_add(decompression_us, Ordering::Relaxed); | |
// Send sample to metrics thread (lock-free, non-blocking) | |
if let Some(sender) = LATENCY_SAMPLES.get() { | |
// Use try_send to avoid blocking if the channel is full | |
let _ = sender.try_send(LatencySample::Decompression(decompression_us)); | |
} | |
} | |
fn record_response_building(&self, time_us: u64) { | |
self.response_building_time_us | |
.fetch_add(time_us, Ordering::Relaxed); | |
// Send sample to metrics thread (lock-free, non-blocking) | |
if let Some(sender) = LATENCY_SAMPLES.get() { | |
let _ = sender.try_send(LatencySample::ResponseBuilding(time_us)); | |
} | |
} | |
fn record_spillover_bytes(&self, bytes: u64) { | |
self.arena_spillovers.fetch_add(1, Ordering::Relaxed); | |
self.arena_spillover_bytes | |
.fetch_add(bytes, Ordering::Relaxed); | |
} | |
fn snapshot(&self) -> MetricsSnapshot { | |
MetricsSnapshot { | |
requests_total: self.requests_total.load(Ordering::Relaxed), | |
documents_fetched: self.documents_fetched.load(Ordering::Relaxed), | |
decompression_time_us: self.decompression_time_us.load(Ordering::Relaxed), | |
response_building_time_us: self.response_building_time_us.load(Ordering::Relaxed), | |
arena_spillovers: self.arena_spillovers.load(Ordering::Relaxed), | |
arena_spillover_bytes: self.arena_spillover_bytes.load(Ordering::Relaxed), | |
} | |
} | |
} | |
// Helper function to calculate percentiles from a sorted slice | |
fn calculate_percentiles_from_slice(sorted_data: &[u64]) -> (u64, u64, u64, u64) { | |
if sorted_data.is_empty() { | |
return (0, 0, 0, 0); | |
} | |
let len = sorted_data.len(); | |
let p50_idx = ((len as f64 * 0.50) as usize).min(len - 1); | |
let p90_idx = ((len as f64 * 0.90) as usize).min(len - 1); | |
let p99_idx = ((len as f64 * 0.99) as usize).min(len - 1); | |
let p100_idx = len - 1; | |
( | |
sorted_data[p50_idx], | |
sorted_data[p90_idx], | |
sorted_data[p99_idx], | |
sorted_data[p100_idx], | |
) | |
} | |
#[derive(Debug, Clone)] | |
struct MetricsSnapshot { | |
requests_total: u64, | |
documents_fetched: u64, | |
decompression_time_us: u64, | |
response_building_time_us: u64, | |
arena_spillovers: u64, | |
arena_spillover_bytes: u64, | |
} | |
struct MetricsReporter { | |
receiver: crossbeam_channel::Receiver<LatencySample>, | |
last_snapshot: MetricsSnapshot, | |
last_time: Instant, | |
decompression_samples: Vec<u64>, | |
response_building_samples: Vec<u64>, | |
document_fetch_samples: Vec<u64>, | |
} | |
impl MetricsReporter { | |
fn new() -> (Self, crossbeam_channel::Sender<LatencySample>) { | |
let (sender, receiver) = crossbeam_channel::bounded::<LatencySample>(10_000); | |
let reporter = Self { | |
receiver, | |
last_snapshot: METRICS.snapshot(), | |
last_time: Instant::now(), | |
decompression_samples: Vec::with_capacity(5000), | |
response_building_samples: Vec::with_capacity(5000), | |
document_fetch_samples: Vec::with_capacity(5000), | |
}; | |
(reporter, sender) | |
} | |
fn collect_samples(&mut self) { | |
while let Ok(sample) = self.receiver.try_recv() { | |
match sample { | |
LatencySample::Decompression(us) => { | |
self.decompression_samples.push(us); | |
if self.decompression_samples.len() > 5000 { | |
self.decompression_samples.drain(0..2500); | |
} | |
} | |
LatencySample::ResponseBuilding(us) => { | |
self.response_building_samples.push(us); | |
if self.response_building_samples.len() > 5000 { | |
self.response_building_samples.drain(0..2500); | |
} | |
} | |
LatencySample::DocumentFetch(us) => { | |
self.document_fetch_samples.push(us); | |
if self.document_fetch_samples.len() > 5000 { | |
self.document_fetch_samples.drain(0..2500); | |
} | |
} | |
} | |
} | |
} | |
fn calculate_percentiles( | |
&mut self, | |
) -> ( | |
(u64, u64, u64, u64), | |
(u64, u64, u64, u64), | |
(u64, u64, u64, u64), | |
) { | |
let decomp_percentiles = if self.decompression_samples.is_empty() { | |
(0, 0, 0, 0) | |
} else { | |
self.decompression_samples.sort_unstable(); | |
calculate_percentiles_from_slice(&self.decompression_samples) | |
}; | |
let response_percentiles = if self.response_building_samples.is_empty() { | |
(0, 0, 0, 0) | |
} else { | |
self.response_building_samples.sort_unstable(); | |
calculate_percentiles_from_slice(&self.response_building_samples) | |
}; | |
let fetch_percentiles = if self.document_fetch_samples.is_empty() { | |
(0, 0, 0, 0) | |
} else { | |
self.document_fetch_samples.sort_unstable(); | |
calculate_percentiles_from_slice(&self.document_fetch_samples) | |
}; | |
(decomp_percentiles, response_percentiles, fetch_percentiles) | |
} | |
fn identify_bottleneck(&self, decomp_p99: u64, response_p99: u64, fetch_p99: u64) -> String { | |
let metrics = [ | |
("FETCH", fetch_p99), | |
("DECOMP", decomp_p99), | |
("RESPONSE", response_p99), | |
]; | |
metrics.iter().max_by_key(|(_, p99)| *p99).map_or_else( | |
|| "❓ UNKNOWN".to_string(), | |
|(name, p99)| { | |
if *p99 > 0 { | |
format!("🔥 {name}") | |
} else { | |
"✅ ALL_FAST".to_string() | |
} | |
}, | |
) | |
} | |
fn report_metrics(&mut self) { | |
self.collect_samples(); | |
let current_snapshot = METRICS.snapshot(); | |
let current_time = Instant::now(); | |
let duration = current_time.duration_since(self.last_time); | |
let requests_delta = current_snapshot.requests_total - self.last_snapshot.requests_total; | |
let docs_delta = current_snapshot.documents_fetched - self.last_snapshot.documents_fetched; | |
let decomp_delta = | |
current_snapshot.decompression_time_us - self.last_snapshot.decompression_time_us; | |
let response_delta = current_snapshot.response_building_time_us | |
- self.last_snapshot.response_building_time_us; | |
let spillover_delta = | |
current_snapshot.arena_spillovers - self.last_snapshot.arena_spillovers; | |
let spillover_bytes_delta = | |
current_snapshot.arena_spillover_bytes - self.last_snapshot.arena_spillover_bytes; | |
if requests_delta > 0 { | |
let rps = requests_delta as f64 / duration.as_secs_f64(); | |
let dps = docs_delta as f64 / duration.as_secs_f64(); | |
let avg_decomp = if docs_delta > 0 { | |
decomp_delta / docs_delta | |
} else { | |
0 | |
}; | |
let avg_response = if requests_delta > 0 { | |
response_delta / requests_delta | |
} else { | |
0 | |
}; | |
let (decomp_percentiles, response_percentiles, fetch_percentiles) = | |
self.calculate_percentiles(); | |
let bottleneck = self.identify_bottleneck( | |
decomp_percentiles.2, | |
response_percentiles.2, | |
fetch_percentiles.2, | |
); | |
info!( | |
rps = format!("{:.1}", rps), | |
dps = format!("{:.1}", dps), | |
bottleneck = bottleneck, | |
avg_decompression_us = avg_decomp, | |
avg_response_building_us = avg_response, | |
document_fetch_p50_us = fetch_percentiles.0, | |
document_fetch_p90_us = fetch_percentiles.1, | |
document_fetch_p99_us = fetch_percentiles.2, | |
document_fetch_p100_us = fetch_percentiles.3, | |
decompression_p50_us = decomp_percentiles.0, | |
decompression_p90_us = decomp_percentiles.1, | |
decompression_p99_us = decomp_percentiles.2, | |
decompression_p100_us = decomp_percentiles.3, | |
response_building_p50_us = response_percentiles.0, | |
response_building_p90_us = response_percentiles.1, | |
response_building_p99_us = response_percentiles.2, | |
response_building_p100_us = response_percentiles.3, | |
spillovers_per_sec = | |
format!("{:.1}", spillover_delta as f64 / duration.as_secs_f64()), | |
spillover_bytes_per_sec = format!( | |
"{:.1}", | |
spillover_bytes_delta as f64 / duration.as_secs_f64() | |
), | |
total_requests = current_snapshot.requests_total, | |
total_documents = current_snapshot.documents_fetched, | |
total_spillovers = current_snapshot.arena_spillovers, | |
total_spillover_bytes = current_snapshot.arena_spillover_bytes, | |
decompression_samples = self.decompression_samples.len(), | |
response_samples = self.response_building_samples.len(), | |
fetch_samples = self.document_fetch_samples.len(), | |
"📊 Performance metrics with percentiles" | |
); | |
} | |
self.last_snapshot = current_snapshot; | |
self.last_time = current_time; | |
} | |
fn start(mut self) { | |
info!("📊 Metrics reporter started - will report every 5 seconds"); | |
loop { | |
thread::sleep(std::time::Duration::from_secs(5)); | |
self.report_metrics(); | |
} | |
} | |
} | |
fn start_metrics_reporter() { | |
let (reporter, sender) = MetricsReporter::new(); | |
// Initialize global sender | |
LATENCY_SAMPLES | |
.set(sender) | |
.expect("Failed to set latency samples channel"); | |
thread::spawn(move || { | |
reporter.start(); | |
}); | |
} | |
#[derive(Debug, Clone)] | |
struct ArenaConfig { | |
decomp_size: usize, | |
string_size: usize, | |
} | |
static ARENA_CONFIG: OnceLock<ArenaConfig> = OnceLock::new(); | |
fn get_arena_config() -> &'static ArenaConfig { | |
ARENA_CONFIG.get().expect("Arena config not initialized") | |
} | |
struct BumpArena { | |
name: String, | |
buf: RefCell<Vec<u8>>, | |
offset: RefCell<usize>, | |
spilled: RefCell<usize>, | |
spilled_bytes: RefCell<usize>, | |
} | |
impl BumpArena { | |
fn new(name: &str, size: usize) -> Self { | |
info!(arena_name = %name, size_bytes = size, "Creating arena"); | |
Self { | |
name: name.to_string(), | |
buf: RefCell::new(vec![0; size]), | |
offset: RefCell::new(0), | |
spilled: RefCell::new(0), | |
spilled_bytes: RefCell::new(0), | |
} | |
} | |
fn alloc_bytes(&self, len: usize) -> Option<&mut [u8]> { | |
let mut offset = self.offset.borrow_mut(); | |
let buf_ptr = self.buf.borrow_mut().as_mut_ptr(); | |
let buf_len = self.buf.borrow().len(); | |
if *offset + len > buf_len { | |
warn!( | |
arena_name = %self.name, | |
requested_bytes = len, | |
used_bytes = *offset, | |
capacity_bytes = buf_len, | |
"Arena allocation failed - insufficient space" | |
); | |
return None; | |
} | |
let start = *offset; | |
*offset += len; | |
debug!( | |
arena_name = %self.name, | |
allocated_bytes = len, | |
offset = start, | |
total_used = *offset, | |
capacity = buf_len, | |
"Arena allocation successful" | |
); | |
unsafe { Some(std::slice::from_raw_parts_mut(buf_ptr.add(start), len)) } | |
} | |
fn stats(&self) { | |
debug!( | |
arena_name = %self.name, | |
used_bytes = *self.offset.borrow(), | |
capacity_bytes = self.buf.borrow().len(), | |
spilled_allocs = *self.spilled.borrow(), | |
spilled_bytes = *self.spilled_bytes.borrow(), | |
"Arena statistics" | |
); | |
} | |
fn used(&self) -> usize { | |
*self.offset.borrow() | |
} | |
fn capacity(&self) -> usize { | |
self.buf.borrow().len() | |
} | |
fn reset(&self) { | |
let used_before = *self.offset.borrow(); | |
let total_spilled = *self.spilled.borrow(); | |
let total_spilled_bytes = *self.spilled_bytes.borrow(); | |
*self.offset.borrow_mut() = 0; | |
debug!( | |
arena_name = %self.name, | |
freed_bytes = used_before, | |
lifetime_spilled_allocs = total_spilled, | |
lifetime_spilled_bytes = total_spilled_bytes, | |
"Arena reset" | |
); | |
} | |
fn alloc_str(&self, s: &str) -> Cow<str> { | |
let mut offset = self.offset.borrow_mut(); | |
let mut buf = self.buf.borrow_mut(); | |
let bytes = s.as_bytes(); | |
let len = bytes.len(); | |
if *offset + len > buf.len() { | |
*self.spilled.borrow_mut() += 1; | |
*self.spilled_bytes.borrow_mut() += len; | |
// Record spillover in global metrics | |
METRICS.record_spillover_bytes(len as u64); | |
warn!( | |
arena_name = %self.name, | |
string_content = %s, | |
bytes = len, | |
"Arena full, spilling string to heap" | |
); | |
return Cow::Owned(s.to_string()); | |
} | |
buf[*offset..*offset + len].copy_from_slice(bytes); | |
let start = *offset; | |
*offset += len; | |
let result = unsafe { | |
std::str::from_utf8_unchecked(std::slice::from_raw_parts(buf.as_ptr().add(start), len)) | |
}; | |
debug!( | |
arena_name = %self.name, | |
string_content = %s, | |
bytes = len, | |
total_used = *offset, | |
capacity = buf.len(), | |
"String allocated in arena" | |
); | |
Cow::Borrowed(result) | |
} | |
} | |
struct DocReader { | |
/// where we decompress documents into, used doc by doc | |
decomp_arena: BumpArena, | |
/// where we keep the docs requested until request end | |
string_arena: BumpArena, | |
} | |
impl DocReader { | |
fn new(decomp_size: usize, string_size: usize) -> Self { | |
Self { | |
decomp_arena: BumpArena::new("decomp", decomp_size), | |
string_arena: BumpArena::new("string", string_size), | |
} | |
} | |
fn stats(&self) { | |
debug!( | |
decomp_arena_used = self.decomp_arena.used(), | |
decomp_arena_capacity = self.decomp_arena.capacity(), | |
"DocReader decomp arena stats" | |
); | |
self.string_arena.stats(); | |
} | |
} | |
/// `Doc` could be in the arena or on the heap, thus `Cow` | |
#[derive(Debug)] | |
struct Doc<'a> { | |
title: Cow<'a, str>, | |
body: Cow<'a, str>, | |
} | |
/// `DocStore` is a simple in-memory doc store that uses a memory-mapped file to store the docs. | |
/// It's used to load the docs into memory and then use the `DocReader` to fetch the docs. | |
struct DocStore { | |
/// compressed docs | |
mmap: Arc<Mmap>, | |
/// doc id -> offset in mmap | |
index: HashMap<String, u64>, | |
} | |
impl DocStore { | |
fn write_sample_docs(path: &Path, document_count: u32) -> std::io::Result<()> { | |
info!(path = ?path, document_count = document_count, "Writing sample docs"); | |
let mut file = OpenOptions::new().create(true).write(true).open(path)?; | |
let count = document_count; | |
file.write_all(&count.to_le_bytes())?; | |
let offset_pos = file.seek(SeekFrom::Current(0))?; | |
file.write_all(&vec![0u8; count as usize * 8])?; | |
let mut offsets = Vec::new(); | |
for i in 0..count { | |
let offset = file.seek(SeekFrom::Current(0))?; | |
offsets.push(offset); | |
let title = format!("Document Title {i}"); | |
let body = format!("This is the body content of document number {i}. It contains some sample text to make the documents more realistic. Additional content to make documents larger: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."); | |
let mut buffer = Vec::new(); | |
buffer.write_all(&(title.len() as u32).to_le_bytes())?; | |
buffer.write_all(title.as_bytes())?; | |
buffer.write_all(&(body.len() as u32).to_le_bytes())?; | |
buffer.write_all(body.as_bytes())?; | |
let decomp_len = buffer.len() as u32; | |
let compressed = lz4_flex::compress_prepend_size(&buffer); | |
let comp_len = compressed.len() as u32; | |
file.write_all(&decomp_len.to_le_bytes())?; | |
file.write_all(&comp_len.to_le_bytes())?; | |
file.write_all(&compressed)?; | |
} | |
file.seek(SeekFrom::Start(offset_pos))?; | |
for offset in offsets { | |
file.write_all(&offset.to_le_bytes())?; | |
} | |
info!( | |
document_count = count, | |
"Sample documents written successfully" | |
); | |
Ok(()) | |
} | |
fn new(path: &Path) -> std::io::Result<Self> { | |
info!(path = ?path, "Loading document store"); | |
let file = File::open(path)?; | |
let mmap = unsafe { Mmap::map(&file)? }; | |
let doc_count = u32::from_le_bytes(mmap[0..4].try_into().unwrap()) as usize; | |
info!(document_count = doc_count, "Document store loaded"); | |
let mut index = HashMap::new(); | |
for i in 0..doc_count { | |
let off = 4 + i * 8; | |
let offset = u64::from_le_bytes(mmap[off..off + 8].try_into().unwrap()); | |
index.insert(format!("doc_{i}"), offset); | |
} | |
info!(index_entries = index.len(), "Document index built"); | |
Ok(Self { | |
mmap: Arc::new(mmap), | |
index, | |
}) | |
} | |
#[allow(clippy::too_many_lines)] | |
fn get_doc<'a>(&'a self, key: &str, reader: &'a DocReader) -> Option<Doc<'a>> { | |
debug!(key = %key, "Fetching document"); | |
let start_time = Instant::now(); | |
let offset = *self.index.get(key)? as usize; | |
let mmap = &self.mmap; | |
let decomp_len = u32::from_le_bytes(mmap[offset..offset + 4].try_into().ok()?) as usize; | |
let comp_len = u32::from_le_bytes(mmap[offset + 4..offset + 8].try_into().ok()?) as usize; | |
let comp_bytes = &mmap[offset + 8..offset + 8 + comp_len]; | |
debug!( | |
key = %key, | |
compressed_bytes = comp_len, | |
decompressed_bytes = decomp_len, | |
"Document compression info" | |
); | |
// Reset decomp arena for reuse | |
reader.decomp_arena.reset(); | |
let Some(out_buf) = reader.decomp_arena.alloc_bytes(decomp_len) else { | |
warn!( | |
key = %key, | |
arena_name = %reader.decomp_arena.name, | |
"Decompression arena too small, spilling to heap" | |
); | |
METRICS.record_spillover_bytes(decomp_len as u64); | |
let heap_buf = lz4_flex::decompress_size_prepended(comp_bytes).ok()?; | |
let result = Self::parse_doc_to_string_arena(&heap_buf, reader)?; | |
debug!( | |
key = %key, | |
elapsed_us = start_time.elapsed().as_micros(), | |
method = "heap", | |
"Document fetched" | |
); | |
return Some(result); | |
}; | |
let decomp_start = Instant::now(); | |
// LZ4 data was compressed with compress_prepend_size, so we need to handle the size prefix | |
// Skip the first 4 bytes which contain the uncompressed size | |
if comp_bytes.len() < 4 { | |
error!( | |
key = %key, | |
compressed_size = comp_bytes.len(), | |
"Compressed data too small to contain size prefix" | |
); | |
return None; | |
} | |
let actual_compressed_data = &comp_bytes[4..]; | |
let n = match lz4_flex::decompress_into(actual_compressed_data, out_buf) { | |
Ok(bytes) => bytes, | |
Err(e) => { | |
error!( | |
key = %key, | |
error = ?e, | |
compressed_size = actual_compressed_data.len(), | |
expected_decompressed_size = decomp_len, | |
"LZ4 decompression failed" | |
); | |
return None; | |
} | |
}; | |
let decomp_time = decomp_start.elapsed().as_micros() as u64; | |
if n != decomp_len { | |
error!( | |
key = %key, | |
expected_bytes = decomp_len, | |
actual_bytes = n, | |
"Decompression length mismatch" | |
); | |
return None; | |
} | |
METRICS.record_document(decomp_time); | |
debug!(key = %key, decompressed_bytes = n, "Document decompressed to arena"); | |
// Parse from decompressed buffer and copy strings to string arena | |
let mut inner_cursor = 0; | |
// Debug: Print first few bytes of decompressed data | |
debug!( | |
key = %key, | |
decompressed_size = n, | |
first_16_bytes = ?&out_buf[0..16.min(n)], | |
"Parsing decompressed document data" | |
); | |
if inner_cursor + 4 > n { | |
error!(key = %key, cursor = inner_cursor, total_size = n, "Not enough data for title length"); | |
return None; | |
} | |
let title_len = u32::from_le_bytes({ | |
match out_buf[inner_cursor..inner_cursor + 4].try_into() { | |
Ok(bytes) => bytes, | |
Err(e) => { | |
error!(key = %key, error = ?e, "Failed to read title length"); | |
return None; | |
} | |
} | |
}) as usize; | |
inner_cursor += 4; | |
debug!(key = %key, title_len = title_len, cursor = inner_cursor, "Read title length"); | |
if inner_cursor + title_len > n { | |
error!( | |
key = %key, | |
cursor = inner_cursor, | |
title_len = title_len, | |
total_size = n, | |
"Not enough data for title content" | |
); | |
return None; | |
} | |
let title_bytes = &out_buf[inner_cursor..inner_cursor + title_len]; | |
let title_str = match std::str::from_utf8(title_bytes) { | |
Ok(s) => s, | |
Err(e) => { | |
error!(key = %key, error = ?e, "Title is not valid UTF-8"); | |
return None; | |
} | |
}; | |
let title = reader.string_arena.alloc_str(title_str); | |
inner_cursor += title_len; | |
debug!(key = %key, title = %title_str, cursor = inner_cursor, "Read title"); | |
if inner_cursor + 4 > n { | |
error!(key = %key, cursor = inner_cursor, total_size = n, "Not enough data for body length"); | |
return None; | |
} | |
let body_len = u32::from_le_bytes({ | |
match out_buf[inner_cursor..inner_cursor + 4].try_into() { | |
Ok(bytes) => bytes, | |
Err(e) => { | |
error!(key = %key, error = ?e, "Failed to read body length"); | |
return None; | |
} | |
} | |
}) as usize; | |
inner_cursor += 4; | |
debug!(key = %key, body_len = body_len, cursor = inner_cursor, "Read body length"); | |
if inner_cursor + body_len > n { | |
error!( | |
key = %key, | |
cursor = inner_cursor, | |
body_len = body_len, | |
total_size = n, | |
"Not enough data for body content" | |
); | |
return None; | |
} | |
let body_bytes = &out_buf[inner_cursor..inner_cursor + body_len]; | |
let body_str = match std::str::from_utf8(body_bytes) { | |
Ok(s) => s, | |
Err(e) => { | |
error!(key = %key, error = ?e, "Body is not valid UTF-8"); | |
return None; | |
} | |
}; | |
let body = reader.string_arena.alloc_str(body_str); | |
debug!( | |
key = %key, | |
title = %title_str, | |
body_preview = %&body_str[..20.min(body_str.len())], | |
elapsed_us = start_time.elapsed().as_micros(), | |
method = "arena", | |
"Document successfully parsed and fetched" | |
); | |
// Record total document fetch time for metrics | |
let total_fetch_time = start_time.elapsed().as_micros() as u64; | |
if let Some(sender) = LATENCY_SAMPLES.get() { | |
let _ = sender.try_send(LatencySample::DocumentFetch(total_fetch_time)); | |
} | |
Some(Doc { title, body }) | |
} | |
fn parse_doc_to_string_arena<'reader>( | |
buf: &[u8], | |
reader: &'reader DocReader, | |
) -> Option<Doc<'reader>> { | |
let mut inner_cursor = 0; | |
let title_len = | |
u32::from_le_bytes(buf[inner_cursor..inner_cursor + 4].try_into().ok()?) as usize; | |
inner_cursor += 4; | |
let title_bytes = &buf[inner_cursor..inner_cursor + title_len]; | |
let title_str = std::str::from_utf8(title_bytes).ok()?; | |
let title = reader.string_arena.alloc_str(title_str); | |
inner_cursor += title_len; | |
let body_len = | |
u32::from_le_bytes(buf[inner_cursor..inner_cursor + 4].try_into().ok()?) as usize; | |
inner_cursor += 4; | |
let body_bytes = &buf[inner_cursor..inner_cursor + body_len]; | |
let body_str = std::str::from_utf8(body_bytes).ok()?; | |
let body = reader.string_arena.alloc_str(body_str); | |
Some(Doc { title, body }) | |
} | |
} | |
// Thread-local persistent DocReader with dual arenas | |
thread_local! { | |
static THREAD_READER: DocReader = { | |
let config = get_arena_config(); | |
DocReader::new(config.decomp_size, config.string_size) | |
}; | |
} | |
fn run_server(store: &Arc<DocStore>) -> std::io::Result<()> { | |
let listener = TcpListener::bind("127.0.0.1:4000")?; | |
info!("Server running at http://127.0.0.1:4000"); | |
// Create a worker pool with crossbeam-channel (much faster than std::mpsc) | |
let (work_tx, work_rx) = crossbeam_channel::unbounded(); | |
let num_workers = std::thread::available_parallelism() | |
.map(|n| (n.get() - 1).max(1)) // Use available CPUs - 1, but at least 1 worker | |
.unwrap_or(4); // Fall back to 4 if detection fails | |
info!(worker_count = num_workers, "Starting worker pool"); | |
// Spawn persistent worker threads - crossbeam-channel handles multi-consumer efficiently | |
for worker_id in 0..num_workers { | |
let work_rx = work_rx.clone(); | |
let store = Arc::clone(store); | |
thread::spawn(move || { | |
info!(worker_id, "Worker started with thread-local arena"); | |
// No mutex needed! crossbeam-channel is lock-free for multi-consumer | |
while let Ok(stream) = work_rx.recv() { | |
handle_connection(stream, &store, worker_id); | |
} | |
info!(worker_id, "Worker shutting down"); | |
}); | |
} | |
// Distribute connections to workers | |
for (conn_id, stream) in listener.incoming().enumerate() { | |
let stream = stream?; | |
debug!(connection_id = conn_id, "New connection accepted"); | |
if work_tx.send(stream).is_err() { | |
error!("Failed to send connection to workers"); | |
break; | |
} | |
} | |
Ok(()) | |
} | |
#[allow(clippy::needless_pass_by_value)] | |
fn handle_connection(stream: std::net::TcpStream, store: &Arc<DocStore>, worker_id: usize) { | |
let peer_addr = stream | |
.peer_addr() | |
.unwrap_or_else(|_| "unknown".parse().unwrap()); | |
debug!(worker_id, peer_addr = %peer_addr, "Worker handling connection"); | |
let reader = BufReader::new(&stream); | |
let mut writer = std::io::BufWriter::new(&stream); | |
for (request_id, line) in reader.lines().enumerate() { | |
let Ok(line) = line else { | |
warn!(worker_id, "Connection broken"); | |
break; | |
}; | |
let keys: Vec<&str> = line.trim().split(',').collect(); | |
debug!( | |
worker_id, | |
request_id, | |
key_count = keys.len(), | |
keys = ?keys, | |
"Processing batch request" | |
); | |
let request_start = Instant::now(); | |
METRICS.record_request(); | |
THREAD_READER.with(|reader| { | |
// Pre-allocate response string to avoid multiple allocations | |
let mut response_parts = Vec::with_capacity(keys.len() * 2 + 10); | |
let batch_start = Instant::now(); | |
response_parts.push(format!( | |
r#"{{"worker_id":{},"request_id":{},"batch_size":{},"results":["#, | |
worker_id, | |
request_id, | |
keys.len() | |
)); | |
let mut first = true; | |
for key in &keys { | |
debug!(worker_id, key = %key, "Looking up document"); | |
if !first { | |
response_parts.push(",".to_string()); | |
} | |
first = false; | |
if let Some(doc) = store.get_doc(key, reader) { | |
// Manual JSON construction to avoid serde_json overhead | |
response_parts.push(format!( | |
r#"{{"key":"{}","found":true,"title":"{}","body":"{}"}}"#, | |
key.replace('"', r#"\""#), // Basic JSON escaping | |
doc.title.as_ref().replace('"', r#"\""#), | |
doc.body.as_ref().replace('"', r#"\""#) | |
)); | |
debug!(worker_id, key = %key, "Document found"); | |
} else { | |
response_parts.push(format!( | |
r#"{{"key":"{}","found":false,"error":"Document not found"}}"#, | |
key.replace('"', r#"\""#) | |
)); | |
debug!(worker_id, key = %key, "Document not found"); | |
} | |
} | |
let processing_time_us = batch_start.elapsed().as_micros(); | |
let response_build_start = Instant::now(); | |
response_parts.push(format!(r#"],"processing_time_us":{processing_time_us}}}"#)); | |
let response = response_parts.concat(); | |
METRICS.record_response_building(response_build_start.elapsed().as_micros() as u64); | |
debug!( | |
worker_id, | |
request_id, | |
response_bytes = response.len(), | |
"Sending response" | |
); | |
let _ = writeln!(writer, "{response}"); | |
let _ = writer.flush(); | |
let request_time = request_start.elapsed(); | |
debug!( | |
worker_id, | |
request_id, | |
elapsed_us = request_time.as_micros(), | |
"Request completed" | |
); | |
debug!(worker_id, "Arena state after request:"); | |
reader.stats(); | |
// Reset string arena after each request, keep decomp for next doc | |
reader.string_arena.reset(); | |
}); | |
} | |
debug!(worker_id, peer_addr = %peer_addr, "Connection finished"); | |
} | |
fn main() -> std::io::Result<()> { | |
let args = Args::parse(); | |
// Initialize async tracing for non-blocking logging | |
std::fs::create_dir_all("./logs").ok(); // Create logs directory | |
let file_appender = tracing_appender::rolling::hourly("./logs", "doc_server.log"); | |
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); | |
tracing_subscriber::fmt() | |
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) | |
.with_writer(non_blocking) | |
.with_ansi(false) // No color codes in log files | |
.init(); | |
let config = ArenaConfig { | |
decomp_size: args.decomp_arena_size, | |
string_size: args.string_arena_size, | |
}; | |
ARENA_CONFIG | |
.set(config.clone()) | |
.expect("Failed to set arena config"); | |
info!( | |
decomp_arena_bytes = config.decomp_size, | |
string_arena_bytes = config.string_size, | |
"Starting doc server with configured arena sizes" | |
); | |
let path = Path::new("docs.bin"); | |
let doc_write_start = Instant::now(); | |
DocStore::write_sample_docs(path, args.document_count)?; | |
let doc_write_time = doc_write_start.elapsed(); | |
info!( | |
document_count = args.document_count, | |
doc_write_time_us = doc_write_time.as_micros(), | |
"Sample documents written" | |
); | |
let store = Arc::new(DocStore::new(path)?); | |
start_metrics_reporter(); | |
// Keep the guard alive to prevent async logger from shutting down | |
let result = run_server(&store); | |
drop(guard); // Explicitly drop to flush remaining logs | |
result | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io::{BufRead, BufReader, Write}; | |
use std::net::TcpStream; | |
use std::time::{Duration, Instant}; | |
use clap::Parser; | |
use rand::Rng; | |
#[derive(Parser, Debug)] | |
#[command(name = "test_client")] | |
#[command(about = "Document server test client with configurable load testing")] | |
struct Args { | |
/// Number of concurrent connections | |
#[arg(short, long, default_value = "10")] | |
connections: usize, | |
/// Target transactions per second (0 = unlimited) | |
#[arg(short, long, default_value = "0")] | |
tps: usize, | |
/// Number of requests per connection | |
#[arg(short, long, default_value = "10")] | |
requests: usize, | |
/// Test duration in seconds (overrides requests if set) | |
#[arg(short, long)] | |
duration: Option<u64>, | |
/// Skip warmup tests | |
#[arg(long)] | |
skip_warmup: bool, | |
/// Maximum document ID to request (document range: 0 to max_doc_id-1) | |
#[arg(long, default_value = "10000")] | |
max_doc_id: u32, | |
} | |
fn main() -> std::io::Result<()> { | |
let args = Args::parse(); | |
if !args.skip_warmup { | |
warmup_tests()?; | |
} | |
load_test(&args); | |
Ok(()) | |
} | |
fn warmup_tests() -> std::io::Result<()> { | |
println!("🔥 Warmup: Connecting to server..."); | |
let stream = TcpStream::connect("127.0.0.1:4000")?; | |
let mut reader = BufReader::new(stream.try_clone()?); | |
let mut writer = stream.try_clone()?; | |
println!("Testing persistent connection performance..."); | |
// Test 1: Single requests on persistent connection | |
for i in 0..5 { | |
let start = Instant::now(); | |
writeln!(writer, "doc_{i}")?; | |
writer.flush()?; | |
let mut response = String::new(); | |
reader.read_line(&mut response)?; | |
let elapsed = start.elapsed(); | |
println!("Request {i}: {elapsed:?}"); | |
} | |
println!("\nTesting batch request..."); | |
let start = Instant::now(); | |
writeln!(writer, "doc_0,doc_1,doc_2,doc_3,doc_4")?; | |
writer.flush()?; | |
let mut response = String::new(); | |
reader.read_line(&mut response)?; | |
let elapsed = start.elapsed(); | |
println!("Batch request: {elapsed:?}"); | |
println!("Response size: {} bytes", response.len()); | |
Ok(()) | |
} | |
fn load_test(args: &Args) { | |
println!("\n🚀 Load test: {} connections", args.connections); | |
if args.tps > 0 { | |
println!("Target TPS: {} ({}μs interval)", args.tps, 1_000_000 / args.tps); | |
} else { | |
println!("Target TPS: unlimited"); | |
} | |
println!("📚 Document range: doc_0 to doc_{} ({} total documents)", args.max_doc_id - 1, args.max_doc_id); | |
// Calculate per-connection rate if TPS limiting is enabled | |
let request_interval = if args.tps > 0 { | |
Some(Duration::from_micros(1_000_000 * args.connections as u64 / args.tps as u64)) | |
} else { | |
None | |
}; | |
let test_start = Instant::now(); | |
let test_duration = args.duration.map(Duration::from_secs); | |
let handles: Vec<_> = (0..args.connections).map(|thread_id| { | |
let requests = args.requests; | |
let max_doc_id = args.max_doc_id; // Copy the value to avoid borrowing issues | |
std::thread::spawn(move || -> std::io::Result<(u128, usize, usize)> { | |
let stream = TcpStream::connect("127.0.0.1:4000")?; | |
let mut reader = BufReader::new(stream.try_clone()?); | |
let mut writer = stream.try_clone()?; | |
let mut rng = rand::thread_rng(); // Thread-local RNG for better performance | |
let start = Instant::now(); | |
let mut request_count = 0; | |
let mut total_response_size = 0; | |
let mut next_request_time = Instant::now(); | |
loop { | |
// Check if we should stop (duration-based or request-based) | |
if let Some(duration) = test_duration { | |
if start.elapsed() >= duration { | |
break; | |
} | |
} else if request_count >= requests { | |
break; | |
} | |
// Rate limiting: wait until it's time for the next request | |
if let Some(interval) = request_interval { | |
let now = Instant::now(); | |
if now < next_request_time { | |
std::thread::sleep(next_request_time - now); | |
} | |
next_request_time = Instant::now() + interval; | |
} | |
// Random document ID from 0 to max_doc_id-1 for realistic cache miss patterns | |
let doc_id = rng.gen_range(0..max_doc_id); | |
writeln!(writer, "doc_{doc_id}")?; | |
writer.flush()?; | |
let mut response = String::new(); | |
reader.read_line(&mut response)?; | |
total_response_size += response.len(); | |
request_count += 1; | |
} | |
let total_elapsed = start.elapsed().as_micros(); | |
Ok((total_elapsed, request_count, total_response_size)) | |
}) | |
}).collect(); | |
let mut total_time = 0u128; | |
let mut total_requests = 0; | |
let mut total_response_size = 0; | |
let mut successful_connections = 0; | |
for (i, handle) in handles.into_iter().enumerate() { | |
match handle.join().unwrap() { | |
Ok((elapsed_micros, request_count, response_size)) => { | |
total_time += elapsed_micros; | |
total_requests += request_count; | |
total_response_size += response_size; | |
successful_connections += 1; | |
let rps = if elapsed_micros > 0 { | |
request_count as u128 * 1_000_000 / elapsed_micros | |
} else { 0 }; | |
println!("Connection {i}: {request_count} requests in {elapsed_micros}μs ({rps} RPS)"); | |
} | |
Err(e) => { | |
println!("Connection {i} failed: {e}"); | |
} | |
} | |
} | |
let total_test_time = test_start.elapsed(); | |
println!("\n📊 Load Test Results:"); | |
println!("- Successful connections: {successful_connections}/{}", args.connections); | |
println!("- Total requests: {total_requests}"); | |
println!("- Total test time: {:.2}s", total_test_time.as_secs_f64()); | |
#[allow(clippy::cast_precision_loss)] | |
let response_data_kb = total_response_size as f64 / 1024.0; | |
println!("- Total response data: {response_data_kb:.2}KB"); | |
if successful_connections > 0 && total_requests > 0 { | |
#[allow(clippy::cast_sign_loss)] | |
let avg_latency = total_time / successful_connections as u128 / total_requests as u128; | |
#[allow(clippy::cast_precision_loss)] | |
let actual_tps = total_requests as f64 / total_test_time.as_secs_f64(); | |
println!("- Actual TPS: {actual_tps:.1}"); | |
println!("- Average latency: {avg_latency}μs per request"); | |
if args.tps > 0 { | |
#[allow(clippy::cast_precision_loss)] | |
let tps_efficiency = (actual_tps / args.tps as f64 * 100.0).min(100.0); | |
println!("- TPS efficiency: {tps_efficiency:.1}%"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment