Skip to content

Instantly share code, notes, and snippets.

@real-ali
Last active March 9, 2025 09:12
Show Gist options
  • Save real-ali/aabbe751e3564f4dc02a672c4d4196e6 to your computer and use it in GitHub Desktop.
Save real-ali/aabbe751e3564f4dc02a672c4d4196e6 to your computer and use it in GitHub Desktop.
rust websocket - room base
mod application;
mod domain;
mod infrastructure;
use futures_util::{SinkExt, StreamExt};
use infrastructure::ClientMessage;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::accept_async;
type Room = Arc<Mutex<HashMap<String, Vec<mpsc::UnboundedSender<String>>>>>;
#[tokio::main]
async fn main() {
let rooms: Room = Arc::new(Mutex::new(HashMap::new()));
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
let rooms = rooms.clone();
tokio::spawn(async move {
if let Ok(ws_stream) = accept_async(stream).await {
let (ws_sender, mut ws_receiver) = ws_stream.split();
let (tx, mut rx): (
mpsc::UnboundedSender<String>,
mpsc::UnboundedReceiver<String>,
) = mpsc::unbounded_channel();
// Spawn a task to forward messages from the channel to the WebSocket
let ws_sender = Arc::new(Mutex::new(ws_sender));
// Clone the `Arc<Mutex<...>>` for use in the spawned task
let ws_sender_clone = Arc::clone(&ws_sender);
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let mut sender = ws_sender_clone.lock().await;
if sender.send(message.into()).await.is_err() {
break; // Stop if the WebSocket connection is closed
}
}
});
while let Some(Ok(message)) = ws_receiver.next().await {
if let Ok(text) = message.to_text() {
if let Ok(client_message) = serde_json::from_str::<ClientMessage>(text) {
match client_message {
ClientMessage::CreateRoom { room } => {
let room_id = room.id();
let mut rooms_lock = rooms.lock().await;
rooms_lock.insert(room_id.to_string(), vec![tx.clone()]);
let response = json!({
"type": "RoomCreated",
"room_id": room_id
});
let mut sender = ws_sender.lock().await;
sender.send(response.to_string().into()).await.unwrap();
}
ClientMessage::Offer {
room,
caller,
callee,
} => {
let room_id = room.id();
let mut rooms_lock = rooms.lock().await;
if let Some(participants) =
rooms_lock.get_mut(&room_id.to_string())
{
for participant_tx in participants {
let response = json!({
"type": "Offer",
"caller": caller,
"callee": callee
});
participant_tx.send(response.to_string()).unwrap();
}
}
}
ClientMessage::Answer { callee } => {
let room_id = callee.room_id();
println!("Answer received for room: {}", room_id);
let mut rooms_lock = rooms.lock().await;
if let Some(participants) =
rooms_lock.get_mut(&room_id.to_string())
{
for participant_tx in participants {
let response = json!({
"type": "Answer",
"callee": callee,
"room_id": room_id
});
participant_tx.send(response.to_string()).unwrap();
}
} else {
println!("Room {} not found", room_id);
}
}
_ => {}
}
}
}
}
// Remove the user from the room when they disconnect
let mut rooms_lock = rooms.lock().await;
for (_, participants) in rooms_lock.iter_mut() {
participants.retain(|participant_tx| !participant_tx.is_closed());
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment