Created
June 20, 2025 02:35
-
-
Save sharmaeklavya2/299144d1f8d8dab5a21d9e9356a74f3b to your computer and use it in GitHub Desktop.
Recursively compress pictures
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Compresses all photos in a directory recursively. | |
Outputs a new directory tree instead of replacing original photos. | |
Tools 'djpeg', 'moz-cjpeg', and 'exiftool' must be installed. | |
""" | |
import sys | |
import argparse | |
import subprocess | |
import os | |
import shutil | |
import time | |
from os.path import join as pjoin | |
IS_TTY = sys.stderr.isatty() | |
START_TIME = time.time() | |
JUNK_FILES = {'.DS_Store', 'Thumbs.db'} | |
def main(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('-i', '--input', required=True, | |
help='path to input file/directory') # noqa: E128 | |
parser.add_argument('-o', '--output', required=True, | |
help='path to output file/directory') # noqa: E128 | |
parser.add_argument('--dry-run', action='store_true', default=False) | |
parser.add_argument('--overwrite', action='store_true', default=False) | |
parser.add_argument('--copy-factor', type=float, default=1.05, | |
help='copy original if compressed size is within this factor of original') # noqa: E128, E501 | |
parser.add_argument('--no-exif', | |
action='store_false', dest='copy_exif', default=True, # noqa: E128 | |
help='do not copy EXIF metadata') # noqa: E128, E501 | |
parser.add_argument('--rexec', default='pamscale', | |
help='resize program to use') # noqa: E128 | |
parser.add_argument('--cexec', default='moz-cjpeg', | |
help='compression program to use') # noqa: E128 | |
parser.add_argument('--quality', type=int, default=70, | |
help='compression quality') # noqa: E128 | |
parser.add_argument('--resize-factor', type=float) | |
parser.add_argument('--resize-filter', default='lanczos') | |
args = parser.parse_args() | |
stats = {'nCompressed': 0, 'nCopied': 0, 'nIgnored': 0, 'nNonImg': 0, | |
'oldSize': 0.0, 'newSize': 0.0} # noqa: E128 | |
compressRecursive(args.input, args.output, args, stats) | |
printStats(stats) | |
# [compress files]============================================================= | |
def compressRecursive(ipath, opath, args, stats): | |
if os.path.isdir(ipath): | |
if args.dry_run: | |
print('$ mkdir ' + opath) | |
else: | |
os.makedirs(opath, exist_ok=True) | |
for fname in os.listdir(ipath): | |
if fname not in JUNK_FILES: | |
compressRecursive(pjoin(ipath, fname), pjoin(opath, fname), | |
args, stats) # noqa: E128 | |
else: | |
compressFile(ipath, opath, args, stats) | |
def chainRun(argsSeq, fpath, dryRun): | |
if dryRun: | |
parts = [' '.join(args) for args in argsSeq] | |
cmd = ' '.join(['$', ' | '.join(parts), '>', fpath]) | |
print(cmd) | |
else: | |
stdin = None | |
with open(fpath, 'wb') as fp: | |
for i, args in enumerate(argsSeq): | |
isFinal = (i + 1) == len(argsSeq) | |
stdout = fp if isFinal else subprocess.PIPE | |
proc = subprocess.Popen(args, stdin=stdin, stdout=stdout) | |
# print('opened:', args, stdin, stdout) | |
if stdin is not None: | |
# allow the previous process to receive a SIGPIPE | |
# if the current process exits | |
stdin.close() | |
stdin = proc.stdout | |
if isFinal: | |
proc.communicate() | |
def compressFile(ipath, opath, args, stats): | |
_, ext = os.path.splitext(ipath) | |
exists = os.path.exists(opath) | |
is_jpg = ext.lower() in ('.jpg', '.jpeg') | |
if exists and not (args.overwrite and is_jpg): | |
stats['nIgnored'] += 1 | |
return | |
if (not exists) and (not is_jpg): | |
stats['nNonImg'] += 1 | |
if args.dry_run: | |
print('cp {} {}'.format(repr(ipath), repr(opath))) | |
else: | |
shutil.copyfile(ipath, opath) | |
return | |
# compress | |
argsSeq = [] | |
argsSeq.append(['djpeg', ipath]) | |
if args.resize_factor: | |
argsSeq.append([args.rexec, str(args.resize_factor), | |
'-filter=' + args.resize_filter]) # noqa: E128 | |
argsSeq.append([args.cexec, '-quality', str(args.quality)]) | |
chainRun(argsSeq, opath, args.dry_run) | |
stats['nCompressed'] += 1 | |
# update sizes and copy if not enough compression | |
copied = False | |
if not args.dry_run: | |
oldSize = os.stat(ipath).st_size | |
newSize = os.stat(opath).st_size | |
stats['oldSize'] += oldSize | |
if oldSize <= newSize * args.copy_factor: | |
shutil.copyfile(ipath, opath) | |
copied = True | |
stats['nCompressed'] -= 1 | |
stats['nCopied'] += 1 | |
stats['newSize'] += oldSize | |
else: | |
stats['newSize'] += newSize | |
# copy EXIF metadata | |
if not copied and args.copy_exif: | |
exifArgs = ['exiftool', '-quiet', '-quiet', | |
'-overwrite_original', '-TagsFromFile', ipath, # noqa: E128 | |
'-Orientation#', '-DateTimeOriginal', opath] # noqa: E128 | |
if args.dry_run: | |
print('$ ' + ' '.join(exifArgs)) | |
else: | |
subprocess.run(exifArgs, check=True) | |
if not args.dry_run: | |
printStats(stats, progressMode=True) | |
# [print status]=============================================================== | |
def hsizeStr(x): | |
if x >= 1e9: | |
return '{:>6.2f} GB'.format(x / 1e9) | |
elif x >= 1e6: | |
return '{:>6.2f} MB'.format(x / 1e6) | |
elif x >= 1e3: | |
return '{:>6.2f} KB'.format(x / 1e3) | |
else: | |
return '{:>6} B'.format(x) | |
def timeStr(x): | |
h = int(x // 3600) | |
x -= h * 3600 | |
m = int(x // 60) | |
x -= m * 60 | |
s = int(x) | |
x -= s | |
parts = [] | |
if h > 0: | |
parts.append(str(h)) | |
parts.append('{:02}'.format(m)) | |
parts.append('{:02}'.format(s)) | |
return ':'.join(parts) | |
def printStats(stats, progressMode=False): | |
if progressMode and not IS_TTY: | |
return | |
parts = [timeStr(time.time() - START_TIME)] | |
parts.append('compressed {}'.format(stats['nCompressed'])) | |
if stats['nCopied'] > 0: | |
parts.append('copied {}'.format(stats['nCopied'])) | |
if stats['nIgnored'] > 0: | |
parts.append('ignored {}'.format(stats['nIgnored'])) | |
if stats['nNonImg'] > 0: | |
parts.append('{} non-images'.format(stats['nNonImg'])) | |
parts.append('old size: ' + hsizeStr(stats['oldSize'])) | |
parts.append('new size: ' + hsizeStr(stats['newSize'])) | |
if progressMode: | |
print(', '.join(parts), file=sys.stderr, end='\r', flush=True) | |
else: | |
print(', '.join(parts)) | |
# [main]======================================================================= | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment