Skip to content

Instantly share code, notes, and snippets.

@hundredwatt
Created July 3, 2025 17:58
Show Gist options
  • Save hundredwatt/a1e69ff300de941041d824e49249d3d7 to your computer and use it in GitHub Desktop.
Save hundredwatt/a1e69ff300de941041d824e49249d3d7 to your computer and use it in GitHub Desktop.
import cityhash
class Cell:
"""A cell in the Invertible Bloom Filter."""
def __init__(self, index, ibf):
self.index = index
self.item_sum = 0 # Now just a single integer instead of array
self.hash_sum = 0 # Store hash sum separately for verification
self.count = 0
self.ibf = ibf
def is_zero(self):
return self.item_sum == 0 and self.hash_sum == 0 and self.count == 0
def __repr__(self):
return f"Cell(index={self.index}, item_sum={self.item_sum}, hash_sum={self.hash_sum}, count={self.count})"
def read(self):
"""Reads the item from a pure cell."""
return self.item_sum
def is_pure(self):
"""Checks if the cell is pure (contains exactly one item)."""
if self.count not in [1, -1]:
return False
# Verify that the hash of the stored item maps to this cell
indices = self.ibf.cell_indices(self.item_sum)
return self.index in indices
def insert(self, item):
"""Inserts an item into the cell (XOR)."""
self.item_sum ^= item
self.hash_sum ^= hash(item)
self.count += 1
def delete(self, item):
"""Deletes an item from the cell (XOR)."""
self.item_sum ^= item
self.hash_sum ^= hash(item)
self.count -= 1
class IBF:
"""Invertible Bloom Filter."""
def __init__(self, size, k):
self.k = k
self.size = size
self.cells = [Cell(i, self) for i in range(self.size)]
def cell_indices(self, item):
"""Generates k cell indices from an item using cityhash."""
indices = []
# Use different seeds for k different hash functions
for i in range(self.k):
# Convert to bytes as cityhash expects bytes or string
hash_val = cityhash.CityHash64WithSeed(str(item).encode('utf-8'), seed=i)
indices.append(hash_val % self.size)
return indices
def insert(self, item):
"""Inserts an item into the IBF."""
for index in self.cell_indices(item):
self.cells[index].insert(item)
def delete(self, item):
"""Deletes an item from the IBF."""
for index in self.cell_indices(item):
self.cells[index].delete(item)
def subtract_ibf(self, other):
"""Subtracts another IBF from this one, returning a new IBF with the difference."""
if self.size != other.size or self.k != other.k:
raise ValueError("IBFs must have the same size and k to subtract")
result = IBF(self.size, self.k)
for i in range(self.size):
# XOR the cell contents to get the difference
result.cells[i].item_sum = self.cells[i].item_sum ^ other.cells[i].item_sum
result.cells[i].hash_sum = self.cells[i].hash_sum ^ other.cells[i].hash_sum
result.cells[i].count = self.cells[i].count - other.cells[i].count
return result
def decode(self):
"""Decodes the IBF to find added and removed items."""
added = set()
removed = set()
# Iteratively peel off pure cells
while True:
pure_indices = [i for i, cell in enumerate(self.cells) if cell.is_pure()]
if not pure_indices:
break
for index in pure_indices:
cell = self.cells[index]
# The cell might have become impure after a peel operation
if not cell.is_pure():
continue
item = cell.read()
count = cell.count
# Use the count to determine if item was added or removed
if count == 1:
if item not in added:
added.add(item)
self.peel(item, count)
elif count == -1:
if item not in removed:
removed.add(item)
self.peel(item, count)
# Check if decoding was successful
if not all(cell.is_zero() for cell in self.cells):
print("DECODING FAILED: Not all cells are zero at the end.")
# You can uncomment the following lines to dump non-zero cells for debugging
# for cell in self.cells:
# if not cell.is_zero():
# print("Non-zero cell:", cell)
return False, added, removed
return True, added, removed
def peel(self, item, count):
"""
Peels an item from the IBF by reversing the operation that was used
to insert it.
"""
if count == 1:
# Item was 'added' with an insert, so we 'delete' to reverse it
self.delete(item)
elif count == -1:
# Item was 'removed' with a delete, so we 'insert' to reverse it
self.insert(item)
if __name__ == "__main__":
# -- Generate Sets --
a = [1, 2, 4, 5, 6, 7, 9, 10]
b = [1, 3, 4, 5, 6, 7, 9, 10]
set_a = set(a)
set_b = set(b)
ibf_size = 100
k = 4
# -- Encode Sets as IBFs --
ibf_a = IBF(size=ibf_size, k=k)
for item in a:
ibf_a.insert(item)
ibf_b = IBF(size=ibf_size, k=k)
for item in b:
ibf_b.insert(item)
# -- Subtract IBFs --
diff_ibf = ibf_a.subtract_ibf(ibf_b)
# --- Decode ---
success,decoded_added, decoded_removed = diff_ibf.decode()
# --- Verification ---
expected_added = set_a - set_b
expected_removed = set_b - set_a
print("--- Verification ---")
print(f"Decoding successful: {success}\n")
print(f"Expected added (in A, not B): {len(expected_added)} items")
print(f"Decoded added: {len(decoded_added)} items")
print(f"Added items match: {expected_added == decoded_added}\n")
print(f"Expected removed (in B, not A): {len(expected_removed)} items")
print(f"Decoded removed: {len(decoded_removed)} items")
print(f"Removed items match: {expected_removed == decoded_removed}\n")
# Print the actual items for debugging
print(f"Expected added items: {sorted(expected_added)}")
print(f"Decoded added items: {sorted(decoded_added)}")
print(f"Expected removed items: {sorted(expected_removed)}")
print(f"Decoded removed items: {sorted(decoded_removed)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment