Skip to content

Instantly share code, notes, and snippets.

@urschrei
Last active June 12, 2025 08:54
Show Gist options
  • Save urschrei/5258588 to your computer and use it in GitHub Desktop.
Save urschrei/5258588 to your computer and use it in GitHub Desktop.
Extract attachments from EML files in the current dir, and write them to the output subdir. Now with recursion and robust filename handling
#!/usr/bin/env python3
"""
2025 update:
- Recursive extraction from nested EML files
- Robust filename handling with sanitization and deduplication
- Proper logging instead of print statements
- Enhanced error handling and validation
- Binary file reading for better encoding support
- Cross-platform filename compatibility
- Depth-limited recursion to prevent infinite loops
"""
import email
import glob
import logging
import os
import re
import sys
import unicodedata
from collections import defaultdict
from email import policy
from multiprocessing import Pool
from pathlib import Path
EXTENSION = "eml"
MAX_FILENAME_LENGTH = 255
INVALID_CHARS = r'[<>:"/\\|?*\x00-\x1f]'
# Configure logging
def setup_logging(level=logging.INFO):
"""Setup logging configuration."""
logging.basicConfig(
level=level,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("eml_extractor.log"),
],
)
return logging.getLogger(__name__)
def sanitize_filename(filename):
"""
Sanitize filename for cross-platform compatibility.
Handles newlines, invalid characters, and length limits.
"""
if not filename:
return "unnamed_attachment"
# Normalize unicode characters
filename = unicodedata.normalize("NFKD", filename)
# Remove or replace invalid characters
filename = re.sub(INVALID_CHARS, "_", filename)
filename = re.sub(r"\s+", " ", filename) # Normalize whitespace
filename = filename.strip(" .") # Remove leading/trailing spaces and dots
# Handle length limit
if len(filename) > MAX_FILENAME_LENGTH:
name, ext = os.path.splitext(filename)
max_name_len = MAX_FILENAME_LENGTH - len(ext)
filename = name[:max_name_len] + ext
return filename or "unnamed_attachment"
def get_unique_filename(output_dir, filename, file_counter):
"""
Generate unique filename to avoid overwrites.
Uses counter dict to track duplicates per original name.
"""
base_name = sanitize_filename(filename)
if base_name not in file_counter:
file_counter[base_name] = 0
return os.path.join(output_dir, base_name)
file_counter[base_name] += 1
name, ext = os.path.splitext(base_name)
unique_name = f"{name}_{file_counter[base_name]}{ext}"
return os.path.join(output_dir, unique_name)
def extract_attachments_recursive(msg, output_dir, file_counter, depth=0, max_depth=10):
"""
Recursively extract attachments, including from nested EML files.
Args:
msg: Email message object
output_dir: Output directory path
file_counter: Dict tracking filename duplicates
depth: Current recursion depth
max_depth: Maximum recursion depth to prevent infinite loops
Returns:
int: Number of attachments extracted
"""
logger = logging.getLogger(__name__)
if depth > max_depth:
logger.warning(
f"Maximum recursion depth ({max_depth}) reached, skipping further nesting"
)
return 0
attachment_count = 0
for part in msg.iter_attachments():
try:
filename = part.get_filename()
content_type = part.get_content_type()
if not filename:
# Generate filename based on content type
ext_map = {
"text/plain": ".txt",
"text/html": ".html",
"image/jpeg": ".jpg",
"image/png": ".png",
"application/pdf": ".pdf",
}
ext = ext_map.get(content_type, ".bin")
filename = f"attachment_{attachment_count + 1}{ext}"
output_path = get_unique_filename(output_dir, filename, file_counter)
try:
payload = part.get_payload(decode=True)
if payload is None:
logger.warning(f"Empty payload for {filename}")
continue
with open(output_path, "wb") as of:
of.write(payload)
attachment_count += 1
logger.info(
f"{' ' * depth}Extracted: {os.path.basename(output_path)}"
)
# Check if this is a nested EML file and recurse
if filename.lower().endswith(".eml") or content_type in [
"message/rfc822",
"text/plain",
]:
try:
# Try to parse as email message
nested_msg = email.message_from_bytes(
payload, policy=policy.default
)
if nested_msg.get("Message-ID") or nested_msg.get("From"):
logger.info(
f"{' ' * depth}Processing nested EML: {os.path.basename(output_path)}"
)
nested_count = extract_attachments_recursive(
nested_msg,
output_dir,
file_counter,
depth + 1,
max_depth,
)
attachment_count += nested_count
except Exception as e:
# Not a valid email message, continue normally
logger.debug(f"File {filename} not a valid nested email: {e}")
except (TypeError, OSError) as e:
logger.error(f"Error extracting {filename}: {e}")
except Exception as e:
logger.error(f"Error processing attachment: {e}")
return attachment_count
def extract(filename):
"""
Extract attachments from an EML file with recursive processing.
Returns:
tuple: (files_processed, attachments_extracted)
"""
logger = logging.getLogger(__name__)
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
file_counter = defaultdict(int)
try:
with open(filename, "rb") as f: # Read as binary for better encoding handling
msg = email.message_from_bytes(f.read(), policy=policy.default)
logger.info(f"Processing: {filename}")
attachment_count = extract_attachments_recursive(
msg, str(output_dir), file_counter
)
if attachment_count == 0:
logger.info(f"No attachments found in {filename}")
else:
logger.info(
f"Extracted {attachment_count} attachment(s) from {filename}"
)
return 1, attachment_count
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
return 1, 0
def main():
"""Main function with better error handling and progress tracking."""
logger = setup_logging()
eml_files = list(glob.glob(f"*.{EXTENSION}"))
if not eml_files:
logger.warning(f"No .{EXTENSION} files found in current directory")
return
logger.info(f"Found {len(eml_files)} EML file(s) to process")
try:
# Process files in parallel
with Pool() as pool:
results = pool.map(extract, eml_files)
# Calculate totals
total_files, total_attachments = map(sum, zip(*results))
logger.info(
f"Summary: Files processed: {total_files}, Attachments extracted: {total_attachments}"
)
except KeyboardInterrupt:
logger.info("Processing interrupted by user")
except Exception as e:
logger.error(f"Error during processing: {e}")
if __name__ == "__main__":
main()
@jjkavalam
Copy link

Just worked. Thank you ! (Probably GitHub should add a comment box to repositories as well. Sometimes you just want to say thanks and there is no simple way to do it.)

@abiank
Copy link

abiank commented Jan 15, 2025

Saved half of my morning, thank you

@scivision
Copy link

See https://gist.github.com/scivision/12d4177b743fafc9e5ff37d14bd44e8d for a single .eml file--one could loop this for separation of .eml file looping from extraction loop

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment