Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active May 29, 2025 23:24
Show Gist options
  • Save metadaddy/ec9e645fa0929321b626d8be6e11162e to your computer and use it in GitHub Desktop.
Save metadaddy/ec9e645fa0929321b626d8be6e11162e to your computer and use it in GitHub Desktop.
Query the Backblaze Drive Stats data set from Daft (www.getdaft.io)
# Use these values for read-only access to the Drive Stats data set:
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com
AWS_ACCESS_KEY_ID=0045f0571db506a0000000017
AWS_SECRET_ACCESS_KEY=K004Fs/bgmTk5dgo6GAVm2Waj3Ka+TE
import datetime
import time
from typing import Tuple
import daft
import logging
import os
import re
from daft import col
from dotenv import load_dotenv
from pyarrow import fs
from pyiceberg.catalog import LOCATION, load_catalog
from pyiceberg.exceptions import NoSuchNamespaceError
# Never put credentials in source code!
# .env file should contain AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# The .env file in this gist (https://gist.github.com/metadaddy/ec9e645fa0929321b626d8be6e11162e)
# contains credentials with read-only access to the Drive Stats data set
load_dotenv(override=True)
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_REQUEST_TIMEOUT = 20
BYTES_IN_EXABYTE = 1_000_000_000_000_000_000.0
ENDPOINT_URL = os.environ['AWS_ENDPOINT_URL']
BUCKET = 'drivestats-iceberg'
NAMESPACE = 'default'
TABLE_NAME = 'drivestats'
def get_region(endpoint: str):
"""
Extract region from endpoint - you could just configure it separately
"""
endpoint_pattern = re.compile(r'^https://s3\.([a-zA-Z0-9-]+)\.backblazeb2\.com$')
region_match = endpoint_pattern.match(endpoint)
region_name = region_match.group(1)
return region_name
REGION = get_region(ENDPOINT_URL)
def get_metadata_location(bucket, table_name) -> str | None:
"""
Given a bucket and table name, return a URI for the most recent Iceberg metadata JSON file
"""
s3fs = fs.S3FileSystem(
# We need to explicitly set endpoint and region since the underlying
# AWS SDK for C++ does not support the environment variables. See
# https://github.com/aws/aws-sdk-cpp/issues/2587
endpoint_override=ENDPOINT_URL,
region=REGION,
request_timeout=DEFAULT_REQUEST_TIMEOUT,
connect_timeout=DEFAULT_CONNECT_TIMEOUT,
)
# List files at the metadata location
prefix = f"{bucket}/{table_name}/metadata/"
files = s3fs.get_file_info(fs.FileSelector(prefix, True, True))
# Metadata files have suffix '.metadata.json' and are named sequentially with numeric prefixes,
# so we can simply filter the listing, sort it, and take the last element
if len(metadata_locations := sorted([file.path for file in files if file.path.endswith('.metadata.json')])) > 0:
return f's3://{metadata_locations[-1]}'
return None
def time_collect(df: daft.DataFrame) -> Tuple[daft.DataFrame, float]:
"""
Helper function to time collect() call on a dataframe
"""
start = time.perf_counter()
result = df.collect()
return result, time.perf_counter() - start
def main():
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.info('Setting log level for daft.iceberg.iceberg_scan to ERROR to suppress warning about unspecified partition filter')
logging.getLogger('daft.iceberg.iceberg_scan').setLevel(logging.ERROR)
# Create an in-memory catalog for the purposes of accessing the data. We could
# equally use an online catalog.
catalog = load_catalog(
'iceberg',
**{
'uri': 'sqlite:///:memory:',
# The underlying AWS SDK for C++ does not support AWS_ENDPOINT_URL
# and AWS_REGION environment variables (see
# https://github.com/aws/aws-sdk-cpp/issues/2587) so we have to pass
# them explicitly
's3.endpoint': ENDPOINT_URL,
's3.region': REGION,
's3.request-timeout': DEFAULT_REQUEST_TIMEOUT,
's3.connect-timeout': DEFAULT_CONNECT_TIMEOUT,
}
)
catalog.create_namespace(NAMESPACE, { LOCATION: f's3://{BUCKET}/'})
metadata_location = get_metadata_location(BUCKET, TABLE_NAME)
logger.info(f'Metadata located at {metadata_location}')
table = catalog.register_table(f'{NAMESPACE}.{TABLE_NAME}', metadata_location)
drivestats = daft.read_iceberg(table)
# How many records are in the current Drive Stats dataset?
count, elapsed_time = time_collect(drivestats.count())
print(f'Total record count: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
# How many hard drives was Backblaze spinning on a given date?
most_recent_day = drivestats.where(col("date") == datetime.date(2025, 3, 31))
count, elapsed_time = time_collect(most_recent_day.count())
print(f'Record count for 2025-03-31: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
# How many exabytes of raw storage was Backblaze managing on a given date?
total_capacity, elapsed_time = time_collect(
most_recent_day.agg(
col('capacity_bytes').sum()
)
)
print(f'Capacity: {total_capacity.to_pydict()['capacity_bytes'][0] / BYTES_IN_EXABYTE:.3f} EB ({elapsed_time:.2f} seconds)')
# What are the top 10 most common drive models in the dataset?
result, elapsed_time = time_collect(
drivestats.groupby('model').agg(
col('serial_number').count_distinct().alias('count')
).sort(col('count'), desc=True).limit(10)
)
print(f'Top 10 most common drive models:')
result.show(10)
print(f'({elapsed_time:.2f} seconds)')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment