Last active
February 26, 2024 15:39
-
-
Save alanhamlett/57ecef296fe582408d387f6fd3319bdd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
wakatime.geocoding_utils | |
~~~~~~~~~~~~~~~~~~~~~~~~ | |
Utils to get location from ip address. | |
""" | |
import traceback | |
import flag # https://pypi.org/project/emoji-country-flag | |
import requests | |
from sqlalchemy.sql.expression import or_ | |
from wakatime import app, json, r | |
from wakatime.constants import ( | |
CACHE_TIMEOUT_ONE_MONTH, | |
GEOCODE_IP_PROVIDER_IPAPI, | |
GEOCODE_IP_PROVIDER_IPINFO, | |
GEOCODE_IP_PROVIDER_IPSTACK, | |
IP_INFO_API_URL, # "https://ipinfo.io/{ip}?token={api_key}" | |
IPAPI_API_URL, # "https://api.ipapi.is/?q={ip}&key={api_key}" | |
IPSTACK_API_URL, # "https://api.ipstack.com/{ip}?access_key={api_key}" | |
) | |
from wakatime.models import City, Country | |
TIMEOUT = 2 | |
MAX_RETRIES = 3 | |
def geocode_ip(ip=None, provider=GEOCODE_IP_PROVIDER_IPAPI, _retries=0): | |
"""Return a dict json response of location for given IP address. | |
First tries ipapi.is up to 3 times, then tries ipstack.com 3 times, then tries ipinfo.io 1 time. | |
If all APIs are offline or have no results, returns None. | |
""" | |
ip = discover_ip(ip) | |
if not ip: | |
return None | |
resp = None | |
try: | |
if provider == GEOCODE_IP_PROVIDER_IPAPI: | |
resp = geocode_ip_using_ipapi(ip=ip) | |
elif provider == GEOCODE_IP_PROVIDER_IPSTACK: | |
resp = geocode_ip_using_ipstack(ip=ip) | |
elif provider == GEOCODE_IP_PROVIDER_IPINFO: | |
resp = geocode_ip_using_ipinfo(ip=ip) | |
except: | |
if provider == GEOCODE_IP_PROVIDER_IPINFO or _retries == MAX_RETRIES: | |
app.logger.error(f"geocode_ip {provider} error:\n{traceback.format_exc()}") | |
return None | |
if resp: | |
return resp | |
if provider != GEOCODE_IP_PROVIDER_IPINFO and _retries < MAX_RETRIES: | |
return geocode_ip(ip=ip, provider=provider, _retries=_retries + 1) | |
if provider == GEOCODE_IP_PROVIDER_IPAPI: | |
return geocode_ip(ip=ip, provider=GEOCODE_IP_PROVIDER_IPSTACK, _retries=0) | |
elif provider == GEOCODE_IP_PROVIDER_IPSTACK: | |
return geocode_ip(ip=ip, provider=GEOCODE_IP_PROVIDER_IPINFO, _retries=0) | |
return None | |
def geocode_ip_using_ipapi(ip=None): | |
"""Convert ipapi.is response into ipstack.com reponse.""" | |
ip = discover_ip(ip) | |
if not ip: | |
return None | |
key = "geocode-ipapi-{}".format(ip) | |
try: | |
cache = r.get(key) | |
if cache: | |
data = json.loads(cache) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
return data | |
except: | |
pass | |
url = IPAPI_API_URL.format(api_key=app.config["IPAPI_SECRET_KEY"], ip=ip) | |
headers = {"User-Agent": "WakaTime/1.0"} | |
response = requests.get(url, headers=headers, timeout=TIMEOUT) | |
try: | |
data = response.json() | |
except: | |
data = None | |
if not data or not data.get("ip") or not (data.get("location") or {}).get("country"): | |
raise Exception(f"Response {response.status_code}:\n{response.text}") | |
# transform response to have ipstack.com structure for backwards compatibility | |
data["ip"] = ip | |
data["type"] = "ipv6" if len(ip) > 15 else "ipv4" | |
data["country_code"] = data["location"]["country_code"] | |
data["country_name"] = data["location"]["country"] | |
data["city"] = data["location"]["city"] | |
data["location"]["country_flag_emoji"] = flag.flag(data["location"]["country_code"].upper()) | |
data["time_zone"] = {"id": data["location"]["timezone"]} | |
data["provider"] = "ipapi.is" | |
try: | |
r.set(key, json.dumps(data)) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
except: | |
pass | |
return data | |
def geocode_ip_using_ipstack(ip=None): | |
ip = discover_ip(ip) | |
if not ip: | |
return None | |
key = "geocode-ipstack-{}".format(ip) | |
try: | |
cache = r.get(key) | |
if cache: | |
data = json.loads(cache) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
return data | |
except: | |
pass | |
url = IPSTACK_API_URL.format(api_key=app.config["IPSTACK_SECRET_KEY"], ip=ip) | |
headers = {"User-Agent": "WakaTime/1.0"} | |
response = requests.get(url, headers=headers, verify=False, timeout=TIMEOUT) | |
try: | |
data = response.json() | |
except: | |
data = None | |
if not data or data.get("success") is False or not data.get("ip") or not data.get("country_name"): | |
raise Exception(f"Response {response.status_code}:\n{response.text}") | |
data["provider"] = "ipstack.com" | |
try: | |
r.set(key, json.dumps(data)) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
except: | |
pass | |
return data | |
def geocode_ip_using_ipinfo(ip=None): | |
"""Convert ipinfo.io response into ipstack.com reponse.""" | |
ip = discover_ip(ip) | |
if not ip: | |
return None | |
key = "geocode-ipinfo-{}".format(ip) | |
try: | |
cache = r.get(key) | |
if cache: | |
data = json.loads(cache) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
return data | |
except: | |
pass | |
url = IP_INFO_API_URL.format(api_key=app.config["IPINFO_SECRET_KEY"], ip=ip) | |
headers = {"User-Agent": "WakaTime/1.0"} | |
response = requests.get(url, headers=headers, timeout=TIMEOUT) | |
try: | |
data = response.json() | |
except: | |
data = None | |
if not data or not data.get("ip") or not data.get("country") or not data.get("city") or not data.get("timezone"): | |
raise Exception(f"Response {response.status_code}:\n{response.text}") | |
country = Country.query.filter_by(country_code=data["country"].upper()).first() | |
if not country: | |
raise Exception(f"geocode_ip ipinfo error missing country {data['country']}:\n{response.text}") | |
city = City.query.filter_by(country_code=data["country"].upper()).filter(or_(City.ascii_name == data["city"], City.name == data["city"])).first() | |
if not city: | |
city = ( | |
City.query.filter_by(country_code=data["country"].upper()).filter(or_(City.ascii_name == country.capital, City.name == country.capital)).first() | |
) | |
if not city: | |
raise Exception(f"geocode_ip ipinfo error missing city {data['city']}:\n{response.text}") | |
transformed = { | |
"ip": ip, | |
"type": "ipv6" if len(ip) > 15 else "ipv4", | |
"country_code": data["country"].upper(), | |
"country_name": country.name, | |
"continent_code": country.continent, | |
"city": data["city"], | |
"zip": data.get("postal"), | |
"latitude": float(data["loc"].split(",")[0]), | |
"longitude": float(data["loc"].split(",")[1]), | |
"location": { | |
"geoname_id": city.geoname_id, | |
"capital": country.capital, | |
"country_flag_emoji": flag.flag(data["country"].upper()), | |
}, | |
"time_zone": {"id": data["timezone"]}, | |
"currency": {"code": country.currency_code}, | |
"connection": {"isp": data["org"]}, | |
"provider": "ipinfo.io", | |
} | |
try: | |
r.set(key, json.dumps(transformed)) | |
r.expire(key, time=CACHE_TIMEOUT_ONE_MONTH) | |
except: | |
pass | |
return transformed | |
def discover_ip(ip): | |
if ip: | |
return ip | |
if app.config["DEV"]: | |
return None | |
return utils.get_ip() | |
def get_ip(internal=False): | |
"""IP Address of current request.""" | |
if not has_request_context(): | |
return None | |
real_ip = request.remote_addr | |
if not internal: | |
return str(real_ip)[:120] if real_ip else None | |
ips = list(request.access_route) | |
if real_ip not in ips: | |
ips.insert(0, real_ip) | |
ips = filter(lambda ip: ip, ips) | |
if not ips: | |
return None | |
return ",".join(ips) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment