Last active
January 3, 2016 01:55
-
-
Save TheEnigmaBlade/1deb3f1b37208ab4cf22 to your computer and use it in GitHub Desktop.
Migrates a Hummingbird anime list to MAL. Manga list migration isn't finished because I didn't need it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Config | |
hb_input = "hb_acct_data.json" | |
db_name = "hb_to_mal.db" | |
hb_user_agent = "Hummingbird anime info puller:v0.1" | |
mal_user_agent = "Hummingbird to MAL migration thingy:v0.1" | |
enable_mal_migrate = True | |
hb_username = "" | |
mal_username = "" | |
mal_password = "" | |
# Main | |
import json | |
from functools import reduce | |
def main(): | |
init_db(db_name) | |
# Migration | |
if enable_mal_migrate: | |
migrate() | |
# ID conversion | |
else: | |
convert_ids() | |
def migrate(): | |
hb_anime = None | |
with open(hb_input, "r", encoding="UTF-8") as file: | |
hb_data = json.load(file) | |
if "anime" in hb_data: | |
hb_anime = {extract_hb_id(x, "anime"): extract_hb_anime_info(x) for x in hb_data["anime"]} | |
existing_anime = get_mal_all_anime() | |
if not existing_anime: | |
print("Cannot continue") | |
return | |
failed_anime_ids = [] | |
if hb_anime: | |
for hb_id in hb_anime: | |
mal_id = load_anime_ids(hb_id) | |
if mal_id: | |
hb_info = hb_anime[hb_id] | |
mal_info = convert_hb_info(hb_info) | |
if mal_id in existing_anime: | |
old_mal_info = existing_anime[mal_id] | |
can_update = reduce(lambda x1, x2: x1 or x2, map(lambda key: old_mal_info[key] != mal_info[key], ["watched", "status", "score"])) | |
if can_update: | |
print("Updating {}".format(mal_id)) | |
#print("{} vs {}".format(old_mal_info["watched"], mal_info["watched"])) | |
#print("{} vs {}".format(old_mal_info["status"], mal_info["status"])) | |
#print("{} vs {}".format(old_mal_info["score"], mal_info["rating"])) | |
if not post_mal_anime(mal_id, mal_info, exists=True): | |
failed_anime_ids.append(mal_id) | |
else: | |
print("Adding {}".format(mal_id)) | |
if not post_mal_anime(mal_id, mal_info, exists=False): | |
failed_anime_ids.append(mal_id) | |
print() | |
print("Summary of failed adds") | |
print("----------------------") | |
for id in failed_anime_ids: | |
print(id) | |
def convert_ids(): | |
hb_anime = None | |
hb_manga = None | |
with open(hb_input, "r", encoding="UTF-8") as file: | |
hb_data = json.load(file) | |
def get_data(medium): | |
return {extract_hb_id(x, medium): x[medium]["slug"] for x in hb_data[medium]} | |
if "anime" in hb_data: | |
hb_anime = get_data("anime") | |
if "manga" in hb_data: | |
hb_manga = get_data("manga") | |
failed_anime_ids = [] | |
if hb_anime: | |
for hb_id in hb_anime: | |
mal_id = convert_hb_anime_id(hb_id) | |
if not mal_id: | |
failed_anime_ids.append(hb_id) | |
failed_manga_ids = [] | |
if hb_manga: | |
for hb_id in hb_manga: | |
mal_id = convert_hb_manga_id(hb_id) | |
if not mal_id: | |
failed_manga_ids.append(hb_id) | |
print() | |
print("Summary of failed conversions") | |
print("-----------------------------") | |
for id in failed_anime_ids: | |
print(id, end="") | |
print(", reason: " + guess_fail_reason(hb_anime[id]["slug"])) | |
print() | |
for id in failed_manga_ids: | |
print(id, end="") | |
print(", reason: " + guess_fail_reason(hb_manga[id]["slug"])) | |
# Support | |
from datetime import datetime | |
def extract_hb_id(hb_obj, medium): | |
return hb_obj[medium]["id"] | |
def extract_hb_anime_info(hb_obj): | |
return { | |
"status": hb_obj["status"], | |
"watched": hb_obj["episodes_watched"], | |
"notes": hb_obj["notes"], | |
"rating": float(hb_obj["rating"]) if hb_obj["rating"] else 0, | |
"last_watched": hb_obj["last_watched"], | |
"rewatch_count": hb_obj["rewatch_count"] | |
} | |
def extract_hb_manga_info(hb_obj): | |
return { | |
} | |
def convert_hb_info(hb_info): | |
def nonone(val, val_type="str"): | |
if val: | |
return val | |
if val_type == "int" or val_type == "float": | |
return 0 | |
return "" | |
mal_info = dict(hb_info) | |
for key in hb_info: | |
if key == "status": | |
mal_info[key] = convert_hb_status(hb_info[key]) | |
elif key == "rating": | |
mal_info["score"] = int(hb_info[key] * 2) | |
elif key == "last_watched" and hb_info[key]: | |
d = datetime.strptime(hb_info[key], "%Y-%m-%dT%H:%M:%S.%fZ") | |
hb_info[key] = d.strftime("%m%d%Y") | |
else: | |
mal_info[key] = nonone(hb_info[key]) | |
return mal_info | |
def convert_hb_status(status): | |
if status == "currently_watching": return 1 | |
if status == "completed": return 2 | |
if status == "on_hold": return 3 | |
if status == "dropped": return 4 | |
if status == "plan_to_watch": return 6 | |
def convert_hb_anime_id(hb_id): | |
print("Converting HB anime ID: {}".format(hb_id)) | |
anime = load_anime_ids(hb_id) | |
if anime: | |
print(" Stored MAL ID: {}".format(anime["mal_id"])) | |
return anime["mal_id"] | |
anime = get_hb_anime(hb_id) | |
if anime and "mal_id" in anime: | |
mal_id = anime["mal_id"] | |
print(" New MAL ID: {}".format(mal_id)) | |
save_anime_ids(hb_id, mal_id) | |
return mal_id | |
print(" Failed") | |
return None | |
def guess_fail_reason(slug): | |
slug = slug.lower() | |
if "rwby" in slug: return "RWBY (not anime)" | |
if "korra" in slug: return "The Legend of Korra (not anime)" | |
if "avatar" in slug: return "Avatar: The Last Airbender (not anime)" | |
return "unknown (" + slug + ")" | |
# API access | |
import requests | |
from time import time, sleep | |
from functools import wraps | |
_last_request_time = 0 | |
_max_request_wait = 2 | |
def rate_limit(f): | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
global _last_request_time | |
time_diff = time() - _last_request_time | |
if time_diff < _max_request_wait: | |
sleep(_max_request_wait - time_diff) | |
r = f(*args, **kwargs) | |
_last_request_time = time() | |
return r | |
return wrapper | |
def create_hb_url(path, *args, **kwargs): | |
if not path or len(path) == 0: | |
return None | |
if path[0] == "/": | |
path = path[1:] | |
return "https://hummingbird.me/api/v1/" + path.format(*args, **kwargs) | |
@rate_limit | |
def get_hb_anime_list(username): | |
url = create_hb_url("users/{username}/library", username=username) | |
response = requests.get(url, headers={"User-Agent": hb_user_agent}) | |
if response.status_code != 200: | |
print("Error: request failed, status={}".format(response.status_code)) | |
return None | |
return response.json() | |
@rate_limit | |
def get_hb_anime(hb_id): | |
url = create_hb_url("anime/{id}", id=hb_id) | |
response = requests.get(url, headers={"User-Agent": hb_user_agent}) | |
if response.status_code != 200: | |
print("Error: request failed, status={}".format(response.status_code)) | |
return None | |
return response.json() | |
def create_mal_url(path, *args, **kwargs): | |
if not path or len(path) == 0: | |
return None | |
if path[0] == "/": | |
path = path[1:] | |
return "http://myanimelist.net/api/" + path.format(*args, **kwargs) | |
@rate_limit | |
def get_mal_all_anime(): | |
headers = {"User-Agent": mal_user_agent} | |
params = {"u": mal_username, "status": "all", "type": "anime"} | |
response = requests.get("http://myanimelist.net/malappinfo.php", params=params, headers=headers) | |
if response.status_code >= 300: | |
print("Error: all anime response code {}".format(response.status_code)) | |
return None | |
if "_Incapsula_Resource" in response.text: | |
print("Error: Request blocked by Incapsula protection") | |
return None | |
from xml.etree import cElementTree as xml | |
result = dict() | |
for raw_entry in xml.fromstring(response.text): | |
entry = dict((attr.tag, attr.text) for attr in raw_entry) | |
if "series_animedb_id" in entry: | |
entry_id = int(entry["series_animedb_id"]) | |
result[entry_id] = { | |
"watched": int(entry["my_watched_episodes"]), | |
"status": int(entry["my_status"]), | |
"score": int(entry["my_score"]), | |
} | |
return result | |
@rate_limit | |
def post_mal_anime(mal_id, info, exists=False): | |
if not exists: | |
url = create_mal_url("animelist/add/{id}.xml", id=mal_id) | |
else: | |
url = create_mal_url("animelist/update/{id}.xml", id=mal_id) | |
headers = {"User-Agent": mal_user_agent} | |
data = {"data": generate_mal_anime_xml(info)} | |
#print(url) | |
#print(data) | |
#response = requests.post("http://httpbin.org/post", auth=None, headers=headers, data=data) | |
response = requests.post(url, auth=(mal_username, mal_password), headers=headers, data=data) | |
if response.status_code != 200 and response.status_code != 201 and response.status_code != 202: | |
print("Error: request failed, status={}".format(response.status_code)) | |
print(response.text) | |
return False | |
return True | |
def generate_mal_anime_xml(info): | |
start_date = info["last_watched"] if info["status"] == 1 else "" | |
finish_date = info["last_watched"] if info["status"] == 2 else "" | |
return "<?xml version='1.0' encoding='UTF-8'?>" \ | |
"<entry>" \ | |
"<status>{status}</status>" \ | |
"<episode>{watched}</episode>" \ | |
"<score>{rating}</score>" \ | |
"<comments>{notes}</comments>" \ | |
"<date_start>{start_date}</date_start>" \ | |
"<date_finish>{finish_date}</date_finish>" \ | |
"<times_rewatched>{rewatch_count}</times_rewatched>" \ | |
"</entry>".format(start_date=start_date, finish_date=finish_date, **info) | |
# Data storage | |
import dataset | |
db = None | |
anime_ids = None | |
manga_ids = None | |
def init_db(name): | |
global db, anime_ids, manga_ids | |
db = dataset.connect("sqlite:///" + name) | |
anime_ids = db["anime"] | |
manga_ids = db["manga"] | |
def save_ids(table, hb_id, mal_id): | |
table.upsert(dict(hb_id=hb_id, mal_id=mal_id), ["hb_id"]) | |
def save_anime_ids(hb_id, mal_id): | |
global anime_ids | |
save_ids(anime_ids, hb_id, mal_id) | |
def save_manga_ids(hb_id, mal_id): | |
global manga_ids | |
save_ids(manga_ids, hb_id, mal_id) | |
def load_ids(table, hb_id): | |
row = table.find_one(hb_id=hb_id) | |
if not row: | |
return None | |
return row["mal_id"] | |
def load_anime_ids(hb_id): | |
global anime_ids | |
return load_ids(anime_ids, hb_id) | |
def load_manga_ids(hb_id): | |
global manga_ids | |
return load_ids(manga_ids, hb_id) | |
### | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment