-
-
Save metadaddy/ec9e645fa0929321b626d8be6e11162e to your computer and use it in GitHub Desktop.
Query the Backblaze Drive Stats data set from Daft (www.getdaft.io)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Use these values for read-only access to the Drive Stats data set: | |
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com | |
AWS_ACCESS_KEY_ID=0045f0571db506a0000000017 | |
AWS_SECRET_ACCESS_KEY=K004Fs/bgmTk5dgo6GAVm2Waj3Ka+TE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import time | |
from typing import Tuple | |
import daft | |
import logging | |
import os | |
import re | |
from daft import col | |
from dotenv import load_dotenv | |
from pyarrow import fs | |
from pyiceberg.catalog import LOCATION, load_catalog | |
from pyiceberg.exceptions import NoSuchNamespaceError | |
# Never put credentials in source code! | |
# .env file should contain AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY | |
# The .env file in this gist (https://gist.github.com/metadaddy/ec9e645fa0929321b626d8be6e11162e) | |
# contains credentials with read-only access to the Drive Stats data set | |
load_dotenv(override=True) | |
DEFAULT_CONNECT_TIMEOUT = 5 | |
DEFAULT_REQUEST_TIMEOUT = 20 | |
BYTES_IN_EXABYTE = 1_000_000_000_000_000_000.0 | |
ENDPOINT_URL = os.environ['AWS_ENDPOINT_URL'] | |
BUCKET = 'drivestats-iceberg' | |
NAMESPACE = 'default' | |
TABLE_NAME = 'drivestats' | |
def get_region(endpoint: str): | |
""" | |
Extract region from endpoint - you could just configure it separately | |
""" | |
endpoint_pattern = re.compile(r'^https://s3\.([a-zA-Z0-9-]+)\.backblazeb2\.com$') | |
region_match = endpoint_pattern.match(endpoint) | |
region_name = region_match.group(1) | |
return region_name | |
REGION = get_region(ENDPOINT_URL) | |
def get_metadata_location(bucket, table_name) -> str | None: | |
""" | |
Given a bucket and table name, return a URI for the most recent Iceberg metadata JSON file | |
""" | |
s3fs = fs.S3FileSystem( | |
# We need to explicitly set endpoint and region since the underlying | |
# AWS SDK for C++ does not support the environment variables. See | |
# https://github.com/aws/aws-sdk-cpp/issues/2587 | |
endpoint_override=ENDPOINT_URL, | |
region=REGION, | |
request_timeout=DEFAULT_REQUEST_TIMEOUT, | |
connect_timeout=DEFAULT_CONNECT_TIMEOUT, | |
) | |
# List files at the metadata location | |
prefix = f"{bucket}/{table_name}/metadata/" | |
files = s3fs.get_file_info(fs.FileSelector(prefix, True, True)) | |
# Metadata files have suffix '.metadata.json' and are named sequentially with numeric prefixes, | |
# so we can simply filter the listing, sort it, and take the last element | |
if len(metadata_locations := sorted([file.path for file in files if file.path.endswith('.metadata.json')])) > 0: | |
return f's3://{metadata_locations[-1]}' | |
return None | |
def time_collect(df: daft.DataFrame) -> Tuple[daft.DataFrame, float]: | |
""" | |
Helper function to time collect() call on a dataframe | |
""" | |
start = time.perf_counter() | |
result = df.collect() | |
return result, time.perf_counter() - start | |
def main(): | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
logger.info('Setting log level for daft.iceberg.iceberg_scan to ERROR to suppress warning about unspecified partition filter') | |
logging.getLogger('daft.iceberg.iceberg_scan').setLevel(logging.ERROR) | |
# Create an in-memory catalog for the purposes of accessing the data. We could | |
# equally use an online catalog. | |
catalog = load_catalog( | |
'iceberg', | |
**{ | |
'uri': 'sqlite:///:memory:', | |
# The underlying AWS SDK for C++ does not support AWS_ENDPOINT_URL | |
# and AWS_REGION environment variables (see | |
# https://github.com/aws/aws-sdk-cpp/issues/2587) so we have to pass | |
# them explicitly | |
's3.endpoint': ENDPOINT_URL, | |
's3.region': REGION, | |
's3.request-timeout': DEFAULT_REQUEST_TIMEOUT, | |
's3.connect-timeout': DEFAULT_CONNECT_TIMEOUT, | |
} | |
) | |
catalog.create_namespace(NAMESPACE, { LOCATION: f's3://{BUCKET}/'}) | |
metadata_location = get_metadata_location(BUCKET, TABLE_NAME) | |
logger.info(f'Metadata located at {metadata_location}') | |
table = catalog.register_table(f'{NAMESPACE}.{TABLE_NAME}', metadata_location) | |
drivestats = daft.read_iceberg(table) | |
# How many records are in the current Drive Stats dataset? | |
count, elapsed_time = time_collect(drivestats.count()) | |
print(f'Total record count: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)') | |
# How many hard drives was Backblaze spinning on a given date? | |
most_recent_day = drivestats.where(col("date") == datetime.date(2025, 3, 31)) | |
count, elapsed_time = time_collect(most_recent_day.count()) | |
print(f'Record count for 2025-03-31: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)') | |
# How many exabytes of raw storage was Backblaze managing on a given date? | |
total_capacity, elapsed_time = time_collect( | |
most_recent_day.agg( | |
col('capacity_bytes').sum() | |
) | |
) | |
print(f'Capacity: {total_capacity.to_pydict()['capacity_bytes'][0] / BYTES_IN_EXABYTE:.3f} EB ({elapsed_time:.2f} seconds)') | |
# What are the top 10 most common drive models in the dataset? | |
result, elapsed_time = time_collect( | |
drivestats.groupby('model').agg( | |
col('serial_number').count_distinct().alias('count') | |
).sort(col('count'), desc=True).limit(10) | |
) | |
print(f'Top 10 most common drive models:') | |
result.show(10) | |
print(f'({elapsed_time:.2f} seconds)') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment