Created
April 8, 2016 05:47
-
-
Save dmckean/f66ad16f90c2ef139f2bb0e52ac936a3 to your computer and use it in GitHub Desktop.
A simple HTTP server for downloading MOBI files from Instapaper, suitable for use from a Kindle browser.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
A simple HTTP server for downloading MOBI files from Instapaper | |
In addition to username, password, a list of folder names and ids | |
must be provided to access Instapaper folders. These IDs may be | |
found in instapaper folder URLs, e.g. | |
https://www.instapaper.com/u/folder/(id)/(name) | |
Leave the id to `None` to reach the home Instapaper folder | |
USAGE: kinstapaper.py [-h] USERNAME PASSWORD ... | |
""" | |
import os | |
import time | |
from datetime import timedelta, datetime | |
import BaseHTTPServer | |
import SimpleHTTPServer | |
import threading | |
import logging | |
import argparse | |
import requests | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='(%(threadName)-10s) %(message)s', | |
) | |
class InstaQueue(object): | |
def __init__(self, username, password, folders): | |
self.cookie_time = None | |
self.session = requests.Session() | |
self.folder_queue = list() | |
self.timer = 5 | |
self.auth_data = { | |
"username" : username, | |
"password" : password | |
} | |
self.folders = folders | |
@property | |
def queue_not_empty(self): | |
return len(self.folder_queue) > 0 | |
@property | |
def sorted_folder_items(self): | |
return sorted(self.folders.iteritems(), key=lambda(k,v): v.get("rank", k)) | |
def update_index(self): | |
now = time.time() | |
def delta_str(then): | |
mod_date = datetime(1,1,1) + timedelta(seconds=now - then) | |
if mod_date.day > 1: | |
result = "%sd" % mod_date.day-1 | |
if mod_date.hour > 0: | |
result = "%sh" % mod_date.hour | |
if mod_date.minute > 0: | |
result = "%sm" % mod_date.minute | |
else: | |
result = "%ds" % mod_date.second | |
return "%s ago" % result | |
output = "<ul>" | |
for folder, values in self.sorted_folder_items: | |
title = values.get("title", folder) | |
if folder in self.folder_queue: | |
output += "<li>%s - Currently updating</li>" % title | |
elif os.path.exists("%s.mobi" % folder): | |
mod_time = os.path.getmtime("%s.mobi" % folder) | |
mod_time_str = delta_str(mod_time) | |
output += "<li><a href='%s.mobi'>%s</a> - <a href='./?update=true&folder=%s'>%s</a></li>" % (folder, title, folder, mod_time_str) | |
else: | |
output += "<li>%s - <a href='./?update=true&folder=%s'>Download</a></li>" % (title, folder) | |
output += "</ul>" | |
reload_meta = "<meta http-equiv='Refresh' content='%s'>" % self.timer if self.queue_not_empty else "" | |
index = open("index.html", "w") | |
index.write(("<html><head><style>* { font-size: 50px; } h1 { font-size: 80px; } a { font-size: inherit; }</style>" + | |
"<title>Instapaper Folders</title>%(meta)s</head>" + | |
"<body><h1><a href='.'>Instapaper Folders</a></h1>%(body)s" + | |
"<p><a href='./?updateAll=true'>Update all</a></p>" + | |
"</body></html>") % { "meta": reload_meta, "body": output}) | |
def update_cookie(self): | |
r = self.session.post("https://www.instapaper.com/user/login" , data=self.auth_data) | |
self.cookie_time = time.time() | |
def update_mobi(self, name): | |
logging.debug("Updating: %s" % name) | |
folder = name.lstrip("./") | |
if folder not in self.folders.keys(): | |
logging.debug("%s not found" % folder) | |
return | |
if self.cookie_time is None: | |
self.update_cookie() | |
elif (time.time() - self.cookie_time) > 3600: | |
self.update_cookie() | |
folder_url = "https://www.instapaper.com/mobi" | |
folder_id = self.folders[folder].get("id") | |
folder_id = None if folder_id == "None" else folder_id | |
if folder_id: | |
folder_url += "/%s" % folder_id | |
folder_response = self.session.get(folder_url) | |
if folder_response.status_code == 200: | |
with open("%s.mobi" % folder, "wb") as fd: | |
for chunk in folder_response: | |
fd.write(chunk) | |
logging.debug("%s updated" % name) | |
else: | |
logging.debug("%s could not be downloaded" % name) | |
def push_folder(self, folder): | |
if folder not in self.folder_queue: | |
self.folder_queue.append(folder) | |
self.update_index() | |
logging.debug("%s added to queue" % folder) | |
else: | |
logging.debug("%s is already in the queue" % folder) | |
def pop_folder(self): | |
if self.queue_not_empty: | |
folder = self.folder_queue[0] | |
self.update_mobi(folder) | |
self.timer += 5 if self.timer < 30 else 0 | |
self.folder_queue.pop(0) | |
self.update_index() | |
else: | |
self.timer = 5 | |
def watch_queue(self): | |
logging.debug("Watching download queue") | |
while True: | |
self.pop_folder() | |
time.sleep(5) | |
instaqueue = None | |
class InstaRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): | |
def send_head(self): | |
instaqueue.update_index() | |
p = self.path.split("?",1) | |
self.path = p[0] | |
if len(p) == 2: | |
kwargs = dict() | |
args = p[1].split("&") | |
for arg in args: | |
k,v = arg.split("=",1) | |
kwargs[k]=v | |
if kwargs.get("update") == "true": | |
folder = kwargs.get("folder") | |
instaqueue.push_folder(folder) | |
elif kwargs.get("updateAll") == "true": | |
logging.debug("Updating all") | |
for folder, value in instaqueue.sorted_folder_items: | |
instaqueue.push_folder(folder) | |
self.send_response(301) | |
self.send_header('Location','./') | |
self.end_headers() | |
return | |
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self) | |
def server(port, address, protocol): | |
server_address = (address, port) | |
InstaRequestHandler.protocol_version = protocol | |
httpd = BaseHTTPServer.HTTPServer(server_address, InstaRequestHandler) | |
logging.debug("Starting HTTP server") | |
httpd.serve_forever() | |
def test(username, password, *args, **kwargs): | |
global instaqueue | |
# | |
# List the folders you want to download, only a "name" and {"id": "1234567"} k-v pair are required | |
# e.g. folders = { | |
# "name": { "rank": 1, "id": 1234567 }, | |
# "home": { "rank": 2, "id": None, title: "Instapaper Home" } | |
# } | |
# name: the filename assigned to the mobi file that will be download from Instapaper | |
# rank: specify the preferred order for the folders | |
# id: (optional) this can be found in the address bar's URL when viewing the folder on the web | |
# e.g. https://www.instapaper.com/u/folder/1234567/world-news | |
# The ID is ususally a 7-digit number | |
# Enter `None` to specify the primary inbox | |
# title: (optional) Name visible from the browser | |
# | |
folder_dict = { | |
"home": {"rank": 1, "id": None, "title": "Home"}, | |
"news": {"rank": 2, "id": "1234567", "title": "World News"}, | |
"editorial": {"rank": 3, "id": "1234568", "title": "Editorial"} | |
} | |
folder_list = dict() | |
if kwargs.get("folders"): | |
with kwargs.get("folders") as f: | |
for l in f: | |
cols = l.rstrip().split("\t") | |
folder_list[cols[0]] = dict({"id": cols[1]}) | |
if len(cols) > 2: | |
folder_list[cols[0]]["rank"] = cols[2] | |
if len(cols) > 3: | |
folder_list[cols[0]]["title"] = cols[3] | |
else: | |
folder_list = folder_dict | |
instaqueue = InstaQueue(username, password, folder_list) | |
instaqueue.update_index() | |
server_thread = threading.Thread(name="InstaServer", target=server, | |
args=(kwargs["port"], kwargs["address"], kwargs["protocol"])) | |
server_thread.daemon = True | |
server_thread.start() | |
instaqueue.watch_queue() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("username", type=str, help="Instapaper Account username (email)") | |
parser.add_argument("password", type=str, help="Instapaper Account password") | |
parser.add_argument("-p", "--port", dest="port", type=int, help="Server Port (default: %(default)s)", default=8000) | |
parser.add_argument("--address", dest="address", type=str, help="Server Address (default: '%(default)s')", default="") | |
parser.add_argument("--protocol", dest="protocol", type=str, help="Server Protocol (default: '%(default)s')", default="HTTP/1.0") | |
parser.add_argument("-f", "--folder-list", dest="folders", type=argparse.FileType("r"), | |
help="A tab-delimited file specifying a list of folders, using the fields 'name', 'id', 'rank' (optional), 'title' (optional)" + | |
" e.g. home None 2 Instapaper Home") | |
args = parser.parse_args() | |
test(args.username, args.password, port=args.port, address=args.address, | |
protocol=args.protocol, folders=args.folders) |
Author
dmckean
commented
Apr 8, 2016
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment