Last active
April 15, 2022 09:09
-
-
Save dariocazzani/1f358d7c6b336a18bb06f8f543298876 to your computer and use it in GitHub Desktop.
Receive packets via UDP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Simple udp socket server | |
''' | |
import socket | |
import sys | |
import pyaudio | |
import numpy as np | |
class Receiver(object): | |
def __init__(self): | |
self.HOST = '' # Symbolic name meaning all available interfaces | |
self.PORT = 9999 # Arbitrary non-privileged port | |
self._create_socket() | |
self._bind_socket() | |
print('Socket bind complete') | |
self._init_pyaudio() | |
print('PyAudio initialized') | |
def _init_pyaudio(self): | |
try: | |
p = pyaudio.PyAudio() | |
self.stream = p.open(format=p.get_format_from_width(2), | |
channels=1, | |
rate=16000, | |
output=True) | |
except Exception as e: | |
print('Error in creating PyAudio: {}'.format(e)) | |
sys.exit() | |
def _create_socket(self): | |
# Datagram (udp) socket | |
try : | |
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
print('Socket created') | |
except socket.error, msg : | |
print('Failed to create socket. Error Code : {}, Message: '.format(msg[0], msg[1])) | |
sys.exit() | |
def _bind_socket(self): | |
# Bind socket to local host and port | |
try: | |
self.s.bind((self.HOST, self.PORT)) | |
except socket.error , msg: | |
print('Bind failed. Error Code : {}, Message: '.format(msg[0], msg[1])) | |
sys.exit() | |
def play(self, data): | |
self.stream.write(data) | |
def receive(self): | |
#now keep talking with the client | |
while 1: | |
# receive data from client (data, addr) | |
d = self.s.recvfrom(1024) | |
data = d[0] | |
addr = d[1] | |
samples = np.fromstring(data, np.float32) | |
self.play(samples) | |
if not data: | |
break | |
print('received packet of size {} from {}'.format(len(data), addr)) | |
print('Closing socket') | |
self.s.close() | |
self.stream.stop_stream() | |
self.stream.close() | |
def main(): | |
r = Receiver() | |
r.receive() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment