This commit is contained in:
mii443
2024-08-31 18:57:58 +09:00
commit 3203e4b5c5
8 changed files with 3641 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

3057
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

35
Cargo.toml Normal file
View File

@@ -0,0 +1,35 @@
[package]
name = "rs-easy-p2p"
version = "0.1.0"
edition = "2021"
[lib]
name = "easyp2p"
path = "src/lib.rs"
[[bin]]
name = "rs-easy-p2p"
path = "src/bin.rs"
[[bin]]
name = "signaling-server"
path = "src/signaling.rs"
[dependencies]
anyhow = "1.0.86"
webrtc = "0.11.0"
tokio = { version = "1.38.0", features = ["full"] }
serde = { version = "1.0.209", features = ["derive"] }
serde_json = "1.0.127"
base64 = "0.22.1"
zstd = "0.13.2"
bincode = "1.3.3"
bytes = "1.7.1"
axum = "0.7.5"
tower-http = { version = "0.5.2", features = ["cors"] }
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4"] }
async-stream = "0.3.5"
rand = "0.8.5"
reqwest = { version = "0.12.7", features = ["json", "stream"] }
futures-util = "0.3.30"

78
src/bin.rs Normal file
View File

@@ -0,0 +1,78 @@
use std::io::Write;
use easyp2p::p2p::P2P;
use anyhow::Result;
fn read_line() -> String {
let mut line = String::new();
std::io::stdin().read_line(&mut line).unwrap();
line = line.trim().to_owned();
line
}
#[tokio::main]
async fn main() -> Result<()> {
print!("> ");
std::io::stdout().flush().unwrap();
let line = read_line();
match &*line {
"a" => client_a().await,
_ => client_b(&line).await
}
}
async fn client_a() -> Result<()> {
let mut p2p = P2P::new(None).await?;
let code = p2p.create_code("http://localhost:3000").await?;
println!("{code}");
p2p.wait_open().await;
println!("Connected!");
tokio::spawn({
let receive = p2p.receive_data.clone();
async move {
let mut receive = receive.lock().await;
while let Some(data) = receive.recv().await {
println!("{}", String::from_utf8(data.to_vec()).unwrap());
}
}
});
loop {
let line = read_line();
p2p.send_text(&line).await?;
}
}
async fn client_b(code: &str) -> Result<()> {
let mut p2p = P2P::new(None).await?;
p2p.connect_with_code("http://localhost:3000", code).await?;
p2p.wait_open().await;
println!("Connected!");
tokio::spawn({
let receive = p2p.receive_data.clone();
async move {
let mut receive = receive.lock().await;
while let Some(data) = receive.recv().await {
println!("Received: {}", String::from_utf8(data.to_vec()).unwrap());
}
}
});
loop {
let line = read_line();
p2p.send_text(&line).await?;
}
}

2
src/lib.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod p2p;
pub mod p2p_option;

339
src/p2p.rs Normal file
View File

@@ -0,0 +1,339 @@
use std::sync::Arc;
use crate::p2p_option::P2POption;
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use base64::{prelude::BASE64_STANDARD, Engine};
use futures::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc::{Receiver, Sender}, Mutex};
use webrtc::{api::{interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder}, data_channel::{data_channel_message::DataChannelMessage, RTCDataChannel}, ice_transport::{ice_candidate::RTCIceCandidate, ice_gatherer::RTCIceGatherer, ice_server::RTCIceServer}, interceptor::registry::Registry, peer_connection::{configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, sdp::session_description::RTCSessionDescription, RTCPeerConnection}};
#[derive(Clone, Serialize, Deserialize)]
pub struct SessionDescription {
pub session_description: String,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ConnectionCode {
pub connection_code: String,
}
pub struct P2P {
peer_connection: Arc<Mutex<RTCPeerConnection>>,
send_data: Arc<Mutex<Sender<SendData>>>,
send_data_rx: Arc<Mutex<Receiver<SendData>>>,
pub receive_data: Arc<Mutex<Receiver<Bytes>>>,
receive_data_tx: Arc<Mutex<Sender<Bytes>>>,
done_rx: Receiver<()>,
on_open: Receiver<()>,
on_open_tx: Arc<Mutex<Sender<()>>>
}
enum SendData {
Bytes(Bytes),
String(String)
}
impl P2P {
pub async fn connect_with_code(&mut self, signaling_server: &str, code: &str) -> Result<()> {
let client = Client::new();
let res = client
.get(format!("{}/session/{}", signaling_server, code))
.send()
.await?;
let session: SessionDescription = if res.status().is_success() {
res.json().await?
} else {
return Err(anyhow!("Failed to get remote session description"));
};
let answer = self.set_offer(&session.session_description, true).await?;
let answer = SessionDescription {
session_description: answer
};
let res = client
.post(format!("{}/session/{}", signaling_server, code))
.json(&answer)
.send()
.await?;
if res.status().is_success() {
Ok(())
} else {
Err(anyhow!("Failed to send local session description"))
}
}
pub async fn create_code(&mut self, signaling_server: &str) -> Result<String> {
let client = Client::new();
let description = SessionDescription {
session_description: self.create_offer(true).await?,
};
let res = client
.post(format!("{}/register", signaling_server))
.json(&description)
.send()
.await?;
let mut stream = res.bytes_stream();
let (connection_code_tx, mut connection_code_rx) = tokio::sync::mpsc::channel::<String>(1);
tokio::spawn({
let peer_connection = self.peer_connection.clone();
async move {
let mut connection_code_received = false;
'outer: while let Some(item) = stream.next().await {
let chunk = item.unwrap();
let data = String::from_utf8_lossy(&chunk);
for line in data.lines() {
if line.starts_with("data:") {
let value = line.trim_start_matches("data:").trim();
if connection_code_received {
peer_connection.lock().await
.set_remote_description(
Self::decode_description(value, true).unwrap()
).await.unwrap();
break 'outer;
} else {
connection_code_tx.send(value.to_string()).await.unwrap();
connection_code_received = true;
}
}
}
}
}
});
connection_code_rx.recv().await.context("Failed to get connection code")
}
pub async fn wait_open(&mut self) {
self.on_open.recv().await;
}
pub async fn send(&mut self, data: Bytes) -> Result<()> {
self.send_data.lock().await.send(SendData::Bytes(data)).await.context("Failed to send data")
}
pub async fn send_text(&mut self, data: &str) -> Result<()> {
self.send_data.lock().await.send(SendData::String(data.to_string())).await.context("Failed to send data")
}
pub async fn receive(&mut self) -> Option<Bytes> {
self.receive_data.lock().await.recv().await
}
pub async fn set_answer(&mut self, answer: &str, compress: bool) -> Result<()> {
self.peer_connection.lock().await.set_remote_description(Self::decode_description(answer, compress)?).await.context("Failed to set answer")
}
pub async fn set_offer(&mut self, offer: &str, compress: bool) -> Result<String> {
let offer = Self::decode_description(offer, compress)?;
let peer_connection = self.peer_connection.lock().await;
let receive_data_tx = self.receive_data_tx.clone();
let on_open_tx = self.on_open_tx.clone();
let send_data_rx = self.send_data_rx.clone();
peer_connection
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let receive_data_tx = receive_data_tx.clone();
let on_open_tx = on_open_tx.clone();
let send_data_rx = send_data_rx.clone();
Box::pin(async move {
let d2 = Arc::clone(&d);
d.on_message(Box::new(move |msg: DataChannelMessage| {
let receive_data_tx = receive_data_tx.clone();
Box::pin(async move {
receive_data_tx.lock().await.send(msg.data).await.unwrap();
})
}));
d.on_open(Box::new(move || {
let on_open_tx = on_open_tx.clone();
let send_data_rx = send_data_rx.clone();
Box::pin(async move {
on_open_tx.lock().await.send(()).await.unwrap();
while let Some(data) = send_data_rx.lock().await.recv().await {
match data {
SendData::Bytes(bytes) => {
d2.send(&bytes).await.unwrap();
},
SendData::String(string) => {
d2.send_text(string).await.unwrap();
}
}
}
})
}))
})
}));
peer_connection.set_remote_description(offer).await?;
let answer = peer_connection.create_answer(None).await?;
let mut gather_complete = peer_connection.gathering_complete_promise().await;
peer_connection.set_local_description(answer).await?;
let _ = gather_complete.recv().await;
Self::encode_description(&peer_connection.local_description().await.context("Failed to generate local_description.")?, compress)
}
pub async fn create_offer(&mut self, compress: bool) -> Result<String> {
let peer_connection = Arc::clone(&self.peer_connection);
let ice_candidates = Arc::new(Mutex::new(Vec::new()));
let ice_candidates_clone = Arc::clone(&ice_candidates);
peer_connection.lock().await.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
let ice_candidates = Arc::clone(&ice_candidates_clone);
Box::pin(async move {
if let Some(candidate) = c {
ice_candidates.lock().await.push(candidate);
}
})
}));
let data_channel = self.peer_connection.lock().await.create_data_channel("data", None).await?;
let receive_data_tx = self.receive_data_tx.clone();
data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
let receive_data_tx = receive_data_tx.clone();
Box::pin(async move {
receive_data_tx.lock().await.send(msg.data).await.unwrap();
})
}));
let d1 = Arc::clone(&data_channel);
let on_open_tx = self.on_open_tx.clone();
let send_data_rx = self.send_data_rx.clone();
data_channel.on_open(Box::new(move || {
let d2 = Arc::clone(&d1);
let on_open_tx = on_open_tx.clone();
let send_data_rx = send_data_rx.clone();
Box::pin(async move {
on_open_tx.lock().await.send(()).await.unwrap();
while let Some(data) = send_data_rx.lock().await.recv().await {
match data {
SendData::Bytes(bytes) => {
d2.send(&bytes).await.unwrap();
},
SendData::String(string) => {
d2.send_text(string).await.unwrap();
}
}
}
})
}));
let offer = peer_connection.lock().await.create_offer(None).await?;
peer_connection.lock().await.set_local_description(offer).await?;
let mut gather_complete = peer_connection.lock().await.gathering_complete_promise().await;
let _ = gather_complete.recv().await;
let local_desc = peer_connection.lock().await.local_description().await.context("Failed to generate local_description.")?;
let mut sdp = local_desc.sdp.clone();
sdp = sdp.lines()
.filter(|line| !line.starts_with("a=candidate:udp"))
.collect::<Vec<&str>>()
.join("\r\n");
Self::encode_description(&RTCSessionDescription::offer(sdp)?, compress)
}
fn encode_description(description: &RTCSessionDescription, compress: bool) -> Result<String> {
let json_str = serde_json::to_string(description)?;
if compress {
let compressed = zstd::encode_all(json_str.as_bytes(), 3).unwrap();
Ok(BASE64_STANDARD.encode(compressed))
} else {
Ok(BASE64_STANDARD.encode(&json_str))
}
}
fn decode_description(description: &str, compress: bool) -> Result<RTCSessionDescription> {
let description = if compress {
String::from_utf8(zstd::decode_all(BASE64_STANDARD.decode(description)?.as_slice())?)?
} else {
String::from_utf8(BASE64_STANDARD.decode(description)?)?
};
serde_json::from_str(&description).context("Failed to parse description")
}
pub async fn new(option: Option<P2POption>) -> Result<Self> {
let mut m = MediaEngine::default();
let _ = m.register_default_codecs();
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let config = if let Some(option) = option {
option
} else {
P2POption {
rtc_configuration: RTCConfiguration {
ice_servers: vec![
RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}
],
..Default::default()
}
}
};
let peer_connection = Arc::new(Mutex::new(api.new_peer_connection(config.rtc_configuration).await?));
let (done_tx, done_rx) = tokio::sync::mpsc::channel::<()>(1);
peer_connection.lock().await.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
if s == RTCPeerConnectionState::Failed || s == RTCPeerConnectionState::Disconnected {
let _ = done_tx.try_send(());
}
Box::pin(async {})
}));
let (send_data_tx, send_data_rx) = tokio::sync::mpsc::channel::<SendData>(128);
let (on_open_tx, on_open_rx) = tokio::sync::mpsc::channel::<()>(1);
let (receive_data_tx, receive_data_rx) = tokio::sync::mpsc::channel::<Bytes>(128);
Ok(
Self {
peer_connection,
send_data: Arc::new(Mutex::new(send_data_tx)),
send_data_rx: Arc::new(Mutex::new(send_data_rx)),
receive_data: Arc::new(Mutex::new(receive_data_rx)),
receive_data_tx: Arc::new(Mutex::new(receive_data_tx)),
done_rx,
on_open: on_open_rx,
on_open_tx: Arc::new(Mutex::new(on_open_tx))
}
)
}
}

5
src/p2p_option.rs Normal file
View File

@@ -0,0 +1,5 @@
use webrtc::peer_connection::configuration::RTCConfiguration;
pub struct P2POption {
pub rtc_configuration: RTCConfiguration
}

124
src/signaling.rs Normal file
View File

@@ -0,0 +1,124 @@
use anyhow::Result;
use axum::{
extract::{Path, State},
response::{sse::Event, Sse},
routing::{get, post},
Json, Router,
};
use easyp2p::p2p::SessionDescription;
use futures::stream::Stream;
use rand::Rng;
use tower_http::cors::CorsLayer;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast, Mutex};
struct AppState {
sessions: Mutex<HashMap<String, String>>,
broadcasters: Mutex<HashMap<String, broadcast::Sender<String>>>,
}
#[tokio::main]
async fn main() -> Result<()> {
let app = app();
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
fn app() -> Router {
let app_state = Arc::new(AppState {
sessions: Mutex::new(HashMap::new()),
broadcasters: Mutex::new(HashMap::new()),
});
Router::new()
.route("/register", post(register))
.route("/session/:connection_code", get(get_session).post(send_session))
.layer(CorsLayer::permissive())
.with_state(app_state)
}
fn generate_connection_code() -> String {
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
const CODE_LEN: usize = 6;
let mut rng = rand::thread_rng();
(0..CODE_LEN)
.map(|_| {
let idx = rng.gen_range(0..CHARSET.len());
CHARSET[idx] as char
})
.collect()
}
async fn get_unique_connection_code(sessions: &Mutex<HashMap<String, String>>) -> String {
loop {
let code = generate_connection_code();
let exists = sessions.lock().await.contains_key(&code);
if !exists {
return code;
}
}
}
async fn register(
State(state): State<Arc<AppState>>,
Json(payload): Json<SessionDescription>,
) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
let connection_code = get_unique_connection_code(&state.sessions).await;
let (tx, rx) = broadcast::channel(100);
state
.sessions
.lock()
.await
.insert(connection_code.clone(), payload.session_description);
state
.broadcasters
.lock()
.await
.insert(connection_code.clone(), tx);
let stream = async_stream::stream! {
yield Ok(Event::default().event("connection_code").data(connection_code.clone()));
let mut receiver = rx;
if let Ok(msg) = receiver.recv().await {
yield Ok(Event::default().event("peer_description").data(msg));
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut sessions = state.sessions.lock().await;
let mut broadcasters = state.broadcasters.lock().await;
sessions.remove(&connection_code);
broadcasters.remove(&connection_code);
});
}
};
Sse::new(stream)
}
async fn get_session(
Path(connection_code): Path<String>,
State(state): State<Arc<AppState>>,
) -> Json<SessionDescription> {
let sessions = state.sessions.lock().await;
let session_description = sessions
.get(&connection_code)
.cloned()
.unwrap_or_else(|| "".to_string());
Json(SessionDescription { session_description })
}
async fn send_session(
Path(connection_code): Path<String>,
State(state): State<Arc<AppState>>,
Json(payload): Json<SessionDescription>,
) {
if let Some(tx) = state.broadcasters.lock().await.get(&connection_code) {
let _ = tx.send(payload.session_description);
}
}