Created
October 22, 2019 03:42
-
-
Save grantjenks/095de18c51fa8f118b68be80a624c45a to your computer and use it in GitHub Desktop.
Python 3 XML-RPC Using Unix Domain Sockets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import http.client | |
import socket | |
import xmlrpc.client | |
class UnixStreamHTTPConnection(http.client.HTTPConnection): | |
def connect(self): | |
self.sock = socket.socket( | |
socket.AF_UNIX, socket.SOCK_STREAM | |
) | |
self.sock.connect(self.host) | |
class UnixStreamTransport(xmlrpc.client.Transport, object): | |
def __init__(self, socket_path): | |
self.socket_path = socket_path | |
super().__init__() | |
def make_connection(self, host): | |
return UnixStreamHTTPConnection(self.socket_path) | |
class UnixStreamXMLRPCClient(xmlrpc.client.ServerProxy): | |
def __init__(self, addr, **kwargs): | |
transport = UnixStreamTransport(addr) | |
super().__init__( | |
"http://", transport=transport, **kwargs | |
) | |
# Client Demo | |
client = UnixStreamXMLRPCClient("/tmp/first.sock") | |
print(client.pow(2, 3)) # Returns 2**3 = 8 | |
print(client.add(2, 3)) # Returns 5 | |
print(client.mul(5, 2)) # Returns 5*2 = 10 | |
# Print list of available methods | |
print(client.system.listMethods()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socketserver | |
from xmlrpc.server import ( | |
SimpleXMLRPCDispatcher, | |
SimpleXMLRPCRequestHandler, | |
) | |
class UnixStreamXMLRPCRequestHandler( | |
SimpleXMLRPCRequestHandler | |
): | |
disable_nagle_algorithm = False | |
def address_string(self): | |
return self.client_address | |
class UnixStreamXMLRPCServer( | |
socketserver.UnixStreamServer, SimpleXMLRPCDispatcher | |
): | |
def __init__( | |
self, | |
addr, | |
log_requests=True, | |
allow_none=True, | |
encoding=None, | |
bind_and_activate=True, | |
use_builtin_types=True, | |
): | |
self.logRequests = log_requests | |
SimpleXMLRPCDispatcher.__init__( | |
self, allow_none, encoding, use_builtin_types | |
) | |
socketserver.UnixStreamServer.__init__( | |
self, | |
addr, | |
UnixStreamXMLRPCRequestHandler, | |
bind_and_activate, | |
) | |
# Server Demo | |
server = UnixStreamXMLRPCServer("/tmp/first.sock") | |
with server: | |
server.register_introspection_functions() | |
# Register pow() function; this will use the value of | |
# pow.__name__ as the name, which is just 'pow'. | |
server.register_function(pow) | |
# Register a function under a different name | |
def adder_function(x, y): | |
return x + y | |
server.register_function(adder_function, "add") | |
# Register an instance; all the methods of the instance are | |
# published as XML-RPC methods (in this case, just 'mul'). | |
class MyFuncs: | |
def mul(self, x, y): | |
return x * y | |
server.register_instance(MyFuncs()) | |
# Run the server's main loop | |
server.serve_forever() |
Works great! Even with supervisor.
Instead of client = UnixStreamXMLRPCClient("/tmp/first.sock")
, just write
server = UnixStreamXMLRPCClient("/var/run/supervisor.sock")
print(server.supervisor.getState())
print(server.supervisor.getProcessInfo('gunicorn'))
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for the gist!