Last active
January 20, 2022 17:56
-
-
Save oscarknagg/151432d56614ec431048b9cde540cb13 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os.path | |
import psutil | |
import pyarrow as pa | |
import numpy as np | |
from pyarrow import parquet as pq | |
import time | |
WINDOW_LENGTH = 1000 | |
N = 1000000 | |
_1_MB = 1024 ** 2 | |
def setup(): | |
timeseries = np.arange(1 * int(10 ** 9), dtype="float32") | |
# Add a NaN | |
timeseries[len(timeseries) // 2] = np.nan | |
# Write as Arrow | |
table = pa.Table.from_arrays(arrays=[timeseries], names=["timeseries"]) | |
with pa.OSFile('timeseries.arrow', 'wb') as sink: | |
with pa.RecordBatchFileWriter(sink, table.schema) as writer: | |
writer.write_table(table) | |
# Write as parquet to compare file sizes | |
pq.write_table(table, "timeseries.parquet") | |
def stream_from_arrow(file): | |
memory0 = psutil.Process().memory_info() | |
t0 = time.time() | |
source = pa.memory_map(file, 'r') | |
table_parrow = pa.ipc.RecordBatchFileReader(source).read_all() | |
memory1 = psutil.Process().memory_info() | |
t1 = time.time() | |
for i in range(N): | |
window = table_parrow["timeseries"][i:i + WINDOW_LENGTH].to_numpy() | |
yield window | |
t2 = time.time() | |
print("Yielded {} windows from Arrow file in {:.3f}s ({:.3f}s including I/O)".format( | |
N, | |
t2 - t1, | |
t2-t0 | |
)) | |
print("Used {:.1f}MB of additional memory".format( | |
(memory1.rss - memory0.rss) / _1_MB | |
)) | |
print() | |
def stream_from_parquet(file): | |
memory0 = psutil.Process().memory_info() | |
t0 = time.time() | |
timeseries = pq.read_table(file)["timeseries"].to_numpy() | |
memory1 = psutil.Process().memory_info() | |
t1 = time.time() | |
for i in range(N): | |
window = timeseries[i:i+WINDOW_LENGTH] | |
yield window | |
t2 = time.time() | |
print("Yielded {} windows from Parquet file in {:.3f}s ({:.3f}s including I/O)".format( | |
N, | |
t2 - t1, | |
t2 - t0 | |
)) | |
print("Used {:.1f}MB of additional memory".format( | |
(memory1.rss - memory0.rss) / _1_MB | |
)) | |
print() | |
def main(): | |
setup() | |
for window in stream_from_arrow("timeseries.arrow"): | |
pass | |
for window in stream_from_parquet("timeseries.parquet"): | |
pass | |
print("timeseries.arrow: {:.1f}MB, timeseries.parquet: {:.1f}MB".format( | |
os.path.getsize("timeseries.arrow") / _1_MB, | |
os.path.getsize("timeseries.parquet") / _1_MB, | |
)) | |
if __name__ == '__main__': | |
"""This script compares the speed of loading subsections of a | |
timeseries from disk when stored in either Arrow or Parquet | |
format. | |
Expected output: | |
Yielded 1000000 windows from Arrow file in 3.832s (3.832s including I/O) | |
Used 0.0MB of additional memory | |
Yielded 1000000 windows from Parquet file in 0.169s (5.374s including I/O) | |
Used 7768.2MB of additional memory | |
timeseries.arrow: 3814.7MB, timeseries.parquet: 457.4MB | |
""" | |
try: | |
main() | |
finally: | |
os.remove("timeseries.arrow") | |
os.remove("timeseries.parquet") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment