Created
August 14, 2020 08:32
-
-
Save sorz/6e8b9a327af868f6dfe861c693a7c249 to your computer and use it in GitHub Desktop.
Lastest update on 2019-10-31. Mocked service may have changed since then. Compatibility issues expected.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import logging | |
from aiohttp import web | |
from aiodnsresolver import Resolver, TYPES, DnsRecordDoesNotExist | |
from des import DesKey | |
TENCENT_KEYS = { | |
'000': b'DES-KEY1', | |
'111': b'DES-KEY2', | |
'222': b'DES-KEY3', | |
} | |
async def resolve(request: web.Request) -> (str, [str], int): | |
host = request.query.get('host') or request.query.get('dn') | |
if host is None: | |
raise web.HTTPBadRequest(text='missing query') | |
if not '.' in host: | |
logging.info('not a domain: %s', host) | |
raise web.HTTPBadRequest(text='not a domain') | |
try: | |
ips, ttl = await resolver.query(host) | |
except DnsRecordDoesNotExist: | |
logging.info('NXDOMAIN: %s', host) | |
raise web.HTTPNotFound(text='NXDOMAIN') | |
return host, ips, ttl | |
async def handle_aliyun(request: web.Request) -> web.Response: | |
host, ips, ttl = await resolve(request) | |
return web.json_response(dict(host=host, ips=ips, ttl=ttl)) | |
async def handle_tencent(request: web.Request) -> web.Response: | |
key_id = request.query.get('id') | |
ttl_enabled = request.query.get('ttl') == '1' | |
if key_id is None: | |
_, ips, ttl = await resolve(request) | |
else: | |
key = TENCENT_KEYS.get(key_id) | |
if key is None: | |
logging.info('missing key: %s', key_id) | |
raise web.HTTPNotFound(text='Missing keys') | |
cipher = DesKey(key) | |
host = request.query.get('host') or request.query.get('dn') | |
host = bytes.fromhex(host) | |
host = cipher.decrypt(host, padding=True).decode('utf8') | |
ips, ttl = await resolver.query(host) | |
if ttl_enabled: | |
resp = ';'.join(f'{ip},{ttl}' for ip in ips) | |
else: | |
resp = ';'.join(ips) | |
if key_id is not None: | |
resp = cipher.encrypt(resp.encode('utf8'), padding=True).hex() | |
return web.Response(text=resp) | |
async def handle_zhihu(request: web.Request) -> web.Response: | |
host = request.query.get('host') | |
if ',' not in host: | |
host, ips, ttl = await resolve(request) | |
resp = dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl) | |
else: | |
resp = [] | |
for host in host.split(','): | |
ips, ttl = await resolver.query(host) | |
resp.append(dict(host=host, ips=ips, ttl=ttl, origin_ttl=ttl)) | |
return web.json_response(dict(dns=resp)) | |
class HostChooser: | |
def __init__(self): | |
self._host_handler = {} | |
async def do_route(self, request: web.Request): | |
handler = self._host_handler.get(request.host) | |
if handler is not None: | |
return await handler(request) | |
logging.info('Unknown host: %s', request.host) | |
raise web.HTTPNotFound() | |
def add(self, host: str, handler): | |
self._host_handler[host] = handler | |
class DNSResolver: | |
_resolve = None | |
_loop = None | |
async def query(self, host: str) -> ([str], int): | |
if self._resolve is None: | |
self._resolve, _ = Resolver() | |
if self._loop is None: | |
self._loop = asyncio.get_event_loop() | |
ips = await self._resolve(host, TYPES.A) | |
ttl = int(max(0, min(ip.expires_at for ip in ips) - self._loop.time())) | |
ips = [str(ip) for ip in ips] | |
return ips, ttl | |
app = web.Application() | |
resolver = DNSResolver() | |
hosts = HostChooser() | |
app.add_routes([web.get('/{path:.*}', hosts.do_route)]) | |
hosts.add('203.107.1.33', handle_aliyun) | |
hosts.add('203.107.1.34', handle_aliyun) | |
hosts.add('203.107.1.65', handle_aliyun) | |
hosts.add('203.107.1.66', handle_aliyun) | |
hosts.add('119.29.29.29', handle_tencent) | |
hosts.add('118.89.204.198', handle_zhihu) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger('aiodnsresolver').setLevel(logging.WARN) | |
web.run_app(app, host='127.0.0.80', port=8011) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment