mirror of
https://github.com/mii443/wasmer.git
synced 2025-12-06 04:38:25 +00:00
wasix: Remove websocket support
Removes the websocket support from wasix. Websockets are a high level concept. They should not be integrated so deeply into the core of wasix. For client connections, we will want to add support for websocket clients into the custom wasix_http_client bindings. For servers, the support should go into the custom wcgi server bindings.
This commit is contained in:
@@ -1,3 +1 @@
|
||||
pub mod memory;
|
||||
|
||||
pub mod tungstenite_websocket;
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
// FIXME: either just delete, or actually use somewhere!
|
||||
|
||||
// use async_trait::async_trait;
|
||||
// use futures::stream::SplitSink;
|
||||
// use futures::stream::SplitStream;
|
||||
// use futures::SinkExt;
|
||||
// use futures_util::StreamExt;
|
||||
// use wasmer_os::wasmer_wasi::WasiRuntimeImplementation;
|
||||
// use std::pin::Pin;
|
||||
// use std::sync::Arc;
|
||||
// use std::sync::Mutex;
|
||||
// use tokio::net::TcpStream;
|
||||
// #[cfg(feature = "tokio_tungstenite")]
|
||||
// use tokio_tungstenite::{
|
||||
// connect_async, tungstenite::protocol::Message,
|
||||
// MaybeTlsStream, WebSocketStream
|
||||
// };
|
||||
// use wasmer_os::wasmer_wasi::WebSocketAbi;
|
||||
|
||||
// #[allow(unused_imports)]
|
||||
// use tracing::{debug, error, info, instrument, span, trace, warn, Level};
|
||||
|
||||
// pub struct TerminalWebSocket {
|
||||
// sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
|
||||
// stream: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
|
||||
// on_close: Arc<Mutex<Option<Box<dyn Fn() + Send + 'static>>>>,
|
||||
// }
|
||||
|
||||
// impl TerminalWebSocket {
|
||||
// pub async fn new(url: &str) -> Result<TerminalWebSocket, String> {
|
||||
// let url = url::Url::parse(url)
|
||||
// .map_err(|err| err.to_string())?;
|
||||
|
||||
// let (ws_stream, _) = connect_async(url).await
|
||||
// .map_err(|err| format!("failed to connect - {}", err))?;
|
||||
// let (sink, stream) = ws_stream.split();
|
||||
|
||||
// Ok(
|
||||
// TerminalWebSocket {
|
||||
// sink,
|
||||
// stream: Some(stream),
|
||||
// on_close: Arc::new(Mutex::new(None)),
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
// #[async_trait]
|
||||
// impl WebSocketAbi for TerminalWebSocket {
|
||||
// fn set_onopen(&mut self, mut callback: Box<dyn FnMut()>) {
|
||||
// // We instantly notify that we are open
|
||||
// callback();
|
||||
// }
|
||||
|
||||
// fn set_onclose(&mut self, callback: Box<dyn Fn() + Send + 'static>) {
|
||||
// let mut guard = self.on_close.lock().unwrap();
|
||||
// guard.replace(callback);
|
||||
// }
|
||||
|
||||
// fn set_onmessage(&mut self, callback: Box<dyn Fn(Vec<u8>) + Send + 'static>, runtime: &dyn WasiRuntimeImplementation)
|
||||
// {
|
||||
// if let Some(mut stream) = self.stream.take() {
|
||||
// let on_close = self.on_close.clone();
|
||||
// runtime.task_shared(Box::new(move || Pin::new(Box::new(async move {
|
||||
// while let Some(msg) = stream.next().await {
|
||||
// match msg {
|
||||
// Ok(Message::Binary(msg)) => {
|
||||
// callback(msg);
|
||||
// }
|
||||
// a => {
|
||||
// debug!("received invalid msg: {:?}", a);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// let on_close = on_close.lock().unwrap();
|
||||
// if let Some(on_close) = on_close.as_ref() {
|
||||
// on_close();
|
||||
// }
|
||||
// }))));
|
||||
// }
|
||||
// }
|
||||
|
||||
// async fn send(&mut self, data: Vec<u8>) -> Result<(), String> {
|
||||
// self.sink
|
||||
// .send(Message::binary(data))
|
||||
// .await
|
||||
// .map_err(|err| err.to_string())?;
|
||||
// Ok(())
|
||||
// }
|
||||
// }
|
||||
@@ -33,13 +33,6 @@ pub struct IpRoute {
|
||||
#[async_trait::async_trait]
|
||||
#[allow(unused_variables)]
|
||||
pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
|
||||
/// Establishes a web socket connection
|
||||
/// (note: this does not use the virtual sockets and is standalone
|
||||
/// functionality that works without the network being connected)
|
||||
async fn ws_connect(&self, url: &str) -> Result<Box<dyn VirtualWebSocket + Sync>> {
|
||||
Err(NetworkError::Unsupported)
|
||||
}
|
||||
|
||||
/// Bridges this local network with a remote network, which is required in
|
||||
/// order to make lower level networking calls (such as UDP/TCP)
|
||||
async fn bridge(
|
||||
@@ -283,34 +276,6 @@ pub enum StreamSecurity {
|
||||
DoubleEncryption,
|
||||
}
|
||||
|
||||
/// Interface used for sending and receiving data from a web socket
|
||||
#[async_trait::async_trait]
|
||||
pub trait VirtualWebSocket: fmt::Debug + Send + Sync + 'static {
|
||||
/// Sends out a datagram or stream of bytes on this socket
|
||||
async fn send(&mut self, data: Bytes) -> Result<usize>;
|
||||
|
||||
/// FLushes all the datagrams
|
||||
fn flush(&mut self) -> Result<()>;
|
||||
|
||||
/// Recv a packet from the socket
|
||||
async fn recv(&mut self) -> Result<SocketReceive>;
|
||||
|
||||
/// Recv a packet from the socket
|
||||
fn try_recv(&mut self) -> Result<Option<SocketReceive>>;
|
||||
|
||||
/// Polls the socket for when there is data to be received
|
||||
fn poll_read_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<usize>>;
|
||||
|
||||
/// Polls the socket for when the backpressure allows for writing to the socket
|
||||
fn poll_write_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<usize>>;
|
||||
}
|
||||
|
||||
/// Connected sockets have a persistent connection to a remote peer
|
||||
#[async_trait::async_trait]
|
||||
pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
|
||||
|
||||
@@ -16,7 +16,7 @@ use wasmer_vnet::{
|
||||
io_err_into_net_error, IpCidr, IpRoute, NetworkError, Result, SocketReceive, SocketReceiveFrom,
|
||||
SocketStatus, StreamSecurity, TimeType, VirtualConnectedSocket, VirtualConnectionlessSocket,
|
||||
VirtualIcmpSocket, VirtualNetworking, VirtualRawSocket, VirtualSocket, VirtualTcpListener,
|
||||
VirtualTcpSocket, VirtualUdpSocket, VirtualWebSocket,
|
||||
VirtualTcpSocket, VirtualUdpSocket,
|
||||
};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
||||
@@ -142,7 +142,6 @@ impl std::fmt::Debug for InodeValFilePollGuard {
|
||||
}
|
||||
InodeSocketKind::UdpSocket(..) => write!(f, "guard-udp-socket"),
|
||||
InodeSocketKind::Raw(..) => write!(f, "guard-raw-socket"),
|
||||
InodeSocketKind::WebSocket(..) => write!(f, "guard-web-socket"),
|
||||
_ => write!(f, "guard-socket"),
|
||||
},
|
||||
_ => write!(f, "guard-socket (locked)"),
|
||||
|
||||
@@ -100,7 +100,7 @@ pub use crate::{
|
||||
},
|
||||
runtime::{
|
||||
task_manager::{VirtualTaskManager, VirtualTaskManagerExt},
|
||||
PluggableRuntimeImplementation, SpawnedMemory, WasiRuntimeImplementation, WebSocketAbi,
|
||||
PluggableRuntimeImplementation, SpawnedMemory, WasiRuntimeImplementation,
|
||||
},
|
||||
wapm::parse_static_webc,
|
||||
};
|
||||
@@ -410,7 +410,6 @@ fn wasix_exports_32(mut store: &mut impl AsStoreMut, env: &FunctionEnv<WasiEnv>)
|
||||
"call_reply" => Function::new_typed_with_env(&mut store, env, call_reply::<Memory32>),
|
||||
"call_fault" => Function::new_typed_with_env(&mut store, env, call_fault),
|
||||
"call_close" => Function::new_typed_with_env(&mut store, env, call_close),
|
||||
"ws_connect" => Function::new_typed_with_env(&mut store, env, ws_connect::<Memory32>),
|
||||
"port_bridge" => Function::new_typed_with_env(&mut store, env, port_bridge::<Memory32>),
|
||||
"port_unbridge" => Function::new_typed_with_env(&mut store, env, port_unbridge),
|
||||
"port_dhcp_acquire" => Function::new_typed_with_env(&mut store, env, port_dhcp_acquire),
|
||||
@@ -543,7 +542,6 @@ fn wasix_exports_64(mut store: &mut impl AsStoreMut, env: &FunctionEnv<WasiEnv>)
|
||||
"call_reply" => Function::new_typed_with_env(&mut store, env, call_reply::<Memory64>),
|
||||
"call_fault" => Function::new_typed_with_env(&mut store, env, call_fault),
|
||||
"call_close" => Function::new_typed_with_env(&mut store, env, call_close),
|
||||
"ws_connect" => Function::new_typed_with_env(&mut store, env, ws_connect::<Memory64>),
|
||||
"port_bridge" => Function::new_typed_with_env(&mut store, env, port_bridge::<Memory64>),
|
||||
"port_unbridge" => Function::new_typed_with_env(&mut store, env, port_unbridge),
|
||||
"port_dhcp_acquire" => Function::new_typed_with_env(&mut store, env, port_dhcp_acquire),
|
||||
|
||||
@@ -15,7 +15,7 @@ use tokio::sync::OwnedRwLockWriteGuard;
|
||||
use wasmer_types::MemorySize;
|
||||
use wasmer_vnet::{
|
||||
DynVirtualNetworking, TimeType, VirtualConnectedSocket, VirtualIcmpSocket, VirtualRawSocket,
|
||||
VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket, VirtualWebSocket,
|
||||
VirtualTcpListener, VirtualTcpSocket, VirtualUdpSocket,
|
||||
};
|
||||
use wasmer_wasi_types::wasi::{
|
||||
Addressfamily, Errno, Fdflags, Rights, SockProto, Sockoption, Socktype,
|
||||
@@ -53,7 +53,6 @@ pub enum InodeSocketKind {
|
||||
connect_timeout: Option<Duration>,
|
||||
accept_timeout: Option<Duration>,
|
||||
},
|
||||
WebSocket(Box<dyn VirtualWebSocket + Sync>),
|
||||
Icmp(Box<dyn VirtualIcmpSocket + Sync>),
|
||||
Raw(Box<dyn VirtualRawSocket + Sync>),
|
||||
TcpListener(Box<dyn VirtualTcpListener + Sync>),
|
||||
@@ -365,7 +364,6 @@ impl InodeSocket {
|
||||
InodeSocketKind::UdpSocket(sock) => {
|
||||
sock.close().map_err(net_error_into_wasi_err)?;
|
||||
}
|
||||
InodeSocketKind::WebSocket(_) => {}
|
||||
InodeSocketKind::Raw(_) => {}
|
||||
InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
|
||||
InodeSocketKind::Closed => return Err(Errno::Notconn),
|
||||
@@ -388,7 +386,6 @@ impl InodeSocket {
|
||||
.await
|
||||
.map_err(net_error_into_wasi_err)?;
|
||||
}
|
||||
InodeSocketKind::WebSocket(_) => {}
|
||||
InodeSocketKind::Raw(sock) => {
|
||||
VirtualRawSocket::flush(sock.deref_mut())
|
||||
.await
|
||||
@@ -458,7 +455,6 @@ impl InodeSocket {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(match &inner.kind {
|
||||
InodeSocketKind::PreSocket { .. } => WasiSocketStatus::Opening,
|
||||
InodeSocketKind::WebSocket(_) => WasiSocketStatus::Opened,
|
||||
InodeSocketKind::TcpListener(_) => WasiSocketStatus::Opened,
|
||||
InodeSocketKind::TcpStream(_) => WasiSocketStatus::Opened,
|
||||
InodeSocketKind::UdpSocket(_) => WasiSocketStatus::Opened,
|
||||
@@ -942,11 +938,6 @@ impl InodeSocket {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
let ret = match &mut inner.kind {
|
||||
InodeSocketKind::WebSocket(sock) => sock
|
||||
.send(Bytes::from(buf))
|
||||
.await
|
||||
.map(|_| buf_len)
|
||||
.map_err(net_error_into_wasi_err),
|
||||
InodeSocketKind::Raw(sock) => sock
|
||||
.send(Bytes::from(buf))
|
||||
.await
|
||||
@@ -1009,10 +1000,6 @@ impl InodeSocket {
|
||||
}
|
||||
}
|
||||
let data = match &mut inner.kind {
|
||||
InodeSocketKind::WebSocket(sock) => {
|
||||
let read = sock.recv().await.map_err(net_error_into_wasi_err)?;
|
||||
read.data
|
||||
}
|
||||
InodeSocketKind::Raw(sock) => {
|
||||
let read = sock.recv().await.map_err(net_error_into_wasi_err)?;
|
||||
read.data
|
||||
@@ -1092,8 +1079,7 @@ impl InodeSocket {
|
||||
match &mut guard.kind {
|
||||
InodeSocketKind::TcpStream(..)
|
||||
| InodeSocketKind::UdpSocket(..)
|
||||
| InodeSocketKind::Raw(..)
|
||||
| InodeSocketKind::WebSocket(..) => true,
|
||||
| InodeSocketKind::Raw(..) => true,
|
||||
_ => false,
|
||||
}
|
||||
} else {
|
||||
@@ -1112,7 +1098,6 @@ impl InodeSocketInner {
|
||||
InodeSocketKind::TcpStream(socket) => socket.poll_read_ready(cx),
|
||||
InodeSocketKind::UdpSocket(socket) => socket.poll_read_ready(cx),
|
||||
InodeSocketKind::Raw(socket) => socket.poll_read_ready(cx),
|
||||
InodeSocketKind::WebSocket(socket) => socket.poll_read_ready(cx),
|
||||
InodeSocketKind::Icmp(socket) => socket.poll_read_ready(cx),
|
||||
InodeSocketKind::PreSocket { .. } => {
|
||||
std::task::Poll::Ready(Err(wasmer_vnet::NetworkError::IOError))
|
||||
@@ -1132,7 +1117,6 @@ impl InodeSocketInner {
|
||||
InodeSocketKind::TcpStream(socket) => socket.poll_write_ready(cx),
|
||||
InodeSocketKind::UdpSocket(socket) => socket.poll_write_ready(cx),
|
||||
InodeSocketKind::Raw(socket) => socket.poll_write_ready(cx),
|
||||
InodeSocketKind::WebSocket(socket) => socket.poll_write_ready(cx),
|
||||
InodeSocketKind::Icmp(socket) => socket.poll_write_ready(cx),
|
||||
InodeSocketKind::PreSocket { .. } => {
|
||||
std::task::Poll::Ready(Err(wasmer_vnet::NetworkError::IOError))
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
mod stdio;
|
||||
pub mod task_manager;
|
||||
mod ws;
|
||||
|
||||
use self::task_manager::StubTaskManager;
|
||||
pub use self::{
|
||||
stdio::*,
|
||||
task_manager::{SpawnType, SpawnedMemory, VirtualTaskManager, VirtualTaskManagerExt},
|
||||
ws::*,
|
||||
task_manager::{SpawnType, SpawnedMemory, VirtualTaskManager},
|
||||
};
|
||||
|
||||
use std::{
|
||||
@@ -157,25 +155,6 @@ where
|
||||
None
|
||||
}
|
||||
|
||||
/// Make a web socket connection to a particular URL
|
||||
#[cfg(not(feature = "host-ws"))]
|
||||
fn web_socket(
|
||||
&self,
|
||||
url: &str,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn WebSocketAbi>, String>>>> {
|
||||
Box::pin(async move { Err("not supported".to_string()) })
|
||||
}
|
||||
|
||||
/// Make a web socket connection to a particular URL
|
||||
#[cfg(feature = "host-ws")]
|
||||
fn web_socket(
|
||||
&self,
|
||||
url: &str,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn WebSocketAbi>, String>>>> {
|
||||
let url = url.to_string();
|
||||
Box::pin(async move { Box::new(TerminalWebSocket::new(url.as_str())).await })
|
||||
}
|
||||
|
||||
/// Writes output to the console
|
||||
fn stdout(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + Sync>> {
|
||||
let data = data.to_vec();
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
#[cfg(feature = "async_ws")]
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::WasiRuntimeImplementation;
|
||||
|
||||
// This ABI implements a general purpose web socket
|
||||
#[cfg_attr(feature = "async_ws", async_trait)]
|
||||
pub trait WebSocketAbi {
|
||||
fn set_onopen(&mut self, callback: Box<dyn FnMut()>);
|
||||
|
||||
fn set_onclose(&mut self, callback: Box<dyn Fn() + Send + 'static>);
|
||||
|
||||
fn set_onmessage(
|
||||
&mut self,
|
||||
callback: Box<dyn Fn(Vec<u8>) + Send + 'static>,
|
||||
runtime: &dyn WasiRuntimeImplementation,
|
||||
);
|
||||
|
||||
#[cfg(feature = "async_ws")]
|
||||
async fn send(&mut self, data: Vec<u8>) -> Result<(), String>;
|
||||
|
||||
#[cfg(not(feature = "async_ws"))]
|
||||
fn send(&mut self, data: Vec<u8>) -> Result<(), String>;
|
||||
}
|
||||
@@ -78,7 +78,6 @@ mod thread_sleep;
|
||||
mod thread_spawn;
|
||||
mod tty_get;
|
||||
mod tty_set;
|
||||
mod ws_connect;
|
||||
|
||||
pub use bus_call::*;
|
||||
pub use bus_close::*;
|
||||
@@ -160,4 +159,3 @@ pub use thread_sleep::*;
|
||||
pub use thread_spawn::*;
|
||||
pub use tty_get::*;
|
||||
pub use tty_set::*;
|
||||
pub use ws_connect::*;
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
use super::*;
|
||||
use crate::syscalls::*;
|
||||
|
||||
/// ### `ws_connect()`
|
||||
/// Connects to a websocket at a particular network URL
|
||||
///
|
||||
/// ## Parameters
|
||||
///
|
||||
/// * `url` - URL of the web socket destination to connect to
|
||||
///
|
||||
/// ## Return
|
||||
///
|
||||
/// Returns a socket handle which is used to send and receive data
|
||||
pub fn ws_connect<M: MemorySize>(
|
||||
mut ctx: FunctionEnvMut<'_, WasiEnv>,
|
||||
url: WasmPtr<u8, M>,
|
||||
url_len: M::Offset,
|
||||
ret_sock: WasmPtr<WasiFd, M>,
|
||||
) -> Result<Errno, WasiError> {
|
||||
debug!(
|
||||
"wasi[{}:{}]::ws_connect",
|
||||
ctx.data().pid(),
|
||||
ctx.data().tid()
|
||||
);
|
||||
let mut env = ctx.data();
|
||||
let memory = env.memory_view(&ctx);
|
||||
let url = unsafe { get_input_str_ok!(&memory, url, url_len) };
|
||||
|
||||
let net = env.net();
|
||||
let tasks = env.tasks.clone();
|
||||
let socket = wasi_try_ok!(__asyncify(&mut ctx, None, async move {
|
||||
net.ws_connect(url.as_str())
|
||||
.await
|
||||
.map_err(net_error_into_wasi_err)
|
||||
})?);
|
||||
env = ctx.data();
|
||||
|
||||
let (memory, state, mut inodes) = env.get_memory_and_wasi_state_and_inodes_mut(&ctx, 0);
|
||||
|
||||
let kind = Kind::Socket {
|
||||
socket: InodeSocket::new(InodeSocketKind::WebSocket(socket)),
|
||||
};
|
||||
|
||||
let inode =
|
||||
state
|
||||
.fs
|
||||
.create_inode_with_default_stat(inodes.deref_mut(), kind, false, "socket".into());
|
||||
let rights = Rights::all_socket();
|
||||
let fd = wasi_try_ok!(state
|
||||
.fs
|
||||
.create_fd(rights, rights, Fdflags::empty(), 0, inode));
|
||||
|
||||
wasi_try_mem_ok!(ret_sock.write(&memory, fd));
|
||||
|
||||
Ok(Errno::Success)
|
||||
}
|
||||
@@ -72,7 +72,6 @@
|
||||
(func (import "wasix_32v1" "call_reply") (param i64 i32 i32 i32) (result i32))
|
||||
(func (import "wasix_32v1" "call_fault") (param i64 i32)
|
||||
(func (import "wasix_32v1" "call_close") (param i64)
|
||||
(func (import "wasix_32v1" "ws_connect") (param i32 i32 i32) (result i32))
|
||||
(func (import "wasix_32v1" "http_request") (param i32 i32 i32 i32 i32 i32 i32 i32) (result i32))
|
||||
(func (import "wasix_32v1" "http_status") (param i32 i32)
|
||||
(func (import "wasix_32v1" "port_bridge") (param i32 i32 i32 i32 i32) (result i32))
|
||||
|
||||
@@ -72,7 +72,6 @@
|
||||
(func (import "wasix_64v1" "call_reply") (param i64 i32 i64 i64) (result i32))
|
||||
(func (import "wasix_64v1" "call_fault") (param i64 i32)
|
||||
(func (import "wasix_64v1" "call_close") (param i64)
|
||||
(func (import "wasix_64v1" "ws_connect") (param i64 i64 i64) (result i32))
|
||||
(func (import "wasix_64v1" "http_request") (param i64 i64 i64 i64 i64 i64 i32 i64) (result i32))
|
||||
(func (import "wasix_64v1" "http_status") (param i32 i64)
|
||||
(func (import "wasix_64v1" "port_bridge") (param i64 i64 i64 i64 i32) (result i32))
|
||||
|
||||
Reference in New Issue
Block a user