Skip to content

Instantly share code, notes, and snippets.

@GadgetSteve
Created October 28, 2024 19:29
Show Gist options
  • Save GadgetSteve/1c743c5b3de62b78460efcb6c379e24b to your computer and use it in GitHub Desktop.
Save GadgetSteve/1c743c5b3de62b78460efcb6c379e24b to your computer and use it in GitHub Desktop.
Send big files to WORK_EMAIL split into chunks & optionally encrypted. Includes reassebly script as mail body.
#! python
# encoding=utf-8
"""
SendBig
Send bigger files than the email limit.
"""
import sys
import os
import pathlib
import argparse
import tempfile
import win32com.client
from tqdm import tqdm
import fast_file_encryption as ffe
ENV_KEY = "WORK_EMAIL" # Environemant Variable Name
MB = 1024 * 1024 # Size of a MegaByte
OUTLOOK = win32com.client.Dispatch("Outlook.Application")
def send_mail_with_attachment(
to_addr: str,
title: str,
attach: pathlib.Path,
body: str = "Auto-sent in chunks - reassemble if necessary!\n",
):
"""Send an email with an attachment."""
print(f"Sending mail to {to_addr} with {attach} attached.", end=" ", flush=True)
mail = OUTLOOK.CreateItem(0)
mail.Subject = title
mail.To = to_addr
mail.Body = body
mail.Attachments.Add(attach.absolute().as_posix())
mail.Send()
print("Sent")
def validate_infiles(in_files: list[str]) -> list[pathlib.Path]:
"""
Validate the input file list.
"""
out_list = []
bad_list = []
for fname in in_files:
file_path = pathlib.Path(fname)
if file_path.is_file():
out_list.append(file_path)
else:
wclist = list(file_path.parent.glob(file_path.name))
if len(wclist) == 0:
bad_list.append(fname)
for match in wclist:
out_list.append(match)
if len(bad_list) > 0:
sys.exit(f"ERROR: No Files Matching: {', '.join(bad_list)}")
return out_list
def arg_parse() -> argparse.Namespace:
"""
Parse the command line arguments.
"""
env_email = os.getenv(ENV_KEY, default=None)
parser = argparse.ArgumentParser(
description="Split the input file into chunks and send",
add_help=True,
)
parser.add_argument(
"in_paths",
nargs="+",
metavar="File(s)",
action="store",
help="The path(s) to the file(s) that you would like to send",
)
parser.add_argument(
"--max_chunk",
"-M",
action="store",
default=4,
type=int,
help="The maximum chunk size in MB (default=4).",
)
parser.add_argument(
"--encrypt",
"-e",
action="store_true",
help=f"Encrypt the file before sending",
)
parser.add_argument(
"--to_address",
type=str,
action="store",
default=env_email,
required=env_email
is None, # If not set in the environment it must be supplied.
help=f"The email address to send to (default from {ENV_KEY}={env_email})",
)
opts = parser.parse_args()
opts.in_paths = validate_infiles(opts.in_paths)
return opts
def chunk_file(
file_path: pathlib.Path, chunk_size_mb: int, chunk_dir: pathlib.Path
) -> list[pathlib.Path]:
"""Chunk a file."""
assert chunk_dir.is_dir(), f"{chunk_dir} is not a directory"
chunk_size = chunk_size_mb * MB
name_base = file_path.name
out_base = pathlib.Path(chunk_dir / name_base)
file_size = file_path.stat().st_size
# num_chunks = 1 + file_size // chunk_size
read_count = chunk_size
chunks = []
count = 0
with open(file_path.absolute(), "rb") as infile:
# prog = tqdm(total=1 + file_size // MB, unit="MB")
while read_count == chunk_size:
count += 1
buff = infile.read(chunk_size)
read_count = len(buff)
if read_count:
out_path = out_base.with_suffix(f".{count:03d}")
out_path.write_bytes(buff)
# prog.update(1 + read_count // MB)
chunks.append(out_path)
print(
f"{file_path} ({file_size / MB} MB) chunked to {count} files in:\n{chunk_dir}"
)
return chunks
SCRIPT_TEXT = """
# Save all of the file attachements and then in a python session at the save location:
import pathlib
cwd = pathlib.Path.cwd()
flist = sorted(list(cwd.glob("{file_path.stem}.[0-9][0-9][0-9]")))
assert len(flist) == {expect}, f"Expected {expect} got %d" % len(flist)
dest_path = pathlib.Path("{file_path.name}")
with dest_path.open("wb") as outfile:
for infile in flist:
outfile.write(infile.read_bytes())
print("Merged", len(flist), "files into", dest_path, dest_path.stat().st_size, "Bytes")
"""
SCRIPT_TEXT_DECODE_PRE = """
# Encrypted file so need to decrypt file, hence you will need to have installed:
# fast_file_encryption with pip.
# Save private.pem to the save location
import fast_file_encryption as ffe
"""
SCRIPT_TEXT_DECODE_ACTION = """
encrypted_path = dest_path.with_suffix(".ffe")
key_path = (cwd / 'private.pem.txt')
print("Rename", dest_path, "to", encrypted_path, "prior to decrypting.")
dest_path.rename(encrypted_path)
print("Decrypting", encrypted_path, "into", dest_path)
decryptor = ffe.Decryptor(ffe.read_private_key(key_path))
decryptor.copy_decrypted(encrypted_path, dest_path)
print("Decoded into", dest_path, dest_path.stat().st_size, "Bytes")
"""
TIDY_ENC_ACTION = """
YN = input("Tidy Up? [Y/N]")
if YN.lower().startswith("y"):
print("Removing", len(flist)+2, "files:")
print("\t", encrypted_path)
freed = encrypted_path.stat().st_size, "Bytes"
encrypted_path.unlink()
for infile in flist:
print("\t", infile)
freed += infile.stat().st_size, "Bytes"
infile.unlink()
print("\t", (cwd / 'private.pem.txt'))
freed += key_path.stat().st_size, "Bytes"
key_path.unlink()
print("Freed", freed, "Bytes")
print("Done")
"""
TIDY_PLAIN_ACTION = """
YN = input("Tidy Up? [Y/N]")
if YN.lower().startswith("y"):
print("Removing", len(flist), "files:")
print("\t", encrypted_path)
freed = 0
for infile in flist:
print("\t", infile)
freed += infile.stat().st_size, "Bytes"
infile.unlink()
print("Freed", freed, "Bytes")
print("Done")
"""
SCRIPT_TEXT_DECODE = "\n".join(
[
SCRIPT_TEXT_DECODE_PRE,
SCRIPT_TEXT,
SCRIPT_TEXT_DECODE_ACTION,
TIDY_ENC_ACTION,
]
)
def main():
"""The main entry point."""
opts = arg_parse()
cwd = pathlib.Path.cwd()
print(opts)
if opts.encrypt:
pub_key_file = cwd / "public.pem"
priv_key_file = cwd / "private.pem.txt"
body_fmt = "Encrypted {file_path.name} chunked into {expect} chunks followed by the keyfile & decode instructions."
if not pub_key_file.exists() or not priv_key_file.exists():
print("Creating key files")
ffe.save_key_pair(public_key=pub_key_file, private_key=priv_key_file)
else:
body_fmt = SCRIPT_TEXT + TIDY_PLAIN_ACTION
for file_path in opts.in_paths:
assert isinstance(
file_path, pathlib.Path
), f"{file_path} is not an instance of pathlib.Path"
with tempfile.TemporaryDirectory() as chunk_dir:
if opts.encrypt:
encryptor = ffe.Encryptor(ffe.read_public_key(cwd / "public.pem"))
send_file_path = file_path.with_suffix(".ffe")
encryptor.copy_encrypted(file_path, send_file_path)
else:
send_file_path = file_path
cd_path = pathlib.Path(chunk_dir)
chunks = chunk_file(send_file_path, opts.max_chunk, cd_path)
print(f"Sending {len(chunks)} emails to {opts.to_address}")
expect = len(chunks)
body = body_fmt.format(file_path=file_path, expect=expect)
for n, chunk in enumerate(chunks):
title = f"{file_path.name}_{n + 1}_of_{expect}"
send_mail_with_attachment(
opts.to_address, title, chunk, body
)
if opts.encrypt:
title = f"{file_path.name} Unpacking"
body = SCRIPT_TEXT_DECODE.format(
file_path=file_path, expect=expect
)
send_mail_with_attachment(
opts.to_address, title, priv_key_file, body
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment