diff --git a/lib/vnet/src/lib.rs b/lib/vnet/src/lib.rs index a004504cc..5e18c987e 100644 --- a/lib/vnet/src/lib.rs +++ b/lib/vnet/src/lib.rs @@ -197,9 +197,6 @@ pub struct SocketReceiveFrom { #[async_trait::async_trait] pub trait VirtualTcpListener: fmt::Debug + Send + Sync + 'static { - /// Checks how many sockets are waiting to be accepted - async fn peek(&mut self) -> Result; - /// Tries to accept a new connection fn try_accept(&mut self) -> Option, SocketAddr)>>; @@ -341,9 +338,6 @@ pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'st /// Recv a packet from the socket fn try_recv(&mut self) -> Result>; - - /// Peeks for a packet from the socket - async fn peek(&mut self) -> Result; } /// Connectionless sockets are able to send and receive datagrams and stream diff --git a/lib/wasi-local-networking/src/lib.rs b/lib/wasi-local-networking/src/lib.rs index ef4c86e14..f6bccd34f 100644 --- a/lib/wasi-local-networking/src/lib.rs +++ b/lib/wasi-local-networking/src/lib.rs @@ -104,33 +104,6 @@ pub struct LocalTcpListener { #[async_trait::async_trait] impl VirtualTcpListener for LocalTcpListener { - async fn peek(&mut self) -> Result { - { - let backlog = self.backlog.lock().unwrap(); - if backlog.is_empty() == false { - return Ok(backlog.len()); - } - } - - let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; - let mut cx = Context::from_waker(&waker); - match self.stream.poll_accept(&mut cx) { - Poll::Ready(Ok((sock, addr))) => { - let mut backlog = self.backlog.lock().unwrap(); - backlog.push(( - Box::new(LocalTcpStream::new(sock, addr, self.nonblocking)), - addr, - )); - Ok(backlog.len()) - } - Poll::Ready(Err(err)) => Err(io_err_into_net_error(err)), - Poll::Pending => { - let backlog = self.backlog.lock().unwrap(); - Ok(backlog.len()) - } - } - } - fn try_accept(&mut self) -> Option, SocketAddr)>> { { let mut backlog = self.backlog.lock().unwrap(); @@ -255,10 +228,7 @@ pub struct LocalTcpStream { connect_timeout: Option, linger_timeout: Option, nonblocking: bool, - sent_eof: bool, shutdown: Option, - tx_recv: mpsc::UnboundedSender>, - rx_recv: mpsc::UnboundedReceiver>, tx_write_ready: mpsc::Sender<()>, rx_write_ready: mpsc::Receiver<()>, tx_write_poll_ready: mpsc::Sender<()>, @@ -267,7 +237,6 @@ pub struct LocalTcpStream { impl LocalTcpStream { pub fn new(stream: tokio::net::TcpStream, addr: SocketAddr, nonblocking: bool) -> Self { - let (tx_recv, rx_recv) = mpsc::unbounded_channel(); let (tx_write_ready, rx_write_ready) = mpsc::channel(1); let (tx_write_poll_ready, rx_write_poll_ready) = mpsc::channel(1); Self { @@ -279,13 +248,10 @@ impl LocalTcpStream { linger_timeout: None, nonblocking, shutdown: None, - sent_eof: false, tx_write_ready, rx_write_ready, tx_write_poll_ready, rx_write_poll_ready, - tx_recv, - rx_recv, } } } @@ -485,7 +451,7 @@ impl VirtualConnectedSocket for LocalTcpStream { } async fn flush(&mut self) -> Result<()> { - self.rx_write_ready.try_recv().ok(); + while self.rx_write_ready.try_recv().is_ok() {} self.tx_write_poll_ready.try_send(()).ok(); if self.nonblocking { let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; @@ -513,18 +479,12 @@ impl VirtualConnectedSocket for LocalTcpStream { } async fn recv(&mut self) -> Result { - if let Ok(ret) = self.rx_recv.try_recv() { - return ret; - } - - tokio::select! { - ret = Self::recv_now_ext( - self.nonblocking, - &mut self.stream, - self.read_timeout.clone(), - ) => ret, - ret = self.rx_recv.recv() => ret.unwrap_or(Err(NetworkError::ConnectionAborted)) - } + Self::recv_now_ext( + self.nonblocking, + &mut self.stream, + self.read_timeout.clone(), + ) + .await } fn try_recv(&mut self) -> Result> { @@ -537,17 +497,6 @@ impl VirtualConnectedSocket for LocalTcpStream { Poll::Pending => Ok(None), } } - - async fn peek(&mut self) -> Result { - let ret = Self::recv_now_ext( - self.nonblocking, - &mut self.stream, - self.read_timeout.clone(), - ) - .await; - self.tx_recv.send(ret.clone()).ok(); - ret - } } #[async_trait::async_trait] @@ -581,33 +530,23 @@ impl VirtualSocket for LocalTcpStream { &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let ret = { - let mut work = Box::pin(Self::recv_now(&mut self.stream, self.read_timeout.clone())); - match work.as_mut().poll(cx) { - Poll::Ready(ret) => ret, - Poll::Pending => return Poll::Pending, - } - }; - if let Ok(ret) = ret.as_ref() { - if ret.data.len() == 0 { - if self.sent_eof == true { - return Poll::Pending; - } - self.sent_eof = true; - } - } - self.tx_recv.send(ret).ok(); - Poll::Ready(Ok(1)) + self.stream + .poll_read_ready(cx) + .map_ok(|_| 1) + .map_err(io_err_into_net_error) } fn poll_write_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - { - // this wakes the polling loop when something is sent + loop { + // this wakes this polling ready call whenever the `rx_write_poll_ready` is triggerd + // (which is triggered whenever a send operation is transmitted) let mut rx = Pin::new(&mut self.rx_write_poll_ready); - rx.poll_recv(cx).is_ready(); + if rx.poll_recv(cx).is_pending() { + break; + } } match self .stream @@ -615,7 +554,7 @@ impl VirtualSocket for LocalTcpStream { .map_err(io_err_into_net_error) { Poll::Ready(Ok(())) => { - if self.tx_write_ready.try_send(()).ok().is_some() { + if self.tx_write_ready.try_send(()).is_ok() { Poll::Ready(Ok(1)) } else { Poll::Pending @@ -969,37 +908,6 @@ impl VirtualConnectedSocket for LocalUdpSocket { truncated: read == buf_size, })) } - - async fn peek(&mut self) -> Result { - let buf_size = 8192; - let mut buf = Vec::with_capacity(buf_size); - unsafe { - buf.set_len(buf_size); - } - - let read = self - .socket - .as_blocking_mut() - .map_err(io_err_into_net_error)? - .peek(&mut buf[..]) - .map_err(io_err_into_net_error)?; - unsafe { - buf.set_len(read); - } - if read == 0 { - if self.nonblocking { - return Err(NetworkError::WouldBlock); - } else { - return Err(NetworkError::BrokenPipe); - } - } - - let buf = Bytes::from(buf); - Ok(SocketReceive { - data: buf, - truncated: read == buf_size, - }) - } } #[async_trait::async_trait] diff --git a/lib/wasi/src/lib.rs b/lib/wasi/src/lib.rs index bd1110020..5c1444560 100644 --- a/lib/wasi/src/lib.rs +++ b/lib/wasi/src/lib.rs @@ -123,7 +123,7 @@ pub use crate::{ /// This is returned in `RuntimeError`. /// Use `downcast` or `downcast_ref` to retrieve the `ExitCode`. -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum WasiError { #[error("WASI exited with code: {0}")] Exit(ExitCode), diff --git a/lib/wasi/src/net/socket.rs b/lib/wasi/src/net/socket.rs index 4b2961c05..0ed8f5a35 100644 --- a/lib/wasi/src/net/socket.rs +++ b/lib/wasi/src/net/socket.rs @@ -992,69 +992,6 @@ impl InodeSocket { Ok(ret) } - pub async fn peek(&self) -> Result { - let mut inner = self.inner.write().await; - if let Some(buf) = inner.read_buffer.as_ref() { - if buf.len() > 0 { - return Ok(buf.len()); - } - } - let data = match &mut inner.kind { - InodeSocketKind::WebSocket(sock) => { - let read = match sock.try_recv().map_err(net_error_into_wasi_err)? { - Some(a) => a, - None => { - return Ok(0); - } - }; - read.data - } - InodeSocketKind::Raw(sock) => { - let read = match sock.try_recv().map_err(net_error_into_wasi_err)? { - Some(a) => a, - None => { - return Ok(0); - } - }; - read.data - } - InodeSocketKind::TcpStream(sock) => { - let read = match sock.try_recv().map_err(net_error_into_wasi_err)? { - Some(a) => a, - None => { - return Ok(0); - } - }; - read.data - } - InodeSocketKind::UdpSocket(sock) => { - let read = match sock.try_recv().map_err(net_error_into_wasi_err)? { - Some(a) => a, - None => { - return Ok(0); - } - }; - read.data - } - InodeSocketKind::TcpListener(sock) => { - return sock.peek().await.map_err(net_error_into_wasi_err); - } - InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn), - InodeSocketKind::Closed => return Ok(0), - _ => return Err(Errno::Notsup), - }; - if data.len() == 0 { - return Ok(0); - } - inner.read_buffer.replace(data); - inner.read_addr.take(); - if let Some(buf) = inner.read_buffer.as_ref() { - Ok(buf.len()) - } else { - Ok(0) - } - } - pub async fn recv(&self, max_size: usize) -> Result { let mut inner = self.inner.write().await; loop { @@ -1198,9 +1135,6 @@ impl InodeSocket { pub async fn can_write(&self) -> bool { if let Ok(mut guard) = self.inner.try_write() { match &mut guard.kind { - InodeSocketKind::TcpListener(socket) => { - socket.peek().await.ok().map(|a| a > 0).unwrap_or_default() - } InodeSocketKind::TcpStream(..) | InodeSocketKind::UdpSocket(..) | InodeSocketKind::Raw(..) diff --git a/lib/wasi/src/state/func_env.rs b/lib/wasi/src/state/func_env.rs index de3e6ab9b..12cafe8a9 100644 --- a/lib/wasi/src/state/func_env.rs +++ b/lib/wasi/src/state/func_env.rs @@ -161,8 +161,9 @@ impl WasiFunctionEnv { pub fn cleanup(&self, store: &mut Store, exit_code: Option) { trace!( - "wasi[{}]:: cleaning up local thread variables", - self.data(store).pid() + "wasi[{}:{}]::cleanup - destroying local thread variables", + self.data(store).pid(), + self.data(store).tid() ); // Destroy all the local thread variables that were allocated for this thread diff --git a/lib/wasi/src/syscalls/wasi/fd_read.rs b/lib/wasi/src/syscalls/wasi/fd_read.rs index fe1b792fd..c05b5973b 100644 --- a/lib/wasi/src/syscalls/wasi/fd_read.rs +++ b/lib/wasi/src/syscalls/wasi/fd_read.rs @@ -23,12 +23,8 @@ pub fn fd_read( iovs_len: M::Offset, nread: WasmPtr, ) -> Result { - trace!( - "wasi[{}:{}]::fd_read: fd={}", - ctx.data().pid(), - ctx.data().tid(), - fd - ); + let pid = ctx.data().pid(); + let tid = ctx.data().tid(); let offset = { let mut env = ctx.data(); @@ -39,7 +35,15 @@ pub fn fd_read( fd_entry.offset.load(Ordering::Acquire) as usize }; - fd_read_internal::(ctx, fd, iovs, iovs_len, offset, nread, true) + let ret = fd_read_internal::(ctx, fd, iovs, iovs_len, offset, nread, true); + trace!( + %fd, + "wasi[{}:{}]::fd_read - {:?}", + pid, + tid, + ret + ); + ret } /// ### `fd_pread()` @@ -65,15 +69,19 @@ pub fn fd_pread( offset: Filesize, nread: WasmPtr, ) -> Result { - trace!( - "wasi[{}:{}]::fd_pread: fd={}, offset={}", - ctx.data().pid(), - ctx.data().tid(), - fd, - offset - ); + let pid = ctx.data().pid(); + let tid = ctx.data().tid(); - fd_read_internal::(ctx, fd, iovs, iovs_len, offset as usize, nread, false) + let ret = fd_read_internal::(ctx, fd, iovs, iovs_len, offset as usize, nread, false); + trace!( + %fd, + %offset, + "wasi[{}:{}]::fd_pread - {:?}", + pid, + tid, + ret + ); + ret } /// ### `fd_pread()` diff --git a/lib/wasi/src/syscalls/wasix/futex_wait.rs b/lib/wasi/src/syscalls/wasix/futex_wait.rs index 5b97750c2..2d5ce315f 100644 --- a/lib/wasi/src/syscalls/wasix/futex_wait.rs +++ b/lib/wasi/src/syscalls/wasix/futex_wait.rs @@ -84,6 +84,7 @@ pub fn futex_wait( let remaining = *timeout - delta; sub_timeout = Some(Duration::from_nanos(remaining as u64)); } + //sub_timeout.replace(sub_timeout.map(|a| a.min(Duration::from_millis(10))).unwrap_or(Duration::from_millis(10))); // Now wait for it to be triggered __asyncify(&mut ctx, sub_timeout, async move { diff --git a/lib/wasi/src/syscalls/wasix/futex_wake.rs b/lib/wasi/src/syscalls/wasix/futex_wake.rs index c6188556d..f1b63ffe9 100644 --- a/lib/wasi/src/syscalls/wasix/futex_wake.rs +++ b/lib/wasi/src/syscalls/wasix/futex_wake.rs @@ -10,31 +10,33 @@ use crate::syscalls::*; /// * `futex` - Memory location that holds a futex that others may be waiting on pub fn futex_wake( ctx: FunctionEnvMut<'_, WasiEnv>, - futex: WasmPtr, + futex_ptr: WasmPtr, ret_woken: WasmPtr, ) -> Errno { - trace!( - "wasi[{}:{}]::futex_wake(offset={})", - ctx.data().pid(), - ctx.data().tid(), - futex.offset() - ); let env = ctx.data(); let memory = env.memory_view(&ctx); let state = env.state.deref(); - let pointer: u64 = wasi_try!(futex.offset().try_into().map_err(|_| Errno::Overflow)); + let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow)); let mut woken = false; let mut guard = state.futexs.read().unwrap(); if let Some(futex) = guard.get(&pointer) { woken = futex.waker.receiver_count() > 0; let _ = futex.waker.send(()); + trace!( + %woken, + "wasi[{}:{}]::futex_wake(offset={})", + ctx.data().pid(), + ctx.data().tid(), + futex_ptr.offset() + ); } else { trace!( - "wasi[{}:{}]::futex_wake - nothing waiting!", + "wasi[{}:{}]::futex_wake(offset={}) - nothing waiting", ctx.data().pid(), - ctx.data().tid() + ctx.data().tid(), + futex_ptr.offset() ); } diff --git a/lib/wasi/src/syscalls/wasix/futex_wake_all.rs b/lib/wasi/src/syscalls/wasix/futex_wake_all.rs index 7867f5d90..d54ca24be 100644 --- a/lib/wasi/src/syscalls/wasix/futex_wake_all.rs +++ b/lib/wasi/src/syscalls/wasix/futex_wake_all.rs @@ -8,31 +8,33 @@ use crate::syscalls::*; /// * `futex` - Memory location that holds a futex that others may be waiting on pub fn futex_wake_all( ctx: FunctionEnvMut<'_, WasiEnv>, - futex: WasmPtr, + futex_ptr: WasmPtr, ret_woken: WasmPtr, ) -> Errno { - trace!( - "wasi[{}:{}]::futex_wake_all(offset={})", - ctx.data().pid(), - ctx.data().tid(), - futex.offset() - ); let env = ctx.data(); let memory = env.memory_view(&ctx); let state = env.state.deref(); - let pointer: u64 = wasi_try!(futex.offset().try_into().map_err(|_| Errno::Overflow)); + let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow)); let mut woken = false; let mut guard = state.futexs.read().unwrap(); if let Some(futex) = guard.get(&pointer) { woken = futex.waker.receiver_count() > 0; let _ = futex.waker.send(()); + trace!( + %woken, + "wasi[{}:{}]::futex_wake_all(offset={})", + ctx.data().pid(), + ctx.data().tid(), + futex_ptr.offset() + ); } else { trace!( - "wasi[{}:{}]::futex_wake_all - nothing waiting!", + "wasi[{}:{}]::futex_wake_all(offset={}) - nothing waiting", ctx.data().pid(), - ctx.data().tid() + ctx.data().tid(), + futex_ptr.offset() ); } diff --git a/lib/wasi/src/syscalls/wasix/thread_join.rs b/lib/wasi/src/syscalls/wasix/thread_join.rs index 1b6abb947..aebd6943e 100644 --- a/lib/wasi/src/syscalls/wasix/thread_join.rs +++ b/lib/wasi/src/syscalls/wasix/thread_join.rs @@ -8,19 +8,21 @@ use crate::syscalls::*; /// ## Parameters /// /// * `tid` - Handle of the thread to wait on -pub fn thread_join(mut ctx: FunctionEnvMut<'_, WasiEnv>, tid: Tid) -> Result { - debug!("wasi::thread_join"); +pub fn thread_join( + mut ctx: FunctionEnvMut<'_, WasiEnv>, + join_tid: Tid, +) -> Result { debug!( - "wasi[{}:{}]::thread_join(tid={})", + %join_tid, + "wasi[{}:{}]::thread_join", ctx.data().pid(), ctx.data().tid(), - tid ); wasi_try_ok!(ctx.data().clone().process_signals_and_exit(&mut ctx)?); let env = ctx.data(); - let tid: WasiThreadId = tid.into(); + let tid: WasiThreadId = join_tid.into(); let other_thread = env.process.get_thread(&tid); if let Some(other_thread) = other_thread { wasi_try_ok!(__asyncify(&mut ctx, None, async move { diff --git a/lib/wasi/src/syscalls/wasix/thread_spawn.rs b/lib/wasi/src/syscalls/wasix/thread_spawn.rs index ed15f88e8..2b9116812 100644 --- a/lib/wasi/src/syscalls/wasix/thread_spawn.rs +++ b/lib/wasi/src/syscalls/wasix/thread_spawn.rs @@ -112,7 +112,11 @@ pub fn thread_spawn( Bool::False => ctx.data(&store).inner().thread_spawn.clone().unwrap(), Bool::True => ctx.data(&store).inner().react.clone().unwrap(), _ => { - debug!("thread failed - failed as the reactor type is not value"); + debug!( + "wasi[{}:{}]::thread_spawn - failed as the reactor type is not value", + ctx.data(&store).pid(), + ctx.data(&store).tid() + ); return Errno::Noexec as u32; } }; @@ -122,10 +126,21 @@ pub fn thread_spawn( let mut ret = Errno::Success; if let Err(err) = spawn.call(store, user_data_low as i32, user_data_high as i32) { - debug!("thread failed - start: {}", err); + debug!( + "wasi[{}:{}]::thread_spawn - thread failed - start: {}", + ctx.data(&store).pid(), + ctx.data(&store).tid(), + err + ); ret = Errno::Noexec; } - //trace!("threading: thread callback finished (reactor={}, ret={})", reactor, ret); + trace!( + "wasi[{}:{}]::thread_spawn - thread callback finished (reactor={:?}, ret={})", + ctx.data(&store).pid(), + ctx.data(&store).tid(), + reactor, + ret + ); // If we are NOT a reactor then we will only run once and need to clean up if reactor == Bool::False {