/* * Rust signaling server example for libdatachannel * Copyright (c) 2020 Paul-Louis Ageneau * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License as published * by the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ extern crate tokio; extern crate tungstenite; extern crate futures_util; extern crate futures_channel; extern crate json; use std::env; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::net::{TcpListener, TcpStream}; use tungstenite::protocol::Message; use tungstenite::handshake::server::{Request, Response}; use futures_util::{future, pin_mut, StreamExt}; use futures_util::stream::TryStreamExt; use futures_channel::mpsc; type Id = String; type Tx = mpsc::UnboundedSender; type ClientsMap = Arc>>; async fn handle(clients: ClientsMap, stream: TcpStream) { let mut client_id = Id::new(); let callback = |req: &Request, response: Response| { let path: &str = req.uri().path(); let tokens: Vec<&str> = path.split('/').collect(); client_id = tokens[1].to_string(); return Ok(response); }; let websocket = tokio_tungstenite::accept_hdr_async(stream, callback) .await.expect("WebSocket handshake failed"); println!("Client {} connected", &client_id); let (tx, rx) = mpsc::unbounded(); clients.lock().unwrap().insert(client_id.clone(), tx); let (outgoing, incoming) = websocket.split(); let forward = rx.map(Ok).forward(outgoing); let process = incoming.try_for_each(|msg| { if msg.is_text() { let text = msg.to_text().unwrap(); println!("Client {} << {}", &client_id, &text); // Parse let mut content = json::parse(text).unwrap(); let remote_id = content["id"].to_string(); let mut locked = clients.lock().unwrap(); match locked.get_mut(&remote_id) { Some(remote) => { // Format content.insert("id", client_id.clone()).unwrap(); let text = json::stringify(content); // Send to remote println!("Client {} >> {}", &remote_id, &text); remote.unbounded_send(Message::text(text)).unwrap(); }, _ => println!("Client {} not found", &remote_id), } } future::ok(()) }); pin_mut!(process, forward); future::select(process, forward).await; println!("Client {} disconnected", &client_id); clients.lock().unwrap().remove(&client_id); } #[tokio::main] async fn main() -> Result<(), std::io::Error> { let service = env::args().nth(1).unwrap_or("8000".to_string()); let endpoint = if service.contains(':') { service } else { format!("127.0.0.1:{}", service) }; println!("Listening on {}", endpoint); let mut listener = TcpListener::bind(endpoint) .await.expect("Listener binding failed"); let clients = ClientsMap::new(Mutex::new(HashMap::new())); while let Ok((stream, _)) = listener.accept().await { tokio::spawn(handle(clients.clone(), stream)); } return Ok(()) }