Last active
March 29, 2025 08:26
-
-
Save remixer-dec/5973aa04bae9efbcf42f6addc4ece1af to your computer and use it in GitHub Desktop.
LZ7BHTS7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def decompress_lz_buffer(src_buffer: bytes) -> bytes: | |
""" | |
Decompress a byte array using custom LZ77 algorithm. | |
Args: | |
src_buffer: Compressed bytes | |
Returns: | |
Decompressed bytes | |
""" | |
dest_buffer = bytearray() | |
src_pos = 0 | |
control_bits = 0x8000 # Initialize with marker indicating need to read next control byte | |
while True: | |
while True: | |
if control_bits == 0x8000: | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
control_bits = (src_buffer[src_pos] << 8) | 0x80 | |
src_pos += 1 | |
flags = control_bits & 0x8000 | |
shifted_bits = control_bits & 0x7fff | |
control_bits = shifted_bits << 1 | |
if flags == 0: | |
break | |
# Literal byte - copy directly | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
dest_buffer.append(src_buffer[src_pos]) | |
src_pos += 1 | |
if shifted_bits == 0x4000: | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
control_bits = (src_buffer[src_pos] << 8) | 0x80 | |
src_pos += 1 | |
flags = control_bits & 0x8000 | |
shifted_bits = control_bits & 0x7fff | |
control_bits = shifted_bits << 1 | |
if flags == 0: | |
offset_bits = 0 | |
if shifted_bits == 0x4000: | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
control_bits = (src_buffer[src_pos] << 8) | 0x80 | |
src_pos += 1 | |
shifted_bits = (control_bits & 0x7fff) << 1 | |
if (control_bits & 0x8000) != 0: | |
offset_bits = 2 | |
if (control_bits & 0x7fff) == 0x4000: | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
shifted_bits = (src_buffer[src_pos] << 8) | 0x80 | |
src_pos += 1 | |
control_bits = (shifted_bits & 0x7fff) << 1 | |
if (shifted_bits & 0x8000) != 0: | |
offset_bits += 1 | |
if src_pos >= len(src_buffer): | |
return bytes(dest_buffer) | |
lookup_pos = len(dest_buffer) + (src_buffer[src_pos] - 0x100) | |
src_pos += 1 | |
copy_length = offset_bits + 2 | |
else: | |
if src_pos + 1 >= len(src_buffer): | |
return bytes(dest_buffer) | |
flag_byte = src_buffer[src_pos + 1] | |
offset_bits = flag_byte & 7 | |
lookup_pos = len(dest_buffer) + (((src_buffer[src_pos] << 8) | flag_byte) >> 3) - 0x2000 | |
if (flag_byte & 7) != 0: | |
copy_length = offset_bits + 2 | |
src_pos += 2 | |
else: | |
if src_pos + 2 >= len(src_buffer): | |
return bytes(dest_buffer) | |
src_pos += 2 | |
if src_buffer[src_pos] == 0: | |
# Check if we've reached the end or decompression failed | |
return bytes(dest_buffer) | |
copy_length = src_buffer[src_pos] + 1 | |
src_pos += 1 | |
# Copy from previous output (LZ77 backreference) | |
for i in range(copy_length): | |
if lookup_pos < 0 or lookup_pos >= len(dest_buffer): | |
# Handle potential out-of-bounds errors | |
return bytes(dest_buffer) | |
dest_buffer.append(dest_buffer[lookup_pos]) | |
lookup_pos += 1 | |
return bytes(dest_buffer) | |
# COMPRESSION CODE IMPLEMENTED BY DIFFERENT LLMS BASED ON DECOMPRESSION CODE PROVIDED | |
# | |
# GEMINI 2.5 PRO Exp | |
# | |
import sys | |
from collections import defaultdict, deque | |
# --- Constants --- | |
WINDOW_SIZE = 8192 | |
MAX_MATCH_LEN = 256 | |
MIN_MATCH_LEN = 2 | |
HASH_KEY_LEN = 3 | |
def find_longest_match_optimized( | |
src_buffer: bytes, | |
src_pos: int, | |
window_start: int, | |
max_match_len: int, | |
hash_table: defaultdict[bytes, deque[int]], | |
min_match_len: int = MIN_MATCH_LEN | |
) -> tuple[int, int]: | |
""" | |
Finds the longest match using a hash table for optimization. | |
Corrected V4: Rigorous verification, explicit offset 1 check. | |
Args: | |
src_buffer: The input data buffer. | |
src_pos: The current position in the input buffer to find a match for. | |
window_start: The starting index (inclusive) of the valid lookback window. | |
max_match_len: The maximum length of a match to consider. | |
hash_table: Dictionary mapping HASH_KEY_LEN byte sequences to deques of positions. | |
min_match_len: Minimum match length to return (must be >= 2). | |
Returns: | |
A tuple (best_match_length, best_match_offset). | |
Returns (0, 0) if no match of length >= min_match_len is found. | |
""" | |
best_match_len = 0 # Initialize with 0, indicating no valid match found yet | |
best_match_offset = 0 | |
src_len = len(src_buffer) | |
# Determine the actual maximum length possible from the current position | |
current_max_len = min(max_match_len, src_len - src_pos) | |
# Not enough remaining bytes for even the smallest possible match | |
if current_max_len < min_match_len: | |
return 0, 0 | |
# --- Hash Table Lookup --- | |
key_found_in_table = False | |
# Only try hash lookup if there are enough bytes to form a key | |
if src_pos + HASH_KEY_LEN <= src_len: | |
key = src_buffer[src_pos : src_pos + HASH_KEY_LEN] | |
if key in hash_table: | |
key_found_in_table = True | |
candidate_positions = hash_table[key] | |
# Iterate from most recent candidate backwards (favors smaller offsets for same length) | |
for i in range(len(candidate_positions) - 1, -1, -1): | |
match_start_pos = candidate_positions[i] | |
# Stop searching this key's list if candidates are too old | |
if match_start_pos < window_start: | |
break | |
# --- Verify and Extend Match --- | |
current_match_len = 0 | |
# Compare byte-by-byte | |
while current_match_len < current_max_len: | |
if src_buffer[match_start_pos + current_match_len] != src_buffer[src_pos + current_match_len]: | |
break # Mismatch found | |
current_match_len += 1 | |
# --- End of Match Extension --- | |
# Update best match IF: | |
# 1. It meets the minimum required length (>= min_match_len) | |
# 2. It's strictly longer than the current best match found | |
# (or equal length but smaller offset - handled by iteration order) | |
# Let's stick to standard LZ77: strictly longer OR equal length (first one found = smallest offset) | |
# Simplified: Update if current is longer. If equal, iteration order handles offset preference. | |
if current_match_len >= min_match_len and current_match_len > best_match_len: | |
best_match_len = current_match_len | |
best_match_offset = src_pos - match_start_pos | |
# Optimization: If we found the absolute max possible length, stop. | |
if best_match_len >= current_max_len: | |
break # Exit inner loop (over candidate positions) | |
# Early exit from function if max length was found in the inner loop | |
if best_match_len >= current_max_len: | |
# Ensure offset is valid before returning | |
return (best_match_len, best_match_offset) if best_match_offset > 0 else (0, 0) | |
# --- Explicit Check for Offset 1 --- | |
# This is important regardless of HASH_KEY_LEN, as offset 1 is common. | |
# Check if offset 1 provides a match that is >= min_len and LONGER | |
# than the best one found via hash search. | |
if current_max_len >= min_match_len: # Check only if a valid match length is possible | |
prev_pos = src_pos - 1 | |
if prev_pos >= window_start: | |
len_offset1 = 0 | |
# Calculate length of match at offset 1 | |
while len_offset1 < current_max_len: | |
if src_buffer[prev_pos + len_offset1] != src_buffer[src_pos + len_offset1]: | |
break | |
len_offset1 += 1 | |
# Update best match if offset 1 gives a valid match AND it's strictly longer | |
# than any match found by the hash search. | |
if len_offset1 >= min_match_len and len_offset1 > best_match_len: | |
best_match_len = len_offset1 | |
best_match_offset = 1 | |
# Also handle the case where hash search found *nothing* valid (best_match_len < min_match_len) | |
# but offset 1 provides a valid match. | |
elif best_match_len < min_match_len and len_offset1 >= min_match_len: | |
best_match_len = len_offset1 | |
best_match_offset = 1 | |
# Final validation: Ensure the chosen match meets min length and has a valid offset. | |
if best_match_len < min_match_len or best_match_offset <= 0: | |
return 0, 0 | |
return best_match_len, best_match_offset | |
def compress_lz_buffer_gemini(src_buffer: bytes) -> bytes: | |
""" | |
Compresses a byte array using the custom LZ77 algorithm (optimized + corrected V4), | |
compatible with the provided decompress_lz_buffer. Strives for correctness, | |
performance, and good compression ratio. Includes improved offset 1 handling. | |
Args: | |
src_buffer: Uncompressed bytes. | |
Returns: | |
Compressed bytes. | |
""" | |
if not src_buffer: | |
return b"" | |
src_len = len(src_buffer) | |
src_pos = 0 | |
dest_buffer = bytearray() | |
# Hash table: maps HASH_KEY_LEN bytes prefix -> deque of starting positions | |
hash_table = defaultdict(deque) | |
# Control bit handling state | |
control_byte_pos = -1 | |
control_bits_count = 0 | |
current_control_byte = 0 | |
def write_control_bit(bit: int): | |
"""Appends a control bit, flushing the control byte to dest_buffer if full.""" | |
nonlocal control_byte_pos, control_bits_count, current_control_byte, dest_buffer | |
if control_bits_count == 0: | |
control_byte_pos = len(dest_buffer) | |
dest_buffer.append(0) # Placeholder | |
current_control_byte = (current_control_byte << 1) | (bit & 1) | |
control_bits_count += 1 | |
if control_bits_count == 8: | |
dest_buffer[control_byte_pos] = current_control_byte | |
control_bits_count = 0 | |
current_control_byte = 0 | |
# --- Main Compression Loop --- | |
last_inserted_hash_pos = -1 | |
while src_pos < src_len: | |
# --- Update Hash Table --- | |
window_start = max(0, src_pos - WINDOW_SIZE) | |
target_insert_pos = src_pos - 1 # Insert keys for positions up to here | |
while last_inserted_hash_pos < target_insert_pos: | |
insert_pos = last_inserted_hash_pos + 1 | |
if insert_pos + HASH_KEY_LEN <= src_len: | |
key = src_buffer[insert_pos : insert_pos + HASH_KEY_LEN] | |
positions = hash_table[key] | |
# Prune based on the window relevant to the search @ src_pos | |
while positions and positions[0] < window_start: | |
positions.popleft() | |
positions.append(insert_pos) | |
last_inserted_hash_pos = insert_pos | |
# --- Find Longest Match --- | |
# Uses the improved finder with better offset 1 handling | |
match_len, match_offset = find_longest_match_optimized( | |
src_buffer, src_pos, window_start, MAX_MATCH_LEN, hash_table, MIN_MATCH_LEN | |
) | |
# --- Encoding Decision (Identical logic to the correct original version) --- | |
encode_as_literal = True | |
if match_len >= MIN_MATCH_LEN: # A valid match was found | |
# Determine encoding feasibility | |
can_encode_short = (2 <= match_len <= 5 and 1 <= match_offset <= 256) | |
can_encode_long = (match_len >= 2 and 1 <= match_offset <= WINDOW_SIZE) | |
# Prefer short encoding if applicable | |
if can_encode_short: | |
write_control_bit(0); write_control_bit(0) | |
length_bits = match_len - 2 | |
write_control_bit((length_bits >> 1) & 1) | |
write_control_bit(length_bits & 1) | |
offset_byte = (0x100 - match_offset) & 0xFF | |
dest_buffer.append(offset_byte) | |
src_pos += match_len # Advance position | |
encode_as_literal = False | |
elif can_encode_long: # Use long encoding otherwise | |
write_control_bit(0); write_control_bit(1) | |
value_base = (0x2000 - match_offset) << 3 | |
if 3 <= match_len <= 9: # Length fits in 3 bits | |
length_bits = match_len - 2 | |
value = value_base | length_bits | |
dest_buffer.append((value >> 8) & 0xFF) | |
dest_buffer.append(value & 0xFF) | |
src_pos += match_len | |
encode_as_literal = False | |
elif match_len >= 2: # Use extended length byte | |
length_bits = 0 | |
value = value_base | length_bits | |
dest_buffer.append((value >> 8) & 0xFF) | |
dest_buffer.append(value & 0xFF) | |
len_byte = match_len - 1 | |
dest_buffer.append(len_byte & 0xFF) | |
src_pos += match_len | |
encode_as_literal = False | |
# --- Encode as Literal --- | |
if encode_as_literal: | |
write_control_bit(1) | |
# Critical: Ensure we append the byte from the correct current position | |
dest_buffer.append(src_buffer[src_pos]) | |
src_pos += 1 # Advance by 1 for the literal | |
# --- Finalization (Identical to the correct original version) --- | |
if control_bits_count > 0: | |
final_control_byte = current_control_byte << (8 - control_bits_count) | |
dest_buffer[control_byte_pos] = final_control_byte & 0xFF | |
return bytes(dest_buffer) | |
# | |
# Claude 3.7 Sonnet (variant 1) | |
# | |
def compress_lz_buffer_claude(data: bytes) -> bytes: | |
""" | |
Custom LZ77 compressor, optimized for performance. | |
Args: | |
data: Uncompressed bytes | |
Returns: | |
Compressed bytes | |
""" | |
if not data: | |
return b'' | |
result = bytearray() | |
pos = 0 | |
data_len = len(data) | |
# Enhanced dictionary using a rolling hash for better match finding | |
position_dict = {} | |
# Hash function that works with variable-length input | |
def hash_sequence(p, length=4): | |
h = 0 | |
for i in range(min(length, data_len - p)): | |
h = ((h * 23) + data[p + i]) & 0xFFFF | |
return h | |
def find_best_match(pos): | |
"""Find the best match at current position using rolling hash.""" | |
best_match_len = 0 | |
best_match_offset = 0 | |
# Can't match if less than 3 bytes left | |
if pos + 2 >= data_len: | |
return 0, 0 | |
# Maximum search distance is 0x1FFF (format limitation) | |
min_pos = max(0, pos - 0x1FFF) | |
# Get hash for current position (4-byte hash for better distribution) | |
h = hash_sequence(pos) | |
# Check all positions with this hash | |
if h in position_dict: | |
# Process matches from newest to oldest (usually better compression) | |
for prev_pos in reversed(position_dict[h]): | |
if prev_pos < min_pos: | |
continue | |
# Verify match (hash collision check) | |
if data[prev_pos] != data[pos]: | |
continue | |
# Calculate match length | |
offset = pos - prev_pos | |
match_len = 0 | |
# Check 4 bytes at once when possible for better performance | |
if pos + 3 < data_len and prev_pos + 3 < pos: | |
if data[prev_pos:prev_pos+4] == data[pos:pos+4]: | |
match_len = 4 | |
# Continue matching byte by byte | |
while (pos + match_len < data_len and | |
prev_pos + match_len < pos and | |
match_len < 255 and | |
data[prev_pos + match_len] == data[pos + match_len]): | |
match_len += 1 | |
else: | |
# Byte-by-byte matching | |
while (pos + match_len < data_len and | |
prev_pos + match_len < pos and | |
match_len < 255 and | |
data[prev_pos + match_len] == data[pos + match_len]): | |
match_len += 1 | |
# Only consider matches of at least 3 bytes | |
if match_len >= 3 and match_len > best_match_len: | |
best_match_len = match_len | |
best_match_offset = offset | |
# Early termination for exceptional matches | |
if match_len > 32: | |
break | |
# Add current position to hash table | |
if h not in position_dict: | |
position_dict[h] = [] | |
position_dict[h].append(pos) | |
# Limit entries to prevent memory bloat | |
if len(position_dict[h]) > 256: | |
position_dict[h] = position_dict[h][-192:] # Keep most recent positions | |
return best_match_len, best_match_offset | |
# Calculate encoding costs for optimization decisions | |
def calculate_match_cost(length): | |
# Short form (length ≤ 9): 2 control bits + 2 bytes | |
# Long form (length > 9): 2 control bits + 3 bytes | |
return 2 + (2 if length <= 9 else 3) | |
def calculate_literal_cost(count): | |
# Each literal: 1 control bit + 1 byte | |
return count * (1 + 8) | |
# Process the data - with exact compatibility with decompressor | |
control_byte = 0 | |
control_bit_pos = 7 # Start with the highest bit (bit 7) | |
control_byte_pos = -1 | |
while pos < data_len: | |
# If we need a new control byte | |
if control_bit_pos == 7: | |
control_byte_pos = len(result) | |
result.append(0) # Placeholder | |
control_byte = 0 | |
# Find best match at current position | |
best_match_len, best_match_offset = find_best_match(pos) | |
# Optimal parsing: Check if skipping bytes gives better compression | |
use_lazy = False | |
# Only consider lazy evaluation for smaller matches | |
if best_match_len >= 3 and best_match_len < 20: | |
# Look ahead multiple positions | |
max_lookahead = min(4, best_match_len - 1) # Look ahead at most 4 positions | |
for skip in range(1, max_lookahead + 1): | |
if pos + skip >= data_len: | |
break | |
# Find match at skip position | |
next_match_len, next_match_offset = find_best_match(pos + skip) | |
if next_match_len < 3: | |
continue | |
# Compare encoding efficiency | |
current_cost = calculate_match_cost(best_match_len) | |
lazy_cost = calculate_literal_cost(skip) + calculate_match_cost(next_match_len) | |
# If encoding literals + next match is more efficient, | |
# or if next match is significantly better | |
if (lazy_cost < current_cost or | |
next_match_len > best_match_len + skip + 1): | |
use_lazy = True | |
break | |
if best_match_len < 3 or use_lazy: | |
# Literal byte - set control bit to 1 | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Add literal byte | |
result.append(data[pos]) | |
pos += 1 | |
else: | |
# Back-reference - leave control bit as 0 (already 0 by default) | |
control_bit_pos -= 1 | |
# If we need a new control byte for the next control bit | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_byte_pos = len(result) | |
result.append(0) | |
control_byte = 0 | |
control_bit_pos = 7 | |
# Set next control bit to 1 (use second form of back-reference) | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Calculate offset in the format expected by decompressor | |
offset_value = (0x2000 - best_match_offset) << 3 | |
if best_match_len <= 9: | |
# Short form: encode length in lower 3 bits | |
length_bits = best_match_len - 2 | |
encoded_value = offset_value | length_bits | |
high_byte = (encoded_value >> 8) & 0xFF | |
low_byte = encoded_value & 0xFF | |
result.append(high_byte) | |
result.append(low_byte) | |
else: | |
# Long form: length in separate byte (lower 3 bits are 0) | |
high_byte = (offset_value >> 8) & 0xFF | |
low_byte = offset_value & 0xFF | |
result.append(high_byte) | |
result.append(low_byte) | |
result.append(best_match_len - 1) # Length byte | |
pos += best_match_len | |
# If we've used all 8 bits, update the control byte | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_bit_pos = 7 | |
# Update the final control byte if there are bits left | |
if control_bit_pos < 7: | |
result[control_byte_pos] = control_byte | |
return bytes(result) | |
# | |
# Claude 3.7 Sonnet (variant 2) | |
# | |
def compress_lz_buffer_claudev2(data: bytes) -> bytes: | |
""" | |
Optimized LZ77 compressor with excellent compression ratio and performance. | |
Args: | |
data: Uncompressed bytes | |
Returns: | |
Compressed bytes | |
""" | |
if not data: | |
return b'' | |
result = bytearray() | |
pos = 0 | |
data_len = len(data) | |
# Large hash table for better distribution | |
hash_size = 32768 # 32K hash table | |
hash_mask = hash_size - 1 | |
hash_table = [[] for _ in range(hash_size)] | |
# Dictionary for recent 3-byte sequences | |
recent_matches = {} | |
def hash_function(pos): | |
"""Fast 3-byte hash function""" | |
if pos + 2 >= data_len: | |
return ((data[pos] << 8) | (data[pos+1] if pos+1 < data_len else 0)) & hash_mask | |
# Optimized hash calculation using bit shifts | |
return ((data[pos] << 16) | (data[pos+1] << 8) | data[pos+2]) & hash_mask | |
def find_best_match(pos): | |
best_match_len = 0 | |
best_match_offset = 0 | |
# Can't match if less than 3 bytes left | |
if pos + 2 >= data_len: | |
return 0, 0 | |
# Maximum search distance is 0x1FFF (format limitation) | |
min_pos = max(0, pos - 0x1FFF) | |
# 1. Quick check for recent 3-byte matches | |
if pos + 2 < data_len: | |
key = (data[pos], data[pos+1], data[pos+2]) | |
if key in recent_matches and recent_matches[key] >= min_pos: | |
prev_pos = recent_matches[key] | |
# We know first 3 bytes match | |
match_len = 3 | |
offset = pos - prev_pos | |
# Extend match byte by byte | |
while (pos + match_len < data_len and | |
prev_pos + match_len < pos and | |
match_len < 255 and | |
data[prev_pos + match_len] == data[pos + match_len]): | |
match_len += 1 | |
if match_len >= 3: | |
best_match_len = match_len | |
best_match_offset = offset | |
# Update recent matches dictionary | |
recent_matches[key] = pos | |
# Periodically clear dictionary to prevent bloat | |
if len(recent_matches) > 8192: | |
recent_matches.clear() | |
# 2. Hash table lookup | |
h = hash_function(pos) | |
candidates = hash_table[h] | |
# Limit traversal depth for better performance | |
max_candidates = min(48, len(candidates)) | |
# Process newest to oldest | |
for i in range(max_candidates): | |
if i >= len(candidates): | |
break | |
prev_pos = candidates[len(candidates) - 1 - i] | |
if prev_pos < min_pos: | |
break | |
# Skip if first byte doesn't match | |
if data[prev_pos] != data[pos]: | |
continue | |
# Optimization: Check 4 bytes at once when possible | |
offset = pos - prev_pos | |
# Block-based matching strategy | |
match_len = 0 | |
while (pos + match_len + 3 < data_len and | |
prev_pos + match_len + 3 < pos and | |
match_len < 252 and | |
data[prev_pos + match_len:prev_pos + match_len + 4] == data[pos + match_len:pos + match_len + 4]): | |
match_len += 4 | |
# Finish matching byte by byte | |
while (pos + match_len < data_len and | |
prev_pos + match_len < pos and | |
match_len < 255 and | |
data[prev_pos + match_len] == data[pos + match_len]): | |
match_len += 1 | |
# Only consider matches of at least 3 bytes | |
if match_len >= 3 and match_len > best_match_len: | |
best_match_len = match_len | |
best_match_offset = offset | |
# Early termination for excellent matches | |
if match_len > 40: | |
break | |
# Add current position to hash table | |
hash_table[h].append(pos) | |
# Keep hash chains to a reasonable length | |
if len(hash_table[h]) > 64: | |
hash_table[h] = hash_table[h][-48:] # Keep most recent positions | |
return best_match_len, best_match_offset | |
# Process the data with control byte encoding | |
control_byte = 0 | |
control_bit_pos = 7 | |
control_byte_pos = -1 | |
while pos < data_len: | |
# If we need a new control byte | |
if control_bit_pos == 7: | |
control_byte_pos = len(result) | |
result.append(0) # Placeholder | |
control_byte = 0 | |
# Find best match at current position | |
best_match_len, best_match_offset = find_best_match(pos) | |
# Advanced lazy matching for better compression | |
use_literal = False | |
# Only consider lazy matching for smaller matches | |
if best_match_len >= 3 and best_match_len < 24: | |
# Look ahead up to 4 positions | |
max_lookahead = min(4, best_match_len - 1) | |
for skip in range(1, max_lookahead + 1): | |
if pos + skip >= data_len: | |
break | |
# Find match at skip position | |
ahead_len, ahead_offset = find_best_match(pos + skip) | |
if ahead_len < 3: | |
continue | |
# Use literals if next match is significantly better | |
if ahead_len > best_match_len + skip or (ahead_len > best_match_len and ahead_len > 16): | |
use_literal = True | |
break | |
if best_match_len < 3 or use_literal: | |
# Literal byte - set control bit to 1 | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Add literal byte | |
result.append(data[pos]) | |
pos += 1 | |
else: | |
# Back-reference - leave control bit as 0 | |
control_bit_pos -= 1 | |
# If we need a new control byte for the next control bit | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_byte_pos = len(result) | |
result.append(0) | |
control_byte = 0 | |
control_bit_pos = 7 | |
# Set next control bit to 1 (for back-reference format) | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Calculate offset in the format expected by decompressor | |
offset_value = (0x2000 - best_match_offset) << 3 | |
if best_match_len <= 9: | |
# Short form: encode length in lower 3 bits | |
length_bits = best_match_len - 2 | |
encoded_value = offset_value | length_bits | |
high_byte = (encoded_value >> 8) & 0xFF | |
low_byte = encoded_value & 0xFF | |
result.append(high_byte) | |
result.append(low_byte) | |
else: | |
# Long form: length in separate byte | |
high_byte = (offset_value >> 8) & 0xFF | |
low_byte = offset_value & 0xFF | |
result.append(high_byte) | |
result.append(low_byte) | |
result.append(best_match_len - 1) # Length byte | |
pos += best_match_len | |
# If we've used all 8 bits, update the control byte | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_bit_pos = 7 | |
# Update the final control byte | |
if control_bit_pos < 7: | |
result[control_byte_pos] = control_byte | |
return bytes(result) | |
# | |
# o3-mini + GPT4o | |
# | |
# | |
from collections import deque | |
def compress_lz_buffer_o3mini(data: bytes) -> bytes: | |
""" | |
Compress a byte array using a custom LZ77 algorithm compatible with decompress_lz_buffer. | |
Control–bit stream: | |
- For each literal, emit one control bit 1 followed by one literal byte. | |
- For each back–reference, emit two control bits: | |
first bit 0 (leaving the bit as 0) then second bit 1, | |
then output either 2 bytes (for a match of length 3–9) or 3 bytes (for a match of length ≥10). | |
Back–reference encoding: | |
- Let offset = distance (1..0x1FFF) and L = match length. | |
- Compute x = 0x2000 - offset. | |
- For short matches (L in [3,9]): output two bytes: (x << 3) | (L - 2) | |
- For extended matches (L ≥10): output two bytes with lower 3 bits 0, then a third byte equal to (L - 1) | |
A dictionary maps 3-byte sequences to a deque of positions (only positions within the last 0x1FFF bytes are kept) | |
to speed up finding the best match. | |
Args: | |
data: Uncompressed bytes. | |
Returns: | |
Compressed bytes. | |
""" | |
if not data: | |
return b"" | |
n = len(data) | |
pos = 0 | |
result = bytearray() | |
control_byte = 0 | |
control_bit_pos = 7 | |
control_byte_pos = None | |
def flush_control_bit(bit: int): | |
nonlocal control_byte, control_bit_pos, control_byte_pos, result | |
if control_bit_pos == 7: | |
control_byte_pos = len(result) | |
result.append(0) | |
control_byte = 0 | |
control_byte |= (bit & 1) << control_bit_pos | |
control_bit_pos -= 1 | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte & 0xFF | |
control_bit_pos = 7 | |
dict3 = {} | |
max_offset = 0x1FFF | |
def find_best_match(cur: int): | |
best_len = 0 | |
best_off = 0 | |
if cur > n - 3: | |
return best_len, best_off | |
key = data[cur:cur+3] | |
candidates = dict3.get(key) | |
if not candidates: | |
return best_len, best_off | |
while candidates and candidates[0] < cur - max_offset: | |
candidates.popleft() | |
for cand in reversed(candidates): | |
length = 0 | |
while cur + length < n and (cand + length) < n and data[cand + length] == data[cur + length] and length < 255: | |
length += 1 | |
if length > best_len: | |
best_len = length | |
best_off = cur - cand | |
if best_len == 255: | |
break | |
return best_len, best_off | |
def update_dict(index: int): | |
if index <= n - 3: | |
key = data[index:index+3] | |
if key not in dict3: | |
dict3[key] = deque() | |
dict3[key].append(index) | |
while pos < n: | |
best_len, best_off = find_best_match(pos) | |
if best_len < 3: | |
flush_control_bit(1) | |
result.append(data[pos]) | |
update_dict(pos) | |
pos += 1 | |
else: | |
if best_len > 256: | |
best_len = 256 | |
flush_control_bit(0) | |
flush_control_bit(1) | |
x = 0x2000 - best_off | |
if best_len <= 9: | |
value = (x << 3) | (best_len - 2) | |
result.append((value >> 8) & 0xFF) | |
result.append(value & 0xFF) | |
else: | |
value = (x << 3) | |
result.append((value >> 8) & 0xFF) | |
result.append(value & 0xFF) | |
result.append(best_len - 1) | |
end = pos + best_len | |
for j in range(pos, min(end, n)): | |
update_dict(j) | |
pos += best_len | |
if control_bit_pos != 7: | |
result[control_byte_pos] = control_byte & 0xFF | |
return bytes(result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#### NUMBA PORT | |
# | |
# Gemini 2.5 Pro Exp | |
# | |
import sys | |
import numpy as np | |
import numba | |
# --- Constants --- | |
WINDOW_SIZE = 8192 | |
MAX_MATCH_LEN = 256 | |
MIN_MATCH_LEN = 2 | |
HASH_KEY_LEN = 3 | |
# Size of the hash table (prime number often helps distribution) | |
# Adjust based on typical input size and memory constraints if needed | |
HASH_TABLE_SIZE = 16381 # A prime number roughly 2*WINDOW_SIZE | |
# Sentinel value for hash arrays indicating no entry or end of chain | |
HASH_SENTINEL = -1 | |
@numba.njit(cache=True, fastmath=True) | |
def calculate_hash(buffer: np.ndarray, pos: int) -> int: | |
"""Calculates a simple integer hash for HASH_KEY_LEN bytes.""" | |
# Simple rolling hash or just combine bytes - ensure it fits in standard int | |
# Using a basic combination here. More sophisticated hashes exist. | |
# Check bounds: pos + HASH_KEY_LEN <= len(buffer) must be ensured by caller | |
h = 0 | |
# Unrolled for HASH_KEY_LEN = 3 | |
h = (h * 257 + buffer[pos]) & 0xFFFFFFFF # Use a prime multiplier | |
h = (h * 257 + buffer[pos + 1]) & 0xFFFFFFFF | |
h = (h * 257 + buffer[pos + 2]) & 0xFFFFFFFF | |
return h % HASH_TABLE_SIZE | |
@numba.njit(cache=True, fastmath=True) | |
def find_longest_match_numba( | |
src_buffer: np.ndarray, # Input buffer as numpy uint8 array | |
src_pos: int, # Current position | |
window_start: int, # Start of lookback window | |
max_match_len: int, # Max allowed match length | |
head_array: np.ndarray, # Hash table head (int32/int64 array) | |
prev_array: np.ndarray, # Hash table prev links (int32/int64 array) | |
min_match_len: int # Minimum match length to return | |
) -> tuple[int, int]: | |
""" | |
Numba-accelerated function to find the longest match using hash chains. | |
Args: | |
src_buffer: NumPy uint8 array of input data. | |
src_pos: Current position in src_buffer. | |
window_start: Start index (inclusive) of the lookback window. | |
max_match_len: Max match length allowed by format. | |
head_array: Array where head_array[hash] gives the most recent pos for that hash. | |
prev_array: Array where prev_array[pos] gives the previous pos with the same hash. | |
min_match_len: Minimum length for a valid match (typically 2). | |
Returns: | |
Tuple (best_match_length, best_match_offset), or (0, 0) if no valid match. | |
""" | |
best_match_len: int = 0 | |
best_match_offset: int = 0 | |
src_len: int = len(src_buffer) | |
# Determine the actual maximum length possible from the current position | |
current_max_len: int = min(max_match_len, src_len - src_pos) | |
# Not enough remaining bytes for even the smallest possible match | |
if current_max_len < min_match_len: | |
return 0, 0 | |
# --- Hash Chain Lookup --- | |
# Only use hash if we have enough bytes left for a key | |
hash_match_len: int = 0 | |
hash_match_offset: int = 0 | |
if src_pos + HASH_KEY_LEN <= src_len: | |
hash_val = calculate_hash(src_buffer, src_pos) | |
match_start_pos = head_array[hash_val] | |
# Traverse the hash chain for this hash value | |
chain_count = 0 # Limit chain traversal depth for performance/safety | |
max_chain_depth = 256 # Heuristic limit | |
while match_start_pos != HASH_SENTINEL and \ | |
match_start_pos >= window_start and \ | |
chain_count < max_chain_depth: | |
# --- Verify and Extend Match --- | |
# Check the first few bytes first for quick rejection if needed | |
# (already implicitly done by hash, but explicit check is safe) | |
if src_buffer[match_start_pos] == src_buffer[src_pos]: | |
current_match_len = 0 | |
# Compare byte-by-byte | |
while current_match_len < current_max_len: | |
if src_buffer[match_start_pos + current_match_len] != src_buffer[src_pos + current_match_len]: | |
break # Mismatch found | |
current_match_len += 1 | |
# Update best match found *via hash* if this one is longer | |
if current_match_len > hash_match_len: | |
hash_match_len = current_match_len | |
hash_match_offset = src_pos - match_start_pos | |
# Optimization: If we found the absolute max possible length, stop chain traversal | |
if hash_match_len >= current_max_len: | |
break | |
# --- End Verification --- | |
# Move to the previous position in the chain | |
match_start_pos = prev_array[match_start_pos] | |
chain_count += 1 | |
# --- Explicit Check for Offset 1 --- | |
# Crucial for capturing short repeats and potentially longer matches missed by hash | |
len_offset1: int = 0 | |
offset1_match_offset: int = 0 | |
if min_match_len <= current_max_len: # Check only if a valid match length is possible | |
prev_pos = src_pos - 1 | |
if prev_pos >= window_start: | |
# Calculate length of match at offset 1 | |
while len_offset1 < current_max_len: | |
if src_buffer[prev_pos + len_offset1] != src_buffer[src_pos + len_offset1]: | |
break | |
len_offset1 += 1 | |
if len_offset1 >= min_match_len: | |
offset1_match_offset = 1 | |
# --- Decide Final Match --- | |
# Prioritize the longest match. If lengths are equal, offset 1 is usually | |
# slightly cheaper to encode in some LZ variants, but this format doesn't | |
# differentiate much. Standard LZ77 just takes the longest. If lengths are equal, | |
# the hash search naturally found the smallest offset among hash candidates. | |
# Let's prioritize strictly longer matches. | |
if len_offset1 > hash_match_len: | |
best_match_len = len_offset1 | |
best_match_offset = offset1_match_offset | |
elif hash_match_len >= min_match_len: # Only consider hash match if valid | |
best_match_len = hash_match_len | |
best_match_offset = hash_match_offset | |
else: # Neither hash nor offset 1 produced a valid match >= min_match_len | |
# Check if offset 1 produced *any* valid match when hash didn't | |
if len_offset1 >= min_match_len: | |
best_match_len = len_offset1 | |
best_match_offset = offset1_match_offset | |
else: # No valid match found at all | |
best_match_len = 0 | |
best_match_offset = 0 | |
# Final validation (redundant if logic above is correct, but safe) | |
if best_match_len < min_match_len or best_match_offset <= 0: | |
return 0, 0 | |
return best_match_len, best_match_offset | |
def compress_lz_buffer_gemini_numba(src_buffer: bytes) -> bytes: | |
""" | |
Compresses a byte array using the custom LZ77 algorithm (Numba accelerated), | |
compatible with the provided decompress_lz_buffer. | |
Args: | |
src_buffer: Uncompressed bytes. | |
Returns: | |
Compressed bytes. | |
""" | |
if not src_buffer: | |
return b"" | |
src_len = len(src_buffer) | |
# Convert source to NumPy array for Numba compatibility | |
src_np = np.frombuffer(src_buffer, dtype=np.uint8) | |
# Initialize hash table arrays (use int32/int64 based on expected src_len) | |
# Using int64 for safety with potentially large inputs | |
head_array = np.full(HASH_TABLE_SIZE, HASH_SENTINEL, dtype=np.int64) | |
# prev_array needs indices up to src_len - 1 | |
prev_array = np.full(src_len, HASH_SENTINEL, dtype=np.int64) | |
src_pos = 0 | |
dest_buffer = bytearray() # Use standard bytearray for output | |
# Control bit handling state (managed in Python) | |
control_byte_pos = -1 | |
control_bits_count = 0 | |
current_control_byte = 0 | |
def write_control_bit(bit: int): | |
"""Appends a control bit, flushing the control byte to dest_buffer if full.""" | |
nonlocal control_byte_pos, control_bits_count, current_control_byte, dest_buffer | |
if control_bits_count == 0: | |
control_byte_pos = len(dest_buffer) | |
dest_buffer.append(0) # Placeholder | |
current_control_byte = (current_control_byte << 1) | (bit & 1) | |
control_bits_count += 1 | |
if control_bits_count == 8: | |
dest_buffer[control_byte_pos] = current_control_byte | |
control_bits_count = 0 | |
current_control_byte = 0 | |
# --- Main Compression Loop --- | |
last_inserted_hash_pos = -1 | |
while src_pos < src_len: | |
# --- Update Hash Table --- | |
window_start = max(0, src_pos - WINDOW_SIZE) | |
target_insert_pos = src_pos - 1 # Insert keys for positions up to here | |
# Insert hash entries for positions not yet processed | |
while last_inserted_hash_pos < target_insert_pos: | |
insert_pos = last_inserted_hash_pos + 1 | |
# Ensure we can form a key of HASH_KEY_LEN bytes starting at insert_pos | |
if insert_pos + HASH_KEY_LEN <= src_len: | |
# Calculate hash for the position being inserted | |
hash_val = calculate_hash(src_np, insert_pos) | |
# Link the previous head to this new position | |
prev_array[insert_pos] = head_array[hash_val] | |
# Update the head to point to this new position | |
head_array[hash_val] = insert_pos | |
# Note: Pruning happens implicitly in the finder by checking window_start | |
last_inserted_hash_pos = insert_pos | |
# --- Find Longest Match --- | |
# Call the Numba-accelerated function | |
match_len, match_offset = find_longest_match_numba( | |
src_np, src_pos, window_start, MAX_MATCH_LEN, | |
head_array, prev_array, MIN_MATCH_LEN | |
) | |
# --- Encoding Decision (Identical logic to the correct Python version) --- | |
encode_as_literal = True | |
if match_len >= MIN_MATCH_LEN: # A valid match was found | |
# Determine encoding feasibility | |
can_encode_short = (2 <= match_len <= 5 and 1 <= match_offset <= 256) | |
can_encode_long = (match_len >= 2 and 1 <= match_offset <= WINDOW_SIZE) | |
# Prefer short encoding if applicable | |
if can_encode_short: | |
write_control_bit(0); write_control_bit(0) | |
length_bits = match_len - 2 | |
write_control_bit((length_bits >> 1) & 1) | |
write_control_bit(length_bits & 1) | |
offset_byte = (0x100 - match_offset) & 0xFF | |
dest_buffer.append(offset_byte) | |
src_pos += match_len # Advance position | |
encode_as_literal = False | |
elif can_encode_long: # Use long encoding otherwise | |
write_control_bit(0); write_control_bit(1) | |
value_base = (0x2000 - match_offset) << 3 | |
if 3 <= match_len <= 9: # Length fits in 3 bits | |
length_bits = match_len - 2 | |
value = value_base | length_bits | |
dest_buffer.append((value >> 8) & 0xFF) | |
dest_buffer.append(value & 0xFF) | |
src_pos += match_len | |
encode_as_literal = False | |
elif match_len >= 2: # Use extended length byte | |
length_bits = 0 | |
value = value_base | length_bits | |
dest_buffer.append((value >> 8) & 0xFF) | |
dest_buffer.append(value & 0xFF) | |
len_byte = match_len - 1 | |
dest_buffer.append(len_byte & 0xFF) | |
src_pos += match_len | |
encode_as_literal = False | |
# --- Encode as Literal --- | |
if encode_as_literal: | |
write_control_bit(1) | |
# Append the byte from the correct current position | |
dest_buffer.append(src_np[src_pos]) # Access NumPy array | |
src_pos += 1 # Advance by 1 for the literal | |
# --- Finalization --- | |
if control_bits_count > 0: | |
final_control_byte = current_control_byte << (8 - control_bits_count) | |
dest_buffer[control_byte_pos] = final_control_byte & 0xFF | |
return bytes(dest_buffer) # Convert final bytearray back to bytes | |
# | |
# Claude 3.7 sonnet | |
# | |
import numba as nb | |
import numpy as np | |
@nb.njit(cache=True) | |
def compress_lz_buffer_numba(data): | |
"""Numba-optimized LZ77 compression implementation""" | |
if len(data) == 0: | |
return np.empty(0, dtype=np.uint8) | |
# Pre-allocate result buffer | |
max_output_size = len(data) * 2 # Worst case scenario | |
result = np.zeros(max_output_size, dtype=np.uint8) | |
result_pos = 0 | |
# Initialize compression state | |
pos = 0 | |
data_len = len(data) | |
control_byte = 0 | |
control_bit_pos = 7 | |
control_byte_pos = -1 | |
# Hash table setup - use arrays instead of lists/dictionaries | |
hash_size = 32768 # 32K hash table | |
hash_mask = hash_size - 1 | |
max_chain = 48 # Maximum positions to store per hash | |
# Store positions in a 2D array (hash_value, chain_index) | |
hash_positions = np.full((hash_size, max_chain), -1, dtype=np.int32) | |
hash_counts = np.zeros(hash_size, dtype=np.int32) | |
# Recent matches using a fixed array (for 2-byte keys) | |
recent_positions = np.full(65536, -1, dtype=np.int32) # 2^16 possible combinations | |
# Main compression loop | |
while pos < data_len: | |
# Start a new control byte if needed | |
if control_bit_pos == 7: | |
control_byte_pos = result_pos | |
result[result_pos] = 0 # Placeholder for control byte | |
result_pos += 1 | |
control_byte = 0 | |
# Find best match at current position | |
best_match_len = 0 | |
best_match_offset = 0 | |
# Only try to find matches if we have enough bytes | |
if pos + 2 < data_len: | |
# Calculate hash for current position | |
h = ((data[pos] << 16) | (data[pos+1] << 8) | data[pos+2]) & hash_mask | |
min_pos = max(0, pos - 0x1FFF) # Maximum match distance | |
# Quick check recent matches (using 2-byte key) | |
two_byte_key = (data[pos] << 8) | data[pos+1] | |
prev_pos = recent_positions[two_byte_key] | |
if prev_pos >= min_pos and prev_pos < pos: | |
# Check if third byte matches too | |
if prev_pos + 2 < pos and data[prev_pos + 2] == data[pos + 2]: | |
# We have a match, find its length | |
match_len = 3 | |
max_len = min(data_len - pos, pos - prev_pos, 255) | |
while match_len < max_len and data[prev_pos + match_len] == data[pos + match_len]: | |
match_len += 1 | |
# Update best match | |
if match_len >= 3: | |
best_match_len = match_len | |
best_match_offset = pos - prev_pos | |
# Update recent match position | |
recent_positions[two_byte_key] = pos | |
# Search hash chain for better matches | |
count = hash_counts[h] | |
# Process newest to oldest entries | |
for i in range(min(max_chain, count)): | |
idx = count - 1 - i | |
if idx < 0: | |
break | |
prev_pos = hash_positions[h, idx] | |
# Skip too old positions | |
if prev_pos < min_pos: | |
break | |
# Skip if first byte doesn't match | |
if data[prev_pos] != data[pos]: | |
continue | |
# Find match length | |
max_match_len = min(data_len - pos, pos - prev_pos, 255) | |
# Block-based matching for speed (4 bytes at once) | |
match_len = 0 | |
while match_len + 4 <= max_match_len: | |
if (data[prev_pos + match_len] == data[pos + match_len] and | |
data[prev_pos + match_len + 1] == data[pos + match_len + 1] and | |
data[prev_pos + match_len + 2] == data[pos + match_len + 2] and | |
data[prev_pos + match_len + 3] == data[pos + match_len + 3]): | |
match_len += 4 | |
else: | |
break | |
# Finish byte by byte | |
while match_len < max_match_len and data[prev_pos + match_len] == data[pos + match_len]: | |
match_len += 1 | |
# Update best match if this one is better | |
if match_len >= 3 and match_len > best_match_len: | |
best_match_len = match_len | |
best_match_offset = pos - prev_pos | |
# Early termination for excellent matches | |
if match_len > 40: | |
break | |
# Add current position to hash table | |
if hash_counts[h] < max_chain: | |
hash_positions[h, hash_counts[h]] = pos | |
hash_counts[h] += 1 | |
else: | |
# Shift array to make room, keeping most recent positions | |
for j in range(max_chain - 1): | |
hash_positions[h, j] = hash_positions[h, j+1] | |
hash_positions[h, max_chain - 1] = pos | |
# Lazy matching optimization | |
use_literal = False | |
if best_match_len >= 3 and best_match_len < 24 and pos + 1 < data_len: | |
# Look ahead up to 4 positions | |
max_lookahead = min(4, best_match_len - 1) | |
for skip in range(1, max_lookahead + 1): | |
if pos + skip >= data_len: | |
break | |
# Check if lazy match might be better | |
ahead_len = 0 | |
ahead_offset = 0 | |
if pos + skip + 2 < data_len: | |
# Find match at skip position | |
skip_h = ((data[pos+skip] << 16) | (data[pos+skip+1] << 8) | data[pos+skip+2]) & hash_mask | |
# Check hash chain for skip position | |
skip_count = hash_counts[skip_h] | |
for i in range(min(16, skip_count)): # Limit search depth for speed | |
idx = skip_count - 1 - i | |
if idx < 0: | |
break | |
skip_prev = hash_positions[skip_h, idx] | |
if skip_prev < min_pos or skip_prev >= pos + skip: | |
continue | |
if data[skip_prev] != data[pos+skip]: | |
continue | |
# Calculate match length | |
cur_len = 0 | |
max_cur_len = min(data_len - (pos+skip), (pos+skip) - skip_prev, 255) | |
while cur_len < max_cur_len and data[skip_prev + cur_len] == data[pos+skip + cur_len]: | |
cur_len += 1 | |
if cur_len >= 3 and cur_len > ahead_len: | |
ahead_len = cur_len | |
ahead_offset = (pos+skip) - skip_prev | |
if cur_len > 32: | |
break | |
# Use literal if next match is significantly better | |
if ahead_len >= 3 and (ahead_len > best_match_len + skip or | |
(ahead_len > best_match_len and ahead_len > 16)): | |
use_literal = True | |
break | |
# Output literal or match | |
if best_match_len < 3 or use_literal: | |
# Literal byte - set control bit to 1 | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Add literal byte | |
result[result_pos] = data[pos] | |
result_pos += 1 | |
pos += 1 | |
else: | |
# Match - leave control bit as 0 | |
control_bit_pos -= 1 | |
# Need new control byte? | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_byte_pos = result_pos | |
result[result_pos] = 0 | |
result_pos += 1 | |
control_byte = 0 | |
control_bit_pos = 7 | |
# Set next bit for match format | |
control_byte |= (1 << control_bit_pos) | |
control_bit_pos -= 1 | |
# Encode match | |
offset_value = (0x2000 - best_match_offset) << 3 | |
if best_match_len <= 9: | |
# Short format (length in lower 3 bits) | |
length_bits = best_match_len - 2 | |
encoded_value = offset_value | length_bits | |
result[result_pos] = (encoded_value >> 8) & 0xFF | |
result_pos += 1 | |
result[result_pos] = encoded_value & 0xFF | |
result_pos += 1 | |
else: | |
# Long format (separate length byte) | |
result[result_pos] = (offset_value >> 8) & 0xFF | |
result_pos += 1 | |
result[result_pos] = offset_value & 0xFF | |
result_pos += 1 | |
result[result_pos] = best_match_len - 1 | |
result_pos += 1 | |
pos += best_match_len | |
# Finalize control byte if all bits used | |
if control_bit_pos < 0: | |
result[control_byte_pos] = control_byte | |
control_bit_pos = 7 | |
# Write final control byte if needed | |
if control_bit_pos < 7: | |
result[control_byte_pos] = control_byte | |
# Return only the used portion of the buffer | |
return result[:result_pos] | |
def compress_lz_buffer_claude_numba(data: bytes) -> bytes: | |
""" | |
Numba-optimized LZ77 compressor with excellent compression ratio and performance. | |
Args: | |
data: Uncompressed bytes | |
Returns: | |
Compressed bytes | |
""" | |
if not data: | |
return b'' | |
# Convert input to numpy array | |
np_data = np.frombuffer(data, dtype=np.uint8).copy() | |
# Run Numba-optimized compression | |
compressed = compress_lz_buffer_numba(np_data) | |
# Convert back to bytes | |
return bytes(compressed) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### TESTING | |
import time | |
import * from custom_lz77 | |
import * from custom_lz77_numba | |
class Timer: | |
def __init__(self, name="Block", unit="seconds"): | |
self.name = name | |
self.unit = unit | |
self.start = None | |
def __enter__(self): | |
self.start = time.perf_counter() | |
return self # Optional, for accessing the timer object within the block | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
elapsed = time.perf_counter() - self.start | |
if self.unit == "seconds": | |
print(f"{self.name} executed in {elapsed:.4f} {self.unit}") | |
elif self.unit == "milliseconds": | |
print(f"{self.name} executed in {elapsed*1000:.2f} {self.unit}") | |
else: | |
raise ValueError("Invalid unit. Use 'seconds' or 'milliseconds'") | |
if exc_type: | |
raise # Re-raise the exception | |
def do_test(b): | |
with Timer("gemini"): | |
print(len(a1 := compress_lz_buffer_gemini(b))) | |
with Timer("gemini_numba"): | |
print(len(a1 := compress_lz_buffer_gemini_numba(b))) | |
with Timer("claude"): | |
print(len(a2 := compress_lz_buffer_claude(b))) | |
with Timer("claude_v2"): | |
print(len(a3 := compress_lz_buffer_claudev2(b))) | |
with Timer("claude_numba"): | |
print(len(a3 := compress_lz_buffer_claude_numba(b))) | |
with Timer("o3mini"): | |
print(len(a4 := compress_lz_buffer_o3mini(b))) | |
return decompress_lz_buffer(a1) == decompress_lz_buffer(a2) == decompress_lz_buffer(a3) == decompress_lz_buffer(a4) | |
assert do_test(b"some00000LongStringRepeating" * 10_000) is True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment