Skip to content

Instantly share code, notes, and snippets.

@sharmaeklavya2
Created June 20, 2025 02:35
Show Gist options
  • Save sharmaeklavya2/299144d1f8d8dab5a21d9e9356a74f3b to your computer and use it in GitHub Desktop.
Save sharmaeklavya2/299144d1f8d8dab5a21d9e9356a74f3b to your computer and use it in GitHub Desktop.
Recursively compress pictures
#!/usr/bin/env python3
"""
Compresses all photos in a directory recursively.
Outputs a new directory tree instead of replacing original photos.
Tools 'djpeg', 'moz-cjpeg', and 'exiftool' must be installed.
"""
import sys
import argparse
import subprocess
import os
import shutil
import time
from os.path import join as pjoin
IS_TTY = sys.stderr.isatty()
START_TIME = time.time()
JUNK_FILES = {'.DS_Store', 'Thumbs.db'}
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-i', '--input', required=True,
help='path to input file/directory') # noqa: E128
parser.add_argument('-o', '--output', required=True,
help='path to output file/directory') # noqa: E128
parser.add_argument('--dry-run', action='store_true', default=False)
parser.add_argument('--overwrite', action='store_true', default=False)
parser.add_argument('--copy-factor', type=float, default=1.05,
help='copy original if compressed size is within this factor of original') # noqa: E128, E501
parser.add_argument('--no-exif',
action='store_false', dest='copy_exif', default=True, # noqa: E128
help='do not copy EXIF metadata') # noqa: E128, E501
parser.add_argument('--rexec', default='pamscale',
help='resize program to use') # noqa: E128
parser.add_argument('--cexec', default='moz-cjpeg',
help='compression program to use') # noqa: E128
parser.add_argument('--quality', type=int, default=70,
help='compression quality') # noqa: E128
parser.add_argument('--resize-factor', type=float)
parser.add_argument('--resize-filter', default='lanczos')
args = parser.parse_args()
stats = {'nCompressed': 0, 'nCopied': 0, 'nIgnored': 0, 'nNonImg': 0,
'oldSize': 0.0, 'newSize': 0.0} # noqa: E128
compressRecursive(args.input, args.output, args, stats)
printStats(stats)
# [compress files]=============================================================
def compressRecursive(ipath, opath, args, stats):
if os.path.isdir(ipath):
if args.dry_run:
print('$ mkdir ' + opath)
else:
os.makedirs(opath, exist_ok=True)
for fname in os.listdir(ipath):
if fname not in JUNK_FILES:
compressRecursive(pjoin(ipath, fname), pjoin(opath, fname),
args, stats) # noqa: E128
else:
compressFile(ipath, opath, args, stats)
def chainRun(argsSeq, fpath, dryRun):
if dryRun:
parts = [' '.join(args) for args in argsSeq]
cmd = ' '.join(['$', ' | '.join(parts), '>', fpath])
print(cmd)
else:
stdin = None
with open(fpath, 'wb') as fp:
for i, args in enumerate(argsSeq):
isFinal = (i + 1) == len(argsSeq)
stdout = fp if isFinal else subprocess.PIPE
proc = subprocess.Popen(args, stdin=stdin, stdout=stdout)
# print('opened:', args, stdin, stdout)
if stdin is not None:
# allow the previous process to receive a SIGPIPE
# if the current process exits
stdin.close()
stdin = proc.stdout
if isFinal:
proc.communicate()
def compressFile(ipath, opath, args, stats):
_, ext = os.path.splitext(ipath)
exists = os.path.exists(opath)
is_jpg = ext.lower() in ('.jpg', '.jpeg')
if exists and not (args.overwrite and is_jpg):
stats['nIgnored'] += 1
return
if (not exists) and (not is_jpg):
stats['nNonImg'] += 1
if args.dry_run:
print('cp {} {}'.format(repr(ipath), repr(opath)))
else:
shutil.copyfile(ipath, opath)
return
# compress
argsSeq = []
argsSeq.append(['djpeg', ipath])
if args.resize_factor:
argsSeq.append([args.rexec, str(args.resize_factor),
'-filter=' + args.resize_filter]) # noqa: E128
argsSeq.append([args.cexec, '-quality', str(args.quality)])
chainRun(argsSeq, opath, args.dry_run)
stats['nCompressed'] += 1
# update sizes and copy if not enough compression
copied = False
if not args.dry_run:
oldSize = os.stat(ipath).st_size
newSize = os.stat(opath).st_size
stats['oldSize'] += oldSize
if oldSize <= newSize * args.copy_factor:
shutil.copyfile(ipath, opath)
copied = True
stats['nCompressed'] -= 1
stats['nCopied'] += 1
stats['newSize'] += oldSize
else:
stats['newSize'] += newSize
# copy EXIF metadata
if not copied and args.copy_exif:
exifArgs = ['exiftool', '-quiet', '-quiet',
'-overwrite_original', '-TagsFromFile', ipath, # noqa: E128
'-Orientation#', '-DateTimeOriginal', opath] # noqa: E128
if args.dry_run:
print('$ ' + ' '.join(exifArgs))
else:
subprocess.run(exifArgs, check=True)
if not args.dry_run:
printStats(stats, progressMode=True)
# [print status]===============================================================
def hsizeStr(x):
if x >= 1e9:
return '{:>6.2f} GB'.format(x / 1e9)
elif x >= 1e6:
return '{:>6.2f} MB'.format(x / 1e6)
elif x >= 1e3:
return '{:>6.2f} KB'.format(x / 1e3)
else:
return '{:>6} B'.format(x)
def timeStr(x):
h = int(x // 3600)
x -= h * 3600
m = int(x // 60)
x -= m * 60
s = int(x)
x -= s
parts = []
if h > 0:
parts.append(str(h))
parts.append('{:02}'.format(m))
parts.append('{:02}'.format(s))
return ':'.join(parts)
def printStats(stats, progressMode=False):
if progressMode and not IS_TTY:
return
parts = [timeStr(time.time() - START_TIME)]
parts.append('compressed {}'.format(stats['nCompressed']))
if stats['nCopied'] > 0:
parts.append('copied {}'.format(stats['nCopied']))
if stats['nIgnored'] > 0:
parts.append('ignored {}'.format(stats['nIgnored']))
if stats['nNonImg'] > 0:
parts.append('{} non-images'.format(stats['nNonImg']))
parts.append('old size: ' + hsizeStr(stats['oldSize']))
parts.append('new size: ' + hsizeStr(stats['newSize']))
if progressMode:
print(', '.join(parts), file=sys.stderr, end='\r', flush=True)
else:
print(', '.join(parts))
# [main]=======================================================================
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment