Created
November 4, 2016 15:51
-
-
Save lambdafu/6d537cf30a6430535593f2d31b05d68a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import os | |
import struct | |
class AppendingTiffWriter: | |
fieldSizes = [ | |
0, # None | |
1, # byte | |
1, # ascii | |
2, # short | |
4, # long | |
8, # rational | |
1, # sbyte | |
1, # undefined | |
2, # sshort | |
4, # slong | |
8, # srational | |
4, # float | |
8, # double | |
] | |
# StripOffsets = 273 | |
# FreeOffsets = 288 | |
# TileOffsets = 324 | |
# JPEGQTables = 519 | |
# JPEGDCTables = 520 | |
# JPEGACTables = 521 | |
Tags = set((273, 288, 324, 519, 520, 521)) | |
def __init__(self, fn, new=False): | |
if hasattr(fn, 'read'): | |
self.f = fn | |
self.close_fp = False | |
else: | |
self.name = fn | |
self.close_fp = True | |
try: | |
self.f = io.open(fn, "w+b" if new else "r+b") | |
except IOError: | |
self.f = io.open(fn, "w+b") | |
self.beginning = self.f.tell() | |
self.setup() | |
def setup(self): | |
# Reset everything. | |
self.f.seek(self.beginning, os.SEEK_SET) | |
self.whereToWriteNewIFDOffset = None | |
self.offsetOfNewPage = 0 | |
self.IIMM = IIMM = self.f.read(4) | |
if not IIMM: | |
# empty file - first page | |
self.isFirst = True | |
return | |
self.isFirst = False | |
if IIMM == b"II\x2a\x00": | |
self.setEndian("<") | |
elif IIMM == b"MM\x00\x2a": | |
self.setEndian(">") | |
else: | |
raise RuntimeError("Invalid TIFF file header") | |
self.skipIFDs() | |
self.goToEnd() | |
def finalize(self): | |
if self.isFirst: | |
return | |
# fix offsets | |
self.f.seek(self.offsetOfNewPage) | |
IIMM = self.f.read(4) | |
if not IIMM: | |
# raise RuntimeError("nothing written into new page") | |
# Make it easy to finish a frame without committing to a new one. | |
return | |
if IIMM != self.IIMM: | |
raise RuntimeError("IIMM of new page doesn't match IIMM of " | |
"first page") | |
IFDoffset = self.readLong() | |
IFDoffset += self.offsetOfNewPage | |
self.f.seek(self.whereToWriteNewIFDOffset) | |
self.writeLong(IFDoffset) | |
self.f.seek(IFDoffset) | |
self.fixIFD() | |
def newFrame(self): | |
# Call this to finish a frame. | |
self.finalize() | |
self.setup() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
if self.close_fp: | |
self.close() | |
return False | |
def tell(self): | |
return self.f.tell() - self.offsetOfNewPage | |
def seek(self, offset, whence): | |
if whence == os.SEEK_SET: | |
offset += self.offsetOfNewPage | |
self.f.seek(offset, whence) | |
return self.tell() | |
def goToEnd(self): | |
self.f.seek(0, os.SEEK_END) | |
pos = self.f.tell() | |
# pad to 16 byte boundary | |
padBytes = 16 - pos % 16 | |
if 0 < padBytes < 16: | |
self.f.write(bytes(bytearray(padBytes))) | |
self.offsetOfNewPage = self.f.tell() | |
def setEndian(self, endian): | |
self.endian = endian | |
self.longFmt = self.endian + "L" | |
self.shortFmt = self.endian + "H" | |
self.tagFormat = self.endian + "HHL" | |
def skipIFDs(self): | |
while True: | |
IFDoffset = self.readLong() | |
if IFDoffset == 0: | |
self.whereToWriteNewIFDOffset = self.f.tell() - 4 | |
break | |
self.f.seek(IFDoffset) | |
numTags = self.readShort() | |
self.f.seek(numTags * 12, os.SEEK_CUR) | |
def write(self, data): | |
return self.f.write(data) | |
def readShort(self): | |
value, = struct.unpack(self.shortFmt, self.f.read(2)) | |
return value | |
def readLong(self): | |
value, = struct.unpack(self.longFmt, self.f.read(4)) | |
return value | |
def rewriteLastShortToLong(self, value): | |
self.f.seek(-2, os.SEEK_CUR) | |
bytesWritten = self.f.write(struct.pack(self.longFmt, value)) | |
if bytesWritten is not None and bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def rewriteLastShort(self, value): | |
self.f.seek(-2, os.SEEK_CUR) | |
bytesWritten = self.f.write(struct.pack(self.shortFmt, value)) | |
if bytesWritten is not None and bytesWritten != 2: | |
raise RuntimeError("wrote only %u bytes but wanted 2" % | |
bytesWritten) | |
def rewriteLastLong(self, value): | |
self.f.seek(-4, os.SEEK_CUR) | |
bytesWritten = self.f.write(struct.pack(self.longFmt, value)) | |
if bytesWritten is not None and bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def writeShort(self, value): | |
bytesWritten = self.f.write(struct.pack(self.shortFmt, value)) | |
if bytesWritten is not None and bytesWritten != 2: | |
raise RuntimeError("wrote only %u bytes but wanted 2" % | |
bytesWritten) | |
def writeLong(self, value): | |
bytesWritten = self.f.write(struct.pack(self.longFmt, value)) | |
if bytesWritten is not None and bytesWritten != 4: | |
raise RuntimeError("wrote only %u bytes but wanted 4" % | |
bytesWritten) | |
def close(self): | |
self.finalize() | |
self.f.close() | |
def fixIFD(self): | |
numTags = self.readShort() | |
#trace("fixing IFD at %X; number of tags: %u (0x%X)", self.f.tell()-2, | |
# numTags, numTags) | |
for i in range(numTags): | |
tag, fieldType, count = struct.unpack(self.tagFormat, self.f.read(8)) | |
#trace(" at %X: tag %u (0x%X), type %u, count %u", self.f.tell()-8, | |
# tag, tag, fieldType, count) | |
fieldSize = self.fieldSizes[fieldType] | |
totalSize = fieldSize * count | |
isLocal = (totalSize <= 4) | |
if not isLocal: | |
offset = self.readLong() | |
offset += self.offsetOfNewPage | |
self.rewriteLastLong(offset) | |
if tag in self.Tags: | |
curPos = self.f.tell() | |
if isLocal: | |
self.fixOffsets(count, isShort=(fieldSize == 2), | |
isLong=(fieldSize == 4)) | |
self.f.seek(curPos + 4) | |
else: | |
self.f.seek(offset) | |
self.fixOffsets(count, isShort=(fieldSize == 2), | |
isLong=(fieldSize == 4)) | |
self.f.seek(curPos) | |
offset = curPos = None | |
elif isLocal: | |
# skip the locally stored value that is not an offset | |
self.f.seek(4, os.SEEK_CUR) | |
def fixOffsets(self, count, isShort=False, isLong=False): | |
if not isShort and not isLong: | |
raise RuntimeError("offset is neither short nor long") | |
for i in range(count): | |
offset = self.readShort() if isShort else self.readLong() | |
offset += self.offsetOfNewPage | |
if isShort and offset >= 65536: | |
# offset is now too large - we must convert shorts to longs | |
if count != 1: | |
raise RuntimeError("not implemented") # XXX TODO | |
# simple case - the offset is just one and therefore it is | |
# local (not referenced with another offset) | |
self.rewriteLastShortToLong(offset) | |
self.f.seek(-10, os.SEEK_CUR) | |
self.writeShort(4) # rewrite the type to LONG | |
self.f.seek(8, os.SEEK_CUR) | |
elif isShort: | |
self.rewriteLastShort(offset) | |
else: | |
self.rewriteLastLong(offset) | |
if __name__ == '__main__': | |
OUTFILE = "mandelbrot.tiff" | |
from PIL import Image | |
size = (512, 512) | |
extends = [(-3, -2.5, 2, 2.5), | |
(-1.98, -0.05, -1.88, 0.05), | |
(-1.945, -0.005, -1.935, 0.005)] | |
if os.path.exists(OUTFILE): | |
os.unlink(OUTFILE) | |
with AppendingTiffWriter(OUTFILE) as tf: | |
for extend in extends: | |
im = Image.effect_mandelbrot(size, extend, 100) | |
im.save(tf) | |
tf.newFrame() | |
# Or like this: | |
# for extend in extends: | |
# im = Image.effect_mandelbrot(size, extend, 100) | |
# with AppendingTiffWriter(OUTFILE) as tf: | |
# im.save(tf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment