Last active
May 25, 2018 18:35
-
-
Save andrewgross/2b64dde5b3eea14d6f1456a2a704406b to your computer and use it in GitHub Desktop.
PySpark scripts to predict the number of partitions needed to get good output file sizes (100-300MB for Parquet). Also a helper function to determine your average byte array size.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_files_per_partition(df, partition_key, file_type="parquet", compression="snappy", byte_array_size=256): | |
rows = df.count() | |
print "Dataset has {} rows".format(rows) | |
schema = df.schema | |
num_partitions = 1 | |
if partition_key is not None: | |
num_partitions = df.select([partition_key]).distinct().count() | |
print "Dataset has {} distinct partition keys".format(num_partitions) | |
_df = df.drop(partition_key) | |
schema = _df.schema | |
return get_num_files(rows, num_partitions, schema, file_type=file_type, compression=compression, byte_array_size=byte_array_size) | |
def get_num_files(rows, num_partitions, schema, file_type="parquet", compression="snappy", byte_array_size=256): | |
record_size = _get_record_size(schema, file_type, byte_array_size) | |
print "Average Record Size is {} bits".format(record_size) | |
min_partitions_for_file_size = _get_min_partition_based_on_file_size(rows, record_size, num_partitions, compression) | |
return max(1, min_partitions_for_file_size) | |
def _get_record_size(schema, file_type, byte_array_size): | |
size_mapping = get_size_mapping(file_type, byte_array_size) | |
record_size = 0 | |
for field in schema: | |
_type = field.dataType.typeName() | |
field_size = size_mapping[_type] | |
record_size = record_size + field_size | |
return record_size | |
def get_average_byte_array_size(schema): | |
""" | |
Calculate the average size of VARCHAR columns in a schema | |
""" | |
average_size = 0.0 | |
string_fields = 0 | |
total_size = 0.0 | |
for field in schema.fields: | |
if str(field.dataType) == 'StringType': | |
size_string = field.metadata.get('HIVE_TYPE_STRING') | |
if size_string: | |
result = re.search(r'(?P<type>[^\(\)]+)\((?P<size>\d+)\)', size_string) | |
if result: | |
size = result.groupdict().get('size', 0) | |
print "Found field {} with size {}".format(field.name, size) | |
total_size = total_size + int(size) | |
string_fields = string_fields + 1 | |
if string_fields > 0: | |
average_size = total_size / string_fields | |
return average_size | |
def _get_min_partition_based_on_file_size(rows, record_size, num_partitions, compression): | |
""" | |
Based on our data size, calculate how many partitions we need to get ~200MB output files | |
""" | |
compression_ratio = get_compression_ratio(compression) | |
avg_rows_per_partition = rows / num_partitions | |
avg_partition_size = record_size * avg_rows_per_partition | |
print "Average Partition Size is {} bits before compression".format(avg_partition_size) | |
# Convert to bytes | |
avg_partition_size = avg_partition_size / 8 | |
print "Average Partition Size is {} bytes before compression".format(avg_partition_size) | |
# Convert bytes to MB | |
avg_partition_size = avg_partition_size / (1024 * 1024) | |
print "Average Partition Size is {} MB before compression".format(avg_partition_size) | |
avg_partition_size = avg_partition_size * compression_ratio | |
print "Average Partition Size is {} MB after compression".format(avg_partition_size) | |
# Convert to 200MB chunks | |
avg_partition_size = avg_partition_size / 200 | |
return int(ceil(avg_partition_size)) | |
def get_size_mapping(file_type, byte_array_size): | |
""" | |
Get a mapping of the file type to the size in bits of various field types | |
""" | |
if file_type == "parquet": | |
# Convert to Bits | |
string_size = byte_array_size * 8 | |
PARQUET_SIZE_MAPPING = { | |
"integer": 32, | |
"long": 64, | |
"boolean": 1, | |
"float": 32, | |
"double": 64, | |
"decimal": 64, | |
"string": string_size, | |
"date": 32, # Assume no date64 | |
"timestamp": 96, # Assume legacy timestamp | |
} | |
return PARQUET_SIZE_MAPPING | |
return {} | |
def get_compression_ratio(compression): | |
""" | |
Return a floating point scalar for the size after compression. | |
""" | |
if compression == "snappy": | |
return 0.4 | |
return 1.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment