Skip to content

Instantly share code, notes, and snippets.

@pszemraj
Created March 31, 2025 22:50
Show Gist options
  • Save pszemraj/63619c80b90f4f265ea0cea3b74cc6cc to your computer and use it in GitHub Desktop.
Save pszemraj/63619c80b90f4f265ea0cea3b74cc6cc to your computer and use it in GitHub Desktop.
File Extension Fixer using Magika
#!/usr/bin/env python3
"""
File Extension Fixer using Magika
This script analyzes files using Google's Magika deep learning model to identify
their actual content types and fix incorrect file extensions.
pip install -U joblib magika tqdm
"""
import argparse
import os
import sys
from pathlib import Path
from typing import Any, List, Optional, Tuple
from joblib import Parallel, delayed
from magika import Magika
from tqdm.auto import tqdm
def parse_args() -> argparse.Namespace:
"""
Parse and validate command line arguments.
"""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("directory", help="Directory to scan for files")
parser.add_argument(
"--dry-run", action="store_true", help="Only report issues without renaming"
)
parser.add_argument(
"-r", "--recursive", action="store_true", help="Scan directories recursively"
)
parser.add_argument("--n-jobs", type=int, default=4, help="Number of parallel jobs")
parser.add_argument(
"--confidence", type=float, default=0.9, help="Minimum confidence threshold"
)
parser.add_argument("--ignore-exts", help="Extensions to ignore (comma-separated)")
parser.add_argument("-q", "--quiet", action="store_true", help="Minimize output")
return parser.parse_args()
def check_file(
file_path: Path, magika: Magika, args: argparse.Namespace
) -> Optional[Tuple[Path, str, str, Any]]:
"""
Check if a file needs its extension fixed based on content type detection.
Args:
file_path: Path to the file to analyze
magika: Initialized Magika instance for content type detection
args: Command line arguments with scan parameters
Returns:
Optional tuple containing:
- file_path: Original file path
- current_ext: Current file extension (without dot)
- new_ext: Suggested file extension (without dot)
- result: Magika detection result object
Returns None if file doesn't need fixing or should be skipped.
"""
current_ext = file_path.suffix.lower().lstrip(".")
# Skip ignored extensions
if args.ignore_exts and current_ext in args.ignore_exts.lower().split(","):
return None
result = magika.identify_path(file_path)
# Skip if invalid or low confidence
if not result.ok or result.score < args.confidence:
return None
suggested_exts = result.output.extensions
# Skip if no suggested extensions or current is already correct
if not suggested_exts or (current_ext and current_ext in suggested_exts):
return None
return (file_path, current_ext, suggested_exts[0], result)
def rename_file(
file_info: Tuple[Path, str, str, Any],
) -> Tuple[bool, Path, Optional[Path]]:
"""
Rename a file to use the suggested extension.
Args:
file_info: Tuple containing file path, current extension,
suggested extension, and detection result
Returns:
Tuple containing:
- success: Whether the rename operation succeeded
- original_path: Original file path
- new_path: New file path if successful, None otherwise
"""
file_path, _, new_ext, _ = file_info
try:
new_path = file_path.with_suffix(f".{new_ext}")
# Handle name collisions
if new_path.exists():
i = 1
while True:
new_path = file_path.with_suffix(f".{new_ext}.{i}")
if not new_path.exists():
break
i += 1
file_path.rename(new_path)
return (True, file_path, new_path)
except Exception:
return (False, file_path, None)
def main() -> int:
"""
Main function that orchestrates the file extension fixing process.
Returns:
int: Exit code (0 for success, 1 for error)
"""
args = parse_args()
directory = Path(args.directory)
if not directory.exists() or not directory.is_dir():
print(f"Error: Directory not found: {directory}")
return 1
magika = Magika()
print(f"Using model {magika.get_model_name()} with threshold {args.confidence}")
files_to_scan = (
[f for f in directory.rglob("*") if f.is_file()]
if args.recursive
else [f for f in directory.glob("*") if f.is_file()]
)
if not args.quiet:
print(f"Found {len(files_to_scan)} files...")
# Process files in parallel
results = Parallel(n_jobs=args.n_jobs, verbose=0, backend="threading")(
delayed(check_file)(file_path, magika, args)
for file_path in tqdm(files_to_scan, desc="checking files")
)
# Filter out None results
files_to_fix = [r for r in results if r]
if not files_to_fix:
if not args.quiet:
print("No files need extension fixes")
return 0
if not args.quiet:
print(f"\nFound {len(files_to_fix)} files with incorrect extensions:")
for file_path, current_ext, new_ext, result in files_to_fix:
current_ext_str = f".{current_ext}" if current_ext else "(none)"
print(
f"{file_path}: {current_ext_str} → .{new_ext} ({result.output.label}, {result.score:.2f})"
)
# In dry-run mode, we're done
if args.dry_run:
if not args.quiet:
print("\nDry run - no files modified")
return 0
rename_results = Parallel(n_jobs=args.n_jobs, verbose=0, backend="threading")(
delayed(rename_file)(file_info)
for file_info in tqdm(files_to_fix, desc="renaming files")
)
# Count results
success_count = sum(1 for result in rename_results if result[0])
error_count = sum(1 for result in rename_results if not result[0])
if not args.quiet:
print(f"\nFixed {success_count} file extensions")
if error_count > 0:
print(f"Failed to fix {error_count} files")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment