Skip to content

Instantly share code, notes, and snippets.

@TheEnigmaBlade
Last active January 3, 2016 01:55
Show Gist options
  • Save TheEnigmaBlade/1deb3f1b37208ab4cf22 to your computer and use it in GitHub Desktop.
Save TheEnigmaBlade/1deb3f1b37208ab4cf22 to your computer and use it in GitHub Desktop.
Migrates a Hummingbird anime list to MAL. Manga list migration isn't finished because I didn't need it.
# Config
hb_input = "hb_acct_data.json"
db_name = "hb_to_mal.db"
hb_user_agent = "Hummingbird anime info puller:v0.1"
mal_user_agent = "Hummingbird to MAL migration thingy:v0.1"
enable_mal_migrate = True
hb_username = ""
mal_username = ""
mal_password = ""
# Main
import json
from functools import reduce
def main():
init_db(db_name)
# Migration
if enable_mal_migrate:
migrate()
# ID conversion
else:
convert_ids()
def migrate():
hb_anime = None
with open(hb_input, "r", encoding="UTF-8") as file:
hb_data = json.load(file)
if "anime" in hb_data:
hb_anime = {extract_hb_id(x, "anime"): extract_hb_anime_info(x) for x in hb_data["anime"]}
existing_anime = get_mal_all_anime()
if not existing_anime:
print("Cannot continue")
return
failed_anime_ids = []
if hb_anime:
for hb_id in hb_anime:
mal_id = load_anime_ids(hb_id)
if mal_id:
hb_info = hb_anime[hb_id]
mal_info = convert_hb_info(hb_info)
if mal_id in existing_anime:
old_mal_info = existing_anime[mal_id]
can_update = reduce(lambda x1, x2: x1 or x2, map(lambda key: old_mal_info[key] != mal_info[key], ["watched", "status", "score"]))
if can_update:
print("Updating {}".format(mal_id))
#print("{} vs {}".format(old_mal_info["watched"], mal_info["watched"]))
#print("{} vs {}".format(old_mal_info["status"], mal_info["status"]))
#print("{} vs {}".format(old_mal_info["score"], mal_info["rating"]))
if not post_mal_anime(mal_id, mal_info, exists=True):
failed_anime_ids.append(mal_id)
else:
print("Adding {}".format(mal_id))
if not post_mal_anime(mal_id, mal_info, exists=False):
failed_anime_ids.append(mal_id)
print()
print("Summary of failed adds")
print("----------------------")
for id in failed_anime_ids:
print(id)
def convert_ids():
hb_anime = None
hb_manga = None
with open(hb_input, "r", encoding="UTF-8") as file:
hb_data = json.load(file)
def get_data(medium):
return {extract_hb_id(x, medium): x[medium]["slug"] for x in hb_data[medium]}
if "anime" in hb_data:
hb_anime = get_data("anime")
if "manga" in hb_data:
hb_manga = get_data("manga")
failed_anime_ids = []
if hb_anime:
for hb_id in hb_anime:
mal_id = convert_hb_anime_id(hb_id)
if not mal_id:
failed_anime_ids.append(hb_id)
failed_manga_ids = []
if hb_manga:
for hb_id in hb_manga:
mal_id = convert_hb_manga_id(hb_id)
if not mal_id:
failed_manga_ids.append(hb_id)
print()
print("Summary of failed conversions")
print("-----------------------------")
for id in failed_anime_ids:
print(id, end="")
print(", reason: " + guess_fail_reason(hb_anime[id]["slug"]))
print()
for id in failed_manga_ids:
print(id, end="")
print(", reason: " + guess_fail_reason(hb_manga[id]["slug"]))
# Support
from datetime import datetime
def extract_hb_id(hb_obj, medium):
return hb_obj[medium]["id"]
def extract_hb_anime_info(hb_obj):
return {
"status": hb_obj["status"],
"watched": hb_obj["episodes_watched"],
"notes": hb_obj["notes"],
"rating": float(hb_obj["rating"]) if hb_obj["rating"] else 0,
"last_watched": hb_obj["last_watched"],
"rewatch_count": hb_obj["rewatch_count"]
}
def extract_hb_manga_info(hb_obj):
return {
}
def convert_hb_info(hb_info):
def nonone(val, val_type="str"):
if val:
return val
if val_type == "int" or val_type == "float":
return 0
return ""
mal_info = dict(hb_info)
for key in hb_info:
if key == "status":
mal_info[key] = convert_hb_status(hb_info[key])
elif key == "rating":
mal_info["score"] = int(hb_info[key] * 2)
elif key == "last_watched" and hb_info[key]:
d = datetime.strptime(hb_info[key], "%Y-%m-%dT%H:%M:%S.%fZ")
hb_info[key] = d.strftime("%m%d%Y")
else:
mal_info[key] = nonone(hb_info[key])
return mal_info
def convert_hb_status(status):
if status == "currently_watching": return 1
if status == "completed": return 2
if status == "on_hold": return 3
if status == "dropped": return 4
if status == "plan_to_watch": return 6
def convert_hb_anime_id(hb_id):
print("Converting HB anime ID: {}".format(hb_id))
anime = load_anime_ids(hb_id)
if anime:
print(" Stored MAL ID: {}".format(anime["mal_id"]))
return anime["mal_id"]
anime = get_hb_anime(hb_id)
if anime and "mal_id" in anime:
mal_id = anime["mal_id"]
print(" New MAL ID: {}".format(mal_id))
save_anime_ids(hb_id, mal_id)
return mal_id
print(" Failed")
return None
def guess_fail_reason(slug):
slug = slug.lower()
if "rwby" in slug: return "RWBY (not anime)"
if "korra" in slug: return "The Legend of Korra (not anime)"
if "avatar" in slug: return "Avatar: The Last Airbender (not anime)"
return "unknown (" + slug + ")"
# API access
import requests
from time import time, sleep
from functools import wraps
_last_request_time = 0
_max_request_wait = 2
def rate_limit(f):
@wraps(f)
def wrapper(*args, **kwargs):
global _last_request_time
time_diff = time() - _last_request_time
if time_diff < _max_request_wait:
sleep(_max_request_wait - time_diff)
r = f(*args, **kwargs)
_last_request_time = time()
return r
return wrapper
def create_hb_url(path, *args, **kwargs):
if not path or len(path) == 0:
return None
if path[0] == "/":
path = path[1:]
return "https://hummingbird.me/api/v1/" + path.format(*args, **kwargs)
@rate_limit
def get_hb_anime_list(username):
url = create_hb_url("users/{username}/library", username=username)
response = requests.get(url, headers={"User-Agent": hb_user_agent})
if response.status_code != 200:
print("Error: request failed, status={}".format(response.status_code))
return None
return response.json()
@rate_limit
def get_hb_anime(hb_id):
url = create_hb_url("anime/{id}", id=hb_id)
response = requests.get(url, headers={"User-Agent": hb_user_agent})
if response.status_code != 200:
print("Error: request failed, status={}".format(response.status_code))
return None
return response.json()
def create_mal_url(path, *args, **kwargs):
if not path or len(path) == 0:
return None
if path[0] == "/":
path = path[1:]
return "http://myanimelist.net/api/" + path.format(*args, **kwargs)
@rate_limit
def get_mal_all_anime():
headers = {"User-Agent": mal_user_agent}
params = {"u": mal_username, "status": "all", "type": "anime"}
response = requests.get("http://myanimelist.net/malappinfo.php", params=params, headers=headers)
if response.status_code >= 300:
print("Error: all anime response code {}".format(response.status_code))
return None
if "_Incapsula_Resource" in response.text:
print("Error: Request blocked by Incapsula protection")
return None
from xml.etree import cElementTree as xml
result = dict()
for raw_entry in xml.fromstring(response.text):
entry = dict((attr.tag, attr.text) for attr in raw_entry)
if "series_animedb_id" in entry:
entry_id = int(entry["series_animedb_id"])
result[entry_id] = {
"watched": int(entry["my_watched_episodes"]),
"status": int(entry["my_status"]),
"score": int(entry["my_score"]),
}
return result
@rate_limit
def post_mal_anime(mal_id, info, exists=False):
if not exists:
url = create_mal_url("animelist/add/{id}.xml", id=mal_id)
else:
url = create_mal_url("animelist/update/{id}.xml", id=mal_id)
headers = {"User-Agent": mal_user_agent}
data = {"data": generate_mal_anime_xml(info)}
#print(url)
#print(data)
#response = requests.post("http://httpbin.org/post", auth=None, headers=headers, data=data)
response = requests.post(url, auth=(mal_username, mal_password), headers=headers, data=data)
if response.status_code != 200 and response.status_code != 201 and response.status_code != 202:
print("Error: request failed, status={}".format(response.status_code))
print(response.text)
return False
return True
def generate_mal_anime_xml(info):
start_date = info["last_watched"] if info["status"] == 1 else ""
finish_date = info["last_watched"] if info["status"] == 2 else ""
return "<?xml version='1.0' encoding='UTF-8'?>" \
"<entry>" \
"<status>{status}</status>" \
"<episode>{watched}</episode>" \
"<score>{rating}</score>" \
"<comments>{notes}</comments>" \
"<date_start>{start_date}</date_start>" \
"<date_finish>{finish_date}</date_finish>" \
"<times_rewatched>{rewatch_count}</times_rewatched>" \
"</entry>".format(start_date=start_date, finish_date=finish_date, **info)
# Data storage
import dataset
db = None
anime_ids = None
manga_ids = None
def init_db(name):
global db, anime_ids, manga_ids
db = dataset.connect("sqlite:///" + name)
anime_ids = db["anime"]
manga_ids = db["manga"]
def save_ids(table, hb_id, mal_id):
table.upsert(dict(hb_id=hb_id, mal_id=mal_id), ["hb_id"])
def save_anime_ids(hb_id, mal_id):
global anime_ids
save_ids(anime_ids, hb_id, mal_id)
def save_manga_ids(hb_id, mal_id):
global manga_ids
save_ids(manga_ids, hb_id, mal_id)
def load_ids(table, hb_id):
row = table.find_one(hb_id=hb_id)
if not row:
return None
return row["mal_id"]
def load_anime_ids(hb_id):
global anime_ids
return load_ids(anime_ids, hb_id)
def load_manga_ids(hb_id):
global manga_ids
return load_ids(manga_ids, hb_id)
###
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment