Forked from jikamens/download-helpscout-mailbox.py
Last active
October 22, 2017 23:53
-
-
Save avioli/88fbd7fac279964ffc422ad94db03f82 to your computer and use it in GitHub Desktop.
Python script for doanloding conversations, customers, and attachments from a Help Scout mailbox
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Download conversations, customers, and attachments from a Help Scout mailbox | |
Written by Jonathan Kamens ([email protected]). | |
Released into the public domain. | |
Email me patches if you have enhancements you'd like me to incorporate. Don't | |
bother emailing me bug reports or suggestions; this script does exactly what I | |
need it to do, and I'm not planning on spending any time doing additional | |
coding on it myself. | |
""" | |
import argparse | |
import base64 | |
import errno | |
import json | |
import os | |
import requests | |
import sys | |
import time | |
def main(): | |
args = parse_args() | |
args.auth = requests.auth.HTTPBasicAuth(args.api_key, 'notused') | |
if args.target_directory: | |
try: | |
os.chdir(args.target_directory) | |
except OSError as ex: | |
if ex.errno == errno.ENOENT: | |
sys.exit("ERROR: The directory {} does not exist".format( | |
args.target_directory)) | |
elif ex.errno == errno.ENOTDIR: | |
sys.exit("ERROR: {} is not a directory".format( | |
args.target_directory)) | |
raise | |
if args.list_mailboxes: | |
list_mailboxes(args) | |
else: | |
if args.mailbox_name: | |
for name, _id in get_mailboxes(args): | |
if args.mailbox_name == name: | |
args.mailbox_id = str(_id) | |
break | |
else: | |
sys.exit('Could not find a mailbox named "{}"'.format( | |
args.mailbox_name)) | |
if args.status and len(args.status) == 1: | |
args.status = [x.strip() for x in args.status[0].split(',')] | |
walk_conversations(args) | |
if args.list_owners: | |
list_owners() | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Download conversations, " | |
"customers, and attachments from a " | |
"Help Scout mailbox") | |
parser.add_argument("--verbose", action="store_true") | |
group1 = parser.add_mutually_exclusive_group(required=True) | |
group1.add_argument("--list-mailboxes", action="store_true", help="List " | |
"available mailboxes instead of downloading") | |
group1.add_argument("--mailbox-name", action="store", | |
help="The name of the mailbox whose content should be " | |
"downloaded") | |
group1.add_argument("--mailbox-id", action="store", help="The identifier " | |
"of the mailbox whose content should be downloaded") | |
parser.add_argument("--api-key", action="store", required=True, | |
help="The API key to use to access Help Scout") | |
parser.add_argument("--target-directory", action="store", | |
help="Directory in which to put downloaded data " | |
"(default: current directory)") | |
group2 = parser.add_mutually_exclusive_group(required=False) | |
group2.add_argument("--list-owners", action="store_true", help="List " | |
"available owners instead of downloading") | |
group2.add_argument("--owner", action="store", help="Download " | |
"only conversations for given owner's email") | |
parser.add_argument("--status", action="append", | |
help="Only download conversations with given status." | |
"(Use multiple args to match more than one)") | |
return parser.parse_args() | |
def list_mailboxes(args): | |
template = "{:<40s} {:>6s}" | |
for name, _id in get_mailboxes(args): | |
print(template.format(name, str(_id))) | |
def get_mailboxes(args): | |
page = 1 | |
while True: | |
mailboxes = api_q(args, "mailboxes.json?page={}", page) | |
pages = mailboxes["pages"] | |
for mailbox in mailboxes["items"]: | |
yield (mailbox["name"], mailbox["id"]) | |
if page == pages: | |
break | |
page += 1 | |
def walk_conversations(args): | |
page = 1 | |
so_far_count = 0 | |
start_time = time.time() | |
while True: | |
conversations = api_q(args, "mailboxes/{}/conversations.json?page={}", | |
args.mailbox_id, page) | |
pages = conversations["pages"] | |
if page == 1 and args.verbose: | |
print("Starting page {} of {}".format(page, pages)) | |
for conversation in conversations["items"]: | |
if args.list_owners: | |
add_owner(conversation) | |
continue | |
if args.status and not conversation["status"] in args.status: | |
continue | |
if args.owner: | |
if not conversation["owner"]: | |
continue | |
if args.owner != conversation["owner"]["email"]: | |
continue | |
download_conversation(args, conversation["id"]) | |
if page == pages: | |
break | |
if args.verbose: | |
so_far_count += len(conversations["items"]) | |
elapsed_time = time.time() - start_time | |
estimated_total = so_far_count * pages / page | |
estimated_total_time = \ | |
elapsed_time * estimated_total / so_far_count | |
estimated_time_left = estimated_total_time - elapsed_time | |
print("Finished page {} of {}, estimated {:.0f} seconds remaining". | |
format(page, pages, estimated_time_left)) | |
page += 1 | |
def download_conversation(args, id): | |
if not os.path.isdir("conversations"): | |
os.mkdir("conversations") | |
conversation_dir = "conversations/{}".format(id) | |
conversation = api_q(args, "{}.json", conversation_dir)["item"] | |
save_customer(args, conversation["customer"]) | |
threads = conversation.pop("threads") | |
if not os.path.isdir(conversation_dir): | |
os.mkdir(conversation_dir) | |
json_dump(conversation, open("{}/conversation.json".format( | |
conversation_dir), "w")) | |
threads_dir = "{}/threads".format(conversation_dir) | |
if not os.path.isdir(threads_dir): | |
os.mkdir(threads_dir) | |
for thread in threads: | |
save_customer(args, conversation["customer"]) | |
thread_dir = "{}/{}".format(threads_dir, thread["id"]) | |
if not os.path.isdir(thread_dir): | |
os.mkdir(thread_dir) | |
attachments = thread.pop("attachments") | |
json_dump(thread, open("{}/thread.json".format(thread_dir), "w")) | |
if attachments: | |
attachments_dir = "{}/attachments".format(thread_dir) | |
if not os.path.isdir(attachments_dir): | |
os.mkdir(attachments_dir) | |
for attachment in attachments: | |
attachment_dir = "{}/{}".format( | |
attachments_dir, attachment["id"]) | |
if not os.path.isdir(attachment_dir): | |
os.mkdir(attachment_dir) | |
json_dump(attachment, open("{}/attachment.json".format( | |
attachment_dir), "w")) | |
attachment_data = base64.b64decode( | |
api_q(args, "attachments/{}/data.json", | |
attachment["id"])["item"]["data"]) | |
file_name = "{}/{}".format( | |
attachment_dir, attachment["fileName"]) | |
if file_name == "attachment.json": | |
# Ugh. | |
file_name = "attachment_data.json" | |
open(file_name, "wb").write(attachment_data) | |
has_unowned = False | |
owners = set() | |
def add_owner(conversation): | |
global has_unowned | |
if not conversation["owner"]: | |
has_unowned = True | |
return | |
owner = conversation["owner"] | |
owners.add((owner["email"], owner["id"])) | |
def list_owners(): | |
template = "{:<64s} {:>8s}" | |
for owner in owners: | |
print(template.format(owner[0], str(owner[1]))) | |
saved_customers = set() | |
def save_customer(args, customer): | |
if not customer: | |
return | |
if customer["type"] != "customer": | |
return | |
if customer["id"] in saved_customers: | |
return | |
if not os.path.isdir("customers"): | |
os.mkdir("customers") | |
customer = api_q(args, "customers/{}.json", customer["id"])["item"] | |
json_dump(customer, open("customers/{}.json".format(customer["id"]), "w")) | |
saved_customers.add(customer["id"]) | |
def api_q(args, req, *pargs, **kwargs): | |
return requests.get("https://api.helpscout.net/v1/" + | |
req.format(*pargs, **kwargs), | |
auth=args.auth).json() | |
def json_dump(obj, f): | |
json.dump(obj, f, sort_keys=True, indent=2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment