Skip to content

Instantly share code, notes, and snippets.

@remixer-dec
Last active March 29, 2025 08:26
Show Gist options
  • Save remixer-dec/5973aa04bae9efbcf42f6addc4ece1af to your computer and use it in GitHub Desktop.
Save remixer-dec/5973aa04bae9efbcf42f6addc4ece1af to your computer and use it in GitHub Desktop.
LZ7BHTS7
def decompress_lz_buffer(src_buffer: bytes) -> bytes:
"""
Decompress a byte array using custom LZ77 algorithm.
Args:
src_buffer: Compressed bytes
Returns:
Decompressed bytes
"""
dest_buffer = bytearray()
src_pos = 0
control_bits = 0x8000 # Initialize with marker indicating need to read next control byte
while True:
while True:
if control_bits == 0x8000:
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
control_bits = (src_buffer[src_pos] << 8) | 0x80
src_pos += 1
flags = control_bits & 0x8000
shifted_bits = control_bits & 0x7fff
control_bits = shifted_bits << 1
if flags == 0:
break
# Literal byte - copy directly
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
dest_buffer.append(src_buffer[src_pos])
src_pos += 1
if shifted_bits == 0x4000:
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
control_bits = (src_buffer[src_pos] << 8) | 0x80
src_pos += 1
flags = control_bits & 0x8000
shifted_bits = control_bits & 0x7fff
control_bits = shifted_bits << 1
if flags == 0:
offset_bits = 0
if shifted_bits == 0x4000:
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
control_bits = (src_buffer[src_pos] << 8) | 0x80
src_pos += 1
shifted_bits = (control_bits & 0x7fff) << 1
if (control_bits & 0x8000) != 0:
offset_bits = 2
if (control_bits & 0x7fff) == 0x4000:
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
shifted_bits = (src_buffer[src_pos] << 8) | 0x80
src_pos += 1
control_bits = (shifted_bits & 0x7fff) << 1
if (shifted_bits & 0x8000) != 0:
offset_bits += 1
if src_pos >= len(src_buffer):
return bytes(dest_buffer)
lookup_pos = len(dest_buffer) + (src_buffer[src_pos] - 0x100)
src_pos += 1
copy_length = offset_bits + 2
else:
if src_pos + 1 >= len(src_buffer):
return bytes(dest_buffer)
flag_byte = src_buffer[src_pos + 1]
offset_bits = flag_byte & 7
lookup_pos = len(dest_buffer) + (((src_buffer[src_pos] << 8) | flag_byte) >> 3) - 0x2000
if (flag_byte & 7) != 0:
copy_length = offset_bits + 2
src_pos += 2
else:
if src_pos + 2 >= len(src_buffer):
return bytes(dest_buffer)
src_pos += 2
if src_buffer[src_pos] == 0:
# Check if we've reached the end or decompression failed
return bytes(dest_buffer)
copy_length = src_buffer[src_pos] + 1
src_pos += 1
# Copy from previous output (LZ77 backreference)
for i in range(copy_length):
if lookup_pos < 0 or lookup_pos >= len(dest_buffer):
# Handle potential out-of-bounds errors
return bytes(dest_buffer)
dest_buffer.append(dest_buffer[lookup_pos])
lookup_pos += 1
return bytes(dest_buffer)
# COMPRESSION CODE IMPLEMENTED BY DIFFERENT LLMS BASED ON DECOMPRESSION CODE PROVIDED
#
# GEMINI 2.5 PRO Exp
#
import sys
from collections import defaultdict, deque
# --- Constants ---
WINDOW_SIZE = 8192
MAX_MATCH_LEN = 256
MIN_MATCH_LEN = 2
HASH_KEY_LEN = 3
def find_longest_match_optimized(
src_buffer: bytes,
src_pos: int,
window_start: int,
max_match_len: int,
hash_table: defaultdict[bytes, deque[int]],
min_match_len: int = MIN_MATCH_LEN
) -> tuple[int, int]:
"""
Finds the longest match using a hash table for optimization.
Corrected V4: Rigorous verification, explicit offset 1 check.
Args:
src_buffer: The input data buffer.
src_pos: The current position in the input buffer to find a match for.
window_start: The starting index (inclusive) of the valid lookback window.
max_match_len: The maximum length of a match to consider.
hash_table: Dictionary mapping HASH_KEY_LEN byte sequences to deques of positions.
min_match_len: Minimum match length to return (must be >= 2).
Returns:
A tuple (best_match_length, best_match_offset).
Returns (0, 0) if no match of length >= min_match_len is found.
"""
best_match_len = 0 # Initialize with 0, indicating no valid match found yet
best_match_offset = 0
src_len = len(src_buffer)
# Determine the actual maximum length possible from the current position
current_max_len = min(max_match_len, src_len - src_pos)
# Not enough remaining bytes for even the smallest possible match
if current_max_len < min_match_len:
return 0, 0
# --- Hash Table Lookup ---
key_found_in_table = False
# Only try hash lookup if there are enough bytes to form a key
if src_pos + HASH_KEY_LEN <= src_len:
key = src_buffer[src_pos : src_pos + HASH_KEY_LEN]
if key in hash_table:
key_found_in_table = True
candidate_positions = hash_table[key]
# Iterate from most recent candidate backwards (favors smaller offsets for same length)
for i in range(len(candidate_positions) - 1, -1, -1):
match_start_pos = candidate_positions[i]
# Stop searching this key's list if candidates are too old
if match_start_pos < window_start:
break
# --- Verify and Extend Match ---
current_match_len = 0
# Compare byte-by-byte
while current_match_len < current_max_len:
if src_buffer[match_start_pos + current_match_len] != src_buffer[src_pos + current_match_len]:
break # Mismatch found
current_match_len += 1
# --- End of Match Extension ---
# Update best match IF:
# 1. It meets the minimum required length (>= min_match_len)
# 2. It's strictly longer than the current best match found
# (or equal length but smaller offset - handled by iteration order)
# Let's stick to standard LZ77: strictly longer OR equal length (first one found = smallest offset)
# Simplified: Update if current is longer. If equal, iteration order handles offset preference.
if current_match_len >= min_match_len and current_match_len > best_match_len:
best_match_len = current_match_len
best_match_offset = src_pos - match_start_pos
# Optimization: If we found the absolute max possible length, stop.
if best_match_len >= current_max_len:
break # Exit inner loop (over candidate positions)
# Early exit from function if max length was found in the inner loop
if best_match_len >= current_max_len:
# Ensure offset is valid before returning
return (best_match_len, best_match_offset) if best_match_offset > 0 else (0, 0)
# --- Explicit Check for Offset 1 ---
# This is important regardless of HASH_KEY_LEN, as offset 1 is common.
# Check if offset 1 provides a match that is >= min_len and LONGER
# than the best one found via hash search.
if current_max_len >= min_match_len: # Check only if a valid match length is possible
prev_pos = src_pos - 1
if prev_pos >= window_start:
len_offset1 = 0
# Calculate length of match at offset 1
while len_offset1 < current_max_len:
if src_buffer[prev_pos + len_offset1] != src_buffer[src_pos + len_offset1]:
break
len_offset1 += 1
# Update best match if offset 1 gives a valid match AND it's strictly longer
# than any match found by the hash search.
if len_offset1 >= min_match_len and len_offset1 > best_match_len:
best_match_len = len_offset1
best_match_offset = 1
# Also handle the case where hash search found *nothing* valid (best_match_len < min_match_len)
# but offset 1 provides a valid match.
elif best_match_len < min_match_len and len_offset1 >= min_match_len:
best_match_len = len_offset1
best_match_offset = 1
# Final validation: Ensure the chosen match meets min length and has a valid offset.
if best_match_len < min_match_len or best_match_offset <= 0:
return 0, 0
return best_match_len, best_match_offset
def compress_lz_buffer_gemini(src_buffer: bytes) -> bytes:
"""
Compresses a byte array using the custom LZ77 algorithm (optimized + corrected V4),
compatible with the provided decompress_lz_buffer. Strives for correctness,
performance, and good compression ratio. Includes improved offset 1 handling.
Args:
src_buffer: Uncompressed bytes.
Returns:
Compressed bytes.
"""
if not src_buffer:
return b""
src_len = len(src_buffer)
src_pos = 0
dest_buffer = bytearray()
# Hash table: maps HASH_KEY_LEN bytes prefix -> deque of starting positions
hash_table = defaultdict(deque)
# Control bit handling state
control_byte_pos = -1
control_bits_count = 0
current_control_byte = 0
def write_control_bit(bit: int):
"""Appends a control bit, flushing the control byte to dest_buffer if full."""
nonlocal control_byte_pos, control_bits_count, current_control_byte, dest_buffer
if control_bits_count == 0:
control_byte_pos = len(dest_buffer)
dest_buffer.append(0) # Placeholder
current_control_byte = (current_control_byte << 1) | (bit & 1)
control_bits_count += 1
if control_bits_count == 8:
dest_buffer[control_byte_pos] = current_control_byte
control_bits_count = 0
current_control_byte = 0
# --- Main Compression Loop ---
last_inserted_hash_pos = -1
while src_pos < src_len:
# --- Update Hash Table ---
window_start = max(0, src_pos - WINDOW_SIZE)
target_insert_pos = src_pos - 1 # Insert keys for positions up to here
while last_inserted_hash_pos < target_insert_pos:
insert_pos = last_inserted_hash_pos + 1
if insert_pos + HASH_KEY_LEN <= src_len:
key = src_buffer[insert_pos : insert_pos + HASH_KEY_LEN]
positions = hash_table[key]
# Prune based on the window relevant to the search @ src_pos
while positions and positions[0] < window_start:
positions.popleft()
positions.append(insert_pos)
last_inserted_hash_pos = insert_pos
# --- Find Longest Match ---
# Uses the improved finder with better offset 1 handling
match_len, match_offset = find_longest_match_optimized(
src_buffer, src_pos, window_start, MAX_MATCH_LEN, hash_table, MIN_MATCH_LEN
)
# --- Encoding Decision (Identical logic to the correct original version) ---
encode_as_literal = True
if match_len >= MIN_MATCH_LEN: # A valid match was found
# Determine encoding feasibility
can_encode_short = (2 <= match_len <= 5 and 1 <= match_offset <= 256)
can_encode_long = (match_len >= 2 and 1 <= match_offset <= WINDOW_SIZE)
# Prefer short encoding if applicable
if can_encode_short:
write_control_bit(0); write_control_bit(0)
length_bits = match_len - 2
write_control_bit((length_bits >> 1) & 1)
write_control_bit(length_bits & 1)
offset_byte = (0x100 - match_offset) & 0xFF
dest_buffer.append(offset_byte)
src_pos += match_len # Advance position
encode_as_literal = False
elif can_encode_long: # Use long encoding otherwise
write_control_bit(0); write_control_bit(1)
value_base = (0x2000 - match_offset) << 3
if 3 <= match_len <= 9: # Length fits in 3 bits
length_bits = match_len - 2
value = value_base | length_bits
dest_buffer.append((value >> 8) & 0xFF)
dest_buffer.append(value & 0xFF)
src_pos += match_len
encode_as_literal = False
elif match_len >= 2: # Use extended length byte
length_bits = 0
value = value_base | length_bits
dest_buffer.append((value >> 8) & 0xFF)
dest_buffer.append(value & 0xFF)
len_byte = match_len - 1
dest_buffer.append(len_byte & 0xFF)
src_pos += match_len
encode_as_literal = False
# --- Encode as Literal ---
if encode_as_literal:
write_control_bit(1)
# Critical: Ensure we append the byte from the correct current position
dest_buffer.append(src_buffer[src_pos])
src_pos += 1 # Advance by 1 for the literal
# --- Finalization (Identical to the correct original version) ---
if control_bits_count > 0:
final_control_byte = current_control_byte << (8 - control_bits_count)
dest_buffer[control_byte_pos] = final_control_byte & 0xFF
return bytes(dest_buffer)
#
# Claude 3.7 Sonnet (variant 1)
#
def compress_lz_buffer_claude(data: bytes) -> bytes:
"""
Custom LZ77 compressor, optimized for performance.
Args:
data: Uncompressed bytes
Returns:
Compressed bytes
"""
if not data:
return b''
result = bytearray()
pos = 0
data_len = len(data)
# Enhanced dictionary using a rolling hash for better match finding
position_dict = {}
# Hash function that works with variable-length input
def hash_sequence(p, length=4):
h = 0
for i in range(min(length, data_len - p)):
h = ((h * 23) + data[p + i]) & 0xFFFF
return h
def find_best_match(pos):
"""Find the best match at current position using rolling hash."""
best_match_len = 0
best_match_offset = 0
# Can't match if less than 3 bytes left
if pos + 2 >= data_len:
return 0, 0
# Maximum search distance is 0x1FFF (format limitation)
min_pos = max(0, pos - 0x1FFF)
# Get hash for current position (4-byte hash for better distribution)
h = hash_sequence(pos)
# Check all positions with this hash
if h in position_dict:
# Process matches from newest to oldest (usually better compression)
for prev_pos in reversed(position_dict[h]):
if prev_pos < min_pos:
continue
# Verify match (hash collision check)
if data[prev_pos] != data[pos]:
continue
# Calculate match length
offset = pos - prev_pos
match_len = 0
# Check 4 bytes at once when possible for better performance
if pos + 3 < data_len and prev_pos + 3 < pos:
if data[prev_pos:prev_pos+4] == data[pos:pos+4]:
match_len = 4
# Continue matching byte by byte
while (pos + match_len < data_len and
prev_pos + match_len < pos and
match_len < 255 and
data[prev_pos + match_len] == data[pos + match_len]):
match_len += 1
else:
# Byte-by-byte matching
while (pos + match_len < data_len and
prev_pos + match_len < pos and
match_len < 255 and
data[prev_pos + match_len] == data[pos + match_len]):
match_len += 1
# Only consider matches of at least 3 bytes
if match_len >= 3 and match_len > best_match_len:
best_match_len = match_len
best_match_offset = offset
# Early termination for exceptional matches
if match_len > 32:
break
# Add current position to hash table
if h not in position_dict:
position_dict[h] = []
position_dict[h].append(pos)
# Limit entries to prevent memory bloat
if len(position_dict[h]) > 256:
position_dict[h] = position_dict[h][-192:] # Keep most recent positions
return best_match_len, best_match_offset
# Calculate encoding costs for optimization decisions
def calculate_match_cost(length):
# Short form (length ≤ 9): 2 control bits + 2 bytes
# Long form (length > 9): 2 control bits + 3 bytes
return 2 + (2 if length <= 9 else 3)
def calculate_literal_cost(count):
# Each literal: 1 control bit + 1 byte
return count * (1 + 8)
# Process the data - with exact compatibility with decompressor
control_byte = 0
control_bit_pos = 7 # Start with the highest bit (bit 7)
control_byte_pos = -1
while pos < data_len:
# If we need a new control byte
if control_bit_pos == 7:
control_byte_pos = len(result)
result.append(0) # Placeholder
control_byte = 0
# Find best match at current position
best_match_len, best_match_offset = find_best_match(pos)
# Optimal parsing: Check if skipping bytes gives better compression
use_lazy = False
# Only consider lazy evaluation for smaller matches
if best_match_len >= 3 and best_match_len < 20:
# Look ahead multiple positions
max_lookahead = min(4, best_match_len - 1) # Look ahead at most 4 positions
for skip in range(1, max_lookahead + 1):
if pos + skip >= data_len:
break
# Find match at skip position
next_match_len, next_match_offset = find_best_match(pos + skip)
if next_match_len < 3:
continue
# Compare encoding efficiency
current_cost = calculate_match_cost(best_match_len)
lazy_cost = calculate_literal_cost(skip) + calculate_match_cost(next_match_len)
# If encoding literals + next match is more efficient,
# or if next match is significantly better
if (lazy_cost < current_cost or
next_match_len > best_match_len + skip + 1):
use_lazy = True
break
if best_match_len < 3 or use_lazy:
# Literal byte - set control bit to 1
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Add literal byte
result.append(data[pos])
pos += 1
else:
# Back-reference - leave control bit as 0 (already 0 by default)
control_bit_pos -= 1
# If we need a new control byte for the next control bit
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_byte_pos = len(result)
result.append(0)
control_byte = 0
control_bit_pos = 7
# Set next control bit to 1 (use second form of back-reference)
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Calculate offset in the format expected by decompressor
offset_value = (0x2000 - best_match_offset) << 3
if best_match_len <= 9:
# Short form: encode length in lower 3 bits
length_bits = best_match_len - 2
encoded_value = offset_value | length_bits
high_byte = (encoded_value >> 8) & 0xFF
low_byte = encoded_value & 0xFF
result.append(high_byte)
result.append(low_byte)
else:
# Long form: length in separate byte (lower 3 bits are 0)
high_byte = (offset_value >> 8) & 0xFF
low_byte = offset_value & 0xFF
result.append(high_byte)
result.append(low_byte)
result.append(best_match_len - 1) # Length byte
pos += best_match_len
# If we've used all 8 bits, update the control byte
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_bit_pos = 7
# Update the final control byte if there are bits left
if control_bit_pos < 7:
result[control_byte_pos] = control_byte
return bytes(result)
#
# Claude 3.7 Sonnet (variant 2)
#
def compress_lz_buffer_claudev2(data: bytes) -> bytes:
"""
Optimized LZ77 compressor with excellent compression ratio and performance.
Args:
data: Uncompressed bytes
Returns:
Compressed bytes
"""
if not data:
return b''
result = bytearray()
pos = 0
data_len = len(data)
# Large hash table for better distribution
hash_size = 32768 # 32K hash table
hash_mask = hash_size - 1
hash_table = [[] for _ in range(hash_size)]
# Dictionary for recent 3-byte sequences
recent_matches = {}
def hash_function(pos):
"""Fast 3-byte hash function"""
if pos + 2 >= data_len:
return ((data[pos] << 8) | (data[pos+1] if pos+1 < data_len else 0)) & hash_mask
# Optimized hash calculation using bit shifts
return ((data[pos] << 16) | (data[pos+1] << 8) | data[pos+2]) & hash_mask
def find_best_match(pos):
best_match_len = 0
best_match_offset = 0
# Can't match if less than 3 bytes left
if pos + 2 >= data_len:
return 0, 0
# Maximum search distance is 0x1FFF (format limitation)
min_pos = max(0, pos - 0x1FFF)
# 1. Quick check for recent 3-byte matches
if pos + 2 < data_len:
key = (data[pos], data[pos+1], data[pos+2])
if key in recent_matches and recent_matches[key] >= min_pos:
prev_pos = recent_matches[key]
# We know first 3 bytes match
match_len = 3
offset = pos - prev_pos
# Extend match byte by byte
while (pos + match_len < data_len and
prev_pos + match_len < pos and
match_len < 255 and
data[prev_pos + match_len] == data[pos + match_len]):
match_len += 1
if match_len >= 3:
best_match_len = match_len
best_match_offset = offset
# Update recent matches dictionary
recent_matches[key] = pos
# Periodically clear dictionary to prevent bloat
if len(recent_matches) > 8192:
recent_matches.clear()
# 2. Hash table lookup
h = hash_function(pos)
candidates = hash_table[h]
# Limit traversal depth for better performance
max_candidates = min(48, len(candidates))
# Process newest to oldest
for i in range(max_candidates):
if i >= len(candidates):
break
prev_pos = candidates[len(candidates) - 1 - i]
if prev_pos < min_pos:
break
# Skip if first byte doesn't match
if data[prev_pos] != data[pos]:
continue
# Optimization: Check 4 bytes at once when possible
offset = pos - prev_pos
# Block-based matching strategy
match_len = 0
while (pos + match_len + 3 < data_len and
prev_pos + match_len + 3 < pos and
match_len < 252 and
data[prev_pos + match_len:prev_pos + match_len + 4] == data[pos + match_len:pos + match_len + 4]):
match_len += 4
# Finish matching byte by byte
while (pos + match_len < data_len and
prev_pos + match_len < pos and
match_len < 255 and
data[prev_pos + match_len] == data[pos + match_len]):
match_len += 1
# Only consider matches of at least 3 bytes
if match_len >= 3 and match_len > best_match_len:
best_match_len = match_len
best_match_offset = offset
# Early termination for excellent matches
if match_len > 40:
break
# Add current position to hash table
hash_table[h].append(pos)
# Keep hash chains to a reasonable length
if len(hash_table[h]) > 64:
hash_table[h] = hash_table[h][-48:] # Keep most recent positions
return best_match_len, best_match_offset
# Process the data with control byte encoding
control_byte = 0
control_bit_pos = 7
control_byte_pos = -1
while pos < data_len:
# If we need a new control byte
if control_bit_pos == 7:
control_byte_pos = len(result)
result.append(0) # Placeholder
control_byte = 0
# Find best match at current position
best_match_len, best_match_offset = find_best_match(pos)
# Advanced lazy matching for better compression
use_literal = False
# Only consider lazy matching for smaller matches
if best_match_len >= 3 and best_match_len < 24:
# Look ahead up to 4 positions
max_lookahead = min(4, best_match_len - 1)
for skip in range(1, max_lookahead + 1):
if pos + skip >= data_len:
break
# Find match at skip position
ahead_len, ahead_offset = find_best_match(pos + skip)
if ahead_len < 3:
continue
# Use literals if next match is significantly better
if ahead_len > best_match_len + skip or (ahead_len > best_match_len and ahead_len > 16):
use_literal = True
break
if best_match_len < 3 or use_literal:
# Literal byte - set control bit to 1
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Add literal byte
result.append(data[pos])
pos += 1
else:
# Back-reference - leave control bit as 0
control_bit_pos -= 1
# If we need a new control byte for the next control bit
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_byte_pos = len(result)
result.append(0)
control_byte = 0
control_bit_pos = 7
# Set next control bit to 1 (for back-reference format)
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Calculate offset in the format expected by decompressor
offset_value = (0x2000 - best_match_offset) << 3
if best_match_len <= 9:
# Short form: encode length in lower 3 bits
length_bits = best_match_len - 2
encoded_value = offset_value | length_bits
high_byte = (encoded_value >> 8) & 0xFF
low_byte = encoded_value & 0xFF
result.append(high_byte)
result.append(low_byte)
else:
# Long form: length in separate byte
high_byte = (offset_value >> 8) & 0xFF
low_byte = offset_value & 0xFF
result.append(high_byte)
result.append(low_byte)
result.append(best_match_len - 1) # Length byte
pos += best_match_len
# If we've used all 8 bits, update the control byte
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_bit_pos = 7
# Update the final control byte
if control_bit_pos < 7:
result[control_byte_pos] = control_byte
return bytes(result)
#
# o3-mini + GPT4o
#
#
from collections import deque
def compress_lz_buffer_o3mini(data: bytes) -> bytes:
"""
Compress a byte array using a custom LZ77 algorithm compatible with decompress_lz_buffer.
Control–bit stream:
- For each literal, emit one control bit 1 followed by one literal byte.
- For each back–reference, emit two control bits:
first bit 0 (leaving the bit as 0) then second bit 1,
then output either 2 bytes (for a match of length 3–9) or 3 bytes (for a match of length ≥10).
Back–reference encoding:
- Let offset = distance (1..0x1FFF) and L = match length.
- Compute x = 0x2000 - offset.
- For short matches (L in [3,9]): output two bytes: (x << 3) | (L - 2)
- For extended matches (L ≥10): output two bytes with lower 3 bits 0, then a third byte equal to (L - 1)
A dictionary maps 3-byte sequences to a deque of positions (only positions within the last 0x1FFF bytes are kept)
to speed up finding the best match.
Args:
data: Uncompressed bytes.
Returns:
Compressed bytes.
"""
if not data:
return b""
n = len(data)
pos = 0
result = bytearray()
control_byte = 0
control_bit_pos = 7
control_byte_pos = None
def flush_control_bit(bit: int):
nonlocal control_byte, control_bit_pos, control_byte_pos, result
if control_bit_pos == 7:
control_byte_pos = len(result)
result.append(0)
control_byte = 0
control_byte |= (bit & 1) << control_bit_pos
control_bit_pos -= 1
if control_bit_pos < 0:
result[control_byte_pos] = control_byte & 0xFF
control_bit_pos = 7
dict3 = {}
max_offset = 0x1FFF
def find_best_match(cur: int):
best_len = 0
best_off = 0
if cur > n - 3:
return best_len, best_off
key = data[cur:cur+3]
candidates = dict3.get(key)
if not candidates:
return best_len, best_off
while candidates and candidates[0] < cur - max_offset:
candidates.popleft()
for cand in reversed(candidates):
length = 0
while cur + length < n and (cand + length) < n and data[cand + length] == data[cur + length] and length < 255:
length += 1
if length > best_len:
best_len = length
best_off = cur - cand
if best_len == 255:
break
return best_len, best_off
def update_dict(index: int):
if index <= n - 3:
key = data[index:index+3]
if key not in dict3:
dict3[key] = deque()
dict3[key].append(index)
while pos < n:
best_len, best_off = find_best_match(pos)
if best_len < 3:
flush_control_bit(1)
result.append(data[pos])
update_dict(pos)
pos += 1
else:
if best_len > 256:
best_len = 256
flush_control_bit(0)
flush_control_bit(1)
x = 0x2000 - best_off
if best_len <= 9:
value = (x << 3) | (best_len - 2)
result.append((value >> 8) & 0xFF)
result.append(value & 0xFF)
else:
value = (x << 3)
result.append((value >> 8) & 0xFF)
result.append(value & 0xFF)
result.append(best_len - 1)
end = pos + best_len
for j in range(pos, min(end, n)):
update_dict(j)
pos += best_len
if control_bit_pos != 7:
result[control_byte_pos] = control_byte & 0xFF
return bytes(result)
#### NUMBA PORT
#
# Gemini 2.5 Pro Exp
#
import sys
import numpy as np
import numba
# --- Constants ---
WINDOW_SIZE = 8192
MAX_MATCH_LEN = 256
MIN_MATCH_LEN = 2
HASH_KEY_LEN = 3
# Size of the hash table (prime number often helps distribution)
# Adjust based on typical input size and memory constraints if needed
HASH_TABLE_SIZE = 16381 # A prime number roughly 2*WINDOW_SIZE
# Sentinel value for hash arrays indicating no entry or end of chain
HASH_SENTINEL = -1
@numba.njit(cache=True, fastmath=True)
def calculate_hash(buffer: np.ndarray, pos: int) -> int:
"""Calculates a simple integer hash for HASH_KEY_LEN bytes."""
# Simple rolling hash or just combine bytes - ensure it fits in standard int
# Using a basic combination here. More sophisticated hashes exist.
# Check bounds: pos + HASH_KEY_LEN <= len(buffer) must be ensured by caller
h = 0
# Unrolled for HASH_KEY_LEN = 3
h = (h * 257 + buffer[pos]) & 0xFFFFFFFF # Use a prime multiplier
h = (h * 257 + buffer[pos + 1]) & 0xFFFFFFFF
h = (h * 257 + buffer[pos + 2]) & 0xFFFFFFFF
return h % HASH_TABLE_SIZE
@numba.njit(cache=True, fastmath=True)
def find_longest_match_numba(
src_buffer: np.ndarray, # Input buffer as numpy uint8 array
src_pos: int, # Current position
window_start: int, # Start of lookback window
max_match_len: int, # Max allowed match length
head_array: np.ndarray, # Hash table head (int32/int64 array)
prev_array: np.ndarray, # Hash table prev links (int32/int64 array)
min_match_len: int # Minimum match length to return
) -> tuple[int, int]:
"""
Numba-accelerated function to find the longest match using hash chains.
Args:
src_buffer: NumPy uint8 array of input data.
src_pos: Current position in src_buffer.
window_start: Start index (inclusive) of the lookback window.
max_match_len: Max match length allowed by format.
head_array: Array where head_array[hash] gives the most recent pos for that hash.
prev_array: Array where prev_array[pos] gives the previous pos with the same hash.
min_match_len: Minimum length for a valid match (typically 2).
Returns:
Tuple (best_match_length, best_match_offset), or (0, 0) if no valid match.
"""
best_match_len: int = 0
best_match_offset: int = 0
src_len: int = len(src_buffer)
# Determine the actual maximum length possible from the current position
current_max_len: int = min(max_match_len, src_len - src_pos)
# Not enough remaining bytes for even the smallest possible match
if current_max_len < min_match_len:
return 0, 0
# --- Hash Chain Lookup ---
# Only use hash if we have enough bytes left for a key
hash_match_len: int = 0
hash_match_offset: int = 0
if src_pos + HASH_KEY_LEN <= src_len:
hash_val = calculate_hash(src_buffer, src_pos)
match_start_pos = head_array[hash_val]
# Traverse the hash chain for this hash value
chain_count = 0 # Limit chain traversal depth for performance/safety
max_chain_depth = 256 # Heuristic limit
while match_start_pos != HASH_SENTINEL and \
match_start_pos >= window_start and \
chain_count < max_chain_depth:
# --- Verify and Extend Match ---
# Check the first few bytes first for quick rejection if needed
# (already implicitly done by hash, but explicit check is safe)
if src_buffer[match_start_pos] == src_buffer[src_pos]:
current_match_len = 0
# Compare byte-by-byte
while current_match_len < current_max_len:
if src_buffer[match_start_pos + current_match_len] != src_buffer[src_pos + current_match_len]:
break # Mismatch found
current_match_len += 1
# Update best match found *via hash* if this one is longer
if current_match_len > hash_match_len:
hash_match_len = current_match_len
hash_match_offset = src_pos - match_start_pos
# Optimization: If we found the absolute max possible length, stop chain traversal
if hash_match_len >= current_max_len:
break
# --- End Verification ---
# Move to the previous position in the chain
match_start_pos = prev_array[match_start_pos]
chain_count += 1
# --- Explicit Check for Offset 1 ---
# Crucial for capturing short repeats and potentially longer matches missed by hash
len_offset1: int = 0
offset1_match_offset: int = 0
if min_match_len <= current_max_len: # Check only if a valid match length is possible
prev_pos = src_pos - 1
if prev_pos >= window_start:
# Calculate length of match at offset 1
while len_offset1 < current_max_len:
if src_buffer[prev_pos + len_offset1] != src_buffer[src_pos + len_offset1]:
break
len_offset1 += 1
if len_offset1 >= min_match_len:
offset1_match_offset = 1
# --- Decide Final Match ---
# Prioritize the longest match. If lengths are equal, offset 1 is usually
# slightly cheaper to encode in some LZ variants, but this format doesn't
# differentiate much. Standard LZ77 just takes the longest. If lengths are equal,
# the hash search naturally found the smallest offset among hash candidates.
# Let's prioritize strictly longer matches.
if len_offset1 > hash_match_len:
best_match_len = len_offset1
best_match_offset = offset1_match_offset
elif hash_match_len >= min_match_len: # Only consider hash match if valid
best_match_len = hash_match_len
best_match_offset = hash_match_offset
else: # Neither hash nor offset 1 produced a valid match >= min_match_len
# Check if offset 1 produced *any* valid match when hash didn't
if len_offset1 >= min_match_len:
best_match_len = len_offset1
best_match_offset = offset1_match_offset
else: # No valid match found at all
best_match_len = 0
best_match_offset = 0
# Final validation (redundant if logic above is correct, but safe)
if best_match_len < min_match_len or best_match_offset <= 0:
return 0, 0
return best_match_len, best_match_offset
def compress_lz_buffer_gemini_numba(src_buffer: bytes) -> bytes:
"""
Compresses a byte array using the custom LZ77 algorithm (Numba accelerated),
compatible with the provided decompress_lz_buffer.
Args:
src_buffer: Uncompressed bytes.
Returns:
Compressed bytes.
"""
if not src_buffer:
return b""
src_len = len(src_buffer)
# Convert source to NumPy array for Numba compatibility
src_np = np.frombuffer(src_buffer, dtype=np.uint8)
# Initialize hash table arrays (use int32/int64 based on expected src_len)
# Using int64 for safety with potentially large inputs
head_array = np.full(HASH_TABLE_SIZE, HASH_SENTINEL, dtype=np.int64)
# prev_array needs indices up to src_len - 1
prev_array = np.full(src_len, HASH_SENTINEL, dtype=np.int64)
src_pos = 0
dest_buffer = bytearray() # Use standard bytearray for output
# Control bit handling state (managed in Python)
control_byte_pos = -1
control_bits_count = 0
current_control_byte = 0
def write_control_bit(bit: int):
"""Appends a control bit, flushing the control byte to dest_buffer if full."""
nonlocal control_byte_pos, control_bits_count, current_control_byte, dest_buffer
if control_bits_count == 0:
control_byte_pos = len(dest_buffer)
dest_buffer.append(0) # Placeholder
current_control_byte = (current_control_byte << 1) | (bit & 1)
control_bits_count += 1
if control_bits_count == 8:
dest_buffer[control_byte_pos] = current_control_byte
control_bits_count = 0
current_control_byte = 0
# --- Main Compression Loop ---
last_inserted_hash_pos = -1
while src_pos < src_len:
# --- Update Hash Table ---
window_start = max(0, src_pos - WINDOW_SIZE)
target_insert_pos = src_pos - 1 # Insert keys for positions up to here
# Insert hash entries for positions not yet processed
while last_inserted_hash_pos < target_insert_pos:
insert_pos = last_inserted_hash_pos + 1
# Ensure we can form a key of HASH_KEY_LEN bytes starting at insert_pos
if insert_pos + HASH_KEY_LEN <= src_len:
# Calculate hash for the position being inserted
hash_val = calculate_hash(src_np, insert_pos)
# Link the previous head to this new position
prev_array[insert_pos] = head_array[hash_val]
# Update the head to point to this new position
head_array[hash_val] = insert_pos
# Note: Pruning happens implicitly in the finder by checking window_start
last_inserted_hash_pos = insert_pos
# --- Find Longest Match ---
# Call the Numba-accelerated function
match_len, match_offset = find_longest_match_numba(
src_np, src_pos, window_start, MAX_MATCH_LEN,
head_array, prev_array, MIN_MATCH_LEN
)
# --- Encoding Decision (Identical logic to the correct Python version) ---
encode_as_literal = True
if match_len >= MIN_MATCH_LEN: # A valid match was found
# Determine encoding feasibility
can_encode_short = (2 <= match_len <= 5 and 1 <= match_offset <= 256)
can_encode_long = (match_len >= 2 and 1 <= match_offset <= WINDOW_SIZE)
# Prefer short encoding if applicable
if can_encode_short:
write_control_bit(0); write_control_bit(0)
length_bits = match_len - 2
write_control_bit((length_bits >> 1) & 1)
write_control_bit(length_bits & 1)
offset_byte = (0x100 - match_offset) & 0xFF
dest_buffer.append(offset_byte)
src_pos += match_len # Advance position
encode_as_literal = False
elif can_encode_long: # Use long encoding otherwise
write_control_bit(0); write_control_bit(1)
value_base = (0x2000 - match_offset) << 3
if 3 <= match_len <= 9: # Length fits in 3 bits
length_bits = match_len - 2
value = value_base | length_bits
dest_buffer.append((value >> 8) & 0xFF)
dest_buffer.append(value & 0xFF)
src_pos += match_len
encode_as_literal = False
elif match_len >= 2: # Use extended length byte
length_bits = 0
value = value_base | length_bits
dest_buffer.append((value >> 8) & 0xFF)
dest_buffer.append(value & 0xFF)
len_byte = match_len - 1
dest_buffer.append(len_byte & 0xFF)
src_pos += match_len
encode_as_literal = False
# --- Encode as Literal ---
if encode_as_literal:
write_control_bit(1)
# Append the byte from the correct current position
dest_buffer.append(src_np[src_pos]) # Access NumPy array
src_pos += 1 # Advance by 1 for the literal
# --- Finalization ---
if control_bits_count > 0:
final_control_byte = current_control_byte << (8 - control_bits_count)
dest_buffer[control_byte_pos] = final_control_byte & 0xFF
return bytes(dest_buffer) # Convert final bytearray back to bytes
#
# Claude 3.7 sonnet
#
import numba as nb
import numpy as np
@nb.njit(cache=True)
def compress_lz_buffer_numba(data):
"""Numba-optimized LZ77 compression implementation"""
if len(data) == 0:
return np.empty(0, dtype=np.uint8)
# Pre-allocate result buffer
max_output_size = len(data) * 2 # Worst case scenario
result = np.zeros(max_output_size, dtype=np.uint8)
result_pos = 0
# Initialize compression state
pos = 0
data_len = len(data)
control_byte = 0
control_bit_pos = 7
control_byte_pos = -1
# Hash table setup - use arrays instead of lists/dictionaries
hash_size = 32768 # 32K hash table
hash_mask = hash_size - 1
max_chain = 48 # Maximum positions to store per hash
# Store positions in a 2D array (hash_value, chain_index)
hash_positions = np.full((hash_size, max_chain), -1, dtype=np.int32)
hash_counts = np.zeros(hash_size, dtype=np.int32)
# Recent matches using a fixed array (for 2-byte keys)
recent_positions = np.full(65536, -1, dtype=np.int32) # 2^16 possible combinations
# Main compression loop
while pos < data_len:
# Start a new control byte if needed
if control_bit_pos == 7:
control_byte_pos = result_pos
result[result_pos] = 0 # Placeholder for control byte
result_pos += 1
control_byte = 0
# Find best match at current position
best_match_len = 0
best_match_offset = 0
# Only try to find matches if we have enough bytes
if pos + 2 < data_len:
# Calculate hash for current position
h = ((data[pos] << 16) | (data[pos+1] << 8) | data[pos+2]) & hash_mask
min_pos = max(0, pos - 0x1FFF) # Maximum match distance
# Quick check recent matches (using 2-byte key)
two_byte_key = (data[pos] << 8) | data[pos+1]
prev_pos = recent_positions[two_byte_key]
if prev_pos >= min_pos and prev_pos < pos:
# Check if third byte matches too
if prev_pos + 2 < pos and data[prev_pos + 2] == data[pos + 2]:
# We have a match, find its length
match_len = 3
max_len = min(data_len - pos, pos - prev_pos, 255)
while match_len < max_len and data[prev_pos + match_len] == data[pos + match_len]:
match_len += 1
# Update best match
if match_len >= 3:
best_match_len = match_len
best_match_offset = pos - prev_pos
# Update recent match position
recent_positions[two_byte_key] = pos
# Search hash chain for better matches
count = hash_counts[h]
# Process newest to oldest entries
for i in range(min(max_chain, count)):
idx = count - 1 - i
if idx < 0:
break
prev_pos = hash_positions[h, idx]
# Skip too old positions
if prev_pos < min_pos:
break
# Skip if first byte doesn't match
if data[prev_pos] != data[pos]:
continue
# Find match length
max_match_len = min(data_len - pos, pos - prev_pos, 255)
# Block-based matching for speed (4 bytes at once)
match_len = 0
while match_len + 4 <= max_match_len:
if (data[prev_pos + match_len] == data[pos + match_len] and
data[prev_pos + match_len + 1] == data[pos + match_len + 1] and
data[prev_pos + match_len + 2] == data[pos + match_len + 2] and
data[prev_pos + match_len + 3] == data[pos + match_len + 3]):
match_len += 4
else:
break
# Finish byte by byte
while match_len < max_match_len and data[prev_pos + match_len] == data[pos + match_len]:
match_len += 1
# Update best match if this one is better
if match_len >= 3 and match_len > best_match_len:
best_match_len = match_len
best_match_offset = pos - prev_pos
# Early termination for excellent matches
if match_len > 40:
break
# Add current position to hash table
if hash_counts[h] < max_chain:
hash_positions[h, hash_counts[h]] = pos
hash_counts[h] += 1
else:
# Shift array to make room, keeping most recent positions
for j in range(max_chain - 1):
hash_positions[h, j] = hash_positions[h, j+1]
hash_positions[h, max_chain - 1] = pos
# Lazy matching optimization
use_literal = False
if best_match_len >= 3 and best_match_len < 24 and pos + 1 < data_len:
# Look ahead up to 4 positions
max_lookahead = min(4, best_match_len - 1)
for skip in range(1, max_lookahead + 1):
if pos + skip >= data_len:
break
# Check if lazy match might be better
ahead_len = 0
ahead_offset = 0
if pos + skip + 2 < data_len:
# Find match at skip position
skip_h = ((data[pos+skip] << 16) | (data[pos+skip+1] << 8) | data[pos+skip+2]) & hash_mask
# Check hash chain for skip position
skip_count = hash_counts[skip_h]
for i in range(min(16, skip_count)): # Limit search depth for speed
idx = skip_count - 1 - i
if idx < 0:
break
skip_prev = hash_positions[skip_h, idx]
if skip_prev < min_pos or skip_prev >= pos + skip:
continue
if data[skip_prev] != data[pos+skip]:
continue
# Calculate match length
cur_len = 0
max_cur_len = min(data_len - (pos+skip), (pos+skip) - skip_prev, 255)
while cur_len < max_cur_len and data[skip_prev + cur_len] == data[pos+skip + cur_len]:
cur_len += 1
if cur_len >= 3 and cur_len > ahead_len:
ahead_len = cur_len
ahead_offset = (pos+skip) - skip_prev
if cur_len > 32:
break
# Use literal if next match is significantly better
if ahead_len >= 3 and (ahead_len > best_match_len + skip or
(ahead_len > best_match_len and ahead_len > 16)):
use_literal = True
break
# Output literal or match
if best_match_len < 3 or use_literal:
# Literal byte - set control bit to 1
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Add literal byte
result[result_pos] = data[pos]
result_pos += 1
pos += 1
else:
# Match - leave control bit as 0
control_bit_pos -= 1
# Need new control byte?
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_byte_pos = result_pos
result[result_pos] = 0
result_pos += 1
control_byte = 0
control_bit_pos = 7
# Set next bit for match format
control_byte |= (1 << control_bit_pos)
control_bit_pos -= 1
# Encode match
offset_value = (0x2000 - best_match_offset) << 3
if best_match_len <= 9:
# Short format (length in lower 3 bits)
length_bits = best_match_len - 2
encoded_value = offset_value | length_bits
result[result_pos] = (encoded_value >> 8) & 0xFF
result_pos += 1
result[result_pos] = encoded_value & 0xFF
result_pos += 1
else:
# Long format (separate length byte)
result[result_pos] = (offset_value >> 8) & 0xFF
result_pos += 1
result[result_pos] = offset_value & 0xFF
result_pos += 1
result[result_pos] = best_match_len - 1
result_pos += 1
pos += best_match_len
# Finalize control byte if all bits used
if control_bit_pos < 0:
result[control_byte_pos] = control_byte
control_bit_pos = 7
# Write final control byte if needed
if control_bit_pos < 7:
result[control_byte_pos] = control_byte
# Return only the used portion of the buffer
return result[:result_pos]
def compress_lz_buffer_claude_numba(data: bytes) -> bytes:
"""
Numba-optimized LZ77 compressor with excellent compression ratio and performance.
Args:
data: Uncompressed bytes
Returns:
Compressed bytes
"""
if not data:
return b''
# Convert input to numpy array
np_data = np.frombuffer(data, dtype=np.uint8).copy()
# Run Numba-optimized compression
compressed = compress_lz_buffer_numba(np_data)
# Convert back to bytes
return bytes(compressed)
### TESTING
import time
import * from custom_lz77
import * from custom_lz77_numba
class Timer:
def __init__(self, name="Block", unit="seconds"):
self.name = name
self.unit = unit
self.start = None
def __enter__(self):
self.start = time.perf_counter()
return self # Optional, for accessing the timer object within the block
def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = time.perf_counter() - self.start
if self.unit == "seconds":
print(f"{self.name} executed in {elapsed:.4f} {self.unit}")
elif self.unit == "milliseconds":
print(f"{self.name} executed in {elapsed*1000:.2f} {self.unit}")
else:
raise ValueError("Invalid unit. Use 'seconds' or 'milliseconds'")
if exc_type:
raise # Re-raise the exception
def do_test(b):
with Timer("gemini"):
print(len(a1 := compress_lz_buffer_gemini(b)))
with Timer("gemini_numba"):
print(len(a1 := compress_lz_buffer_gemini_numba(b)))
with Timer("claude"):
print(len(a2 := compress_lz_buffer_claude(b)))
with Timer("claude_v2"):
print(len(a3 := compress_lz_buffer_claudev2(b)))
with Timer("claude_numba"):
print(len(a3 := compress_lz_buffer_claude_numba(b)))
with Timer("o3mini"):
print(len(a4 := compress_lz_buffer_o3mini(b)))
return decompress_lz_buffer(a1) == decompress_lz_buffer(a2) == decompress_lz_buffer(a3) == decompress_lz_buffer(a4)
assert do_test(b"some00000LongStringRepeating" * 10_000) is True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment