Created
March 16, 2025 21:00
-
-
Save patrickd-/2ac1caa88e142edab4746d814972bbe8 to your computer and use it in GitHub Desktop.
Remove attachments from CZI Microscopy Files, such as Labels, eg. to remove identifiers for privacy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import struct | |
import sys | |
def list_attachments(filename): | |
""" | |
Scans through a ZISRAW (.czi) file and lists attachment segments. | |
File format summary (as per the specification): | |
- The file consists of segments. Each segment has a 32-byte header followed by segment data. | |
- Header structure (32 bytes): | |
• Bytes [0:16]: Segment ID (ANSI text). For attachments this is typically "ZISRAWATTACH". | |
• Bytes [16:24]: AllocatedSize (little-endian Int64) – total bytes reserved for the segment data. | |
• Bytes [24:32]: UsedSize (little-endian Int64) – bytes in use (if zero, assume equals AllocatedSize). | |
- For attachment segments: | |
• The first 256 bytes of the segment data form the fixed part. | |
• Fixed part layout: | |
- Bytes [0:4]: DataSize (Int32) for the following variable-length data. | |
- Bytes [4:16]: Reserved/spare. | |
- Bytes [16:256]: AttachmentEntry structure. | |
- In AttachmentEntry: | |
• Bytes [0:2]: SchemaType (should be b'A1' for our purposes). | |
• Bytes [12:20]: FilePosition (Int64, file offset for the embedded file). | |
• Bytes [40:48]: ContentFileType (8 bytes). | |
• Bytes [48:128]: Name (80-byte null-terminated UTF8 string). | |
- Segments are aligned on 32-byte boundaries and AllocatedSize is always a multiple of 32. | |
- The special segment ID "DELETED" marks a segment as deleted. | |
This function reads the file sequentially and prints out the offset, | |
SchemaType, ContentFileType, and Name (in quotation marks) for each attachment segment. | |
""" | |
try: | |
file_size = os.path.getsize(filename) | |
except OSError as e: | |
print(f"Error accessing file: {e}") | |
return | |
with open(filename, 'rb') as f: | |
offset = 0 | |
attachment_count = 0 | |
while offset < file_size: | |
f.seek(offset) | |
header = f.read(32) | |
if len(header) < 32: | |
break # incomplete header; exit loop | |
# Parse the segment header: | |
# - ID: 16 bytes ANSI (e.g., "ZISRAWATTACH"). "DELETED" indicates a removed segment. | |
# - AllocatedSize: 8 bytes little-endian integer. | |
# - UsedSize: 8 bytes little-endian integer. | |
segment_id_raw = header[0:16] | |
segment_id = segment_id_raw.rstrip(b'\x00').decode('ascii', errors='replace') | |
allocated_size = struct.unpack('<Q', header[16:24])[0] | |
used_size = struct.unpack('<Q', header[24:32])[0] | |
# Check for a deleted segment; skip if so. | |
if segment_id == "DELETED": | |
pass # skip deleted segments | |
elif segment_id == "ZISRAWATTACH": | |
attachment_count += 1 | |
print(f'Attachment segment {attachment_count} at offset {offset}:') | |
# For an attachment segment, the fixed part is 256 bytes long. | |
fixed_part = f.read(256) | |
if len(fixed_part) < 256: | |
print(" Incomplete fixed part; stopping.") | |
break | |
# Extract the AttachmentEntry from the fixed part: | |
# - Fixed part layout: first 4 bytes are DataSize, next 12 are reserved. | |
# The AttachmentEntry occupies bytes [16:256]. | |
attachment_entry = fixed_part[16:256] | |
schema_type = attachment_entry[0:2] | |
print(f" SchemaType: {schema_type}") | |
if schema_type == b'A1': | |
# For SchemaType A1, the AttachmentEntry contains: | |
# - FilePosition: Int64 at offset 12 (bytes [12:20]) | |
# - ContentFileType: 8 bytes at offset 40 (bytes [40:48]) | |
# - Name: 80 bytes at offset 48 (bytes [48:128]) (null-terminated UTF8) | |
file_position = struct.unpack('<Q', attachment_entry[12:20])[0] | |
content_file_type = attachment_entry[40:48] | |
name_bytes = attachment_entry[48:128] | |
# Remove the null terminator and decode. | |
name = name_bytes.split(b'\x00', 1)[0].decode('utf-8', errors='replace') | |
print(f" ContentFileType: {content_file_type}") | |
print(f' Name: "{name}"') | |
print(f" FilePosition (embedded file offset): {file_position}") | |
else: | |
print(" AttachmentEntry is not of type A1; skipping detailed info.") | |
# Advance to the next segment: header (32 bytes) + segment data (AllocatedSize bytes). | |
next_segment_offset = offset + 32 + allocated_size | |
if next_segment_offset <= offset: | |
print(" Corrupt AllocatedSize detected. Stopping scan.") | |
break | |
offset = next_segment_offset | |
print("Finished scanning for attachment segments.") | |
def remove_attachments_by_name(input_filename, output_filename, target_name): | |
""" | |
Copies the input CZI file to a new output file while removing (deleting) any attachment segments | |
whose AttachmentEntry Name field matches target_name exactly. | |
For each segment in the input file: | |
- The 32-byte header is read. | |
- If the segment ID is "ZISRAWATTACH", the fixed part (256 bytes) is read to access the AttachmentEntry. | |
- In an AttachmentEntry (SchemaType b'A1'), the Name field (80 bytes at offset 48) is extracted. | |
- If the Name matches target_name, the segment is “removed” in the copy: | |
• The header is modified by replacing the ID with "DELETED" (padded to 16 bytes). | |
• The entire segment data (AllocatedSize bytes) is overwritten with zeros. | |
- All other segments are copied unchanged. | |
The file is processed segment-by-segment so that even multi-gigabyte files are handled without | |
loading the entire file into memory. | |
""" | |
try: | |
file_size = os.path.getsize(input_filename) | |
except OSError as e: | |
print(f"Error accessing input file: {e}") | |
return | |
with open(input_filename, 'rb') as fin, open(output_filename, 'wb') as fout: | |
offset = 0 | |
while offset < file_size: | |
fin.seek(offset) | |
header = fin.read(32) | |
if len(header) < 32: | |
break # incomplete header; exit loop | |
# Parse header fields. | |
segment_id_raw = header[0:16] | |
segment_id = segment_id_raw.rstrip(b'\x00').decode('ascii', errors='replace') | |
allocated_size = struct.unpack('<Q', header[16:24])[0] | |
used_size = struct.unpack('<Q', header[24:32])[0] | |
modify_this_segment = False | |
if segment_id == "ZISRAWATTACH": | |
# For attachment segments, the fixed part (first 256 bytes of segment data) | |
# contains the AttachmentEntry (starting at offset 16 within the fixed part). | |
fixed_part = fin.read(256) | |
if len(fixed_part) < 256: | |
print("Incomplete fixed part; aborting.") | |
break | |
attachment_entry = fixed_part[16:256] | |
name_bytes = attachment_entry[48:128] | |
attachment_name = name_bytes.split(b'\x00', 1)[0].decode('utf-8', errors='replace') | |
# Check if this attachment's name matches the target. | |
if attachment_name == target_name: | |
modify_this_segment = True | |
print(f'Removing attachment segment at offset {offset}: Name "{attachment_name}"') | |
# For ZISRAWATTACH segments, we have read the fixed part. | |
# The remaining data in the segment is: | |
remaining_size = allocated_size - 256 | |
else: | |
# For non-attachment segments, no fixed part is pre-read. | |
remaining_size = allocated_size | |
# Write the segment header (modified or unmodified) to the output file. | |
if modify_this_segment: | |
# Create a new header: | |
# Replace the 16-byte ID with b"DELETED" (padded with zeros) and keep the AllocatedSize and UsedSize. | |
new_id = b"DELETED" + b'\x00' * (16 - len("DELETED")) | |
new_header = new_id + header[16:32] | |
fout.write(new_header) | |
else: | |
fout.write(header) | |
# Process the segment data. | |
if segment_id == "ZISRAWATTACH": | |
if modify_this_segment: | |
# For a matching attachment segment, zero out the entire segment data. | |
# Write 256 bytes (the fixed part) as zeros. | |
fout.write(b'\x00' * 256) | |
# Write zeros for the remaining allocated bytes. | |
bytes_left = remaining_size | |
chunk_size = 4096 | |
while bytes_left > 0: | |
to_write = min(chunk_size, bytes_left) | |
fout.write(b'\x00' * to_write) | |
bytes_left -= to_write | |
# Since we already read the fixed part from fin, skip over the remaining data in the input. | |
fin.seek(remaining_size, 1) # relative seek | |
else: | |
# For a ZISRAWATTACH segment that is not modified: | |
# Write out the fixed part (which was already read). | |
fout.write(fixed_part) | |
# Then copy the remaining data of the segment. | |
bytes_left = remaining_size | |
chunk_size = 4096 | |
while bytes_left > 0: | |
to_read = min(chunk_size, bytes_left) | |
chunk = fin.read(to_read) | |
if not chunk: | |
break | |
fout.write(chunk) | |
bytes_left -= len(chunk) | |
else: | |
# For non-attachment segments, copy the segment data as is. | |
bytes_left = allocated_size | |
chunk_size = 4096 | |
while bytes_left > 0: | |
to_read = min(chunk_size, bytes_left) | |
chunk = fin.read(to_read) | |
if not chunk: | |
break | |
fout.write(chunk) | |
bytes_left -= len(chunk) | |
# Advance offset: header (32 bytes) + segment data (AllocatedSize bytes) | |
offset += 32 + allocated_size | |
print("File copy complete with specified attachments removed.") | |
def print_usage(): | |
usage_text = ( | |
"Usage:\n" | |
" To list attachment segments:\n" | |
" python cziattdel.py list <path_to_czi_file>\n\n" | |
" To create a copy with attachments removed by name:\n" | |
" python cziattdel.py remove <input_czi_file> <output_czi_file> <attachment_name>\n\n" | |
"Example:\n" | |
' python cziattdel.py remove input.czi output.czi "Label"\n' | |
) | |
print(usage_text) | |
if __name__ == '__main__': | |
if len(sys.argv) < 3: | |
print_usage() | |
sys.exit(1) | |
command = sys.argv[1].lower() | |
if command == "list": | |
# List attachment segments. | |
filename = sys.argv[2] | |
list_attachments(filename) | |
elif command == "remove": | |
# Remove (delete) attachment segments by name while copying. | |
if len(sys.argv) != 5: | |
print_usage() | |
sys.exit(1) | |
input_file = sys.argv[2] | |
output_file = sys.argv[3] | |
target_name = sys.argv[4] | |
remove_attachments_by_name(input_file, output_file, target_name) | |
else: | |
print(f"Unknown command: {command}") | |
print_usage() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment