Last active
November 7, 2023 00:59
-
-
Save 0xpizza/3d63308a2baff93793fe03dcfeed8622 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:ascii | |
import argparse | |
import asyncio | |
import http.server | |
import logging | |
import ssl | |
import socket | |
import socketserver | |
import selectors | |
import threading | |
import warnings | |
from functools import partial | |
logging.basicConfig( | |
format='%(asctime)s [%(levelname)s] %(message)s' | |
) | |
LOCALIP = socket.gethostbyname(socket.gethostname()) | |
LOCALPORT = 8000 | |
addl_ciphers = [] | |
if not ssl.HAS_SSLv3: | |
addl_ciphers.append(':!SSLv3') | |
if not ssl.HAS_TLSv1: | |
addl_ciphers.append(':!TLS1.0') | |
if not ssl.HAS_TLSv1_1: | |
addl_ciphers.append(':!TLS1.1') | |
print( | |
'Your python installation does not support:', | |
', '.join(s.replace(':!','') for s in addl_ciphers) | |
) | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
INSECURE_CONTEXT = ssl.SSLContext() | |
INSECURE_CONTEXT.set_ciphers('ALL' + ''.join(addl_ciphers)) | |
class ProxyProtocol(asyncio.Protocol): | |
def __init__(self, event_loop, connection_lock, linked_transport=None): | |
self.linked_transport = linked_transport | |
self.connection_lock = connection_lock | |
self.event_loop = event_loop | |
def connection_made(self, transport): | |
self.transport = transport | |
def data_received(self, data): | |
self.linked_transport.write(data) | |
def eof_received(self): | |
self.transport.close() | |
self.linked_transport.close() | |
self.event_loop.call_soon_threadsafe(self.connection_lock.clear) | |
class HttpProxyHandler(http.server.BaseHTTPRequestHandler): | |
def __init__(self, *args, **kwargs): | |
self.event_loop = kwargs.pop('event_loop', None) | |
if not self.event_loop: | |
raise ValueError('Missing argument: `event_loop`') | |
super().__init__(*args, **kwargs) | |
self.do_HEAD = self.do_GET | |
self.do_POST = self.do_GET | |
def do_GET(self): | |
self.send_response(405, b'Method Not Allowed') | |
self.end_headers() | |
self.wfile.write(b'This server accepts CONNECT commands only. Configure this connection as an HTTP proxy endpoint to use the CONNECT command.') | |
self.wfile.flush() | |
def do_CONNECT(self): | |
try: | |
url, port = self.path.rsplit(':', 1) | |
port = int(port) | |
sock = socket.create_connection((url, port)) | |
sock.setblocking(False) | |
self.sock_remote = sock | |
except Exception as e: | |
error = str(e).encode() | |
self.send_error(500, 'Internal Server Error') | |
self.send_header('Content-Length', len(str(e))) | |
self.end_headers() | |
self.wfile.write(str(e).encode()) | |
print(e) | |
return | |
else: | |
self.send_response(200, 'OK') | |
self.event_loop.create_task(self.proxy_connection()) | |
async def proxy_connection(self): | |
"""Transfer control of the sockets to async event loop""" | |
connection_lock = asyncio.Event() | |
factory = partial(ProxyProtocol, self.event_loop, connection_lock) | |
remote_transport, remote_protocol = await self.event_loop.create_connection( | |
factory, ssl=INSECURE_CONTEXT, sock=self.sock_remote | |
) | |
local_transport, local_protocol = await self.event_loop.create_connection( | |
factory, sock=self.socket | |
) | |
remote_protocol.linked_transport = local_transport | |
local_protocol.linked_transport = remote_transport | |
connection_lock.set() | |
# Pass control to the transport chain. Await RST or FIN. | |
await connection_lock.wait() | |
class HttpServer(): | |
def __init__(self): | |
self.event_loop = asyncio.get_event_loop() | |
def run(self): | |
threading.Thread( | |
target=self.event_loop.run_forever, | |
daemon=True, | |
).start() | |
factory = partial(HttpProxyHandler, event_loop=self.event_loop) | |
server = socketserver.ThreadingTCPServer((LOCALIP, LOCALPORT), factory) | |
try: | |
print(f'Serving on: http://{LOCALIP}:{LOCALPORT}') | |
server.serve_forever() | |
except KeyboardInterrupt: | |
print('Shutting down...') | |
try: | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
self.event_loop.stop() | |
self.event_loop.run_until_complete(self.event_loop.shutdown_asyncgens()) | |
self.event_loop.close() | |
except (asyncio.CancelledError, StopIteration, RuntimeError): | |
pass | |
def main(): | |
HttpServer().run() | |
print('Done.') | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment