Skip to content

Instantly share code, notes, and snippets.

@emcmanus
Last active July 31, 2024 09:50
Show Gist options
  • Save emcmanus/be0e5a02dafb6d9df93a9fdfe7e39ee7 to your computer and use it in GitHub Desktop.
Save emcmanus/be0e5a02dafb6d9df93a9fdfe7e39ee7 to your computer and use it in GitHub Desktop.
Minimal python client for Phoenix Channels
# Copyright 2024 Ed McManus
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software
# and associated documentation files (the “Software”), to deal in the Software without
# restriction, including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# This very minimal Phoenix Channel client allows you to:
# 1) Join a channel
# 2) Send messages
# 3) Receive messages
import asyncio
import json
import websockets
PHX_WEBSOCKET_URL = "wss://your_server/socket/websocket"
TOPIC = "your_topic"
class PhoenixChannel:
def __init__(self, websocket_url, topic, heartbeat_interval=30):
self.websocket_url = websocket_url
self.topic = topic
self.websocket = None
self.ref = 0
self.heartbeat_interval = heartbeat_interval
def _get_ref(self):
self.ref += 1
return str(self.ref)
async def connect(self):
self.websocket = await websockets.connect(self.websocket_url)
await self._join_topic()
asyncio.create_task(self._heartbeat())
async def _join_topic(self):
join_message = {
"topic": self.topic,
"event": "phx_join",
"payload": {},
"ref": self._get_ref()
}
await self.websocket.send(json.dumps(join_message))
response = await self.websocket.recv()
print(f"Joined topic: {self.topic}")
print(f"Server response: {response}")
async def send_message(self, event, payload):
message = {
"topic": self.topic,
"event": event,
"payload": payload,
"ref": self._get_ref()
}
await self.websocket.send(json.dumps(message))
print(f"Sent message: {message}")
async def receive_messages(self):
while True:
try:
message = await self.websocket.recv()
parsed_message = json.loads(message)
print(f"Received message: {parsed_message}")
# You can add custom handling for different event types here
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
break
async def _heartbeat(self):
while True:
try:
heartbeat_message = {
"topic": "phoenix",
"event": "heartbeat",
"payload": {},
"ref": self._get_ref()
}
await self.websocket.send(json.dumps(heartbeat_message))
print("Sent heartbeat")
await asyncio.sleep(self.heartbeat_interval)
except websockets.exceptions.ConnectionClosed:
print("Connection closed, stopping heartbeat")
break
async def main():
channel = PhoenixChannel(PHX_WEBSOCKET_URL, TOPIC)
await channel.connect()
# Start receiving messages in the background
receive_task = asyncio.create_task(channel.receive_messages())
# Example: Send a message every 5 seconds
for i in range(5):
await channel.send_message("example", {"some_key": f"value {i}"})
await asyncio.sleep(5)
# Wait for the receive task to complete (it won't in this example, but good practice)
await receive_task
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment