Skip to content

Instantly share code, notes, and snippets.

@sankalpsingha
Created December 20, 2024 21:13
Show Gist options
  • Save sankalpsingha/37b335046f87544336e7565e6745cee8 to your computer and use it in GitHub Desktop.
Save sankalpsingha/37b335046f87544336e7565e6745cee8 to your computer and use it in GitHub Desktop.
This will find duplicates in a folder ( you can specify the depth )
# # # Basic usage
# python3 duplicate-finder.py --path ~/Downloads --depth 2
# # With minimum size and custom output
# python3 duplicate-finder.py --path ~/Downloads --depth 2 --min-size 1048576 --output my_report.txt
# Delete from a previous report
# python3 duplicate-finder.py --delete-from-report my_report.txt
# # Non-interactive deletion, keeping oldest files
# python3 duplicate-finder.py --delete-from-report my_report.txt --non-interactive --keep-oldest
import os
import hashlib
import argparse
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Set, Generator
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class DuplicateFinder:
def __init__(self, root_path: str, max_depth: int = 1, min_size: int = 1024):
self.root_path = Path(root_path)
self.max_depth = max_depth
self.min_size = min_size # Minimum file size to consider (in bytes)
self.duplicates: Dict[str, List[Path]] = defaultdict(list)
self.size_match: Dict[int, List[Path]] = defaultdict(list)
self.total_files = 0
self.total_duplicates = 0
self.saved_space = 0
def get_file_hash(self, file_path: Path, chunk_size: int = 8192) -> str:
"""Calculate SHA256 hash of a file."""
sha256_hash = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except (PermissionError, OSError) as e:
logging.error(f"Error reading file {file_path}: {e}")
return ""
def get_files_recursively(self) -> Generator[Path, None, None]:
"""Yield files up to max_depth recursively."""
def scan_directory(current_path: Path, current_depth: int) -> Generator[Path, None, None]:
if current_depth > self.max_depth:
return
try:
for entry in current_path.iterdir():
if entry.is_file():
yield entry
elif entry.is_dir() and current_depth < self.max_depth:
yield from scan_directory(entry, current_depth + 1)
except (PermissionError, OSError) as e:
logging.error(f"Error accessing directory {current_path}: {e}")
yield from scan_directory(self.root_path, 0)
def find_duplicates(self) -> None:
"""Find duplicate files first by size, then by hash."""
logging.info("Starting duplicate file search...")
# First pass: Group files by size
for file_path in self.get_files_recursively():
try:
file_size = file_path.stat().st_size
if file_size >= self.min_size: # Skip files smaller than min_size
self.size_match[file_size].append(file_path)
self.total_files += 1
except (PermissionError, OSError) as e:
logging.error(f"Error accessing file {file_path}: {e}")
# Second pass: Compare files of the same size using hashes
for size, file_list in self.size_match.items():
if len(file_list) > 1: # Only check files with same size
for file_path in file_list:
file_hash = self.get_file_hash(file_path)
if file_hash: # Only add if hash was successfully calculated
self.duplicates[file_hash].append(file_path)
# Remove entries that don't have duplicates
self.duplicates = {k: v for k, v in self.duplicates.items() if len(v) > 1}
def save_report(self, report_path: str | None = None) -> None:
"""Save duplicate files report to a file."""
if not report_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"duplicate_files_report_{timestamp}.txt"
self.total_duplicates = sum(len(files) - 1 for files in self.duplicates.values())
with open(report_path, 'w', encoding='utf-8') as f:
f.write("=== Duplicate Files Report ===\n\n")
f.write(f"Scan path: {self.root_path}\n")
f.write(f"Scan depth: {self.max_depth} level(s)\n")
f.write(f"Minimum file size: {self.min_size:,} bytes\n")
f.write(f"Total files scanned: {self.total_files:,}\n")
f.write(f"Total duplicates found: {self.total_duplicates:,}\n\n")
if not self.duplicates:
f.write("No duplicate files found.\n")
return
for hash_value, file_list in self.duplicates.items():
if len(file_list) > 1:
file_size = file_list[0].stat().st_size
self.saved_space += file_size * (len(file_list) - 1)
f.write(f"\nDuplicate set (Size: {file_size:,} bytes):\n")
for file_path in file_list:
f.write(f" {file_path}\n")
f.write(f"\nPotential space savings: {self.saved_space:,} bytes ")
f.write(f"({self.saved_space / (1024*1024):.2f} MB)\n")
logging.info(f"Report saved to: {report_path}")
def print_summary(self) -> None:
"""Print a summary of the duplicate search results."""
logging.info("\nDuplicate Search Summary:")
logging.info(f"Total files scanned: {self.total_files:,}")
logging.info(f"Duplicate sets found: {len(self.duplicates):,}")
logging.info(f"Total duplicate files: {self.total_duplicates:,}")
logging.info(f"Potential space savings: {self.saved_space:,} bytes "
f"({self.saved_space / (1024*1024):.2f} MB)")
def parse_report_file(report_path: str) -> Dict[str, List[str]]:
"""Parse a duplicate files report and return duplicate sets."""
duplicate_sets = {}
current_size = None
current_files = []
with open(report_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
line = line.rstrip()
if line.startswith("Duplicate set (Size:"):
# If we were processing a set, save it
if current_size and current_files:
duplicate_sets[current_size] = current_files
# Start new set
current_size = line # Use the full line as a unique identifier
current_files = []
elif line.startswith(" ") and line.strip(): # File paths are indented with 2 spaces
file_path = line.strip()
if file_path:
current_files.append(file_path)
# Save the last set
if current_size and current_files:
duplicate_sets[current_size] = current_files
return duplicate_sets
def delete_duplicates(report_path: str, keep_newest: bool = True, interactive: bool = True) -> None:
"""Delete duplicate files based on a report file."""
duplicate_sets = parse_report_file(report_path)
if not duplicate_sets:
logging.error("No duplicate sets found in the report file.")
return
total_sets = len(duplicate_sets)
total_freed = 0
logging.info(f"Found {total_sets} sets of duplicate files.")
for hash_val, file_paths in duplicate_sets.items():
if len(file_paths) < 2:
continue
print(f"\nDuplicate set:")
for idx, path in enumerate(file_paths, 1):
# Get file info
try:
stat = os.stat(path)
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime)
print(f"{idx}. {path}")
print(f" Size: {size:,} bytes")
print(f" Modified: {mtime}")
except OSError:
print(f"{idx}. {path} (Unable to access file)")
if keep_newest:
# Sort files by modification time, newest first
files_with_time = []
for path in file_paths:
try:
mtime = os.path.getmtime(path)
files_with_time.append((path, mtime))
except OSError:
continue
if not files_with_time:
continue
files_with_time.sort(key=lambda x: x[1], reverse=True)
to_keep = files_with_time[0][0]
to_delete = [f[0] for f in files_with_time[1:]]
else:
to_keep = file_paths[0]
to_delete = file_paths[1:]
if interactive:
print(f"\nWill keep: {to_keep}")
print("Will delete:")
for f in to_delete:
print(f" {f}")
response = input("\nProceed with deletion? (y/n/s - s to skip this set): ").lower()
if response == 's':
continue
elif response != 'y':
print("Skipping deletion for this set")
continue
# Proceed with deletion
for file_path in to_delete:
try:
file_size = os.path.getsize(file_path)
os.remove(file_path)
total_freed += file_size
logging.info(f"Deleted: {file_path}")
except OSError as e:
logging.error(f"Error deleting {file_path}: {e}")
logging.info(f"\nDeletion complete!")
logging.info(f"Total space freed: {total_freed:,} bytes ({total_freed / (1024*1024):.2f} MB)")
def main():
parser = argparse.ArgumentParser(
description='Find and manage duplicate files in directories.'
)
parser.add_argument(
'-p', '--path',
type=str,
default='.',
help='Path to the directory to scan (default: current directory)'
)
parser.add_argument(
'-d', '--depth',
type=int,
default=1,
help='Maximum directory depth to scan (default: 1)'
)
parser.add_argument(
'-s', '--min-size',
type=int,
default=1024,
help='Minimum file size in bytes to consider (default: 1024)'
)
parser.add_argument(
'-o', '--output',
type=str,
help='Output report file path (default: duplicate_files_report_TIMESTAMP.txt)'
)
parser.add_argument(
'--delete-from-report',
type=str,
help='Path to a previously generated report file to delete duplicates from'
)
parser.add_argument(
'--non-interactive',
action='store_true',
help='When deleting, proceed without asking for confirmation'
)
parser.add_argument(
'--keep-oldest',
action='store_true',
help='When deleting, keep oldest file instead of newest (default: keep newest)'
)
args = parser.parse_args()
if args.delete_from_report:
# Delete mode
if not os.path.exists(args.delete_from_report):
logging.error(f"Report file not found: {args.delete_from_report}")
return
delete_duplicates(
args.delete_from_report,
keep_newest=not args.keep_oldest,
interactive=not args.non_interactive
)
else:
# Find mode
path = os.path.expanduser(args.path)
path = os.path.abspath(path)
if not os.path.exists(path):
logging.error(f"The specified path does not exist: {path}")
return
logging.info(f"Scanning directory: {path}")
finder = DuplicateFinder(
root_path=path,
max_depth=args.depth,
min_size=args.min_size
)
finder.find_duplicates()
finder.save_report(args.output)
finder.print_summary()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment