Created
October 28, 2024 19:29
-
-
Save GadgetSteve/1c743c5b3de62b78460efcb6c379e24b to your computer and use it in GitHub Desktop.
Send big files to WORK_EMAIL split into chunks & optionally encrypted. Includes reassebly script as mail body.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! python | |
# encoding=utf-8 | |
""" | |
SendBig | |
Send bigger files than the email limit. | |
""" | |
import sys | |
import os | |
import pathlib | |
import argparse | |
import tempfile | |
import win32com.client | |
from tqdm import tqdm | |
import fast_file_encryption as ffe | |
ENV_KEY = "WORK_EMAIL" # Environemant Variable Name | |
MB = 1024 * 1024 # Size of a MegaByte | |
OUTLOOK = win32com.client.Dispatch("Outlook.Application") | |
def send_mail_with_attachment( | |
to_addr: str, | |
title: str, | |
attach: pathlib.Path, | |
body: str = "Auto-sent in chunks - reassemble if necessary!\n", | |
): | |
"""Send an email with an attachment.""" | |
print(f"Sending mail to {to_addr} with {attach} attached.", end=" ", flush=True) | |
mail = OUTLOOK.CreateItem(0) | |
mail.Subject = title | |
mail.To = to_addr | |
mail.Body = body | |
mail.Attachments.Add(attach.absolute().as_posix()) | |
mail.Send() | |
print("Sent") | |
def validate_infiles(in_files: list[str]) -> list[pathlib.Path]: | |
""" | |
Validate the input file list. | |
""" | |
out_list = [] | |
bad_list = [] | |
for fname in in_files: | |
file_path = pathlib.Path(fname) | |
if file_path.is_file(): | |
out_list.append(file_path) | |
else: | |
wclist = list(file_path.parent.glob(file_path.name)) | |
if len(wclist) == 0: | |
bad_list.append(fname) | |
for match in wclist: | |
out_list.append(match) | |
if len(bad_list) > 0: | |
sys.exit(f"ERROR: No Files Matching: {', '.join(bad_list)}") | |
return out_list | |
def arg_parse() -> argparse.Namespace: | |
""" | |
Parse the command line arguments. | |
""" | |
env_email = os.getenv(ENV_KEY, default=None) | |
parser = argparse.ArgumentParser( | |
description="Split the input file into chunks and send", | |
add_help=True, | |
) | |
parser.add_argument( | |
"in_paths", | |
nargs="+", | |
metavar="File(s)", | |
action="store", | |
help="The path(s) to the file(s) that you would like to send", | |
) | |
parser.add_argument( | |
"--max_chunk", | |
"-M", | |
action="store", | |
default=4, | |
type=int, | |
help="The maximum chunk size in MB (default=4).", | |
) | |
parser.add_argument( | |
"--encrypt", | |
"-e", | |
action="store_true", | |
help=f"Encrypt the file before sending", | |
) | |
parser.add_argument( | |
"--to_address", | |
type=str, | |
action="store", | |
default=env_email, | |
required=env_email | |
is None, # If not set in the environment it must be supplied. | |
help=f"The email address to send to (default from {ENV_KEY}={env_email})", | |
) | |
opts = parser.parse_args() | |
opts.in_paths = validate_infiles(opts.in_paths) | |
return opts | |
def chunk_file( | |
file_path: pathlib.Path, chunk_size_mb: int, chunk_dir: pathlib.Path | |
) -> list[pathlib.Path]: | |
"""Chunk a file.""" | |
assert chunk_dir.is_dir(), f"{chunk_dir} is not a directory" | |
chunk_size = chunk_size_mb * MB | |
name_base = file_path.name | |
out_base = pathlib.Path(chunk_dir / name_base) | |
file_size = file_path.stat().st_size | |
# num_chunks = 1 + file_size // chunk_size | |
read_count = chunk_size | |
chunks = [] | |
count = 0 | |
with open(file_path.absolute(), "rb") as infile: | |
# prog = tqdm(total=1 + file_size // MB, unit="MB") | |
while read_count == chunk_size: | |
count += 1 | |
buff = infile.read(chunk_size) | |
read_count = len(buff) | |
if read_count: | |
out_path = out_base.with_suffix(f".{count:03d}") | |
out_path.write_bytes(buff) | |
# prog.update(1 + read_count // MB) | |
chunks.append(out_path) | |
print( | |
f"{file_path} ({file_size / MB} MB) chunked to {count} files in:\n{chunk_dir}" | |
) | |
return chunks | |
SCRIPT_TEXT = """ | |
# Save all of the file attachements and then in a python session at the save location: | |
import pathlib | |
cwd = pathlib.Path.cwd() | |
flist = sorted(list(cwd.glob("{file_path.stem}.[0-9][0-9][0-9]"))) | |
assert len(flist) == {expect}, f"Expected {expect} got %d" % len(flist) | |
dest_path = pathlib.Path("{file_path.name}") | |
with dest_path.open("wb") as outfile: | |
for infile in flist: | |
outfile.write(infile.read_bytes()) | |
print("Merged", len(flist), "files into", dest_path, dest_path.stat().st_size, "Bytes") | |
""" | |
SCRIPT_TEXT_DECODE_PRE = """ | |
# Encrypted file so need to decrypt file, hence you will need to have installed: | |
# fast_file_encryption with pip. | |
# Save private.pem to the save location | |
import fast_file_encryption as ffe | |
""" | |
SCRIPT_TEXT_DECODE_ACTION = """ | |
encrypted_path = dest_path.with_suffix(".ffe") | |
key_path = (cwd / 'private.pem.txt') | |
print("Rename", dest_path, "to", encrypted_path, "prior to decrypting.") | |
dest_path.rename(encrypted_path) | |
print("Decrypting", encrypted_path, "into", dest_path) | |
decryptor = ffe.Decryptor(ffe.read_private_key(key_path)) | |
decryptor.copy_decrypted(encrypted_path, dest_path) | |
print("Decoded into", dest_path, dest_path.stat().st_size, "Bytes") | |
""" | |
TIDY_ENC_ACTION = """ | |
YN = input("Tidy Up? [Y/N]") | |
if YN.lower().startswith("y"): | |
print("Removing", len(flist)+2, "files:") | |
print("\t", encrypted_path) | |
freed = encrypted_path.stat().st_size, "Bytes" | |
encrypted_path.unlink() | |
for infile in flist: | |
print("\t", infile) | |
freed += infile.stat().st_size, "Bytes" | |
infile.unlink() | |
print("\t", (cwd / 'private.pem.txt')) | |
freed += key_path.stat().st_size, "Bytes" | |
key_path.unlink() | |
print("Freed", freed, "Bytes") | |
print("Done") | |
""" | |
TIDY_PLAIN_ACTION = """ | |
YN = input("Tidy Up? [Y/N]") | |
if YN.lower().startswith("y"): | |
print("Removing", len(flist), "files:") | |
print("\t", encrypted_path) | |
freed = 0 | |
for infile in flist: | |
print("\t", infile) | |
freed += infile.stat().st_size, "Bytes" | |
infile.unlink() | |
print("Freed", freed, "Bytes") | |
print("Done") | |
""" | |
SCRIPT_TEXT_DECODE = "\n".join( | |
[ | |
SCRIPT_TEXT_DECODE_PRE, | |
SCRIPT_TEXT, | |
SCRIPT_TEXT_DECODE_ACTION, | |
TIDY_ENC_ACTION, | |
] | |
) | |
def main(): | |
"""The main entry point.""" | |
opts = arg_parse() | |
cwd = pathlib.Path.cwd() | |
print(opts) | |
if opts.encrypt: | |
pub_key_file = cwd / "public.pem" | |
priv_key_file = cwd / "private.pem.txt" | |
body_fmt = "Encrypted {file_path.name} chunked into {expect} chunks followed by the keyfile & decode instructions." | |
if not pub_key_file.exists() or not priv_key_file.exists(): | |
print("Creating key files") | |
ffe.save_key_pair(public_key=pub_key_file, private_key=priv_key_file) | |
else: | |
body_fmt = SCRIPT_TEXT + TIDY_PLAIN_ACTION | |
for file_path in opts.in_paths: | |
assert isinstance( | |
file_path, pathlib.Path | |
), f"{file_path} is not an instance of pathlib.Path" | |
with tempfile.TemporaryDirectory() as chunk_dir: | |
if opts.encrypt: | |
encryptor = ffe.Encryptor(ffe.read_public_key(cwd / "public.pem")) | |
send_file_path = file_path.with_suffix(".ffe") | |
encryptor.copy_encrypted(file_path, send_file_path) | |
else: | |
send_file_path = file_path | |
cd_path = pathlib.Path(chunk_dir) | |
chunks = chunk_file(send_file_path, opts.max_chunk, cd_path) | |
print(f"Sending {len(chunks)} emails to {opts.to_address}") | |
expect = len(chunks) | |
body = body_fmt.format(file_path=file_path, expect=expect) | |
for n, chunk in enumerate(chunks): | |
title = f"{file_path.name}_{n + 1}_of_{expect}" | |
send_mail_with_attachment( | |
opts.to_address, title, chunk, body | |
) | |
if opts.encrypt: | |
title = f"{file_path.name} Unpacking" | |
body = SCRIPT_TEXT_DECODE.format( | |
file_path=file_path, expect=expect | |
) | |
send_mail_with_attachment( | |
opts.to_address, title, priv_key_file, body | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment