Created
December 20, 2024 21:13
-
-
Save sankalpsingha/37b335046f87544336e7565e6745cee8 to your computer and use it in GitHub Desktop.
This will find duplicates in a folder ( you can specify the depth )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # # Basic usage | |
# python3 duplicate-finder.py --path ~/Downloads --depth 2 | |
# # With minimum size and custom output | |
# python3 duplicate-finder.py --path ~/Downloads --depth 2 --min-size 1048576 --output my_report.txt | |
# Delete from a previous report | |
# python3 duplicate-finder.py --delete-from-report my_report.txt | |
# # Non-interactive deletion, keeping oldest files | |
# python3 duplicate-finder.py --delete-from-report my_report.txt --non-interactive --keep-oldest | |
import os | |
import hashlib | |
import argparse | |
from pathlib import Path | |
from collections import defaultdict | |
from typing import Dict, List, Set, Generator | |
import logging | |
from datetime import datetime | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
class DuplicateFinder: | |
def __init__(self, root_path: str, max_depth: int = 1, min_size: int = 1024): | |
self.root_path = Path(root_path) | |
self.max_depth = max_depth | |
self.min_size = min_size # Minimum file size to consider (in bytes) | |
self.duplicates: Dict[str, List[Path]] = defaultdict(list) | |
self.size_match: Dict[int, List[Path]] = defaultdict(list) | |
self.total_files = 0 | |
self.total_duplicates = 0 | |
self.saved_space = 0 | |
def get_file_hash(self, file_path: Path, chunk_size: int = 8192) -> str: | |
"""Calculate SHA256 hash of a file.""" | |
sha256_hash = hashlib.sha256() | |
try: | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(chunk_size), b""): | |
sha256_hash.update(chunk) | |
return sha256_hash.hexdigest() | |
except (PermissionError, OSError) as e: | |
logging.error(f"Error reading file {file_path}: {e}") | |
return "" | |
def get_files_recursively(self) -> Generator[Path, None, None]: | |
"""Yield files up to max_depth recursively.""" | |
def scan_directory(current_path: Path, current_depth: int) -> Generator[Path, None, None]: | |
if current_depth > self.max_depth: | |
return | |
try: | |
for entry in current_path.iterdir(): | |
if entry.is_file(): | |
yield entry | |
elif entry.is_dir() and current_depth < self.max_depth: | |
yield from scan_directory(entry, current_depth + 1) | |
except (PermissionError, OSError) as e: | |
logging.error(f"Error accessing directory {current_path}: {e}") | |
yield from scan_directory(self.root_path, 0) | |
def find_duplicates(self) -> None: | |
"""Find duplicate files first by size, then by hash.""" | |
logging.info("Starting duplicate file search...") | |
# First pass: Group files by size | |
for file_path in self.get_files_recursively(): | |
try: | |
file_size = file_path.stat().st_size | |
if file_size >= self.min_size: # Skip files smaller than min_size | |
self.size_match[file_size].append(file_path) | |
self.total_files += 1 | |
except (PermissionError, OSError) as e: | |
logging.error(f"Error accessing file {file_path}: {e}") | |
# Second pass: Compare files of the same size using hashes | |
for size, file_list in self.size_match.items(): | |
if len(file_list) > 1: # Only check files with same size | |
for file_path in file_list: | |
file_hash = self.get_file_hash(file_path) | |
if file_hash: # Only add if hash was successfully calculated | |
self.duplicates[file_hash].append(file_path) | |
# Remove entries that don't have duplicates | |
self.duplicates = {k: v for k, v in self.duplicates.items() if len(v) > 1} | |
def save_report(self, report_path: str | None = None) -> None: | |
"""Save duplicate files report to a file.""" | |
if not report_path: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
report_path = f"duplicate_files_report_{timestamp}.txt" | |
self.total_duplicates = sum(len(files) - 1 for files in self.duplicates.values()) | |
with open(report_path, 'w', encoding='utf-8') as f: | |
f.write("=== Duplicate Files Report ===\n\n") | |
f.write(f"Scan path: {self.root_path}\n") | |
f.write(f"Scan depth: {self.max_depth} level(s)\n") | |
f.write(f"Minimum file size: {self.min_size:,} bytes\n") | |
f.write(f"Total files scanned: {self.total_files:,}\n") | |
f.write(f"Total duplicates found: {self.total_duplicates:,}\n\n") | |
if not self.duplicates: | |
f.write("No duplicate files found.\n") | |
return | |
for hash_value, file_list in self.duplicates.items(): | |
if len(file_list) > 1: | |
file_size = file_list[0].stat().st_size | |
self.saved_space += file_size * (len(file_list) - 1) | |
f.write(f"\nDuplicate set (Size: {file_size:,} bytes):\n") | |
for file_path in file_list: | |
f.write(f" {file_path}\n") | |
f.write(f"\nPotential space savings: {self.saved_space:,} bytes ") | |
f.write(f"({self.saved_space / (1024*1024):.2f} MB)\n") | |
logging.info(f"Report saved to: {report_path}") | |
def print_summary(self) -> None: | |
"""Print a summary of the duplicate search results.""" | |
logging.info("\nDuplicate Search Summary:") | |
logging.info(f"Total files scanned: {self.total_files:,}") | |
logging.info(f"Duplicate sets found: {len(self.duplicates):,}") | |
logging.info(f"Total duplicate files: {self.total_duplicates:,}") | |
logging.info(f"Potential space savings: {self.saved_space:,} bytes " | |
f"({self.saved_space / (1024*1024):.2f} MB)") | |
def parse_report_file(report_path: str) -> Dict[str, List[str]]: | |
"""Parse a duplicate files report and return duplicate sets.""" | |
duplicate_sets = {} | |
current_size = None | |
current_files = [] | |
with open(report_path, 'r', encoding='utf-8') as f: | |
lines = f.readlines() | |
for line in lines: | |
line = line.rstrip() | |
if line.startswith("Duplicate set (Size:"): | |
# If we were processing a set, save it | |
if current_size and current_files: | |
duplicate_sets[current_size] = current_files | |
# Start new set | |
current_size = line # Use the full line as a unique identifier | |
current_files = [] | |
elif line.startswith(" ") and line.strip(): # File paths are indented with 2 spaces | |
file_path = line.strip() | |
if file_path: | |
current_files.append(file_path) | |
# Save the last set | |
if current_size and current_files: | |
duplicate_sets[current_size] = current_files | |
return duplicate_sets | |
def delete_duplicates(report_path: str, keep_newest: bool = True, interactive: bool = True) -> None: | |
"""Delete duplicate files based on a report file.""" | |
duplicate_sets = parse_report_file(report_path) | |
if not duplicate_sets: | |
logging.error("No duplicate sets found in the report file.") | |
return | |
total_sets = len(duplicate_sets) | |
total_freed = 0 | |
logging.info(f"Found {total_sets} sets of duplicate files.") | |
for hash_val, file_paths in duplicate_sets.items(): | |
if len(file_paths) < 2: | |
continue | |
print(f"\nDuplicate set:") | |
for idx, path in enumerate(file_paths, 1): | |
# Get file info | |
try: | |
stat = os.stat(path) | |
size = stat.st_size | |
mtime = datetime.fromtimestamp(stat.st_mtime) | |
print(f"{idx}. {path}") | |
print(f" Size: {size:,} bytes") | |
print(f" Modified: {mtime}") | |
except OSError: | |
print(f"{idx}. {path} (Unable to access file)") | |
if keep_newest: | |
# Sort files by modification time, newest first | |
files_with_time = [] | |
for path in file_paths: | |
try: | |
mtime = os.path.getmtime(path) | |
files_with_time.append((path, mtime)) | |
except OSError: | |
continue | |
if not files_with_time: | |
continue | |
files_with_time.sort(key=lambda x: x[1], reverse=True) | |
to_keep = files_with_time[0][0] | |
to_delete = [f[0] for f in files_with_time[1:]] | |
else: | |
to_keep = file_paths[0] | |
to_delete = file_paths[1:] | |
if interactive: | |
print(f"\nWill keep: {to_keep}") | |
print("Will delete:") | |
for f in to_delete: | |
print(f" {f}") | |
response = input("\nProceed with deletion? (y/n/s - s to skip this set): ").lower() | |
if response == 's': | |
continue | |
elif response != 'y': | |
print("Skipping deletion for this set") | |
continue | |
# Proceed with deletion | |
for file_path in to_delete: | |
try: | |
file_size = os.path.getsize(file_path) | |
os.remove(file_path) | |
total_freed += file_size | |
logging.info(f"Deleted: {file_path}") | |
except OSError as e: | |
logging.error(f"Error deleting {file_path}: {e}") | |
logging.info(f"\nDeletion complete!") | |
logging.info(f"Total space freed: {total_freed:,} bytes ({total_freed / (1024*1024):.2f} MB)") | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Find and manage duplicate files in directories.' | |
) | |
parser.add_argument( | |
'-p', '--path', | |
type=str, | |
default='.', | |
help='Path to the directory to scan (default: current directory)' | |
) | |
parser.add_argument( | |
'-d', '--depth', | |
type=int, | |
default=1, | |
help='Maximum directory depth to scan (default: 1)' | |
) | |
parser.add_argument( | |
'-s', '--min-size', | |
type=int, | |
default=1024, | |
help='Minimum file size in bytes to consider (default: 1024)' | |
) | |
parser.add_argument( | |
'-o', '--output', | |
type=str, | |
help='Output report file path (default: duplicate_files_report_TIMESTAMP.txt)' | |
) | |
parser.add_argument( | |
'--delete-from-report', | |
type=str, | |
help='Path to a previously generated report file to delete duplicates from' | |
) | |
parser.add_argument( | |
'--non-interactive', | |
action='store_true', | |
help='When deleting, proceed without asking for confirmation' | |
) | |
parser.add_argument( | |
'--keep-oldest', | |
action='store_true', | |
help='When deleting, keep oldest file instead of newest (default: keep newest)' | |
) | |
args = parser.parse_args() | |
if args.delete_from_report: | |
# Delete mode | |
if not os.path.exists(args.delete_from_report): | |
logging.error(f"Report file not found: {args.delete_from_report}") | |
return | |
delete_duplicates( | |
args.delete_from_report, | |
keep_newest=not args.keep_oldest, | |
interactive=not args.non_interactive | |
) | |
else: | |
# Find mode | |
path = os.path.expanduser(args.path) | |
path = os.path.abspath(path) | |
if not os.path.exists(path): | |
logging.error(f"The specified path does not exist: {path}") | |
return | |
logging.info(f"Scanning directory: {path}") | |
finder = DuplicateFinder( | |
root_path=path, | |
max_depth=args.depth, | |
min_size=args.min_size | |
) | |
finder.find_duplicates() | |
finder.save_report(args.output) | |
finder.print_summary() | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment