Last active
June 12, 2025 08:54
-
-
Save urschrei/5258588 to your computer and use it in GitHub Desktop.
Extract attachments from EML files in the current dir, and write them to the output subdir. Now with recursion and robust filename handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
2025 update: | |
- Recursive extraction from nested EML files | |
- Robust filename handling with sanitization and deduplication | |
- Proper logging instead of print statements | |
- Enhanced error handling and validation | |
- Binary file reading for better encoding support | |
- Cross-platform filename compatibility | |
- Depth-limited recursion to prevent infinite loops | |
""" | |
import email | |
import glob | |
import logging | |
import os | |
import re | |
import sys | |
import unicodedata | |
from collections import defaultdict | |
from email import policy | |
from multiprocessing import Pool | |
from pathlib import Path | |
EXTENSION = "eml" | |
MAX_FILENAME_LENGTH = 255 | |
INVALID_CHARS = r'[<>:"/\\|?*\x00-\x1f]' | |
# Configure logging | |
def setup_logging(level=logging.INFO): | |
"""Setup logging configuration.""" | |
logging.basicConfig( | |
level=level, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler(sys.stdout), | |
logging.FileHandler("eml_extractor.log"), | |
], | |
) | |
return logging.getLogger(__name__) | |
def sanitize_filename(filename): | |
""" | |
Sanitize filename for cross-platform compatibility. | |
Handles newlines, invalid characters, and length limits. | |
""" | |
if not filename: | |
return "unnamed_attachment" | |
# Normalize unicode characters | |
filename = unicodedata.normalize("NFKD", filename) | |
# Remove or replace invalid characters | |
filename = re.sub(INVALID_CHARS, "_", filename) | |
filename = re.sub(r"\s+", " ", filename) # Normalize whitespace | |
filename = filename.strip(" .") # Remove leading/trailing spaces and dots | |
# Handle length limit | |
if len(filename) > MAX_FILENAME_LENGTH: | |
name, ext = os.path.splitext(filename) | |
max_name_len = MAX_FILENAME_LENGTH - len(ext) | |
filename = name[:max_name_len] + ext | |
return filename or "unnamed_attachment" | |
def get_unique_filename(output_dir, filename, file_counter): | |
""" | |
Generate unique filename to avoid overwrites. | |
Uses counter dict to track duplicates per original name. | |
""" | |
base_name = sanitize_filename(filename) | |
if base_name not in file_counter: | |
file_counter[base_name] = 0 | |
return os.path.join(output_dir, base_name) | |
file_counter[base_name] += 1 | |
name, ext = os.path.splitext(base_name) | |
unique_name = f"{name}_{file_counter[base_name]}{ext}" | |
return os.path.join(output_dir, unique_name) | |
def extract_attachments_recursive(msg, output_dir, file_counter, depth=0, max_depth=10): | |
""" | |
Recursively extract attachments, including from nested EML files. | |
Args: | |
msg: Email message object | |
output_dir: Output directory path | |
file_counter: Dict tracking filename duplicates | |
depth: Current recursion depth | |
max_depth: Maximum recursion depth to prevent infinite loops | |
Returns: | |
int: Number of attachments extracted | |
""" | |
logger = logging.getLogger(__name__) | |
if depth > max_depth: | |
logger.warning( | |
f"Maximum recursion depth ({max_depth}) reached, skipping further nesting" | |
) | |
return 0 | |
attachment_count = 0 | |
for part in msg.iter_attachments(): | |
try: | |
filename = part.get_filename() | |
content_type = part.get_content_type() | |
if not filename: | |
# Generate filename based on content type | |
ext_map = { | |
"text/plain": ".txt", | |
"text/html": ".html", | |
"image/jpeg": ".jpg", | |
"image/png": ".png", | |
"application/pdf": ".pdf", | |
} | |
ext = ext_map.get(content_type, ".bin") | |
filename = f"attachment_{attachment_count + 1}{ext}" | |
output_path = get_unique_filename(output_dir, filename, file_counter) | |
try: | |
payload = part.get_payload(decode=True) | |
if payload is None: | |
logger.warning(f"Empty payload for {filename}") | |
continue | |
with open(output_path, "wb") as of: | |
of.write(payload) | |
attachment_count += 1 | |
logger.info( | |
f"{' ' * depth}Extracted: {os.path.basename(output_path)}" | |
) | |
# Check if this is a nested EML file and recurse | |
if filename.lower().endswith(".eml") or content_type in [ | |
"message/rfc822", | |
"text/plain", | |
]: | |
try: | |
# Try to parse as email message | |
nested_msg = email.message_from_bytes( | |
payload, policy=policy.default | |
) | |
if nested_msg.get("Message-ID") or nested_msg.get("From"): | |
logger.info( | |
f"{' ' * depth}Processing nested EML: {os.path.basename(output_path)}" | |
) | |
nested_count = extract_attachments_recursive( | |
nested_msg, | |
output_dir, | |
file_counter, | |
depth + 1, | |
max_depth, | |
) | |
attachment_count += nested_count | |
except Exception as e: | |
# Not a valid email message, continue normally | |
logger.debug(f"File {filename} not a valid nested email: {e}") | |
except (TypeError, OSError) as e: | |
logger.error(f"Error extracting {filename}: {e}") | |
except Exception as e: | |
logger.error(f"Error processing attachment: {e}") | |
return attachment_count | |
def extract(filename): | |
""" | |
Extract attachments from an EML file with recursive processing. | |
Returns: | |
tuple: (files_processed, attachments_extracted) | |
""" | |
logger = logging.getLogger(__name__) | |
output_dir = Path("output") | |
output_dir.mkdir(exist_ok=True) | |
file_counter = defaultdict(int) | |
try: | |
with open(filename, "rb") as f: # Read as binary for better encoding handling | |
msg = email.message_from_bytes(f.read(), policy=policy.default) | |
logger.info(f"Processing: {filename}") | |
attachment_count = extract_attachments_recursive( | |
msg, str(output_dir), file_counter | |
) | |
if attachment_count == 0: | |
logger.info(f"No attachments found in {filename}") | |
else: | |
logger.info( | |
f"Extracted {attachment_count} attachment(s) from {filename}" | |
) | |
return 1, attachment_count | |
except Exception as e: | |
logger.error(f"Error processing {filename}: {e}") | |
return 1, 0 | |
def main(): | |
"""Main function with better error handling and progress tracking.""" | |
logger = setup_logging() | |
eml_files = list(glob.glob(f"*.{EXTENSION}")) | |
if not eml_files: | |
logger.warning(f"No .{EXTENSION} files found in current directory") | |
return | |
logger.info(f"Found {len(eml_files)} EML file(s) to process") | |
try: | |
# Process files in parallel | |
with Pool() as pool: | |
results = pool.map(extract, eml_files) | |
# Calculate totals | |
total_files, total_attachments = map(sum, zip(*results)) | |
logger.info( | |
f"Summary: Files processed: {total_files}, Attachments extracted: {total_attachments}" | |
) | |
except KeyboardInterrupt: | |
logger.info("Processing interrupted by user") | |
except Exception as e: | |
logger.error(f"Error during processing: {e}") | |
if __name__ == "__main__": | |
main() |
Saved half of my morning, thank you
See https://gist.github.com/scivision/12d4177b743fafc9e5ff37d14bd44e8d for a single .eml file--one could loop this for separation of .eml file looping from extraction loop
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just worked. Thank you ! (Probably GitHub should add a comment box to repositories as well. Sometimes you just want to say thanks and there is no simple way to do it.)