Skip to content

Instantly share code, notes, and snippets.

@aont
Created November 6, 2021 02:07
Show Gist options
  • Save aont/9625ee9710688fbde2cb5e48ca53709f to your computer and use it in GitHub Desktop.
Save aont/9625ee9710688fbde2cb5e48ca53709f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# todo: WWW-Authorizationのuriを変更しない
import sys
import os
import selectors
import socket
import re
proxy_address = "proxy_address_local"
proxy_port = 5060
proxy_address_port = "%s:%s"%(proxy_address,proxy_port)
voip_server_address = "voip_server_address_local"
voip_server_port = 5060
voip_server_address_port = "%s:%s"%(voip_server_address,voip_server_port)
bufsize = 4096
global_address = "global_address"
global_port = 5060
global_address_port = "%s:%s" % (global_address,global_port)
register_table = {}
user_table = {}
invite_table = {}
dialog_table = {}
contact_pattern = re.compile("sip:([^@]+)@([^:]+):(\\d+)")
to_from_pattern = re.compile("sip:([^@]+)@([^>;]+)")
class RTPSockets:
def __init__(self):
self.rtp_sockets = []
self.rtcp_sockets = []
self.sockets = []
self.rtp_port_begin = 4000
for i in range(5):
rtp_socket_i = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.rtp_sockets.append(rtp_socket_i)
self.sockets.append(rtp_socket_i)
port = self.rtp_port_begin+i*2
rtp_socket_i.bind((proxy_address, port))
selector.register(rtp_socket_i, selectors.EVENT_READ, {"Protocol":"RTP", "Port": port})
rtcp_socket_i = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.rtcp_sockets.append(rtcp_socket_i)
self.sockets.append(rtp_socket_i)
port = self.rtp_port_begin+i*2+1
rtcp_socket_i.bind((proxy_address, self.rtp_port_begin+i*2+1))
selector.register(rtcp_socket_i, selectors.EVENT_READ, {"Protocol":"RTCP", "Port": port})
def __enter__(self):
pass
def get_socket_from_portnum(portnum):
ofs = portnum - self.rtp_port_begin
return self.sockets[ofs]
def __exit__(self, exc_type, exc_value, traceback):
for socket in self.sockets:
socket.close()
with selectors.DefaultSelector() as selector, \
socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock1, \
RTPSockets() as rtpsocks:
sock1.bind((proxy_address, proxy_port))
selector.register(sock1, selectors.EVENT_READ, {"Protocol": "SIP", "Port": proxy_port})
try:
while True:
ready = selector.select()
# sys.stderr.write("%s\n" % repr(ready))
if not ready:
break
for key, events in ready:
# if key.fileobj == sock1:
if key.data["Protocol"] == "SIP":
try:
msg, sender_tuple = sock1.recvfrom(bufsize)
except KeyboardInterrupt as e:
raise e
# sys.stderr.write("Ctrl+C\n")
except Exception as e:
raise e
# sys.exc_info()[1]
# else:
# pass
# sys.stderr.write("sock1: %s %s\n" % (msg, sender_tuple))
msg_decode = msg.decode()
sip_msg_end = msg_decode.find("\r\n\r\n")
sip_msg = msg_decode[:sip_msg_end]
sdp_msg = msg_decode[sip_msg_end+4:]
lines = sip_msg.split("\r\n")
headline = lines[0]
sys.stderr.write("[debug] ---- received packet: %s from %s ----\n" % (headline, sender_tuple[0]))
# sys.stderr.write("[debug] len(sdp)=%s\n" % len(sdp_msg))
sip_data = {}
for line in lines[1:]:
if line == "":
continue
colon_index = line.find(":")
param_name = line[0:colon_index]
value = line[colon_index+2:]
sys.stderr.write("[debug] sip %s=%s\n" % (param_name, value))
if line[colon_index+1]!=" ":
raise Exception(line)
sip_data[param_name] = value
sdp_data = {}
lines = sdp_msg.split("\r\n")
for line in lines:
if line=="":
continue
eq_idx = line.find("=")
param = line[:eq_idx]
value = line[eq_idx+1:]
sys.stderr.write("[debug] sdp %s=%s\n" % (param, value))
sdp_data[param] = value
# sys.stderr.write(repr(register_table)+"\n")
# sys.stderr.write(repr(sip_data)+"\n")
# sys.stderr.write(repr(register_table[sip_data["CSeq"]])+"\n")
if headline.startswith("REGISTER"):
register_data = {}
cseq = sip_data.get("CSeq")
if cseq is None:
raise Exception("CSeq not set")
register_table[cseq] = register_data
# sys.stderr.write("Cseq: %s\n" % sip_data["CSeq"])
register_data["UserGlobalAddress"] = sender_tuple[0]
register_data["UserGlobalPort"] = sender_tuple[1]
user_global_address = sender_tuple[0]
user_global_port = sender_tuple[1]
# sys.stderr.write("user global address: %s:%s\n" % (user_global_address, user_global_port))
from_data = sip_data.get("From")
if from_data is None:
from_data = sip_data.get("f")
if from_data is None:
raise Exception("From not found")
m = to_from_pattern.search(from_data)
if m is None:
raise Exception("m is None")
else:
username = m.group(1)
register_data["UserName"] = username
m = contact_pattern.search(sip_data["Contact"])
if m is None:
raise Exception("m is None")
else:
user_local_address = m.group(2)
user_local_port = int(m.group(3))
# sys.stderr.write("user local address: %s:%s\n" % (user_local_address, user_local_port))
register_data["UserLocalAddress"] = user_local_address
register_data["UserLocalPort"] = user_local_port
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
msg_replaced = msg_decode \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sys.stderr.write("[debug] c2s: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
continue
if headline=="SIP/2.0 200 OK":
cseq = sip_data.get("CSeq")
# sys.stderr.write("[debug] CSeq=%s\n" % cseq)
if cseq is not None:
register_data = register_table.get(cseq)
if register_data is not None:
# register_data = register_table.get(data["Cseq"])
user_local_address = register_data["UserLocalAddress"]
user_local_port = register_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = register_data["UserGlobalAddress"]
user_global_port = register_data["UserGlobalPort"]
msg_replaced = msg_decode \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address)
# sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
user_table[register_data["UserName"]] = register_data
del register_table[cseq]
sys.stderr.write("[debug] user_table=%s\n\n" % repr(user_table))
continue
cseq = sip_data.get("CSeq")
# sys.stderr.write("[debug] CSeq=%s\n" % cseq)
if cseq is not None:
register_data = register_table.get(cseq)
if register_data is not None:
# register_data = register_table.get(data["Cseq"])
user_local_address = register_data["UserLocalAddress"]
user_local_port = register_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = register_data["UserGlobalAddress"]
user_global_port = register_data["UserGlobalPort"]
if sender_tuple == (voip_server_address, voip_server_port):
msg_replaced = msg_decode \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address)
# sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
continue
else:
msg_replaced = msg_decode \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sys.stderr.write("[debug] c2s: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
continue
if headline.startswith("INVITE"):
invite_data = {}
cseq = sip_data.get("CSeq")
if cseq is None:
raise Exception("CSeq not set")
invite_table[cseq] = invite_data
if sender_tuple == (voip_server_address, voip_server_port):
to_data = sip_data.get("To")
if to_data is None:
to_data = sip_data.get("t")
if to_data is None:
raise Exception("To not found")
m = to_from_pattern.search(to_data)
if m is None:
raise Exception("m is None")
username = m.group(1)
invite_data["UserName"] = username
peer_rtp_address = sdp_data["c"].split(" ")[2]
peer_rtp_port = int(sdp_data["m"].split(" ")[1])
invite_data["PeerRTPAddress"] = peer_rtp_address
invite_data["PeerRTPPort"] = peer_rtp_port
user_data = user_table[username]
user_local_address = user_data["UserLocalAddress"]
user_local_port = user_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = user_data["UserGlobalAddress"]
user_global_port = user_data["UserGlobalPort"]
user_global_address_port = "%s:%s"%(user_global_address, user_global_port)
sdp_msg_replaced = sdp_msg \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address) \
.replace(peer_rtp_address, global_address_port)
sip_msg_replaced = \
re.sub("l:\\s*\\d+", "l: %s"%len(sdp_msg_replaced), \
re.sub("Content-Length:\\s*\\d+", "Content-Length: %s"%len(sdp_msg_replaced), sip_msg) \
) \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address) \
.replace(peer_rtp_address, global_address_port)
msg_replaced = sip_msg_replaced + "\r\n\r\n" + sdp_msg_replaced
# sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
continue
else:
from_data = sip_data.get("From")
if from_data is None:
from_data = sip_data.get("f")
if from_data is None:
raise Exception("From not found")
m = to_from_pattern.search(from_data)
if m is None:
raise Exception("m is None")
username = m.group(1)
invite_data["UserName"] = username
user_rtp_address = sdp_data["c"].split(" ")[2]
user_rtp_port = int(sdp_data["m"].split(" ")[1])
invite_data["UserRTPAddress"] = user_rtp_address
invite_data["UserRTPPort"] = user_rtp_port
user_data = user_table[username]
user_local_address = user_data["UserLocalAddress"]
user_local_port = user_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = user_data["UserGlobalAddress"]
user_global_port = user_data["UserGlobalPort"]
user_global_address_port = "%s:%s"%(user_global_address, user_global_port)
sdp_msg_replaced = sdp_msg \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
sip_msg_replaced = \
re.sub("l:\\s*\\d+", "l: %s"%len(sdp_msg_replaced), \
re.sub("Content-Length:\\s*\\d+", "Content-Length: %s"%len(sdp_msg_replaced), sip_msg) \
) \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
msg_replaced = sip_msg_replaced + "\r\n\r\n" + sdp_msg_replaced
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sys.stderr.write("[debug] c2s: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
continue
if headline=="SIP/2.0 200 OK":
cseq = sip_data.get("CSeq")
# sys.stderr.write("[debug] CSeq=%s\n" % cseq)
if cseq is not None:
invite_data = invite_table.get(cseq)
if invite_data is not None:
username = invite_data["UserName"]
if sender_tuple == (voip_server_address, voip_server_port):
peer_rtp_address = sdp_data["c"].split(" ")[2]
peer_rtp_port = int(sdp_data["m"].split(" ")[1])
sys.stderr.write("[debug] peer: %s:%s\n" % (peer_rtp_address, peer_rtp_port))
invite_data["PeerRTPAddress"] = peer_rtp_address
invite_data["PeerRTPPort"] = peer_rtp_port
user_data = user_table[username]
user_local_address = user_data["UserLocalAddress"]
user_local_port = user_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = user_data["UserGlobalAddress"]
user_global_port = user_data["UserGlobalPort"]
user_global_address_port = "%s:%s"%(user_global_address, user_global_port)
sdp_msg_replaced = sdp_msg \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address) \
.replace(peer_rtp_address, global_address)
sip_msg_replaced = \
re.sub("l:\\s*\\d+", "l: %s"%len(sdp_msg_replaced), \
re.sub("Content-Length:\\s*\\d+", "Content-Length: %s"%len(sdp_msg_replaced), sip_msg) \
) \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address) \
.replace(peer_rtp_address, global_address)
msg_replaced = sip_msg_replaced + "\r\n\r\n" + sdp_msg_replaced
#sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
user_data["PeerRTPAddress"] = invite_data["PeerRTPAddress"]
user_data["PeerRTPPort"] = invite_data["PeerRTPPort"]
user_data["UserRTPAddress"] = invite_data["UserRTPAddress"]
user_data["UserRTPPort"] = invite_data["UserRTPPort"]
del invite_table[cseq]
continue
else:
invite_data["UserName"] = username
user_rtp_address = sdp_data["c"].split(" ")[2]
user_rtp_port = int(sdp_data["m"].split(" ")[1])
invite_data["UserRTPAddress"] = user_rtp_address
invite_data["UserRTPPort"] = user_rtp_port
user_data = user_table[username]
user_local_address = user_data["UserLocalAddress"]
user_local_port = user_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = user_data["UserGlobalAddress"]
user_global_port = user_data["UserGlobalPort"]
user_global_address_port = "%s:%s"%(user_global_address, user_global_port)
sdp_msg_replaced = sdp_msg \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
sdp_msg_replaced = \
re.sub("l:\\s*\\d+", "l: %s"%len(sdp_msg_replaced), \
re.sub("Content-Length:\\s*\\d+", "Content-Length: %s"%len(sdp_msg_replaced), sip_msg) \
) \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
msg_replaced = sip_msg_replaced + "\r\n\r\n" + sdp_msg_replaced
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sys.stderr.write("[debug] c2s: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
user_data["PeerRTPAddress"] = invite_data["PeerRTPAddress"]
user_data["PeerRTPPort"] = invite_data["PeerRTPPort"]
user_data["UserRTPAddress"] = invite_data["UserRTPAddress"]
user_data["UserRTPPort"] = invite_data["UserRTPPort"]
del invite_table[cseq]
continue
cseq = sip_data.get("CSeq")
if cseq is not None:
invite_data = invite_table.get(cseq)
if invite_data is not None:
if sender_tuple == (voip_server_address, voip_server_port):
msg_replaced = msg_decode \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address)
# sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
continue
else:
msg_replaced = msg_decode \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
sys.stderr.write("[debug] c2s: %s\n" % headline)
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
continue
cseq = sip_data.get("CSeq")
sys.stderr.write("[debug] cseq=%s\n" % cseq)
if cseq is not None:
dialog_data = dialog_table.get(cseq)
sys.stderr.write("[debug] dialog_data=%s\n" % repr(dialog_data))
if dialog_data is None:
dialog_data = {}
if sender_tuple == (voip_server_address, voip_server_port):
to_data = sip_data.get("To")
if to_data is None:
to_data = sip_data.get("t")
if to_data is None:
raise Exception("To not found")
m = to_from_pattern.search(to_data)
if m is None:
raise Exception("m is None")
username = m.group(1)
dialog_data["UserName"] = username
dialog_table[cseq] = dialog_data
else:
from_data = sip_data.get("From")
if from_data is None:
from_data = sip_data.get("f")
if from_data is None:
raise Exception("From not found")
m = to_from_pattern.search(from_data)
if m is None:
raise Exception("m is None")
username = m.group(1)
dialog_data["UserName"] = username
dialog_table[cseq] = dialog_data
sys.stderr.write("[debug] dialog_data=%s\n" % repr(dialog_data))
user_name = dialog_data["UserName"]
user_data = user_table.get(user_name)
if user_data is None:
sys.stderr.write("[debug] User %s not found\n" % user_name)
continue
user_local_address = user_data["UserLocalAddress"]
user_local_port = user_data["UserLocalPort"]
user_local_address_port = "%s:%s"%(user_local_address, user_local_port)
user_global_address = user_data["UserGlobalAddress"]
user_global_port = user_data["UserGlobalPort"]
user_global_address_port = "%s:%s"%(user_global_address, user_global_port)
# sys.stderr.write("[debug] dialog_data=%s\n" % repr(dialog_data))
if sender_tuple == (voip_server_address, voip_server_port):
msg_replaced = msg_decode \
.replace(proxy_address_port, user_local_address_port) \
.replace(proxy_address, user_local_address) \
.replace(voip_server_address_port, global_address_port) \
.replace(voip_server_address, global_address)
# sys.stderr.write("[debug] following message is being sent to the client\n%s" % msg_replaced)
sys.stderr.write("[debug] s2c: %s\n" % headline)
sock1.sendto(msg_replaced.encode(), (user_global_address, user_global_port))
continue
else:
msg_replaced = msg_decode \
.replace(global_address_port, voip_server_address_port) \
.replace(global_address, voip_server_address) \
.replace(user_local_address_port, proxy_address_port) \
.replace(user_local_address, proxy_address)
sys.stderr.write("[debug] c2s: %s\n" % headline)
# sys.stderr.write("[debug] following message is being sent to the voip server\n%s" % msg_replaced)
sock1.sendto(msg_replaced.encode(), (voip_server_address, voip_server_port))
continue
elif key.data["Protocol"] == "RTP":
sock = key.fileobj
msg, sender_tuple = sock.recvfrom(bufsize)
sys.stderr.write("[debug] ---- received RTP from %s:%s ----\n" % sender_tuple)
for user, user_data in user_table.items():
if "UserRTPAddress" not in user_data:
continue
sys.stderr.write("[debug] user_data=%s\n" % repr(user_data))
if (user_data["UserRTPAddress"], user_data["UserRTPPort"]) == sender_tuple:
sock.sendto(msg, (user_data["PeerRTPAddress"], user_data["PeerRTPPort"]))
continue
elif (user_data["PeerRTPAddress"], user_data["PeerRTPPort"]) == sender_tuple:
sock.sendto(msg, (user_data["UserRTPAddress"], user_data["UserRTPPort"]))
continue
elif key.data["Protocol"] == "RTCP":
sock = key.fileobj
msg, sender_tuple = sock.recvfrom(bufsize)
sys.stderr.write("[debug] ---- received RTCP from %s:%s ----\n" % sender_tuple)
for user, user_data in user_table.items():
if "UserRTPAddress" not in user_data:
continue
sys.stderr.write("[debug] user_data=%s\n" % repr(user_data))
if (user_data["UserRTPAddress"], user_data["UserRTPPort"]+1) == sender_tuple:
sock.sendto(msg, (user_data["PeerRTPAddress"], user_data["PeerRTPPort"]+1))
continue
elif (user_data["PeerRTPAddress"], user_data["PeerRTPPort"]+1) == sender_tuple:
sock.sendto(msg, (user_data["UserRTPAddress"], user_data["UserRTPPort"]+1))
continue
except KeyboardInterrupt:
sys.stderr.write("Ctrl+C\n")
# except:
# sys.exc_info()[1]
# msg, address = sock1.recvfrom(8192)
# print(f"message: {msg}\nfrom: {address}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment