Created
September 28, 2020 00:09
-
-
Save csm10495/b188b266a49610d6d4e3fd690663b183 to your computer and use it in GitHub Desktop.
simple_compression.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Simple compression example | |
(C) - Charles Machalow - MIT License | |
Attempts to compress using a 'bit-compression' algorithm. Basically it goes | |
byte by byte trying to shrink each byte to the minimum number of bits needed to | |
represent each byte. If all 8 bits are needed, it will likely yield a larger than | |
original file. Often all 8 aren't needed if the file is something like ascii. | |
Some optimizations to speed that would likely speed this up greatly | |
- Stop going to string for to get binary representations (use math instead) | |
- Don't modify bit_strings, instead iterate along them | |
- Create a translation_table attribute that gives the table from byte to bit-length-correct bit string | |
(this is instead of computing it over and over at runtime, which is what it does now) | |
- Can split input data into multiple sections, then use threads/processes to compress instead of having | |
one thread go through the whole data. | |
''' | |
def bytes_to_number(data): | |
return int.from_bytes(data, 'little', signed=False) | |
def bits_to_bytes(bit_string, bits_in_byte=8): | |
if bit_string.startswith('0b'): | |
bit_string = bit_string[2:] | |
while len(bit_string) % bits_in_byte != 0: | |
bit_string = '0' + bit_string | |
ret_bytes = b'' | |
while bit_string != '': | |
this_byte = int(bit_string[:bits_in_byte], 2) | |
bit_string = bit_string[bits_in_byte:] | |
ret_bytes += coerce_to_byte(this_byte) | |
return ret_bytes | |
def bytes_to_bits(the_bytes, bits_in_byte=8): | |
ret_bits = '' | |
for b in the_bytes: | |
unexpanded_bits = bin(b)[2:] | |
while len(unexpanded_bits) % bits_in_byte != 0: | |
unexpanded_bits = '0' + unexpanded_bits | |
ret_bits += unexpanded_bits | |
return ret_bits | |
def number_to_bytes(num, num_bytes=None): | |
if num_bytes is None: | |
num_bits = num.bit_length() | |
while num_bits % 8 != 0: | |
num_bits += 1 | |
assert num_bits % 8 == 0, 'math error' | |
num_bytes = int(num_bits / 8) | |
return num.to_bytes(num_bytes, 'little', signed=False) | |
def inverted_dict(d): | |
return {v: k for k, v in d.items()} | |
def coerce_to_bytes(d): | |
if isinstance(d, str): | |
d = d.encode('utf-8') | |
return d | |
def coerce_to_byte(d): | |
if isinstance(d, str): | |
d = d[0].encode('utf-8') | |
elif isinstance(d, int): | |
d = bytes([d]) | |
return d | |
class TranslationTable: | |
VERSION = b'1' | |
EYE_CATCHER = b'COMP' | |
HEADER_LENGTH_SIZE_IN_BYTES = 2 | |
def __init__(self, translation_data=None, data_to_compress=None): | |
self.HEADER_LENGTH_IN_BYTES = len(self.VERSION) + len(self.EYE_CATCHER) + self.HEADER_LENGTH_SIZE_IN_BYTES | |
if translation_data is not None and data_to_compress is not None: | |
raise ValueError("Only one of translation_data/data_to_compress should be given.") | |
if translation_data: | |
self._init_from_translation_data(translation_data) | |
if data_to_compress: | |
self._init_from_data_to_compress(data_to_compress) | |
assert isinstance(self._translations, dict) | |
def __getitem__(self, key): | |
return self._translations[key] | |
def __eq__(self, other): | |
return self._translations == other._translations | |
def _init_from_translation_data(self, translation_data): | |
self._translations = {} | |
if translation_data.startswith(self.EYE_CATCHER): | |
translation_data_length = bytes_to_number(translation_data[5:7]) | |
translation_data = translation_data[7:7+translation_data_length] # skip header and raw data | |
assert len(translation_data) % 2 == 0, 'translation_data should have an even length' | |
for idx in range(0, len(translation_data), 2): | |
actual_byte = coerce_to_byte(translation_data[idx]) | |
compressed_byte = coerce_to_byte(translation_data[idx + 1]) | |
self._translations[actual_byte] = compressed_byte | |
def _init_from_data_to_compress(self, data_to_compress): | |
data_to_compress = coerce_to_bytes(data_to_compress) | |
self._translations = {} | |
max_num = 1 | |
for byte in data_to_compress: | |
if coerce_to_byte(byte) not in self._translations: | |
self._translations[coerce_to_byte(byte)] = coerce_to_byte(max_num) | |
max_num += 1 | |
def is_compressible(self): | |
return len(self._translations) < 0xFF | |
def get_item_length_in_bits(self): | |
return max(v[0] for v in self._translations.values()).bit_length() | |
def get_translation_table_raw(self): | |
ret_bytes = b'' | |
for uncompressed, compressed in self._translations.items(): | |
ret_bytes += uncompressed + compressed | |
return ret_bytes | |
def get_translation_table_with_header(self): | |
raw = self.get_translation_table_raw() | |
# Byte 0-3: COMP | |
# Byte 4: version = 1 | |
# Byte 5-6: length of table | |
# Byte 7:7+length: table | |
ret_bytes = self.EYE_CATCHER + self.VERSION + number_to_bytes(len(raw), self.HEADER_LENGTH_SIZE_IN_BYTES) + raw | |
return ret_bytes | |
def get_inverted_translation_dict(self): | |
return inverted_dict(self._translations) | |
test_data = b'abc' | |
t = TranslationTable(data_to_compress=test_data) | |
t2 = TranslationTable(translation_data=t.get_translation_table_raw()) | |
assert t2 == t | |
t3 = TranslationTable(translation_data=t.get_translation_table_with_header()) | |
assert t3 == t | |
class Compressor: | |
def __init__(self, raw_data): | |
self.raw_data = coerce_to_bytes(raw_data) | |
def compress(self, add_table_header=True): | |
translation_table = TranslationTable(data_to_compress=self.raw_data) | |
item_bit_length = translation_table.get_item_length_in_bits() | |
bit_string = '' | |
for byte in self.raw_data: | |
this_byte = coerce_to_byte(byte) | |
this_byte_compressed_to_x_bits = bin(translation_table[this_byte][0])[2:].rjust(item_bit_length, '0') | |
assert len(this_byte_compressed_to_x_bits) == item_bit_length | |
bit_string += this_byte_compressed_to_x_bits | |
ret_bytes = bits_to_bytes(bit_string, 8) | |
if add_table_header: | |
ret_bytes = translation_table.get_translation_table_with_header() + ret_bytes | |
return ret_bytes | |
t4 = TranslationTable(translation_data=Compressor(test_data).compress()) | |
assert t4 == t | |
class Uncompressor: | |
def __init__(self, compressed_data): | |
self.compressed_data = coerce_to_bytes(compressed_data) | |
def uncompress(self): | |
translation_table = TranslationTable(translation_data=self.compressed_data) | |
reverse_translation_dict = translation_table.get_inverted_translation_dict() | |
actual_compressed_data = self.compressed_data[len(translation_table.get_translation_table_with_header()):] | |
item_bit_length = translation_table.get_item_length_in_bits() | |
in_bits = bytes_to_bits(actual_compressed_data, item_bit_length) | |
expanded_items = b'' | |
current_offset = 0 | |
# expand | |
while in_bits[current_offset:current_offset+item_bit_length]: | |
the_bits = in_bits[current_offset:current_offset+item_bit_length] | |
expanded_items += coerce_to_byte(int(the_bits, 2)) | |
current_offset += item_bit_length | |
# convert | |
ret_bytes = b'' | |
for byte in expanded_items: | |
ret_bytes += reverse_translation_dict[coerce_to_byte(byte)] | |
return ret_bytes | |
compressed = Compressor(test_data).compress() | |
uncompressed = Uncompressor(compressed).uncompress() | |
assert uncompressed == test_data | |
def compression_ratio(raw_data): | |
compressed = Compressor(raw_data).compress() | |
ratio = len(compressed) / len(raw_data) | |
return ratio | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment