Created
March 31, 2025 22:50
-
-
Save pszemraj/63619c80b90f4f265ea0cea3b74cc6cc to your computer and use it in GitHub Desktop.
File Extension Fixer using Magika
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
File Extension Fixer using Magika | |
This script analyzes files using Google's Magika deep learning model to identify | |
their actual content types and fix incorrect file extensions. | |
pip install -U joblib magika tqdm | |
""" | |
import argparse | |
import os | |
import sys | |
from pathlib import Path | |
from typing import Any, List, Optional, Tuple | |
from joblib import Parallel, delayed | |
from magika import Magika | |
from tqdm.auto import tqdm | |
def parse_args() -> argparse.Namespace: | |
""" | |
Parse and validate command line arguments. | |
""" | |
parser = argparse.ArgumentParser( | |
description=__doc__, | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument("directory", help="Directory to scan for files") | |
parser.add_argument( | |
"--dry-run", action="store_true", help="Only report issues without renaming" | |
) | |
parser.add_argument( | |
"-r", "--recursive", action="store_true", help="Scan directories recursively" | |
) | |
parser.add_argument("--n-jobs", type=int, default=4, help="Number of parallel jobs") | |
parser.add_argument( | |
"--confidence", type=float, default=0.9, help="Minimum confidence threshold" | |
) | |
parser.add_argument("--ignore-exts", help="Extensions to ignore (comma-separated)") | |
parser.add_argument("-q", "--quiet", action="store_true", help="Minimize output") | |
return parser.parse_args() | |
def check_file( | |
file_path: Path, magika: Magika, args: argparse.Namespace | |
) -> Optional[Tuple[Path, str, str, Any]]: | |
""" | |
Check if a file needs its extension fixed based on content type detection. | |
Args: | |
file_path: Path to the file to analyze | |
magika: Initialized Magika instance for content type detection | |
args: Command line arguments with scan parameters | |
Returns: | |
Optional tuple containing: | |
- file_path: Original file path | |
- current_ext: Current file extension (without dot) | |
- new_ext: Suggested file extension (without dot) | |
- result: Magika detection result object | |
Returns None if file doesn't need fixing or should be skipped. | |
""" | |
current_ext = file_path.suffix.lower().lstrip(".") | |
# Skip ignored extensions | |
if args.ignore_exts and current_ext in args.ignore_exts.lower().split(","): | |
return None | |
result = magika.identify_path(file_path) | |
# Skip if invalid or low confidence | |
if not result.ok or result.score < args.confidence: | |
return None | |
suggested_exts = result.output.extensions | |
# Skip if no suggested extensions or current is already correct | |
if not suggested_exts or (current_ext and current_ext in suggested_exts): | |
return None | |
return (file_path, current_ext, suggested_exts[0], result) | |
def rename_file( | |
file_info: Tuple[Path, str, str, Any], | |
) -> Tuple[bool, Path, Optional[Path]]: | |
""" | |
Rename a file to use the suggested extension. | |
Args: | |
file_info: Tuple containing file path, current extension, | |
suggested extension, and detection result | |
Returns: | |
Tuple containing: | |
- success: Whether the rename operation succeeded | |
- original_path: Original file path | |
- new_path: New file path if successful, None otherwise | |
""" | |
file_path, _, new_ext, _ = file_info | |
try: | |
new_path = file_path.with_suffix(f".{new_ext}") | |
# Handle name collisions | |
if new_path.exists(): | |
i = 1 | |
while True: | |
new_path = file_path.with_suffix(f".{new_ext}.{i}") | |
if not new_path.exists(): | |
break | |
i += 1 | |
file_path.rename(new_path) | |
return (True, file_path, new_path) | |
except Exception: | |
return (False, file_path, None) | |
def main() -> int: | |
""" | |
Main function that orchestrates the file extension fixing process. | |
Returns: | |
int: Exit code (0 for success, 1 for error) | |
""" | |
args = parse_args() | |
directory = Path(args.directory) | |
if not directory.exists() or not directory.is_dir(): | |
print(f"Error: Directory not found: {directory}") | |
return 1 | |
magika = Magika() | |
print(f"Using model {magika.get_model_name()} with threshold {args.confidence}") | |
files_to_scan = ( | |
[f for f in directory.rglob("*") if f.is_file()] | |
if args.recursive | |
else [f for f in directory.glob("*") if f.is_file()] | |
) | |
if not args.quiet: | |
print(f"Found {len(files_to_scan)} files...") | |
# Process files in parallel | |
results = Parallel(n_jobs=args.n_jobs, verbose=0, backend="threading")( | |
delayed(check_file)(file_path, magika, args) | |
for file_path in tqdm(files_to_scan, desc="checking files") | |
) | |
# Filter out None results | |
files_to_fix = [r for r in results if r] | |
if not files_to_fix: | |
if not args.quiet: | |
print("No files need extension fixes") | |
return 0 | |
if not args.quiet: | |
print(f"\nFound {len(files_to_fix)} files with incorrect extensions:") | |
for file_path, current_ext, new_ext, result in files_to_fix: | |
current_ext_str = f".{current_ext}" if current_ext else "(none)" | |
print( | |
f"{file_path}: {current_ext_str} → .{new_ext} ({result.output.label}, {result.score:.2f})" | |
) | |
# In dry-run mode, we're done | |
if args.dry_run: | |
if not args.quiet: | |
print("\nDry run - no files modified") | |
return 0 | |
rename_results = Parallel(n_jobs=args.n_jobs, verbose=0, backend="threading")( | |
delayed(rename_file)(file_info) | |
for file_info in tqdm(files_to_fix, desc="renaming files") | |
) | |
# Count results | |
success_count = sum(1 for result in rename_results if result[0]) | |
error_count = sum(1 for result in rename_results if not result[0]) | |
if not args.quiet: | |
print(f"\nFixed {success_count} file extensions") | |
if error_count > 0: | |
print(f"Failed to fix {error_count} files") | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment