Fix for the read ready event on local networking which was missing events on new connections in certain race conditions

This commit is contained in:
Johnathan Sharratt
2023-01-19 11:08:31 +11:00
parent 27d020dce4
commit 67a1a89f8c
11 changed files with 95 additions and 228 deletions

View File

@@ -197,9 +197,6 @@ pub struct SocketReceiveFrom {
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait VirtualTcpListener: fmt::Debug + Send + Sync + 'static { pub trait VirtualTcpListener: fmt::Debug + Send + Sync + 'static {
/// Checks how many sockets are waiting to be accepted
async fn peek(&mut self) -> Result<usize>;
/// Tries to accept a new connection /// Tries to accept a new connection
fn try_accept(&mut self) -> Option<Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>>; fn try_accept(&mut self) -> Option<Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>>;
@@ -341,9 +338,6 @@ pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'st
/// Recv a packet from the socket /// Recv a packet from the socket
fn try_recv(&mut self) -> Result<Option<SocketReceive>>; fn try_recv(&mut self) -> Result<Option<SocketReceive>>;
/// Peeks for a packet from the socket
async fn peek(&mut self) -> Result<SocketReceive>;
} }
/// Connectionless sockets are able to send and receive datagrams and stream /// Connectionless sockets are able to send and receive datagrams and stream

View File

@@ -104,33 +104,6 @@ pub struct LocalTcpListener {
#[async_trait::async_trait] #[async_trait::async_trait]
impl VirtualTcpListener for LocalTcpListener { impl VirtualTcpListener for LocalTcpListener {
async fn peek(&mut self) -> Result<usize> {
{
let backlog = self.backlog.lock().unwrap();
if backlog.is_empty() == false {
return Ok(backlog.len());
}
}
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);
match self.stream.poll_accept(&mut cx) {
Poll::Ready(Ok((sock, addr))) => {
let mut backlog = self.backlog.lock().unwrap();
backlog.push((
Box::new(LocalTcpStream::new(sock, addr, self.nonblocking)),
addr,
));
Ok(backlog.len())
}
Poll::Ready(Err(err)) => Err(io_err_into_net_error(err)),
Poll::Pending => {
let backlog = self.backlog.lock().unwrap();
Ok(backlog.len())
}
}
}
fn try_accept(&mut self) -> Option<Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>> { fn try_accept(&mut self) -> Option<Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>> {
{ {
let mut backlog = self.backlog.lock().unwrap(); let mut backlog = self.backlog.lock().unwrap();
@@ -255,10 +228,7 @@ pub struct LocalTcpStream {
connect_timeout: Option<Duration>, connect_timeout: Option<Duration>,
linger_timeout: Option<Duration>, linger_timeout: Option<Duration>,
nonblocking: bool, nonblocking: bool,
sent_eof: bool,
shutdown: Option<Shutdown>, shutdown: Option<Shutdown>,
tx_recv: mpsc::UnboundedSender<Result<SocketReceive>>,
rx_recv: mpsc::UnboundedReceiver<Result<SocketReceive>>,
tx_write_ready: mpsc::Sender<()>, tx_write_ready: mpsc::Sender<()>,
rx_write_ready: mpsc::Receiver<()>, rx_write_ready: mpsc::Receiver<()>,
tx_write_poll_ready: mpsc::Sender<()>, tx_write_poll_ready: mpsc::Sender<()>,
@@ -267,7 +237,6 @@ pub struct LocalTcpStream {
impl LocalTcpStream { impl LocalTcpStream {
pub fn new(stream: tokio::net::TcpStream, addr: SocketAddr, nonblocking: bool) -> Self { pub fn new(stream: tokio::net::TcpStream, addr: SocketAddr, nonblocking: bool) -> Self {
let (tx_recv, rx_recv) = mpsc::unbounded_channel();
let (tx_write_ready, rx_write_ready) = mpsc::channel(1); let (tx_write_ready, rx_write_ready) = mpsc::channel(1);
let (tx_write_poll_ready, rx_write_poll_ready) = mpsc::channel(1); let (tx_write_poll_ready, rx_write_poll_ready) = mpsc::channel(1);
Self { Self {
@@ -279,13 +248,10 @@ impl LocalTcpStream {
linger_timeout: None, linger_timeout: None,
nonblocking, nonblocking,
shutdown: None, shutdown: None,
sent_eof: false,
tx_write_ready, tx_write_ready,
rx_write_ready, rx_write_ready,
tx_write_poll_ready, tx_write_poll_ready,
rx_write_poll_ready, rx_write_poll_ready,
tx_recv,
rx_recv,
} }
} }
} }
@@ -485,7 +451,7 @@ impl VirtualConnectedSocket for LocalTcpStream {
} }
async fn flush(&mut self) -> Result<()> { async fn flush(&mut self) -> Result<()> {
self.rx_write_ready.try_recv().ok(); while self.rx_write_ready.try_recv().is_ok() {}
self.tx_write_poll_ready.try_send(()).ok(); self.tx_write_poll_ready.try_send(()).ok();
if self.nonblocking { if self.nonblocking {
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
@@ -513,18 +479,12 @@ impl VirtualConnectedSocket for LocalTcpStream {
} }
async fn recv(&mut self) -> Result<SocketReceive> { async fn recv(&mut self) -> Result<SocketReceive> {
if let Ok(ret) = self.rx_recv.try_recv() { Self::recv_now_ext(
return ret; self.nonblocking,
} &mut self.stream,
self.read_timeout.clone(),
tokio::select! { )
ret = Self::recv_now_ext( .await
self.nonblocking,
&mut self.stream,
self.read_timeout.clone(),
) => ret,
ret = self.rx_recv.recv() => ret.unwrap_or(Err(NetworkError::ConnectionAborted))
}
} }
fn try_recv(&mut self) -> Result<Option<SocketReceive>> { fn try_recv(&mut self) -> Result<Option<SocketReceive>> {
@@ -537,17 +497,6 @@ impl VirtualConnectedSocket for LocalTcpStream {
Poll::Pending => Ok(None), Poll::Pending => Ok(None),
} }
} }
async fn peek(&mut self) -> Result<SocketReceive> {
let ret = Self::recv_now_ext(
self.nonblocking,
&mut self.stream,
self.read_timeout.clone(),
)
.await;
self.tx_recv.send(ret.clone()).ok();
ret
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -581,33 +530,23 @@ impl VirtualSocket for LocalTcpStream {
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<usize>> { ) -> std::task::Poll<Result<usize>> {
let ret = { self.stream
let mut work = Box::pin(Self::recv_now(&mut self.stream, self.read_timeout.clone())); .poll_read_ready(cx)
match work.as_mut().poll(cx) { .map_ok(|_| 1)
Poll::Ready(ret) => ret, .map_err(io_err_into_net_error)
Poll::Pending => return Poll::Pending,
}
};
if let Ok(ret) = ret.as_ref() {
if ret.data.len() == 0 {
if self.sent_eof == true {
return Poll::Pending;
}
self.sent_eof = true;
}
}
self.tx_recv.send(ret).ok();
Poll::Ready(Ok(1))
} }
fn poll_write_ready( fn poll_write_ready(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<usize>> { ) -> std::task::Poll<Result<usize>> {
{ loop {
// this wakes the polling loop when something is sent // this wakes this polling ready call whenever the `rx_write_poll_ready` is triggerd
// (which is triggered whenever a send operation is transmitted)
let mut rx = Pin::new(&mut self.rx_write_poll_ready); let mut rx = Pin::new(&mut self.rx_write_poll_ready);
rx.poll_recv(cx).is_ready(); if rx.poll_recv(cx).is_pending() {
break;
}
} }
match self match self
.stream .stream
@@ -615,7 +554,7 @@ impl VirtualSocket for LocalTcpStream {
.map_err(io_err_into_net_error) .map_err(io_err_into_net_error)
{ {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
if self.tx_write_ready.try_send(()).ok().is_some() { if self.tx_write_ready.try_send(()).is_ok() {
Poll::Ready(Ok(1)) Poll::Ready(Ok(1))
} else { } else {
Poll::Pending Poll::Pending
@@ -969,37 +908,6 @@ impl VirtualConnectedSocket for LocalUdpSocket {
truncated: read == buf_size, truncated: read == buf_size,
})) }))
} }
async fn peek(&mut self) -> Result<SocketReceive> {
let buf_size = 8192;
let mut buf = Vec::with_capacity(buf_size);
unsafe {
buf.set_len(buf_size);
}
let read = self
.socket
.as_blocking_mut()
.map_err(io_err_into_net_error)?
.peek(&mut buf[..])
.map_err(io_err_into_net_error)?;
unsafe {
buf.set_len(read);
}
if read == 0 {
if self.nonblocking {
return Err(NetworkError::WouldBlock);
} else {
return Err(NetworkError::BrokenPipe);
}
}
let buf = Bytes::from(buf);
Ok(SocketReceive {
data: buf,
truncated: read == buf_size,
})
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@@ -123,7 +123,7 @@ pub use crate::{
/// This is returned in `RuntimeError`. /// This is returned in `RuntimeError`.
/// Use `downcast` or `downcast_ref` to retrieve the `ExitCode`. /// Use `downcast` or `downcast_ref` to retrieve the `ExitCode`.
#[derive(Error, Debug)] #[derive(Error, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum WasiError { pub enum WasiError {
#[error("WASI exited with code: {0}")] #[error("WASI exited with code: {0}")]
Exit(ExitCode), Exit(ExitCode),

View File

@@ -992,69 +992,6 @@ impl InodeSocket {
Ok(ret) Ok(ret)
} }
pub async fn peek(&self) -> Result<usize, Errno> {
let mut inner = self.inner.write().await;
if let Some(buf) = inner.read_buffer.as_ref() {
if buf.len() > 0 {
return Ok(buf.len());
}
}
let data = match &mut inner.kind {
InodeSocketKind::WebSocket(sock) => {
let read = match sock.try_recv().map_err(net_error_into_wasi_err)? {
Some(a) => a,
None => {
return Ok(0);
}
};
read.data
}
InodeSocketKind::Raw(sock) => {
let read = match sock.try_recv().map_err(net_error_into_wasi_err)? {
Some(a) => a,
None => {
return Ok(0);
}
};
read.data
}
InodeSocketKind::TcpStream(sock) => {
let read = match sock.try_recv().map_err(net_error_into_wasi_err)? {
Some(a) => a,
None => {
return Ok(0);
}
};
read.data
}
InodeSocketKind::UdpSocket(sock) => {
let read = match sock.try_recv().map_err(net_error_into_wasi_err)? {
Some(a) => a,
None => {
return Ok(0);
}
};
read.data
}
InodeSocketKind::TcpListener(sock) => {
return sock.peek().await.map_err(net_error_into_wasi_err);
}
InodeSocketKind::PreSocket { .. } => return Err(Errno::Notconn),
InodeSocketKind::Closed => return Ok(0),
_ => return Err(Errno::Notsup),
};
if data.len() == 0 {
return Ok(0);
}
inner.read_buffer.replace(data);
inner.read_addr.take();
if let Some(buf) = inner.read_buffer.as_ref() {
Ok(buf.len())
} else {
Ok(0)
}
}
pub async fn recv(&self, max_size: usize) -> Result<Bytes, Errno> { pub async fn recv(&self, max_size: usize) -> Result<Bytes, Errno> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
loop { loop {
@@ -1198,9 +1135,6 @@ impl InodeSocket {
pub async fn can_write(&self) -> bool { pub async fn can_write(&self) -> bool {
if let Ok(mut guard) = self.inner.try_write() { if let Ok(mut guard) = self.inner.try_write() {
match &mut guard.kind { match &mut guard.kind {
InodeSocketKind::TcpListener(socket) => {
socket.peek().await.ok().map(|a| a > 0).unwrap_or_default()
}
InodeSocketKind::TcpStream(..) InodeSocketKind::TcpStream(..)
| InodeSocketKind::UdpSocket(..) | InodeSocketKind::UdpSocket(..)
| InodeSocketKind::Raw(..) | InodeSocketKind::Raw(..)

View File

@@ -161,8 +161,9 @@ impl WasiFunctionEnv {
pub fn cleanup(&self, store: &mut Store, exit_code: Option<ExitCode>) { pub fn cleanup(&self, store: &mut Store, exit_code: Option<ExitCode>) {
trace!( trace!(
"wasi[{}]:: cleaning up local thread variables", "wasi[{}:{}]::cleanup - destroying local thread variables",
self.data(store).pid() self.data(store).pid(),
self.data(store).tid()
); );
// Destroy all the local thread variables that were allocated for this thread // Destroy all the local thread variables that were allocated for this thread

View File

@@ -23,12 +23,8 @@ pub fn fd_read<M: MemorySize>(
iovs_len: M::Offset, iovs_len: M::Offset,
nread: WasmPtr<M::Offset, M>, nread: WasmPtr<M::Offset, M>,
) -> Result<Errno, WasiError> { ) -> Result<Errno, WasiError> {
trace!( let pid = ctx.data().pid();
"wasi[{}:{}]::fd_read: fd={}", let tid = ctx.data().tid();
ctx.data().pid(),
ctx.data().tid(),
fd
);
let offset = { let offset = {
let mut env = ctx.data(); let mut env = ctx.data();
@@ -39,7 +35,15 @@ pub fn fd_read<M: MemorySize>(
fd_entry.offset.load(Ordering::Acquire) as usize fd_entry.offset.load(Ordering::Acquire) as usize
}; };
fd_read_internal::<M>(ctx, fd, iovs, iovs_len, offset, nread, true) let ret = fd_read_internal::<M>(ctx, fd, iovs, iovs_len, offset, nread, true);
trace!(
%fd,
"wasi[{}:{}]::fd_read - {:?}",
pid,
tid,
ret
);
ret
} }
/// ### `fd_pread()` /// ### `fd_pread()`
@@ -65,15 +69,19 @@ pub fn fd_pread<M: MemorySize>(
offset: Filesize, offset: Filesize,
nread: WasmPtr<M::Offset, M>, nread: WasmPtr<M::Offset, M>,
) -> Result<Errno, WasiError> { ) -> Result<Errno, WasiError> {
trace!( let pid = ctx.data().pid();
"wasi[{}:{}]::fd_pread: fd={}, offset={}", let tid = ctx.data().tid();
ctx.data().pid(),
ctx.data().tid(),
fd,
offset
);
fd_read_internal::<M>(ctx, fd, iovs, iovs_len, offset as usize, nread, false) let ret = fd_read_internal::<M>(ctx, fd, iovs, iovs_len, offset as usize, nread, false);
trace!(
%fd,
%offset,
"wasi[{}:{}]::fd_pread - {:?}",
pid,
tid,
ret
);
ret
} }
/// ### `fd_pread()` /// ### `fd_pread()`

View File

@@ -84,6 +84,7 @@ pub fn futex_wait<M: MemorySize>(
let remaining = *timeout - delta; let remaining = *timeout - delta;
sub_timeout = Some(Duration::from_nanos(remaining as u64)); sub_timeout = Some(Duration::from_nanos(remaining as u64));
} }
//sub_timeout.replace(sub_timeout.map(|a| a.min(Duration::from_millis(10))).unwrap_or(Duration::from_millis(10)));
// Now wait for it to be triggered // Now wait for it to be triggered
__asyncify(&mut ctx, sub_timeout, async move { __asyncify(&mut ctx, sub_timeout, async move {

View File

@@ -10,31 +10,33 @@ use crate::syscalls::*;
/// * `futex` - Memory location that holds a futex that others may be waiting on /// * `futex` - Memory location that holds a futex that others may be waiting on
pub fn futex_wake<M: MemorySize>( pub fn futex_wake<M: MemorySize>(
ctx: FunctionEnvMut<'_, WasiEnv>, ctx: FunctionEnvMut<'_, WasiEnv>,
futex: WasmPtr<u32, M>, futex_ptr: WasmPtr<u32, M>,
ret_woken: WasmPtr<Bool, M>, ret_woken: WasmPtr<Bool, M>,
) -> Errno { ) -> Errno {
trace!(
"wasi[{}:{}]::futex_wake(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex.offset()
);
let env = ctx.data(); let env = ctx.data();
let memory = env.memory_view(&ctx); let memory = env.memory_view(&ctx);
let state = env.state.deref(); let state = env.state.deref();
let pointer: u64 = wasi_try!(futex.offset().try_into().map_err(|_| Errno::Overflow)); let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
let mut woken = false; let mut woken = false;
let mut guard = state.futexs.read().unwrap(); let mut guard = state.futexs.read().unwrap();
if let Some(futex) = guard.get(&pointer) { if let Some(futex) = guard.get(&pointer) {
woken = futex.waker.receiver_count() > 0; woken = futex.waker.receiver_count() > 0;
let _ = futex.waker.send(()); let _ = futex.waker.send(());
trace!(
%woken,
"wasi[{}:{}]::futex_wake(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()
);
} else { } else {
trace!( trace!(
"wasi[{}:{}]::futex_wake - nothing waiting!", "wasi[{}:{}]::futex_wake(offset={}) - nothing waiting",
ctx.data().pid(), ctx.data().pid(),
ctx.data().tid() ctx.data().tid(),
futex_ptr.offset()
); );
} }

View File

@@ -8,31 +8,33 @@ use crate::syscalls::*;
/// * `futex` - Memory location that holds a futex that others may be waiting on /// * `futex` - Memory location that holds a futex that others may be waiting on
pub fn futex_wake_all<M: MemorySize>( pub fn futex_wake_all<M: MemorySize>(
ctx: FunctionEnvMut<'_, WasiEnv>, ctx: FunctionEnvMut<'_, WasiEnv>,
futex: WasmPtr<u32, M>, futex_ptr: WasmPtr<u32, M>,
ret_woken: WasmPtr<Bool, M>, ret_woken: WasmPtr<Bool, M>,
) -> Errno { ) -> Errno {
trace!(
"wasi[{}:{}]::futex_wake_all(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex.offset()
);
let env = ctx.data(); let env = ctx.data();
let memory = env.memory_view(&ctx); let memory = env.memory_view(&ctx);
let state = env.state.deref(); let state = env.state.deref();
let pointer: u64 = wasi_try!(futex.offset().try_into().map_err(|_| Errno::Overflow)); let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
let mut woken = false; let mut woken = false;
let mut guard = state.futexs.read().unwrap(); let mut guard = state.futexs.read().unwrap();
if let Some(futex) = guard.get(&pointer) { if let Some(futex) = guard.get(&pointer) {
woken = futex.waker.receiver_count() > 0; woken = futex.waker.receiver_count() > 0;
let _ = futex.waker.send(()); let _ = futex.waker.send(());
trace!(
%woken,
"wasi[{}:{}]::futex_wake_all(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()
);
} else { } else {
trace!( trace!(
"wasi[{}:{}]::futex_wake_all - nothing waiting!", "wasi[{}:{}]::futex_wake_all(offset={}) - nothing waiting",
ctx.data().pid(), ctx.data().pid(),
ctx.data().tid() ctx.data().tid(),
futex_ptr.offset()
); );
} }

View File

@@ -8,19 +8,21 @@ use crate::syscalls::*;
/// ## Parameters /// ## Parameters
/// ///
/// * `tid` - Handle of the thread to wait on /// * `tid` - Handle of the thread to wait on
pub fn thread_join(mut ctx: FunctionEnvMut<'_, WasiEnv>, tid: Tid) -> Result<Errno, WasiError> { pub fn thread_join(
debug!("wasi::thread_join"); mut ctx: FunctionEnvMut<'_, WasiEnv>,
join_tid: Tid,
) -> Result<Errno, WasiError> {
debug!( debug!(
"wasi[{}:{}]::thread_join(tid={})", %join_tid,
"wasi[{}:{}]::thread_join",
ctx.data().pid(), ctx.data().pid(),
ctx.data().tid(), ctx.data().tid(),
tid
); );
wasi_try_ok!(ctx.data().clone().process_signals_and_exit(&mut ctx)?); wasi_try_ok!(ctx.data().clone().process_signals_and_exit(&mut ctx)?);
let env = ctx.data(); let env = ctx.data();
let tid: WasiThreadId = tid.into(); let tid: WasiThreadId = join_tid.into();
let other_thread = env.process.get_thread(&tid); let other_thread = env.process.get_thread(&tid);
if let Some(other_thread) = other_thread { if let Some(other_thread) = other_thread {
wasi_try_ok!(__asyncify(&mut ctx, None, async move { wasi_try_ok!(__asyncify(&mut ctx, None, async move {

View File

@@ -112,7 +112,11 @@ pub fn thread_spawn<M: MemorySize>(
Bool::False => ctx.data(&store).inner().thread_spawn.clone().unwrap(), Bool::False => ctx.data(&store).inner().thread_spawn.clone().unwrap(),
Bool::True => ctx.data(&store).inner().react.clone().unwrap(), Bool::True => ctx.data(&store).inner().react.clone().unwrap(),
_ => { _ => {
debug!("thread failed - failed as the reactor type is not value"); debug!(
"wasi[{}:{}]::thread_spawn - failed as the reactor type is not value",
ctx.data(&store).pid(),
ctx.data(&store).tid()
);
return Errno::Noexec as u32; return Errno::Noexec as u32;
} }
}; };
@@ -122,10 +126,21 @@ pub fn thread_spawn<M: MemorySize>(
let mut ret = Errno::Success; let mut ret = Errno::Success;
if let Err(err) = spawn.call(store, user_data_low as i32, user_data_high as i32) { if let Err(err) = spawn.call(store, user_data_low as i32, user_data_high as i32) {
debug!("thread failed - start: {}", err); debug!(
"wasi[{}:{}]::thread_spawn - thread failed - start: {}",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
err
);
ret = Errno::Noexec; ret = Errno::Noexec;
} }
//trace!("threading: thread callback finished (reactor={}, ret={})", reactor, ret); trace!(
"wasi[{}:{}]::thread_spawn - thread callback finished (reactor={:?}, ret={})",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
reactor,
ret
);
// If we are NOT a reactor then we will only run once and need to clean up // If we are NOT a reactor then we will only run once and need to clean up
if reactor == Bool::False { if reactor == Bool::False {