Many performance and memory optimizations

- Avoiding a memory copy on all socket reads and writes using a new `copy_from_slice` instead
- File openers no longer box the implementation avoiding a memory allocation on all file access
- Polling sockets rather than using an async function which significantly reduces locking contention and removes an box operation
- Futex now uses wakers rather than broadcasts which makes them more efficient and durable
- Converted many async functions into sync functions in vnet
- Sleeping no longer allocates memory
- Avoiding a number of WasiEnv clones which was impacting performance and memory efficiency
This commit is contained in:
Johnathan Sharratt
2023-02-05 10:16:41 +11:00
committed by Christoph Herzog
parent 7f8c0858b3
commit 26d4a6a0de
71 changed files with 2025 additions and 2123 deletions

View File

@@ -135,6 +135,9 @@ coverage = []
[profile.dev]
split-debuginfo = "unpacked"
#[profile.release]
#debug = true
[[bench]]
name = "static_and_dynamic_functions"
harness = false

View File

@@ -311,6 +311,17 @@ impl<'a, T: ValueType> WasmSlice<'a, T> {
self.buffer.write(self.offset, bytes)
}
/// Reads this `WasmSlice` into a `slice`.
#[inline]
pub fn read_to_slice<'b>(
self,
buf: &'b mut [MaybeUninit<u8>],
) -> Result<usize, MemoryAccessError> {
let len = self.len.try_into().expect("WasmSlice length overflow");
self.buffer.read_uninit(self.offset, buf)?;
Ok(len)
}
/// Reads this `WasmSlice` into a `Vec`.
#[inline]
pub fn read_to_vec(self) -> Result<Vec<T>, MemoryAccessError> {

View File

@@ -313,6 +313,17 @@ impl<'a, T: ValueType> WasmSlice<'a, T> {
self.buffer.write(self.offset, bytes)
}
/// Reads this `WasmSlice` into a `slice`.
#[inline]
pub fn copy_to_slice<'b>(
self,
buf: &'b mut [MaybeUninit<u8>],
) -> Result<usize, MemoryAccessError> {
let len = self.len.try_into().expect("WasmSlice length overflow");
self.buffer.read_uninit(self.offset, buf)?;
Ok(len)
}
/// Reads this `WasmSlice` into a `Vec`.
#[inline]
pub fn read_to_vec(self) -> Result<Vec<T>, MemoryAccessError> {

View File

@@ -41,14 +41,14 @@ impl FileSystem for EmptyFileSystem {
}
fn new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(EmptyFileSystem::default()))
OpenOptions::new(self)
}
}
impl FileOpener for EmptyFileSystem {
#[allow(unused_variables)]
fn open(
&mut self,
&self,
path: &Path,
conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {

View File

@@ -120,7 +120,7 @@ impl crate::FileSystem for FileSystem {
}
fn new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(FileOpener))
OpenOptions::new(self)
}
fn metadata(&self, path: &Path) -> Result<Metadata> {
@@ -188,12 +188,9 @@ impl TryInto<Metadata> for std::fs::Metadata {
}
}
#[derive(Debug, Clone)]
pub struct FileOpener;
impl crate::FileOpener for FileOpener {
impl crate::FileOpener for FileSystem {
fn open(
&mut self,
&self,
path: &Path,
conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {

View File

@@ -80,7 +80,7 @@ impl dyn FileSystem + 'static {
pub trait FileOpener {
fn open(
&mut self,
&self,
path: &Path,
conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>>;
@@ -134,19 +134,19 @@ impl OpenOptionsConfig {
}
}
impl fmt::Debug for OpenOptions {
impl<'a> fmt::Debug for OpenOptions<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.conf.fmt(f)
}
}
pub struct OpenOptions {
opener: Box<dyn FileOpener>,
pub struct OpenOptions<'a> {
opener: &'a dyn FileOpener,
conf: OpenOptionsConfig,
}
impl OpenOptions {
pub fn new(opener: Box<dyn FileOpener>) -> Self {
impl<'a> OpenOptions<'a> {
pub fn new(opener: &'a dyn FileOpener) -> Self {
Self {
opener,
conf: OpenOptionsConfig {

View File

@@ -5,17 +5,11 @@ use std::borrow::Cow;
use std::path::Path;
use tracing::*;
/// The type that is responsible to open a file.
#[derive(Debug, Clone)]
pub struct FileOpener {
pub(super) filesystem: FileSystem,
}
impl FileOpener {
impl FileSystem {
/// Inserts a readonly file into the file system that uses copy-on-write
/// (this is required for zero-copy creation of the same file)
pub fn insert_ro_file(&mut self, path: &Path, contents: Cow<'static, [u8]>) -> Result<()> {
let _ = crate::FileSystem::remove_file(&self.filesystem, path);
pub fn insert_ro_file(&self, path: &Path, contents: Cow<'static, [u8]>) -> Result<()> {
let _ = crate::FileSystem::remove_file(self, path);
let (inode_of_parent, maybe_inode_of_file, name_of_file) = self.insert_inode(path)?;
let inode_of_parent = match inode_of_parent {
@@ -32,7 +26,7 @@ impl FileOpener {
// The file doesn't already exist; it's OK to create it if
None => {
// Write lock.
let mut fs = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs = self.inner.write().map_err(|_| FsError::Lock)?;
let file = ReadOnlyFile::new(contents);
let file_len = file.len() as u64;
@@ -76,11 +70,11 @@ impl FileOpener {
/// Inserts a arc file into the file system that references another file
/// in another file system (does not copy the real data)
pub fn insert_arc_file(
&mut self,
&self,
path: PathBuf,
fs: Arc<dyn crate::FileSystem + Send + Sync>,
) -> Result<()> {
let _ = crate::FileSystem::remove_file(&self.filesystem, path.as_path());
let _ = crate::FileSystem::remove_file(self, path.as_path());
let (inode_of_parent, maybe_inode_of_file, name_of_file) =
self.insert_inode(path.as_path())?;
@@ -98,7 +92,7 @@ impl FileOpener {
// The file doesn't already exist; it's OK to create it if
None => {
// Write lock.
let mut fs_lock = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs_lock = self.inner.write().map_err(|_| FsError::Lock)?;
// Read the metadata or generate a dummy one
let meta = match fs.metadata(&path) {
@@ -145,11 +139,11 @@ impl FileOpener {
/// Inserts a arc directory into the file system that references another file
/// in another file system (does not copy the real data)
pub fn insert_arc_directory(
&mut self,
&self,
path: PathBuf,
fs: Arc<dyn crate::FileSystem + Send + Sync>,
) -> Result<()> {
let _ = crate::FileSystem::remove_dir(&self.filesystem, path.as_path());
let _ = crate::FileSystem::remove_dir(self, path.as_path());
let (inode_of_parent, maybe_inode_of_file, name_of_file) =
self.insert_inode(path.as_path())?;
@@ -167,7 +161,7 @@ impl FileOpener {
// The file doesn't already exist; it's OK to create it if
None => {
// Write lock.
let mut fs_lock = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs_lock = self.inner.write().map_err(|_| FsError::Lock)?;
// Creating the file in the storage.
let inode_of_file = fs_lock.storage.vacant_entry().key();
@@ -209,11 +203,11 @@ impl FileOpener {
/// Inserts a arc file into the file system that references another file
/// in another file system (does not copy the real data)
pub fn insert_custom_file(
&mut self,
&self,
path: PathBuf,
file: Box<dyn crate::VirtualFile + Send + Sync>,
) -> Result<()> {
let _ = crate::FileSystem::remove_file(&self.filesystem, path.as_path());
let _ = crate::FileSystem::remove_file(self, path.as_path());
let (inode_of_parent, maybe_inode_of_file, name_of_file) =
self.insert_inode(path.as_path())?;
@@ -231,7 +225,7 @@ impl FileOpener {
// The file doesn't already exist; it's OK to create it if
None => {
// Write lock.
let mut fs_lock = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs_lock = self.inner.write().map_err(|_| FsError::Lock)?;
// Creating the file in the storage.
let inode_of_file = fs_lock.storage.vacant_entry().key();
@@ -269,11 +263,11 @@ impl FileOpener {
}
fn insert_inode(
&mut self,
&self,
path: &Path,
) -> Result<(InodeResolution, Option<InodeResolution>, OsString)> {
// Read lock.
let fs = self.filesystem.inner.read().map_err(|_| FsError::Lock)?;
let fs = self.inner.read().map_err(|_| FsError::Lock)?;
// Check the path has a parent.
let parent_of_path = path.parent().ok_or(FsError::BaseNotDirectory)?;
@@ -309,9 +303,9 @@ impl FileOpener {
}
}
impl crate::FileOpener for FileOpener {
impl crate::FileOpener for FileSystem {
fn open(
&mut self,
&self,
path: &Path,
conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync + 'static>> {
@@ -370,7 +364,7 @@ impl crate::FileOpener for FileOpener {
};
// Write lock.
let mut fs = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs = self.inner.write().map_err(|_| FsError::Lock)?;
let inode = fs.storage.get_mut(inode_of_file);
match inode {
@@ -456,7 +450,7 @@ impl crate::FileOpener for FileOpener {
// 2. `create` is used with `write` or `append`.
None if (create_new || create) && (write || append) => {
// Write lock.
let mut fs = self.filesystem.inner.write().map_err(|_| FsError::Lock)?;
let mut fs = self.inner.write().map_err(|_| FsError::Lock)?;
let file = File::new();
@@ -500,7 +494,7 @@ impl crate::FileOpener for FileOpener {
Ok(Box::new(FileHandle::new(
inode_of_file,
self.filesystem.clone(),
self.clone(),
read,
write || append || truncate,
append,

View File

@@ -21,10 +21,8 @@ pub struct FileSystem {
}
impl FileSystem {
pub fn new_open_options_ext(&self) -> FileOpener {
FileOpener {
filesystem: self.clone(),
}
pub fn new_open_options_ext(&self) -> &FileSystem {
self
}
pub fn union(&self, other: &Arc<dyn crate::FileSystem + Send + Sync>) {
@@ -526,9 +524,7 @@ impl crate::FileSystem for FileSystem {
}
fn new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(FileOpener {
filesystem: self.clone(),
}))
OpenOptions::new(self)
}
}

View File

@@ -4,7 +4,6 @@ mod filesystem;
mod stdio;
use file::{File, FileHandle, ReadOnlyFile};
pub use file_opener::FileOpener;
pub use filesystem::FileSystem;
pub use stdio::{Stderr, Stdin, Stdout};

View File

@@ -43,16 +43,9 @@ impl StaticFileSystem {
}
/// Custom file opener, returns a WebCFile
#[derive(Debug)]
struct WebCFileOpener {
pub package: String,
pub volumes: Arc<webc::IndexMap<String, webc::Volume<'static>>>,
pub memory: Arc<MemFileSystem>,
}
impl FileOpener for WebCFileOpener {
impl FileOpener for StaticFileSystem {
fn open(
&mut self,
&self,
path: &Path,
_conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync>, FsError> {
@@ -338,11 +331,7 @@ impl FileSystem for StaticFileSystem {
}
}
fn new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(WebCFileOpener {
package: self.package.clone(),
volumes: self.volumes.clone(),
memory: self.memory.clone(),
}))
OpenOptions::new(self)
}
fn symlink_metadata(&self, path: &Path) -> Result<Metadata, FsError> {
let path = normalizes_path(path);

View File

@@ -30,7 +30,7 @@ impl TmpFileSystem {
Self::default()
}
pub fn new_open_options_ext(&self) -> mem_fs::FileOpener {
pub fn new_open_options_ext(&self) -> &mem_fs::FileSystem {
self.fs.new_open_options_ext()
}

View File

@@ -364,10 +364,7 @@ impl FileSystem for UnionFileSystem {
Err(ret_error)
}
fn new_open_options(&self) -> OpenOptions {
let opener = Box::new(UnionFileOpener {
mounts: self.mounts.clone(),
});
OpenOptions::new(opener)
OpenOptions::new(self)
}
}
@@ -413,14 +410,9 @@ fn filter_mounts(
ret.into_iter()
}
#[derive(Debug)]
pub struct UnionFileOpener {
mounts: Vec<MountPoint>,
}
impl FileOpener for UnionFileOpener {
impl FileOpener for UnionFileSystem {
fn open(
&mut self,
&self,
path: &Path,
conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync>> {

View File

@@ -46,23 +46,13 @@ where
}
/// Custom file opener, returns a WebCFile
#[derive(Debug)]
struct WebCFileOpener<T>
where
T: std::fmt::Debug + Send + Sync + 'static,
{
pub package: String,
pub webc: Arc<T>,
pub memory: Arc<MemFileSystem>,
}
impl<T> FileOpener for WebCFileOpener<T>
impl<T> FileOpener for WebcFileSystem<T>
where
T: std::fmt::Debug + Send + Sync + 'static,
T: Deref<Target = WebC<'static>>,
{
fn open(
&mut self,
&self,
path: &Path,
_conf: &OpenOptionsConfig,
) -> Result<Box<dyn VirtualFile + Send + Sync>, FsError> {
@@ -357,11 +347,7 @@ where
}
}
fn new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(WebCFileOpener {
package: self.package.clone(),
webc: self.webc.clone(),
memory: self.memory.clone(),
}))
OpenOptions::new(self)
}
fn symlink_metadata(&self, path: &Path) -> Result<Metadata, FsError> {
let path = normalizes_path(path);

View File

@@ -1,10 +1,13 @@
use std::fmt;
use std::mem::MaybeUninit;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use thiserror::Error;
@@ -55,37 +58,37 @@ pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
}
/// Adds a static IP address to the interface with a netmask prefix
async fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<()> {
fn ip_add(&self, ip: IpAddr, prefix: u8) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Removes a static (or dynamic) IP address from the interface
async fn ip_remove(&self, ip: IpAddr) -> Result<()> {
fn ip_remove(&self, ip: IpAddr) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Clears all the assigned IP addresses for this interface
async fn ip_clear(&self) -> Result<()> {
fn ip_clear(&self) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Lists all the IP addresses currently assigned to this interface
async fn ip_list(&self) -> Result<Vec<IpCidr>> {
fn ip_list(&self) -> Result<Vec<IpCidr>> {
Err(NetworkError::Unsupported)
}
/// Returns the hardware MAC address for this interface
async fn mac(&self) -> Result<[u8; 6]> {
fn mac(&self) -> Result<[u8; 6]> {
Err(NetworkError::Unsupported)
}
/// Adds a default gateway to the routing table
async fn gateway_set(&self, ip: IpAddr) -> Result<()> {
fn gateway_set(&self, ip: IpAddr) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Adds a specific route to the routing table
async fn route_add(
fn route_add(
&self,
cidr: IpCidr,
via_router: IpAddr,
@@ -96,17 +99,17 @@ pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
}
/// Removes a routing rule from the routing table
async fn route_remove(&self, cidr: IpAddr) -> Result<()> {
fn route_remove(&self, cidr: IpAddr) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Clears the routing table for this interface
async fn route_clear(&self) -> Result<()> {
fn route_clear(&self) -> Result<()> {
Err(NetworkError::Unsupported)
}
/// Lists all the routes defined in the routing table for this interface
async fn route_list(&self) -> Result<Vec<IpRoute>> {
fn route_list(&self) -> Result<Vec<IpRoute>> {
Err(NetworkError::Unsupported)
}
@@ -152,7 +155,6 @@ pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
&self,
addr: SocketAddr,
peer: SocketAddr,
timeout: Option<Duration>,
) -> Result<Box<dyn VirtualTcpSocket + Sync>> {
Err(NetworkError::Unsupported)
}
@@ -170,25 +172,6 @@ pub trait VirtualNetworking: fmt::Debug + Send + Sync + 'static {
pub type DynVirtualNetworking = Arc<dyn VirtualNetworking>;
#[derive(Debug, Clone)]
pub struct SocketReceive {
/// Data that was received
pub data: Bytes,
/// Indicates if the data was truncated (e.g. UDP packet)
pub truncated: bool,
}
#[derive(Debug, Clone)]
pub struct SocketReceiveFrom {
/// Data that was received
pub data: Bytes,
/// Indicates if the data was truncated (e.g. UDP packet)
pub truncated: bool,
/// Peer sender address of the data
pub addr: SocketAddr,
}
#[async_trait::async_trait]
pub trait VirtualTcpListener: fmt::Debug + Send + Sync + 'static {
/// Tries to accept a new connection
fn try_accept(&mut self) -> Option<Result<(Box<dyn VirtualTcpSocket + Sync>, SocketAddr)>>;
@@ -205,38 +188,19 @@ pub trait VirtualTcpListener: fmt::Debug + Send + Sync + 'static {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<usize>>;
/// Sets the accept timeout
fn set_timeout(&mut self, timeout: Option<Duration>) -> Result<()>;
/// Gets the accept timeout
fn timeout(&self) -> Result<Option<Duration>>;
/// Returns the local address of this TCP listener
fn addr_local(&self) -> Result<SocketAddr>;
/// Sets how many network hops the packets are permitted for new connections
async fn set_ttl(&mut self, ttl: u8) -> Result<()>;
fn set_ttl(&mut self, ttl: u8) -> Result<()>;
/// Returns the maximum number of network hops before packets are dropped
fn ttl(&self) -> Result<u8>;
/// Determines if the socket is blocking or not
fn set_nonblocking(&mut self, nonblocking: bool) -> Result<()>;
// Returns true if the socket is nonblocking
fn nonblocking(&self) -> Result<bool>;
}
#[async_trait::async_trait]
pub trait VirtualSocket: fmt::Debug + Send + Sync + 'static {
/// Sets how many network hops the packets are permitted for new connections
async fn set_ttl(&mut self, ttl: u32) -> Result<()>;
/// Determines if the socket is blocking or not
fn set_nonblocking(&mut self, nonblocking: bool) -> Result<()>;
// Returns true if the socket is nonblocking
fn nonblocking(&self) -> Result<bool>;
fn set_ttl(&mut self, ttl: u32) -> Result<()>;
/// Returns the maximum number of network hops before packets are dropped
fn ttl(&self) -> Result<u32>;
@@ -277,7 +241,6 @@ pub enum StreamSecurity {
}
/// Connected sockets have a persistent connection to a remote peer
#[async_trait::async_trait]
pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
/// Determines how long the socket will remain in a TIME_WAIT
/// after it disconnects (only the one that initiates the close will
@@ -289,63 +252,93 @@ pub trait VirtualConnectedSocket: VirtualSocket + fmt::Debug + Send + Sync + 'st
/// after it disconnects
fn linger(&self) -> Result<Option<Duration>>;
/// Sends out a datagram or stream of bytes on this socket
async fn send(&mut self, data: Bytes) -> Result<usize>;
/// Tries to send out a datagram or stream of bytes on this socket
fn try_send(&mut self, data: &[u8]) -> Result<usize>;
/// FLushes all the datagrams
async fn flush(&mut self) -> Result<()>;
/// Sends out a datagram or stream of bytes on this socket
fn poll_send(&mut self, cx: &mut Context<'_>, data: &[u8]) -> Poll<Result<usize>>;
/// Attempts to flush the object, ensuring that any buffered data reach
/// their destination.
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
/// Closes the socket
fn close(&mut self) -> Result<()>;
/// Recv a packet from the socket
async fn recv(&mut self) -> Result<SocketReceive>;
fn poll_recv<'a>(
&mut self,
cx: &mut Context<'_>,
buf: &'a mut [MaybeUninit<u8>],
) -> Poll<Result<usize>>;
/// Recv a packet from the socket
fn try_recv(&mut self) -> Result<Option<SocketReceive>>;
fn try_recv<'a>(&mut self, buf: &'a mut [MaybeUninit<u8>]) -> Result<usize>;
}
/// Connectionless sockets are able to send and receive datagrams and stream
/// bytes to multiple addresses at the same time (peer-to-peer)
#[async_trait::async_trait]
pub trait VirtualConnectionlessSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
/// Sends out a datagram or stream of bytes on this socket
/// to a specific address
async fn send_to(&mut self, data: Bytes, addr: SocketAddr) -> Result<usize>;
fn poll_send_to(
&mut self,
cx: &mut Context<'_>,
data: &[u8],
addr: SocketAddr,
) -> Poll<Result<usize>>;
/// Sends out a datagram or stream of bytes on this socket
/// to a specific address
fn try_send_to(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize>;
/// Recv a packet from the socket
async fn recv_from(&mut self) -> Result<SocketReceiveFrom>;
fn poll_recv_from<'a>(
&mut self,
cx: &mut Context<'_>,
buf: &'a mut [MaybeUninit<u8>],
) -> Poll<Result<(usize, SocketAddr)>>;
/// Recv a packet from the socket
fn try_recv_from(&mut self) -> Result<Option<SocketReceiveFrom>>;
fn try_recv_from<'a>(&mut self, buf: &'a mut [MaybeUninit<u8>]) -> Result<(usize, SocketAddr)>;
}
/// ICMP sockets are low level devices bound to a specific address
/// that can send and receive ICMP packets
#[async_trait::async_trait]
pub trait VirtualIcmpSocket:
VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
{
}
#[async_trait::async_trait]
pub trait VirtualRawSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
/// Sends out a raw packet on this socket
async fn send(&mut self, data: Bytes) -> Result<usize>;
/// Sends out a datagram or stream of bytes on this socket
fn poll_send(&mut self, cx: &mut Context<'_>, data: &[u8]) -> Poll<Result<usize>>;
/// FLushes all the datagrams
async fn flush(&mut self) -> Result<()>;
/// Sends out a datagram or stream of bytes on this socket
fn try_send(&mut self, data: &[u8]) -> Result<usize>;
/// Attempts to flush the object, ensuring that any buffered data reach
/// their destination.
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
/// Attempts to flush the object, ensuring that any buffered data reach
/// their destination.
fn try_flush(&mut self) -> Result<()>;
/// Recv a packet from the socket
async fn recv(&mut self) -> Result<SocketReceive>;
fn poll_recv<'a>(
&mut self,
cx: &mut Context<'_>,
buf: &'a mut [MaybeUninit<u8>],
) -> Poll<Result<usize>>;
/// Recv a packet from the socket
fn try_recv(&mut self) -> Result<Option<SocketReceive>>;
fn try_recv<'a>(&mut self, buf: &'a mut [MaybeUninit<u8>]) -> Result<usize>;
/// Tells the raw socket and its backing switch that all packets
/// should be received by this socket even if they are not
/// destined for this device
async fn set_promiscuous(&mut self, promiscuous: bool) -> Result<()>;
fn set_promiscuous(&mut self, promiscuous: bool) -> Result<()>;
/// Returns if the socket is running in promiscuous mode whereby it
/// will receive all packets even if they are not destined for the
@@ -353,23 +346,7 @@ pub trait VirtualRawSocket: VirtualSocket + fmt::Debug + Send + Sync + 'static {
fn promiscuous(&self) -> Result<bool>;
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum TimeType {
ReadTimeout,
WriteTimeout,
AcceptTimeout,
ConnectTimeout,
Linger,
}
#[async_trait::async_trait]
pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync + 'static {
/// Sets the timeout for a specific action on the socket
fn set_opt_time(&mut self, ty: TimeType, timeout: Option<Duration>) -> Result<()>;
/// Returns one of the previous set timeouts
fn opt_time(&self, ty: TimeType) -> Result<Option<Duration>>;
/// Sets the receive buffer size which acts as a trottle for how
/// much data is buffered on this side of the pipe
fn set_recv_buf_size(&mut self, size: usize) -> Result<()>;
@@ -390,7 +367,7 @@ pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync +
/// the peer is sent immediately rather than waiting for a bigger
/// batch of data, this reduces latency but increases encapsulation
/// overhead.
async fn set_nodelay(&mut self, reuse: bool) -> Result<()>;
fn set_nodelay(&mut self, reuse: bool) -> Result<()>;
/// Indicates if the NO_DELAY flag is set which means that data
/// is immediately sent to the peer without waiting. This reduces
@@ -403,20 +380,15 @@ pub trait VirtualTcpSocket: VirtualConnectedSocket + fmt::Debug + Send + Sync +
/// Shuts down either the READER or WRITER sides of the socket
/// connection.
async fn shutdown(&mut self, how: Shutdown) -> Result<()>;
fn shutdown(&mut self, how: Shutdown) -> Result<()>;
/// Return true if the socket is closed
fn is_closed(&self) -> bool;
}
#[async_trait::async_trait]
pub trait VirtualUdpSocket:
VirtualConnectedSocket + VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
VirtualConnectionlessSocket + fmt::Debug + Send + Sync + 'static
{
/// Connects to a destination peer so that the normal
/// send/recv operations can be used.
async fn connect(&mut self, addr: SocketAddr) -> Result<()>;
/// Sets a flag that means that the UDP socket is able
/// to receive and process broadcast packets.
fn set_broadcast(&mut self, broadcast: bool) -> Result<()>;

File diff suppressed because it is too large Load Diff

View File

@@ -247,6 +247,8 @@ impl ModuleCache {
#[cfg(test)]
#[cfg(feature = "sys")]
mod tests {
use std::time::Duration;
use tracing_subscriber::{
filter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, Layer,
};
@@ -275,7 +277,9 @@ mod tests {
for _ in 0..2 {
let webc = cache.get_webc("sharrattj/dash", &rt, &tasks).unwrap();
store.push(webc);
tasks.runtime().block_on(tasks.sleep_now(0.into(), 1000));
tasks
.runtime()
.block_on(tasks.sleep_now(Duration::from_secs(1)));
}
}
}

View File

@@ -1,5 +1,4 @@
use std::{
collections::HashMap,
future::Future,
io::{IoSlice, SeekFrom},
ops::{Deref, DerefMut},
@@ -39,36 +38,23 @@ pub(crate) enum InodeValFilePollGuardMode {
counter: Arc<AtomicU64>,
},
Socket {
inner: Arc<tokio::sync::RwLock<InodeSocketInner>>,
lock_state: InodeValFilePollGuardSocketLocking,
inner: Arc<RwLock<InodeSocketInner>>,
},
}
pub(crate) enum InodeValFilePollGuardSocketLocking {
Locking(
Pin<
Box<
dyn Future<Output = tokio::sync::OwnedRwLockWriteGuard<InodeSocketInner>>
+ Send
+ Sync
+ 'static,
>,
>,
),
Locked(tokio::sync::OwnedRwLockWriteGuard<InodeSocketInner>),
}
pub(crate) struct InodeValFilePollGuard {
pub(crate) fd: u32,
pub(crate) peb: PollEventSet,
pub(crate) subscription: Subscription,
pub(crate) mode: InodeValFilePollGuardMode,
pub(crate) subscriptions: HashMap<PollEventSet, Subscription>,
}
impl InodeValFilePollGuard {
pub(crate) fn new(
fd: u32,
peb: PollEventSet,
subscription: Subscription,
guard: &Kind,
subscriptions: HashMap<PollEventSet, Subscription>,
) -> Option<Self> {
let mode = match guard.deref() {
Kind::EventNotifications {
@@ -91,22 +77,9 @@ impl InodeValFilePollGuard {
counter: counter.clone(),
}
}
Kind::Socket { socket } => {
if let Ok(guard) = socket.inner.clone().try_write_owned() {
InodeValFilePollGuardMode::Socket {
inner: socket.inner.clone(),
lock_state: InodeValFilePollGuardSocketLocking::Locked(guard),
}
} else {
let socket = socket.clone();
InodeValFilePollGuardMode::Socket {
inner: socket.inner.clone(),
lock_state: InodeValFilePollGuardSocketLocking::Locking(Box::pin(
socket.inner.write_owned(),
)),
}
}
}
Kind::Socket { socket } => InodeValFilePollGuardMode::Socket {
inner: socket.inner.clone(),
},
Kind::File {
handle: Some(handle),
..
@@ -118,7 +91,8 @@ impl InodeValFilePollGuard {
Some(Self {
fd,
mode,
subscriptions,
peb,
subscription,
})
}
}
@@ -130,22 +104,22 @@ impl std::fmt::Debug for InodeValFilePollGuard {
InodeValFilePollGuardMode::EventNotifications { .. } => {
write!(f, "guard-notifications")
}
InodeValFilePollGuardMode::Socket { lock_state, .. } => match lock_state {
InodeValFilePollGuardSocketLocking::Locked(guard) => match guard.kind {
InodeSocketKind::TcpListener(..) => write!(f, "guard-tcp-listener"),
InodeSocketKind::TcpStream(ref stream) => {
if stream.is_closed() {
InodeValFilePollGuardMode::Socket { inner } => {
let inner = inner.read().unwrap();
match inner.kind {
InodeSocketKind::TcpListener { .. } => write!(f, "guard-tcp-listener"),
InodeSocketKind::TcpStream { ref socket, .. } => {
if socket.is_closed() {
write!(f, "guard-tcp-stream (closed)")
} else {
write!(f, "guard-tcp-stream")
}
}
InodeSocketKind::UdpSocket(..) => write!(f, "guard-udp-socket"),
InodeSocketKind::UdpSocket { .. } => write!(f, "guard-udp-socket"),
InodeSocketKind::Raw(..) => write!(f, "guard-raw-socket"),
_ => write!(f, "guard-socket"),
},
_ => write!(f, "guard-socket (locked)"),
},
}
}
}
}
}
@@ -167,7 +141,8 @@ impl InodeValFilePollGuard {
pub(crate) struct InodeValFilePollGuardJoin<'a> {
mode: &'a mut InodeValFilePollGuardMode,
fd: u32,
subscriptions: HashMap<PollEventSet, Subscription>,
peb: PollEventSet,
subscription: Subscription,
}
impl<'a> InodeValFilePollGuardJoin<'a> {
@@ -175,7 +150,8 @@ impl<'a> InodeValFilePollGuardJoin<'a> {
Self {
mode: &mut guard.mode,
fd: guard.fd,
subscriptions: guard.subscriptions.clone(),
peb: guard.peb,
subscription: guard.subscription,
}
}
pub(crate) fn fd(&self) -> u32 {
@@ -184,37 +160,34 @@ impl<'a> InodeValFilePollGuardJoin<'a> {
}
impl<'a> Future for InodeValFilePollGuardJoin<'a> {
type Output = Vec<Event>;
type Output = Event;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut has_read = None;
let mut has_write = None;
let mut has_close = None;
let mut has_read = false;
let mut has_write = false;
let mut has_close = false;
let mut has_hangup = false;
let mut ret = Vec::new();
for (set, s) in self.subscriptions.iter() {
for in_event in iterate_poll_events(*set) {
match in_event {
PollEvent::PollIn => {
has_read = Some(*s);
}
PollEvent::PollOut => {
has_write = Some(*s);
}
PollEvent::PollHangUp => {
has_hangup = true;
has_close = Some(*s);
}
PollEvent::PollError | PollEvent::PollInvalid => {
if !has_hangup {
has_close = Some(*s);
}
for in_event in iterate_poll_events(self.peb) {
match in_event {
PollEvent::PollIn => {
has_read = true;
}
PollEvent::PollOut => {
has_write = true;
}
PollEvent::PollHangUp => {
has_hangup = true;
has_close = true;
}
PollEvent::PollError | PollEvent::PollInvalid => {
if !has_hangup {
has_close = true;
}
}
}
}
if let Some(s) = has_close.as_ref() {
if has_close {
let is_closed = match &mut self.mode {
InodeValFilePollGuardMode::File(file) => {
let mut guard = file.write().unwrap();
@@ -222,29 +195,11 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
file.poll_shutdown(cx).is_ready()
}
InodeValFilePollGuardMode::EventNotifications { .. } => false,
InodeValFilePollGuardMode::Socket {
ref inner,
ref mut lock_state,
..
} => {
let guard = match lock_state {
InodeValFilePollGuardSocketLocking::Locking(locking) => {
match locking.as_mut().poll(cx) {
Poll::Ready(guard) => {
*lock_state = InodeValFilePollGuardSocketLocking::Locked(guard);
match lock_state {
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
_ => unreachable!(),
}
}
Poll::Pending => return Poll::Pending,
}
}
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
};
InodeValFilePollGuardMode::Socket { ref inner } => {
let mut guard = inner.write().unwrap();
let is_closed = if let InodeSocketKind::Closed = guard.kind {
true
} else if has_read.is_some() || has_write.is_some() {
} else if has_read || has_write {
// this will be handled in the read/write poll instead
false
} else {
@@ -261,21 +216,15 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
_ => false,
}
};
// Release the lock so we don't cause any blocking issues
drop(guard);
*lock_state = InodeValFilePollGuardSocketLocking::Locking(Box::pin(
inner.clone().write_owned(),
));
is_closed
}
};
if is_closed {
ret.push(Event {
userdata: s.userdata,
return Poll::Ready(Event {
userdata: self.subscription.userdata,
error: Errno::Success,
type_: s.type_,
u: match s.type_ {
type_: self.subscription.type_,
u: match self.subscription.type_ {
Eventtype::FdRead | Eventtype::FdWrite => EventUnion {
fd_readwrite: EventFdReadwrite {
nbytes: 0,
@@ -291,7 +240,7 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
});
}
}
if let Some(s) = has_read {
if has_read {
let mut poll_result = match &mut self.mode {
InodeValFilePollGuardMode::File(file) => {
let mut guard = file.write().unwrap();
@@ -317,37 +266,12 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
})
}
}
InodeValFilePollGuardMode::Socket {
ref inner,
ref mut lock_state,
} => {
let guard = match lock_state {
InodeValFilePollGuardSocketLocking::Locking(locking) => {
match locking.as_mut().poll(cx) {
Poll::Ready(guard) => {
*lock_state = InodeValFilePollGuardSocketLocking::Locked(guard);
match lock_state {
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
_ => unreachable!(),
}
}
Poll::Pending => return Poll::Pending,
}
}
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
};
let res = guard.poll_read_ready(cx).map_err(net_error_into_io_err);
// drop the lock so we don't block things
drop(guard);
*lock_state = InodeValFilePollGuardSocketLocking::Locking(Box::pin(
inner.clone().write_owned(),
));
res
InodeValFilePollGuardMode::Socket { ref inner } => {
let mut guard = inner.write().unwrap();
guard.poll_read_ready(cx).map_err(net_error_into_io_err)
}
};
if let Some(s) = has_close.as_ref() {
if has_close {
poll_result = match poll_result {
Poll::Ready(Err(err))
if err.kind() == std::io::ErrorKind::ConnectionAborted
@@ -357,11 +281,11 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
|| err.kind() == std::io::ErrorKind::NotConnected
|| err.kind() == std::io::ErrorKind::UnexpectedEof =>
{
ret.push(Event {
userdata: s.userdata,
return Poll::Ready(Event {
userdata: self.subscription.userdata,
error: Errno::Success,
type_: s.type_,
u: match s.type_ {
type_: self.subscription.type_,
u: match self.subscription.type_ {
Eventtype::FdRead | Eventtype::FdWrite => EventUnion {
fd_readwrite: EventFdReadwrite {
nbytes: 0,
@@ -375,7 +299,6 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
Eventtype::Clock => EventUnion { clock: 0 },
},
});
Poll::Pending
}
a => a,
};
@@ -389,11 +312,11 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
0
}
};
ret.push(Event {
userdata: s.userdata,
return Poll::Ready(Event {
userdata: self.subscription.userdata,
error,
type_: s.type_,
u: match s.type_ {
type_: self.subscription.type_,
u: match self.subscription.type_ {
Eventtype::FdRead | Eventtype::FdWrite => EventUnion {
fd_readwrite: EventFdReadwrite {
nbytes: bytes_available as u64,
@@ -409,7 +332,7 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
});
}
}
if let Some(s) = has_write {
if has_write {
let mut poll_result = match &mut self.mode {
InodeValFilePollGuardMode::File(file) => {
let mut guard = file.write().unwrap();
@@ -435,37 +358,12 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
})
}
}
InodeValFilePollGuardMode::Socket {
ref inner,
ref mut lock_state,
} => {
let guard = match lock_state {
InodeValFilePollGuardSocketLocking::Locking(locking) => {
match locking.as_mut().poll(cx) {
Poll::Ready(guard) => {
*lock_state = InodeValFilePollGuardSocketLocking::Locked(guard);
match lock_state {
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
_ => unreachable!(),
}
}
Poll::Pending => return Poll::Pending,
}
}
InodeValFilePollGuardSocketLocking::Locked(guard) => guard,
};
let res = guard.poll_write_ready(cx).map_err(net_error_into_io_err);
// drop the lock so we don't block things
drop(guard);
*lock_state = InodeValFilePollGuardSocketLocking::Locking(Box::pin(
inner.clone().write_owned(),
));
res
InodeValFilePollGuardMode::Socket { ref inner } => {
let mut guard = inner.write().unwrap();
guard.poll_write_ready(cx).map_err(net_error_into_io_err)
}
};
if let Some(s) = has_close.as_ref() {
if has_close {
poll_result = match poll_result {
Poll::Ready(Err(err))
if err.kind() == std::io::ErrorKind::ConnectionAborted
@@ -475,11 +373,11 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
|| err.kind() == std::io::ErrorKind::NotConnected
|| err.kind() == std::io::ErrorKind::UnexpectedEof =>
{
ret.push(Event {
userdata: s.userdata,
return Poll::Ready(Event {
userdata: self.subscription.userdata,
error: Errno::Success,
type_: s.type_,
u: match s.type_ {
type_: self.subscription.type_,
u: match self.subscription.type_ {
Eventtype::FdRead | Eventtype::FdWrite => EventUnion {
fd_readwrite: EventFdReadwrite {
nbytes: 0,
@@ -493,7 +391,6 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
Eventtype::Clock => EventUnion { clock: 0 },
},
});
Poll::Pending
}
a => a,
};
@@ -507,11 +404,11 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
0
}
};
ret.push(Event {
userdata: s.userdata,
return Poll::Ready(Event {
userdata: self.subscription.userdata,
error,
type_: s.type_,
u: match s.type_ {
type_: self.subscription.type_,
u: match self.subscription.type_ {
Eventtype::FdRead | Eventtype::FdWrite => EventUnion {
fd_readwrite: EventFdReadwrite {
nbytes: bytes_available as u64,
@@ -527,11 +424,7 @@ impl<'a> Future for InodeValFilePollGuardJoin<'a> {
});
}
}
if !ret.is_empty() {
Poll::Ready(ret)
} else {
Poll::Pending
}
Poll::Pending
}
}
@@ -556,11 +449,13 @@ impl InodeValFileReadGuard {
pub fn into_poll_guard(
self,
fd: u32,
subscriptions: HashMap<PollEventSet, Subscription>,
peb: PollEventSet,
subscription: Subscription,
) -> InodeValFilePollGuard {
InodeValFilePollGuard {
fd,
subscriptions,
peb,
subscription,
mode: InodeValFilePollGuardMode::File(self.file),
}
}

View File

@@ -515,7 +515,9 @@ impl WasiFs {
if path.starts_with("./") {
let current_dir = self.current_dir.lock().unwrap();
path = format!("{}{}", current_dir.as_str(), &path[1..]);
path = path.replace("//", "/");
if path.contains("//") {
path = path.replace("//", "/");
}
}
path
}

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::{Arc, Mutex, RwLock},
task::Waker,
};
use bytes::{Bytes, BytesMut};
@@ -88,7 +89,7 @@ struct WasiThreadState {
is_main: bool,
pid: WasiProcessId,
id: WasiThreadId,
signals: Mutex<(Vec<Signal>, tokio::sync::broadcast::Sender<()>)>,
signals: Mutex<(Vec<Signal>, Vec<Waker>)>,
stack: Mutex<ThreadStack>,
finished: Arc<TaskJoinHandle>,
@@ -113,7 +114,7 @@ impl WasiThread {
pid,
id,
finished,
signals: Mutex::new((Vec::new(), tokio::sync::broadcast::channel(1).0)),
signals: Mutex::new((Vec::new(), Vec::new())),
stack: Mutex::new(ThreadStack::default()),
_task_count_guard: guard,
}),
@@ -136,7 +137,7 @@ impl WasiThread {
}
// TODO: this should be private, access should go through utility methods.
pub fn signals(&self) -> &Mutex<(Vec<Signal>, tokio::sync::broadcast::Sender<()>)> {
pub fn signals(&self) -> &Mutex<(Vec<Signal>, Vec<Waker>)> {
&self.state.signals
}
@@ -162,7 +163,7 @@ impl WasiThread {
if !guard.0.contains(&signal) {
guard.0.push(signal);
}
let _ = guard.1.send(());
guard.1.drain(..).for_each(|w| w.wake());
}
/// Returns all the signals that are waiting to be processed
@@ -177,18 +178,41 @@ impl WasiThread {
}
/// Returns all the signals that are waiting to be processed
pub fn pop_signals_or_subscribe(
&self,
) -> Result<Vec<Signal>, tokio::sync::broadcast::Receiver<()>> {
pub fn pop_signals_or_subscribe(&self, waker: &Waker) -> Option<Vec<Signal>> {
let mut guard = self.state.signals.lock().unwrap();
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut guard.0);
match ret.is_empty() {
true => Err(guard.1.subscribe()),
false => Ok(ret),
true => {
if guard.1.iter().any(|w| w.will_wake(waker)) == false {
guard.1.push(waker.clone());
}
None
}
false => Some(ret),
}
}
/// Returns all the signals that are waiting to be processed
pub fn has_signals_or_subscribe(&self, waker: &Waker) -> bool {
let mut guard = self.state.signals.lock().unwrap();
let has_signals = !guard.0.is_empty();
if has_signals == false {
if guard.1.iter().any(|w| w.will_wake(waker)) == false {
guard.1.push(waker.clone());
}
}
has_signals
}
/// Returns all the signals that are waiting to be processed
pub fn pop_signals(&self) -> Vec<Signal> {
let mut guard = self.state.signals.lock().unwrap();
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut guard.0);
ret
}
/// Adds a stack snapshot and removes dead ones
pub fn add_snapshot(
&self,

View File

@@ -2,7 +2,7 @@
#[cfg(feature = "sys-thread")]
pub mod tokio;
use std::pin::Pin;
use std::{pin::Pin, time::Duration};
use ::tokio::runtime::Runtime;
use futures::Future;
@@ -10,7 +10,7 @@ use wasmer::{vm::VMMemory, MemoryType, Module, Store};
#[cfg(feature = "sys")]
use wasmer_types::MemoryStyle;
use crate::{os::task::thread::WasiThreadError, WasiCallingId};
use crate::os::task::thread::WasiThreadError;
#[derive(Debug)]
pub struct SpawnedMemory {
@@ -28,16 +28,13 @@ pub enum SpawnType {
}
/// An implementation of task management
#[async_trait::async_trait]
#[allow(unused_variables)]
pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
/// Invokes whenever a WASM thread goes idle. In some runtimes (like singlethreaded
/// execution environments) they will need to do asynchronous work whenever the main
/// thread goes idle and this is the place to hook for that.
fn sleep_now(
&self,
_id: WasiCallingId,
ms: u128,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
async fn sleep_now(&self, time: Duration);
/// Starts an asynchronous task that will run on a shared worker pool
/// This task must not block the execution or it could cause a deadlock
@@ -82,19 +79,15 @@ pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
#[derive(Clone, Debug)]
pub struct StubTaskManager;
#[async_trait::async_trait]
impl VirtualTaskManager for StubTaskManager {
#[allow(unused_variables)]
fn sleep_now(
&self,
id: WasiCallingId,
ms: u128,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
if ms == 0 {
async fn sleep_now(&self, time: Duration) {
if time == Duration::ZERO {
std::thread::yield_now();
} else {
std::thread::sleep(std::time::Duration::from_millis(ms as u64));
std::thread::sleep(time);
}
Box::pin(async move {})
}
#[allow(unused_variables)]

View File

@@ -1,11 +1,11 @@
use std::pin::Pin;
use std::{pin::Pin, time::Duration};
use futures::Future;
#[cfg(feature = "sys-thread")]
use tokio::runtime::{Builder, Runtime};
use wasmer::{vm::VMMemory, Module, Store};
use crate::{os::task::thread::WasiThreadError, WasiCallingId};
use crate::os::task::thread::WasiThreadError;
use super::{SpawnType, VirtualTaskManager};
@@ -39,20 +39,15 @@ impl<'g> Drop for TokioRuntimeGuard<'g> {
fn drop(&mut self) {}
}
#[async_trait::async_trait]
impl VirtualTaskManager for TokioTaskManager {
/// See [`VirtualTaskManager::sleep_now`].
fn sleep_now(
&self,
_id: WasiCallingId,
ms: u128,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
Box::pin(async move {
if ms == 0 {
tokio::task::yield_now().await;
} else {
tokio::time::sleep(std::time::Duration::from_millis(ms as u64)).await;
}
})
async fn sleep_now(&self, time: Duration) {
if time == Duration::ZERO {
tokio::task::yield_now().await;
} else {
tokio::time::sleep(time).await;
}
}
/// See [`VirtualTaskManager::task_shared`].

View File

@@ -338,20 +338,17 @@ impl WasiEnv {
// differently
let env = ctx.data();
if !env.inner().signal_set {
if let Ok(signals) = env.thread.pop_signals_or_subscribe() {
let signal_cnt = signals.len();
for sig in signals {
if sig == Signal::Sigint || sig == Signal::Sigquit || sig == Signal::Sigkill {
env.thread.terminate(Errno::Intr as u32);
return Err(WasiError::Exit(Errno::Intr as u32));
} else {
trace!("wasi[{}]::signal-ignored: {:?}", env.pid(), sig);
}
let signals = env.thread.pop_signals();
let signal_cnt = signals.len();
for sig in signals {
if sig == Signal::Sigint || sig == Signal::Sigquit || sig == Signal::Sigkill {
env.thread.terminate(Errno::Intr as u32);
return Err(WasiError::Exit(Errno::Intr as u32));
} else {
trace!("wasi[{}]::signal-ignored: {:?}", env.pid(), sig);
}
return Ok(Ok(signal_cnt > 0));
} else {
return Ok(Ok(false));
}
return Ok(Ok(signal_cnt > 0));
}
// Check for forced exit
@@ -381,67 +378,80 @@ impl WasiEnv {
// Check for any signals that we need to trigger
// (but only if a signal handler is registered)
if let Some(_) = env.inner().signal.as_ref() {
let signals = env.thread.pop_signals();
Ok(Ok(Self::process_signals_internal(ctx, signals)?))
} else {
Ok(Ok(false))
}
}
pub fn process_signals_internal(
ctx: &mut FunctionEnvMut<'_, Self>,
mut signals: Vec<Signal>,
) -> Result<bool, WasiError> {
let env = ctx.data();
if let Some(handler) = env.inner().signal.clone() {
if let Ok(mut signals) = env.thread.pop_signals_or_subscribe() {
// We might also have signals that trigger on timers
let mut now = 0;
let has_signal_interval = {
let mut any = false;
let inner = env.process.inner.read().unwrap();
if !inner.signal_intervals.is_empty() {
now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000)
.unwrap() as u128;
for signal in inner.signal_intervals.values() {
let elapsed = now - signal.last_signal;
if elapsed >= signal.interval.as_nanos() {
any = true;
break;
}
}
}
any
};
if has_signal_interval {
let mut inner = env.process.inner.write().unwrap();
for signal in inner.signal_intervals.values_mut() {
// We might also have signals that trigger on timers
let mut now = 0;
let has_signal_interval = {
let mut any = false;
let inner = env.process.inner.read().unwrap();
if !inner.signal_intervals.is_empty() {
now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap()
as u128;
for signal in inner.signal_intervals.values() {
let elapsed = now - signal.last_signal;
if elapsed >= signal.interval.as_nanos() {
signal.last_signal = now;
signals.push(signal.signal);
any = true;
break;
}
}
}
any
};
if has_signal_interval {
let mut inner = env.process.inner.write().unwrap();
for signal in inner.signal_intervals.values_mut() {
let elapsed = now - signal.last_signal;
if elapsed >= signal.interval.as_nanos() {
signal.last_signal = now;
signals.push(signal.signal);
}
}
}
for signal in signals {
tracing::trace!(
"wasi[{}]::processing-signal: {:?}",
ctx.data().pid(),
signal
);
if let Err(err) = handler.call(ctx, signal as i32) {
match err.downcast::<WasiError>() {
Ok(wasi_err) => {
warn!(
"wasi[{}]::signal handler wasi error - {}",
ctx.data().pid(),
wasi_err
);
return Err(wasi_err);
}
Err(runtime_err) => {
warn!(
"wasi[{}]::signal handler runtime error - {}",
ctx.data().pid(),
runtime_err
);
return Err(WasiError::Exit(Errno::Intr as ExitCode));
}
for signal in signals {
tracing::trace!(
"wasi[{}]::processing-signal: {:?}",
ctx.data().pid(),
signal
);
if let Err(err) = handler.call(ctx, signal as i32) {
match err.downcast::<WasiError>() {
Ok(wasi_err) => {
warn!(
"wasi[{}]::signal handler wasi error - {}",
ctx.data().pid(),
wasi_err
);
return Err(wasi_err);
}
Err(runtime_err) => {
warn!(
"wasi[{}]::signal handler runtime error - {}",
ctx.data().pid(),
runtime_err
);
return Err(WasiError::Exit(Errno::Intr as ExitCode));
}
}
}
}
Ok(true)
} else {
Ok(false)
}
Ok(Ok(true))
}
/// Returns an exit code if the thread or process has been forced to exit

View File

@@ -25,7 +25,7 @@ use std::{
cell::RefCell,
collections::HashMap,
path::Path,
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, RwLock},
sync::{Arc, Mutex, MutexGuard, RwLock},
task::Waker,
time::Duration,
};
@@ -102,9 +102,7 @@ impl WasiState {
}
pub(crate) fn fs_new_open_options(&self) -> OpenOptions {
OpenOptions::new(Box::new(WasiStateOpener {
root_fs: self.fs.root_fs.clone(),
}))
self.fs.root_fs.new_open_options()
}
}
@@ -114,7 +112,7 @@ struct WasiStateOpener {
impl FileOpener for WasiStateOpener {
fn open(
&mut self,
&self,
path: &Path,
conf: &wasmer_vfs::OpenOptionsConfig,
) -> wasmer_vfs::Result<Box<dyn VirtualFile + Send + Sync + 'static>> {
@@ -155,8 +153,7 @@ pub(crate) struct WasiStateThreading {
/// CPU efficient manner
#[derive(Debug)]
pub struct WasiFutex {
pub(crate) refcnt: AtomicU32,
pub(crate) waker: tokio::sync::broadcast::Sender<()>,
pub(crate) wakers: Vec<Waker>,
}
#[derive(Debug)]
@@ -250,7 +247,7 @@ pub struct WasiState {
// TODO: review allow...
#[allow(dead_code)]
pub(crate) threading: RwLock<WasiStateThreading>,
pub(crate) futexs: RwLock<HashMap<u64, WasiFutex>>,
pub(crate) futexs: Mutex<HashMap<u64, WasiFutex>>,
pub(crate) clock_offset: Mutex<HashMap<Snapshot0Clockid, i64>>,
pub(crate) bus: WasiBusState,
pub args: Vec<String>,

View File

@@ -6,8 +6,12 @@ use wasmer_wasi_types::wasi::{
};
use crate::{
mem_error_to_wasi, os::task::thread::WasiThread, syscalls, syscalls::types, Memory32,
MemorySize, WasiEnv, WasiError,
mem_error_to_wasi,
os::task::thread::WasiThread,
state::{PollEventBuilder, PollEventSet},
syscalls,
syscalls::types,
Memory32, MemorySize, WasiEnv, WasiError,
};
/// Wrapper around `syscalls::fd_filestat_get` with extra logic to handle the size
@@ -141,7 +145,11 @@ pub fn poll_oneoff(
let in_origs = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
let in_origs = wasi_try_mem_ok!(in_origs.read_to_vec());
for in_orig in in_origs {
subscriptions.push(Into::<Subscription>::into(in_orig));
subscriptions.push((
None,
PollEventSet::default(),
Into::<Subscription>::into(in_orig),
));
}
// make the call

View File

@@ -19,12 +19,14 @@ pub mod windows;
pub mod wasi;
pub mod wasix;
use bytes::{Buf, BufMut};
use futures::Future;
pub use wasi::*;
pub use wasix::*;
pub mod legacy;
use std::mem::MaybeUninit;
pub(crate) use std::{
borrow::{Borrow, Cow},
cell::RefCell,
@@ -158,6 +160,60 @@ pub(crate) fn write_bytes<T: Write, M: MemorySize>(
result
}
pub(crate) fn copy_to_slice<M: MemorySize>(
memory: &MemoryView,
iovs_arr_cell: WasmSlice<__wasi_ciovec_t<M>>,
mut write_loc: &mut [MaybeUninit<u8>],
) -> Result<usize, Errno> {
let mut bytes_written = 0usize;
for iov in iovs_arr_cell.iter() {
let iov_inner = iov.read().map_err(mem_error_to_wasi)?;
let amt = from_offset::<M>(iov_inner.buf_len)?;
let (left, right) = write_loc.split_at_mut(amt);
let bytes = WasmPtr::<u8, M>::new(iov_inner.buf)
.slice(memory, iov_inner.buf_len)
.map_err(mem_error_to_wasi)?;
if amt != bytes.copy_to_slice(left).map_err(mem_error_to_wasi)? {
return Err(Errno::Fault);
}
write_loc = right;
bytes_written += amt;
}
Ok(bytes_written)
}
pub(crate) fn copy_from_slice<M: MemorySize>(
mut read_loc: &[u8],
memory: &MemoryView,
iovs_arr: WasmSlice<__wasi_iovec_t<M>>,
) -> Result<usize, Errno> {
let mut bytes_read = 0usize;
for iov in iovs_arr.iter() {
let iov_inner = iov.read().map_err(mem_error_to_wasi)?;
let to_read = from_offset::<M>(iov_inner.buf_len)?;
let to_read = to_read.min(read_loc.len());
if to_read == 0 {
break;
}
let (left, right) = read_loc.split_at(to_read);
let buf = WasmPtr::<u8, M>::new(iov_inner.buf)
.slice(memory, to_read.try_into().map_err(|_| Errno::Overflow)?)
.map_err(mem_error_to_wasi)?;
buf.write_slice(left).map_err(mem_error_to_wasi)?;
read_loc = right;
bytes_read += to_read;
}
Ok(bytes_read)
}
pub(crate) fn read_bytes<T: Read, M: MemorySize>(
mut reader: T,
memory: &MemoryView,
@@ -167,7 +223,7 @@ pub(crate) fn read_bytes<T: Read, M: MemorySize>(
// We allocate the raw_bytes first once instead of
// N times in the loop.
let mut raw_bytes: Vec<u8> = vec![0; 1024];
let mut raw_bytes: Vec<u8> = vec![0; 10240];
for iov in iovs_arr.iter() {
let iov_inner = iov.read().map_err(mem_error_to_wasi)?;
@@ -221,35 +277,17 @@ where
return Err(WasiError::Exit(exit_code));
}
// Fast path (inline synchronous)
let pinned_work = {
let _guard = env.tasks.runtime_enter();
let waker = WasiDummyWaker.into_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned_work = Box::pin(work);
if let Poll::Ready(i) = pinned_work.as_mut().poll(&mut cx) {
return Ok(i);
}
pinned_work
};
// Slow path (will may put the thread to sleep)
//let mut env = ctx.data();
let tasks = env.tasks.clone();
// Create the timeout
let mut nonblocking = false;
if timeout == Some(Duration::ZERO) {
nonblocking = true;
}
let timeout = {
let tasks_inner = tasks.clone();
let tasks_inner = env.tasks.clone();
async move {
if let Some(timeout) = timeout {
if !nonblocking {
tasks_inner
.sleep_now(current_caller_id(), timeout.as_millis())
.await
tasks_inner.sleep_now(timeout).await
} else {
InfiniteSleep::default().await
}
@@ -259,51 +297,148 @@ where
}
};
let mut signaler = {
let signals = env.thread.signals().lock().unwrap();
let signaler = signals.1.subscribe();
if !signals.0.is_empty() {
drop(signals);
match WasiEnv::process_signals(ctx)? {
Err(err) => return Ok(Err(err)),
Ok(processed) if processed => return Ok(Err(Errno::Intr)),
Ok(_) => {}
// This poller will process any signals when the main working function is idle
struct WorkWithSignalPoller<'a, 'b, Fut, T>
where
Fut: Future<Output = Result<T, Errno>>,
{
ctx: &'a mut FunctionEnvMut<'b, WasiEnv>,
pinned_work: Pin<Box<Fut>>,
}
impl<'a, 'b, Fut, T> Future for WorkWithSignalPoller<'a, 'b, Fut, T>
where
Fut: Future<Output = Result<T, Errno>>,
{
type Output = Result<Fut::Output, WasiError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(res) = Pin::new(&mut self.pinned_work).poll(cx) {
return Poll::Ready(Ok(res));
}
env = ctx.data();
if let Some(exit_code) = self.ctx.data().should_exit() {
return Poll::Ready(Err(WasiError::Exit(exit_code)));
}
if let Some(signals) = self.ctx.data().thread.pop_signals_or_subscribe(cx.waker()) {
if let Err(err) = WasiEnv::process_signals_internal(self.ctx, signals) {
return Poll::Ready(Err(err));
}
return Poll::Ready(Ok(Err(Errno::Intr)));
}
Poll::Pending
}
signaler
};
}
// Define the work function
let work = async move {
let tasks = env.tasks.clone();
let mut pinned_work = Box::pin(work);
let work = async {
Ok(tokio::select! {
// The main work we are doing
ret = pinned_work => ret,
// If a signaler is triggered then we interrupt the main process
_ = signaler.recv() => {
WasiEnv::process_signals(ctx)?;
Err(Errno::Intr)
},
res = WorkWithSignalPoller { ctx, pinned_work } => res?,
// Optional timeout
_ = timeout => Err(Errno::Timedout),
})
};
// If we are in nonblocking mode then we register a fake waker
// and poll then return immediately with a timeout if nothing happened
// Fast path
if nonblocking {
let waker = WasiDummyWaker.into_waker();
let mut cx = Context::from_waker(&waker);
let _guard = tasks.runtime_enter();
let mut pinned_work = Box::pin(work);
if let Poll::Ready(res) = pinned_work.as_mut().poll(&mut cx) {
res
} else {
Ok(Err(Errno::Again))
return res;
}
} else {
// Block on the work and process process
tasks.block_on(work)
return Ok(Err(Errno::Again));
}
// Slow path, block on the work and process process
tasks.block_on(work)
}
/// Asyncify takes the current thread and blocks on the async runtime associated with it
/// thus allowed for asynchronous operations to execute. It has built in functionality
/// to (optionally) timeout the IO, force exit the process, callback signals and pump
/// synchronous IO engine
pub(crate) fn __asyncify_light<'a, T, Fut>(
env: &'a WasiEnv,
timeout: Option<Duration>,
work: Fut,
) -> Result<Result<T, Errno>, WasiError>
where
T: 'static,
Fut: std::future::Future<Output = Result<T, Errno>>,
{
// Create the timeout
let mut nonblocking = false;
if timeout == Some(Duration::ZERO) {
nonblocking = true;
}
let timeout = {
async {
if let Some(timeout) = timeout {
if !nonblocking {
env.tasks.sleep_now(timeout).await
} else {
InfiniteSleep::default().await
}
} else {
InfiniteSleep::default().await
}
}
};
// This poller will process any signals when the main working function is idle
struct WorkWithSignalPoller<'a, Fut, T>
where
Fut: Future<Output = Result<T, Errno>>,
{
env: &'a WasiEnv,
pinned_work: Pin<Box<Fut>>,
}
impl<'a, Fut, T> Future for WorkWithSignalPoller<'a, Fut, T>
where
Fut: Future<Output = Result<T, Errno>>,
{
type Output = Result<Fut::Output, WasiError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(res) = Pin::new(&mut self.pinned_work).poll(cx) {
return Poll::Ready(Ok(res));
}
if let Some(exit_code) = self.env.should_exit() {
return Poll::Ready(Err(WasiError::Exit(exit_code)));
}
if let Some(signals) = self.env.thread.pop_signals_or_subscribe(cx.waker()) {
return Poll::Ready(Ok(Err(Errno::Intr)));
}
Poll::Pending
}
}
// Define the work function
let mut pinned_work = Box::pin(work);
let work = async move {
Ok(tokio::select! {
// The main work we are doing
res = WorkWithSignalPoller { env, pinned_work } => res?,
// Optional timeout
_ = timeout => Err(Errno::Timedout),
})
};
// Fast path
if nonblocking {
let waker = WasiDummyWaker.into_waker();
let mut cx = Context::from_waker(&waker);
let _guard = env.tasks.runtime_enter();
let mut pinned_work = Box::pin(work);
if let Poll::Ready(res) = pinned_work.as_mut().poll(&mut cx) {
return res;
}
return Ok(Err(Errno::Again));
}
// Slow path, block on the work and process process
env.tasks.block_on(work)
}
// This should be compiled away, it will simply wait forever however its never
@@ -318,22 +453,18 @@ impl std::future::Future for InfiniteSleep {
}
}
/// Performs an immuatble operation on the socket while running in an asynchronous runtime
/// Performs an immutable operation on the socket while running in an asynchronous runtime
/// This has built in signal support
pub(crate) fn __sock_actor<T, F, Fut>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
pub(crate) fn __sock_asyncify<'a, T, F, Fut>(
env: &'a WasiEnv,
sock: WasiFd,
rights: Rights,
actor: F,
) -> Result<T, Errno>
where
T: 'static,
F: FnOnce(crate::net::socket::InodeSocket) -> Fut + 'static,
F: FnOnce(crate::net::socket::InodeSocket, Fd) -> Fut,
Fut: std::future::Future<Output = Result<T, Errno>>,
{
let env = ctx.data();
let tasks = env.tasks.clone();
let state = env.state.clone();
let inodes = state.inodes.clone();
@@ -356,7 +487,7 @@ where
drop(guard);
// Start the work using the socket
actor(socket)
actor(socket, fd_entry)
}
_ => {
return Err(Errno::Notsock);
@@ -365,21 +496,20 @@ where
};
// Block on the work and process it
tasks.block_on(work)
env.tasks.block_on(work)
}
/// Performs mutable work on a socket under an asynchronous runtime with
/// built in signal processing
pub(crate) fn __sock_actor_mut<'a, T, F, Fut>(
ctx: &'a mut FunctionEnvMut<'_, WasiEnv>,
pub(crate) fn __sock_asyncify_mut<T, F, Fut>(
ctx: &'_ mut FunctionEnvMut<'_, WasiEnv>,
sock: WasiFd,
rights: Rights,
actor: F,
) -> Result<T, Errno>
where
T: 'static,
F: FnOnce(crate::net::socket::InodeSocket) -> Fut,
Fut: std::future::Future<Output = Result<T, Errno>> + 'a,
F: FnOnce(crate::net::socket::InodeSocket, Fd) -> Fut,
Fut: std::future::Future<Output = Result<T, Errno>>,
{
let env = ctx.data();
let tasks = env.tasks.clone();
@@ -392,27 +522,109 @@ where
return Err(Errno::Access);
}
let tasks = env.tasks.clone();
{
let inode_idx = fd_entry.inode;
let inodes_guard = inodes.read().unwrap();
let inode = &inodes_guard.arena[inode_idx];
let mut guard = inode.write();
match guard.deref_mut() {
Kind::Socket { socket } => {
// Clone the socket and release the lock
let socket = socket.clone();
drop(guard);
drop(inodes_guard);
let inode_idx = fd_entry.inode;
let inodes_guard = inodes.read().unwrap();
let inode = &inodes_guard.arena[inode_idx];
let mut guard = inode.write();
match guard.deref_mut() {
Kind::Socket { socket } => {
// Clone the socket and release the lock
let socket = socket.clone();
drop(guard);
drop(inodes_guard);
// Start the work using the socket
let work = actor(socket);
// Start the work using the socket
let work = actor(socket, fd_entry);
// Block on the work and process it
tasks.block_on(work)
}
_ => Err(Errno::Notsock),
// Block on the work and process it
tasks.block_on(work)
}
_ => Err(Errno::Notsock),
}
}
/// Performs an immutable operation on the socket while running in an asynchronous runtime
/// This has built in signal support
pub(crate) fn __sock_actor<T, F>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
sock: WasiFd,
rights: Rights,
actor: F,
) -> Result<T, Errno>
where
T: 'static,
F: FnOnce(crate::net::socket::InodeSocket, Fd) -> Result<T, Errno>,
{
let env = ctx.data();
let tasks = env.tasks.clone();
let state = env.state.clone();
let inodes = state.inodes.clone();
let fd_entry = state.fs.get_fd(sock)?;
if !rights.is_empty() && !fd_entry.rights.contains(rights) {
return Err(Errno::Access);
}
let inodes_guard = inodes.read().unwrap();
let inode_idx = fd_entry.inode;
let inode = &inodes_guard.arena[inode_idx];
let tasks = env.tasks.clone();
let mut guard = inode.read();
match guard.deref() {
Kind::Socket { socket } => {
// Clone the socket and release the lock
let socket = socket.clone();
drop(guard);
// Start the work using the socket
actor(socket, fd_entry)
}
_ => {
return Err(Errno::Notsock);
}
}
}
/// Performs mutable work on a socket under an asynchronous runtime with
/// built in signal processing
pub(crate) fn __sock_actor_mut<'a, T, F>(
ctx: &'a mut FunctionEnvMut<'_, WasiEnv>,
sock: WasiFd,
rights: Rights,
actor: F,
) -> Result<T, Errno>
where
T: 'static,
F: FnOnce(crate::net::socket::InodeSocket, Fd) -> Result<T, Errno>,
{
let env = ctx.data();
let tasks = env.tasks.clone();
let state = env.state.clone();
let inodes = state.inodes.clone();
let fd_entry = state.fs.get_fd(sock)?;
if !rights.is_empty() && !fd_entry.rights.contains(rights) {
return Err(Errno::Access);
}
let inode_idx = fd_entry.inode;
let inodes_guard = inodes.read().unwrap();
let inode = &inodes_guard.arena[inode_idx];
let mut guard = inode.write();
match guard.deref_mut() {
Kind::Socket { socket } => {
// Clone the socket and release the lock
let socket = socket.clone();
drop(guard);
drop(inodes_guard);
// Start the work using the socket
actor(socket, fd_entry)
}
_ => Err(Errno::Notsock),
}
}

View File

@@ -36,7 +36,7 @@ pub fn fd_close(mut ctx: FunctionEnvMut<'_, WasiEnv>, fd: WasiFd) -> Result<Errn
drop(inodes);
__asyncify(&mut ctx, None, async move {
socket.close().await.map(|()| Errno::Success)
socket.close().map(|()| Errno::Success)
})?
.unwrap_or_else(|a| a)
}

View File

@@ -38,26 +38,6 @@ pub fn fd_fdstat_set_flags(
);
return Ok(Errno::Access);
}
let mut guard = inodes.arena[inode].write();
if let Kind::Socket { socket } = guard.deref_mut() {
let nonblocking = flags.contains(Fdflags::NONBLOCK);
debug!(
"wasi[{}:{}]::socket(fd={}) nonblocking={}",
ctx.data().pid(),
ctx.data().tid(),
fd,
nonblocking
);
let socket = socket.clone();
drop(guard);
drop(fd_map);
drop(inodes);
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
socket.set_nonblocking(nonblocking).await
})?)
}
}
let env = ctx.data();

View File

@@ -123,8 +123,8 @@ fn fd_read_internal<M: MemorySize>(
return Ok(Errno::Access);
}
let is_non_blocking = fd_entry.flags.contains(Fdflags::NONBLOCK);
let inode_idx = fd_entry.inode;
let fd_flags = fd_entry.flags;
let max_size = {
let memory = env.memory_view(&ctx);
@@ -152,7 +152,7 @@ fn fd_read_internal<M: MemorySize>(
let data = wasi_try_ok!(__asyncify(
&mut ctx,
if is_non_blocking {
if fd_flags.contains(Fdflags::NONBLOCK) {
Some(Duration::ZERO)
} else {
None
@@ -206,14 +206,30 @@ fn fd_read_internal<M: MemorySize>(
drop(guard);
drop(inodes);
let tasks = env.tasks.clone();
let res = __asyncify(
&mut ctx,
if is_non_blocking {
if fd_flags.contains(Fdflags::NONBLOCK) {
Some(Duration::ZERO)
} else {
None
},
async move { socket.recv(max_size).await },
async {
let mut buf = Vec::with_capacity(max_size);
unsafe {
buf.set_len(max_size);
}
socket
.recv(tasks.deref(), &mut buf, fd_flags)
.await
.map(|amt| {
unsafe {
buf.set_len(amt);
}
let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
buf
})
},
)?
.map_err(|err| match err {
Errno::Timedout => Errno::Again,
@@ -244,7 +260,7 @@ fn fd_read_internal<M: MemorySize>(
let data = wasi_try_ok!(__asyncify(
&mut ctx,
if is_non_blocking {
if fd_flags.contains(Fdflags::NONBLOCK) {
Some(Duration::ZERO)
} else {
None
@@ -317,7 +333,7 @@ fn fd_read_internal<M: MemorySize>(
}
// If its none blocking then exit
if is_non_blocking {
if fd_flags.contains(Fdflags::NONBLOCK) {
return Ok(Errno::Again);
}

View File

@@ -112,7 +112,7 @@ fn fd_write_internal<M: MemorySize>(
return Ok(Errno::Access);
}
let is_non_blocking = fd_entry.flags.contains(Fdflags::NONBLOCK);
let fd_flags = fd_entry.flags;
let inode_idx = fd_entry.inode;
let (bytes_written, can_update_cursor) = {
@@ -138,12 +138,12 @@ fn fd_write_internal<M: MemorySize>(
let written = wasi_try_ok!(__asyncify(
&mut ctx,
if is_non_blocking {
if fd_entry.flags.contains(Fdflags::NONBLOCK) {
Some(Duration::ZERO)
} else {
None
},
async move {
async {
let mut handle = handle.write().unwrap();
if !is_stdio {
handle
@@ -179,8 +179,9 @@ fn fd_write_internal<M: MemorySize>(
let mut buf = Vec::with_capacity(buf_len);
wasi_try_ok!(write_bytes(&mut buf, &memory, iovs_arr));
let tasks = env.tasks.clone();
let written = wasi_try_ok!(__asyncify(&mut ctx, None, async move {
socket.send(buf).await
socket.send(tasks.deref(), &buf, fd_flags).await
})?);
(written, false)
}
@@ -208,12 +209,13 @@ fn fd_write_internal<M: MemorySize>(
immediate,
..
} => {
let mut val = 0u64.to_ne_bytes();
let written = wasi_try_ok!(write_bytes(&mut val[..], &memory, iovs_arr));
let mut val: [MaybeUninit<u8>; 8] =
unsafe { MaybeUninit::uninit().assume_init() };
let written = wasi_try_ok!(copy_to_slice(&memory, iovs_arr, &mut val[..]));
if written != val.len() {
return Ok(Errno::Inval);
}
let val = u64::from_ne_bytes(val);
let val = u64::from_ne_bytes(unsafe { std::mem::transmute(val) });
counter.fetch_add(val, Ordering::AcqRel);
{

View File

@@ -3,6 +3,7 @@ use wasmer_wasi_types::wasi::SubscriptionClock;
use super::*;
use crate::{
fs::{InodeValFilePollGuard, InodeValFilePollGuardJoin},
state::PollEventSet,
syscalls::*,
};
@@ -30,11 +31,11 @@ pub fn poll_oneoff<M: MemorySize>(
let mut env = ctx.data();
let mut memory = env.memory_view(&ctx);
let mut subscriptions = Vec::new();
let subscription_array = wasi_try_mem_ok!(in_.slice(&memory, nsubscriptions));
let mut subscriptions = Vec::with_capacity(subscription_array.len() as usize);
for sub in subscription_array.iter() {
let s = wasi_try_mem_ok!(sub.read());
subscriptions.push(s);
subscriptions.push((None, PollEventSet::default(), s));
}
// Poll and receive all the events that triggered
@@ -70,6 +71,7 @@ pub fn poll_oneoff<M: MemorySize>(
struct PollBatch<'a> {
pid: WasiProcessId,
tid: WasiThreadId,
evts: Vec<Event>,
joins: Vec<InodeValFilePollGuardJoin<'a>>,
}
impl<'a> PollBatch<'a> {
@@ -77,6 +79,7 @@ impl<'a> PollBatch<'a> {
Self {
pid,
tid,
evts: Vec::new(),
joins: fds.iter_mut().map(InodeValFilePollGuardJoin::new).collect(),
}
}
@@ -94,24 +97,22 @@ impl<'a> Future for PollBatch<'a> {
let mut guard = Pin::new(join);
match guard.poll(cx) {
Poll::Pending => {}
Poll::Ready(mut res) => {
for evt in res.iter() {
tracing::trace!(
"wasi[{}:{}]::poll_oneoff triggered_fd (fd={}, userdata={}, type={:?})",
pid,
tid,
fd,
evt.userdata,
evt.type_,
);
}
evts.append(&mut res);
Poll::Ready(evt) => {
tracing::trace!(
"wasi[{}:{}]::poll_oneoff triggered_fd (fd={}, userdata={}, type={:?})",
pid,
tid,
fd,
evt.userdata,
evt.type_,
);
evts.push(evt);
done = true;
}
}
}
if done {
if !evts.is_empty() {
return Poll::Ready(Ok(evts));
}
@@ -133,7 +134,7 @@ impl<'a> Future for PollBatch<'a> {
/// The number of events seen
pub(crate) fn poll_oneoff_internal(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
subs: Vec<Subscription>,
mut subs: Vec<(Option<WasiFd>, PollEventSet, Subscription)>,
) -> Result<Result<Vec<Event>, Errno>, WasiError> {
let pid = ctx.data().pid();
let tid = ctx.data().tid();
@@ -150,16 +151,17 @@ pub(crate) fn poll_oneoff_internal(
// These are used when we capture what clocks (timeouts) are being
// subscribed too
let mut clock_subs: Vec<(SubscriptionClock, u64)> = vec![];
let clock_cnt = subs
.iter()
.filter(|a| a.2.type_ == Eventtype::Clock)
.count();
let mut clock_subs: Vec<(SubscriptionClock, u64)> = Vec::with_capacity(subs.len());
let mut time_to_sleep = None;
// First we extract all the subscriptions into an array so that they
// can be processed
let mut memory = env.memory_view(&ctx);
let mut subscriptions = HashMap::new();
for s in subs {
let mut peb = PollEventBuilder::new();
let mut in_events = HashMap::new();
for (fd, peb, s) in subs.iter_mut() {
let fd = match s.type_ {
Eventtype::FdRead => {
let file_descriptor = unsafe { s.data.fd_readwrite.file_descriptor };
@@ -175,7 +177,8 @@ pub(crate) fn poll_oneoff_internal(
}
}
}
in_events.insert(peb.add(PollEvent::PollIn).build(), s);
*fd = Some(file_descriptor);
*peb = *peb | (PollEvent::PollIn as PollEventSet);
file_descriptor
}
Eventtype::FdWrite => {
@@ -192,7 +195,8 @@ pub(crate) fn poll_oneoff_internal(
}
}
}
in_events.insert(peb.add(PollEvent::PollOut).build(), s);
*fd = Some(file_descriptor);
*peb = *peb | (PollEvent::PollOut as PollEventSet);
file_descriptor
}
Eventtype::Clock => {
@@ -232,11 +236,6 @@ pub(crate) fn poll_oneoff_internal(
}
}
};
let entry = subscriptions
.entry(fd)
.or_insert_with(HashMap::<state::PollEventSet, Subscription>::default);
entry.extend(in_events.into_iter());
}
let mut events_seen: u32 = 0;
@@ -250,56 +249,58 @@ pub(crate) fn poll_oneoff_internal(
// and open a read lock on them all
let inodes = state.inodes.clone();
let inodes = inodes.read().unwrap();
let mut fd_guards = vec![];
let mut fd_guards = Vec::with_capacity(subs.len());
#[allow(clippy::significant_drop_in_scrutinee)]
for (fd, in_events) in subscriptions {
let wasi_file_ref = match fd {
__WASI_STDERR_FILENO => {
wasi_try_ok_ok!(inodes
.stderr(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, in_events))
.map_err(fs_error_into_wasi_err))
}
__WASI_STDIN_FILENO => {
wasi_try_ok_ok!(inodes
.stdin(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, in_events))
.map_err(fs_error_into_wasi_err))
}
__WASI_STDOUT_FILENO => {
wasi_try_ok_ok!(inodes
.stdout(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, in_events))
.map_err(fs_error_into_wasi_err))
}
_ => {
let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
if !fd_entry.rights.contains(Rights::POLL_FD_READWRITE) {
return Ok(Err(Errno::Access));
for (fd, peb, s) in subs {
if let Some(fd) = fd {
let wasi_file_ref = match fd {
__WASI_STDERR_FILENO => {
wasi_try_ok_ok!(inodes
.stderr(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, peb, s))
.map_err(fs_error_into_wasi_err))
}
let inode = fd_entry.inode;
__WASI_STDIN_FILENO => {
wasi_try_ok_ok!(inodes
.stdin(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, peb, s))
.map_err(fs_error_into_wasi_err))
}
__WASI_STDOUT_FILENO => {
wasi_try_ok_ok!(inodes
.stdout(&state.fs.fd_map)
.map(|g| g.into_poll_guard(fd, peb, s))
.map_err(fs_error_into_wasi_err))
}
_ => {
let fd_entry = wasi_try_ok_ok!(state.fs.get_fd(fd));
if !fd_entry.rights.contains(Rights::POLL_FD_READWRITE) {
return Ok(Err(Errno::Access));
}
let inode = fd_entry.inode;
{
let guard = inodes.arena[inode].read();
if let Some(guard) =
crate::fs::InodeValFilePollGuard::new(fd, guard.deref(), in_events)
{
guard
} else {
return Ok(Err(Errno::Badf));
let guard = inodes.arena[inode].read();
if let Some(guard) =
crate::fs::InodeValFilePollGuard::new(fd, peb, s, guard.deref())
{
guard
} else {
return Ok(Err(Errno::Badf));
}
}
}
}
};
tracing::trace!(
"wasi[{}:{}]::poll_oneoff wait_for_fd={} type={:?}",
pid,
tid,
fd,
wasi_file_ref
);
fd_guards.push(wasi_file_ref);
};
tracing::trace!(
"wasi[{}:{}]::poll_oneoff wait_for_fd={} type={:?}",
pid,
tid,
fd,
wasi_file_ref
);
fd_guards.push(wasi_file_ref);
}
}
fd_guards

View File

@@ -1,6 +1,62 @@
use std::task::Waker;
use super::*;
use crate::syscalls::*;
struct FutexPoller<'a, M>
where
M: MemorySize,
{
env: &'a WasiEnv,
view: MemoryView<'a>,
futex_idx: u64,
futex_ptr: WasmPtr<u32, M>,
expected: u32,
}
impl<'a, M> Future for FutexPoller<'a, M>
where
M: MemorySize,
{
type Output = Result<(), Errno>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let waker = cx.waker();
let mut guard = self.env.state.futexs.lock().unwrap();
{
let val = match self.futex_ptr.read(&self.view) {
Ok(a) => a,
Err(err) => return Poll::Ready(Err(mem_error_to_wasi(err))),
};
if val != self.expected {
return Poll::Ready(Ok(()));
}
}
let futex = guard
.entry(self.futex_idx)
.or_insert_with(|| WasiFutex { wakers: vec![] });
if futex.wakers.iter().any(|w| w.will_wake(waker)) == false {
futex.wakers.push(waker.clone());
}
Poll::Pending
}
}
impl<'a, M> Drop for FutexPoller<'a, M>
where
M: MemorySize,
{
fn drop(&mut self) {
let futex = {
let mut guard = self.env.state.futexs.lock().unwrap();
guard.remove(&self.futex_idx)
};
if let Some(futex) = futex {
futex.wakers.into_iter().for_each(|w| w.wake());
}
}
}
/// Wait for a futex_wake operation to wake us.
/// Returns with EINVAL if the futex doesn't hold the expected value.
/// Returns false on timeout, and true in all other cases.
@@ -29,7 +85,7 @@ pub fn futex_wait<M: MemorySize>(
let mut env = ctx.data();
let state = env.state.clone();
let pointer: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
let futex_idx: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
// Determine the timeout
let timeout = {
@@ -37,74 +93,36 @@ pub fn futex_wait<M: MemorySize>(
wasi_try_mem_ok!(timeout.read(&memory))
};
let timeout = match timeout.tag {
OptionTag::Some => Some(timeout.u as u128),
OptionTag::Some => Some(Duration::from_nanos(timeout.u as u64)),
_ => None,
};
// Loop until we either hit a yield error or the futex is woken
let mut woken = Bool::False;
let start = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1).unwrap() as u128;
loop {
// Register the waiting futex (if its not already registered)
let mut rx = {
use std::collections::hash_map::Entry;
let mut guard = state.futexs.write().unwrap();
guard.entry(pointer).or_insert_with(|| WasiFutex {
refcnt: AtomicU32::new(1),
waker: tokio::sync::broadcast::channel(1).0,
});
let futex = guard.get_mut(&pointer).unwrap();
// Create a poller which will register ourselves against
// this futex event and check when it has changed
let view = env.memory_view(&ctx);
let poller = FutexPoller {
env,
view,
futex_idx,
futex_ptr,
expected,
};
// If the value of the memory is no longer the expected value
// then terminate from the loop (we do this under a futex lock
// so that its protected)
let rx = futex.waker.subscribe();
{
let view = env.memory_view(&ctx);
let val = wasi_try_mem_ok!(futex_ptr.read(&view));
if val != expected {
woken = Bool::True;
break;
}
}
rx
};
// Wait for the futex to trigger or a timeout to occur
let res = __asyncify_light(env, timeout, poller)?;
// Check if we have timed out
let mut sub_timeout = None;
if let Some(timeout) = timeout.as_ref() {
let now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1).unwrap() as u128;
let delta = now.saturating_sub(start);
if delta >= *timeout {
break;
}
let remaining = *timeout - delta;
sub_timeout = Some(Duration::from_nanos(remaining as u64));
// Process it and return the result
let mut ret = Errno::Success;
let woken = match res {
Err(Errno::Timedout) => Bool::False,
Err(err) => {
ret = err;
Bool::True
}
//sub_timeout.replace(sub_timeout.map(|a| a.min(Duration::from_millis(10))).unwrap_or(Duration::from_millis(10)));
// Now wait for it to be triggered
__asyncify(&mut ctx, sub_timeout, async move {
rx.recv().await.ok();
Ok(())
})?;
env = ctx.data();
}
// Drop the reference count to the futex (and remove it if the refcnt hits zero)
{
let mut guard = state.futexs.write().unwrap();
if guard
.get(&pointer)
.map(|futex| futex.refcnt.fetch_sub(1, Ordering::AcqRel) == 1)
.unwrap_or(false)
{
guard.remove(&pointer);
}
}
Ok(_) => Bool::True,
};
let memory = env.memory_view(&ctx);
wasi_try_mem_ok!(ret_woken.write(&memory, woken));
Ok(Errno::Success)
let mut env = ctx.data();
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
Ok(ret)
}

View File

@@ -20,10 +20,19 @@ pub fn futex_wake<M: MemorySize>(
let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
let mut woken = false;
let mut guard = state.futexs.read().unwrap();
if let Some(futex) = guard.get(&pointer) {
woken = futex.waker.receiver_count() > 0;
let _ = futex.waker.send(());
let woken = {
let mut guard = state.futexs.lock().unwrap();
if let Some(futex) = guard.get_mut(&pointer) {
futex.wakers.pop().map(|w| w.wake());
if futex.wakers.is_empty() {
guard.remove(&pointer);
}
true
} else {
false
}
};
if woken {
trace!(
%woken,
"wasi[{}:{}]::futex_wake(offset={})",

View File

@@ -18,20 +18,26 @@ pub fn futex_wake_all<M: MemorySize>(
let pointer: u64 = wasi_try!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));
let mut woken = false;
let mut guard = state.futexs.read().unwrap();
if let Some(futex) = guard.get(&pointer) {
woken = futex.waker.receiver_count() > 0;
let _ = futex.waker.send(());
let woken = {
let mut guard = state.futexs.lock().unwrap();
if let Some(futex) = guard.remove(&pointer) {
futex.wakers.into_iter().for_each(|w| w.wake());
true
} else {
false
}
};
if woken {
trace!(
%woken,
"wasi[{}:{}]::futex_wake_all(offset={})",
"wasi[{}:{}]::futex_wake(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()
);
} else {
trace!(
"wasi[{}:{}]::futex_wake_all(offset={}) - nothing waiting",
"wasi[{}:{}]::futex_wake(offset={}) - nothing waiting",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()

View File

@@ -20,9 +20,8 @@ pub fn port_addr_add<M: MemorySize>(
let memory = env.memory_view(&ctx);
let cidr = wasi_try_ok!(crate::net::read_cidr(&memory, ip));
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.ip_add(cidr.ip, cidr.prefix)
.await
.map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)

View File

@@ -11,8 +11,8 @@ pub fn port_addr_clear(mut ctx: FunctionEnvMut<'_, WasiEnv>) -> Result<Errno, Wa
);
let env = ctx.data();
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.ip_clear().await.map_err(net_error_into_wasi_err)
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.ip_clear().map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)
}

View File

@@ -30,8 +30,8 @@ pub fn port_addr_list<M: MemorySize>(
let max_addrs: u64 = wasi_try_ok!(max_addrs.try_into().map_err(|_| Errno::Overflow));
let net = env.net();
let addrs = wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.ip_list().await.map_err(net_error_into_wasi_err)
let addrs = wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.ip_list().map_err(net_error_into_wasi_err)
})?);
let env = ctx.data();
let memory = env.memory_view(&ctx);

View File

@@ -20,8 +20,8 @@ pub fn port_addr_remove<M: MemorySize>(
let memory = env.memory_view(&ctx);
let ip = wasi_try_ok!(crate::net::read_ip(&memory, ip));
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.ip_remove(ip).await.map_err(net_error_into_wasi_err)
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.ip_remove(ip).map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)
}

View File

@@ -21,8 +21,8 @@ pub fn port_gateway_set<M: MemorySize>(
let ip = wasi_try_ok!(crate::net::read_ip(&memory, ip));
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.gateway_set(ip).await.map_err(net_error_into_wasi_err)
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.gateway_set(ip).map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)
}

View File

@@ -12,8 +12,8 @@ pub fn port_mac<M: MemorySize>(
let mut memory = env.memory_view(&ctx);
let net = env.net();
let mac = wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.mac().await.map_err(net_error_into_wasi_err)
let mac = wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.mac().map_err(net_error_into_wasi_err)
})?);
let env = ctx.data();
let memory = env.memory_view(&ctx);

View File

@@ -33,9 +33,8 @@ pub fn port_route_add<M: MemorySize>(
};
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.route_add(cidr, via_router, preferred_until, expires_at)
.await
.map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)

View File

@@ -11,8 +11,8 @@ pub fn port_route_clear(mut ctx: FunctionEnvMut<'_, WasiEnv>) -> Result<Errno, W
);
let env = ctx.data();
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.route_clear().await.map_err(net_error_into_wasi_err)
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.route_clear().map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)
}

View File

@@ -30,8 +30,8 @@ pub fn port_route_list<M: MemorySize>(
wasi_try_mem_ok!(routes_ptr.slice(&memory, wasi_try_ok!(to_offset::<M>(max_routes))));
let net = env.net();
let routes = wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.route_list().await.map_err(net_error_into_wasi_err)
let routes = wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.route_list().map_err(net_error_into_wasi_err)
})?);
let env = ctx.data();
let memory = env.memory_view(&ctx);

View File

@@ -17,8 +17,8 @@ pub fn port_route_remove<M: MemorySize>(
let ip = wasi_try_ok!(crate::net::read_ip(&memory, ip));
let net = env.net();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
net.route_remove(ip).await.map_err(net_error_into_wasi_err)
wasi_try_ok!(__asyncify(&mut ctx, None, async {
net.route_remove(ip).map_err(net_error_into_wasi_err)
})?);
Ok(Errno::Success)

View File

@@ -280,7 +280,7 @@ pub fn proc_exec<M: MemorySize>(
let tasks_inner = tasks.clone();
tasks.block_on(Box::pin(async move {
loop {
tasks_inner.sleep_now(current_caller_id(), 5).await;
tasks_inner.sleep_now(Duration::from_millis(5)).await;
if let Some(exit_code) = process.inst.exit_code() {
tx.send(exit_code).unwrap();
break;

View File

@@ -5,12 +5,5 @@ use crate::syscalls::*;
/// Yields execution of the thread
pub fn sched_yield(mut ctx: FunctionEnvMut<'_, WasiEnv>) -> Result<Errno, WasiError> {
//trace!("wasi[{}:{}]::sched_yield", ctx.data().pid(), ctx.data().tid());
let env = ctx.data();
let tasks = env.tasks.clone();
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
tasks.sleep_now(current_caller_id(), 0).await;
Ok(())
})?);
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
Ok(Errno::Success)
thread_sleep_internal(ctx, 0)
}

View File

@@ -16,7 +16,7 @@ use crate::syscalls::*;
pub fn sock_accept<M: MemorySize>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
sock: WasiFd,
fd_flags: Fdflags,
mut fd_flags: Fdflags,
ro_fd: WasmPtr<WasiFd, M>,
ro_addr: WasmPtr<__wasi_addr_port_t, M>,
) -> Result<Errno, WasiError> {
@@ -30,28 +30,44 @@ pub fn sock_accept<M: MemorySize>(
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
let (child, addr) = wasi_try_ok!(__sock_actor(
&mut ctx,
let tasks = ctx.data().tasks.clone();
let (child, addr, fd_flags) = wasi_try_ok!(__sock_asyncify(
ctx.data(),
sock,
Rights::SOCK_ACCEPT,
move |socket| async move { socket.accept(fd_flags).await }
move |socket, fd| async move {
if fd.flags.contains(Fdflags::NONBLOCK) {
fd_flags.set(Fdflags::NONBLOCK, true);
}
socket
.accept(tasks.deref(), fd_flags)
.await
.map(|a| (a.0, a.1, fd_flags))
}
));
let env = ctx.data();
let (memory, state, mut inodes) = env.get_memory_and_wasi_state_and_inodes_mut(&ctx, 0);
let kind = Kind::Socket {
socket: InodeSocket::new(InodeSocketKind::TcpStream(child)),
socket: InodeSocket::new(InodeSocketKind::TcpStream {
socket: child,
write_timeout: None,
read_timeout: None,
}),
};
let inode =
state
.fs
.create_inode_with_default_stat(inodes.deref_mut(), kind, false, "socket".into());
let mut new_flags = Fdflags::empty();
if fd_flags.contains(Fdflags::NONBLOCK) {
new_flags.set(Fdflags::NONBLOCK, true);
}
let rights = Rights::all_socket();
let fd = wasi_try_ok!(state
.fs
.create_fd(rights, rights, Fdflags::empty(), 0, inode));
let fd = wasi_try_ok!(state.fs.create_fd(rights, rights, new_flags, 0, inode));
debug!(
"wasi[{}:{}]::sock_accept (ret=ESUCCESS, peer={})",

View File

@@ -28,7 +28,7 @@ pub fn sock_addr_local<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.addr_local().await }
|socket, _| socket.addr_local()
));
let memory = ctx.data().memory_view(&ctx);
wasi_try!(crate::net::write_ip_port(

View File

@@ -28,7 +28,7 @@ pub fn sock_addr_peer<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.addr_peer().await }
|socket, _| socket.addr_peer()
));
let env = ctx.data();

View File

@@ -26,11 +26,14 @@ pub fn sock_bind<M: MemorySize>(
let addr = wasi_try!(crate::net::read_ip_port(&memory, addr));
let addr = SocketAddr::new(addr.0, addr.1);
let net = env.net();
let tasks = ctx.data().tasks.clone();
wasi_try!(__sock_upgrade(
&mut ctx,
sock,
Rights::SOCK_BIND,
move |socket| async move { socket.bind(net, addr).await }
move |socket| async move { socket.bind(tasks.deref(), net.deref(), addr).await }
));
Errno::Success
}

View File

@@ -31,11 +31,13 @@ pub fn sock_connect<M: MemorySize>(
let addr = wasi_try!(crate::net::read_ip_port(&memory, addr));
let addr = SocketAddr::new(addr.0, addr.1);
let tasks = ctx.data().tasks.clone();
wasi_try!(__sock_upgrade(
&mut ctx,
sock,
Rights::SOCK_CONNECT,
move |mut socket| async move { socket.connect(net, addr).await }
move |mut socket| async move { socket.connect(tasks.deref(), net.deref(), addr, None).await }
));
Errno::Success
}

View File

@@ -28,7 +28,7 @@ pub fn sock_get_opt_flag<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.get_opt_flag(option).await }
|socket, _| socket.get_opt_flag(option)
));
let env = ctx.data();

View File

@@ -26,16 +26,14 @@ pub fn sock_get_opt_size<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move {
match opt {
Sockoption::RecvBufSize => socket.recv_buf_size().await.map(|a| a as Filesize),
Sockoption::SendBufSize => socket.send_buf_size().await.map(|a| a as Filesize),
Sockoption::Ttl => socket.ttl().await.map(|a| a as Filesize),
Sockoption::MulticastTtlV4 => {
socket.multicast_ttl_v4().await.map(|a| a as Filesize)
}
_ => Err(Errno::Inval),
|socket, _| match opt {
Sockoption::RecvBufSize => socket.recv_buf_size().map(|a| a as Filesize),
Sockoption::SendBufSize => socket.send_buf_size().map(|a| a as Filesize),
Sockoption::Ttl => socket.ttl().map(|a| a as Filesize),
Sockoption::MulticastTtlV4 => {
socket.multicast_ttl_v4().map(|a| a as Filesize)
}
_ => Err(Errno::Inval),
}
));

View File

@@ -1,5 +1,5 @@
use super::*;
use crate::syscalls::*;
use crate::{net::socket::TimeType, syscalls::*};
/// ### `sock_get_opt_time()`
/// Retrieve one of the times on the socket
@@ -23,11 +23,11 @@ pub fn sock_get_opt_time<M: MemorySize>(
);
let ty = match opt {
Sockoption::RecvTimeout => wasmer_vnet::TimeType::ReadTimeout,
Sockoption::SendTimeout => wasmer_vnet::TimeType::WriteTimeout,
Sockoption::ConnectTimeout => wasmer_vnet::TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => wasmer_vnet::TimeType::AcceptTimeout,
Sockoption::Linger => wasmer_vnet::TimeType::Linger,
Sockoption::RecvTimeout => TimeType::ReadTimeout,
Sockoption::SendTimeout => TimeType::WriteTimeout,
Sockoption::ConnectTimeout => TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => TimeType::AcceptTimeout,
Sockoption::Linger => TimeType::Linger,
_ => return Errno::Inval,
};
@@ -35,7 +35,7 @@ pub fn sock_get_opt_time<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.opt_time(ty).await }
|socket, _| socket.opt_time(ty)
));
let env = ctx.data();

View File

@@ -30,7 +30,7 @@ pub fn sock_join_multicast_v4<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.join_multicast_v4(multiaddr, iface).await }
|socket, _| socket.join_multicast_v4(multiaddr, iface)
));
Errno::Success
}

View File

@@ -29,7 +29,7 @@ pub fn sock_join_multicast_v6<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.join_multicast_v6(multiaddr, iface).await }
|socket, _| socket.join_multicast_v6(multiaddr, iface)
));
Errno::Success
}

View File

@@ -30,7 +30,7 @@ pub fn sock_leave_multicast_v4<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.leave_multicast_v4(multiaddr, iface).await }
|socket, _| socket.leave_multicast_v4(multiaddr, iface)
));
Errno::Success
}

View File

@@ -29,7 +29,7 @@ pub fn sock_leave_multicast_v6<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |mut socket| async move { socket.leave_multicast_v6(multiaddr, iface).await }
|mut socket, _| socket.leave_multicast_v6(multiaddr, iface)
));
Errno::Success
}

View File

@@ -28,11 +28,14 @@ pub fn sock_listen<M: MemorySize>(
let env = ctx.data();
let net = env.net();
let backlog: usize = wasi_try!(backlog.try_into().map_err(|_| Errno::Inval));
let tasks = ctx.data().tasks.clone();
wasi_try!(__sock_upgrade(
&mut ctx,
sock,
Rights::SOCK_LISTEN,
move |socket| async move { socket.listen(net, backlog).await }
|socket| async move { socket.listen(tasks.deref(), net.deref(), backlog).await }
));
Errno::Success
}

View File

@@ -42,13 +42,12 @@ pub fn sock_open<M: MemorySize>(
only_v6: false,
reuse_port: false,
reuse_addr: false,
nonblocking: false,
send_buf_size: None,
recv_buf_size: None,
send_timeout: None,
recv_timeout: None,
connect_timeout: None,
write_timeout: None,
read_timeout: None,
accept_timeout: None,
connect_timeout: None,
}),
},
_ => return Errno::Notsup,

View File

@@ -1,3 +1,5 @@
use std::mem::MaybeUninit;
use super::*;
use crate::syscalls::*;
@@ -26,10 +28,10 @@ pub fn sock_recv<M: MemorySize>(
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
let mut env = ctx.data();
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
let max_size = {
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
let mut max_size = 0usize;
for iovs in iovs_arr.iter() {
let iovs = wasi_try_mem_ok!(iovs.read());
@@ -39,23 +41,55 @@ pub fn sock_recv<M: MemorySize>(
max_size
};
let data = wasi_try_ok!(__sock_actor_mut(
&mut ctx,
sock,
Rights::SOCK_RECV,
move |socket| async move { socket.recv(max_size).await },
));
env = ctx.data();
let bytes_read = {
if max_size <= 10240 {
let mut buf: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
let writer = &mut buf[..max_size];
let amt = wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_RECV,
|socket, fd| async move { socket.recv(env.tasks.deref(), writer, fd.flags).await },
));
let memory = env.memory_view(&ctx);
if amt > 0 {
let buf: &[MaybeUninit<u8>] = &buf[..amt];
let buf: &[u8] = unsafe { std::mem::transmute(buf) };
wasi_try_ok!(copy_from_slice(buf, &memory, iovs_arr).map(|_| amt))
} else {
0
}
} else {
let data = wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_RECV,
|socket, fd| async move {
let mut buf = Vec::with_capacity(max_size);
unsafe {
buf.set_len(max_size);
}
socket
.recv(env.tasks.deref(), &mut buf, fd.flags)
.await
.map(|amt| {
unsafe {
buf.set_len(amt);
}
let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
buf
})
},
));
let data_len = data.len();
let bytes_read = if data_len > 0 {
let mut reader = &data[..];
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
wasi_try_ok!(read_bytes(reader, &memory, iovs_arr).map(|_| data_len))
} else {
0
let data_len = data.len();
if data_len > 0 {
let mut reader = &data[..];
wasi_try_ok!(read_bytes(reader, &memory, iovs_arr).map(|_| data_len))
} else {
0
}
}
};
debug!(

View File

@@ -1,3 +1,5 @@
use std::mem::MaybeUninit;
use super::*;
use crate::syscalls::*;
@@ -34,10 +36,10 @@ pub fn sock_recv_from<M: MemorySize>(
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
let mut env = ctx.data();
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
let max_size = {
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
let mut max_size = 0usize;
for iovs in iovs_arr.iter() {
let iovs = wasi_try_mem_ok!(iovs.read());
@@ -47,23 +49,60 @@ pub fn sock_recv_from<M: MemorySize>(
max_size
};
let (data, peer) = wasi_try_ok!(__sock_actor_mut(
&mut ctx,
sock,
Rights::SOCK_RECV_FROM,
move |socket| async move { socket.recv_from(max_size).await }
));
env = ctx.data();
let (bytes_read, peer) = {
if max_size <= 10240 {
let mut buf: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
let writer = &mut buf[..max_size];
let (amt, peer) = wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_RECV,
|socket, fd| async move { socket.recv_from(env.tasks.deref(), writer, fd.flags).await },
));
if amt > 0 {
let buf: &[MaybeUninit<u8>] = &buf[..amt];
let buf: &[u8] = unsafe { std::mem::transmute(buf) };
wasi_try_ok!(copy_from_slice(buf, &memory, iovs_arr).map(|_| (amt, peer)))
} else {
(amt, peer)
}
} else {
let (data, peer) = wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_RECV_FROM,
|socket, fd| async move {
let mut buf = Vec::with_capacity(max_size);
unsafe {
buf.set_len(max_size);
}
socket
.recv_from(env.tasks.deref(), &mut buf, fd.flags)
.await
.map(|(amt, addr)| {
unsafe {
buf.set_len(amt);
}
let buf: Vec<u8> = unsafe { std::mem::transmute(buf) };
(buf, addr)
})
}
));
let data_len = data.len();
if data_len > 0 {
let mut reader = &data[..];
wasi_try_ok!(read_bytes(reader, &memory, iovs_arr).map(|_| (data_len, peer)))
} else {
(0, peer)
}
}
};
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(ri_data.slice(&memory, ri_data_len));
wasi_try_ok!(write_ip_port(&memory, ro_addr, peer.ip(), peer.port()));
let data_len = data.len();
let mut reader = &data[..];
let bytes_read = wasi_try_ok!(read_bytes(reader, &memory, iovs_arr).map(|_| data_len));
let bytes_read: M::Offset = wasi_try_ok!(bytes_read.try_into().map_err(|_| Errno::Overflow));
wasi_try_mem_ok!(ro_flags.write(&memory, 0));
wasi_try_mem_ok!(ro_data_len.write(&memory, bytes_read));

View File

@@ -1,3 +1,5 @@
use std::mem::MaybeUninit;
use super::*;
use crate::syscalls::*;
@@ -22,14 +24,12 @@ pub fn sock_send<M: MemorySize>(
si_flags: SiFlags,
ret_data_len: WasmPtr<M::Offset, M>,
) -> Result<Errno, WasiError> {
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
let mut env = ctx.data();
let env = ctx.data();
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
let runtime = env.runtime.clone();
let buf_len: M::Offset = {
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
iovs_arr
.iter()
.filter_map(|a| a.read().ok())
@@ -45,24 +45,38 @@ pub fn sock_send<M: MemorySize>(
si_flags
);
let buf_len: usize = wasi_try_ok!(buf_len.try_into().map_err(|_| Errno::Inval));
let mut buf = Vec::with_capacity(buf_len);
{
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
wasi_try_ok!(write_bytes(&mut buf, &memory, iovs_arr));
}
let bytes_written = wasi_try_ok!(__sock_actor_mut(
&mut ctx,
sock,
Rights::SOCK_SEND,
move |socket| async move { socket.send(buf).await },
));
env = ctx.data();
let bytes_written = {
if buf_len <= 10240 {
let mut buf: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
let writer = &mut buf[..buf_len];
let written = wasi_try_ok!(copy_to_slice(&memory, iovs_arr, writer));
let reader = &buf[..written];
let reader: &[u8] = unsafe { std::mem::transmute(reader) };
wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_SEND,
|socket, fd| async move { socket.send(env.tasks.deref(), reader, fd.flags).await },
))
} else {
let mut buf = Vec::with_capacity(buf_len);
wasi_try_ok!(write_bytes(&mut buf, &memory, iovs_arr));
let reader = &buf;
wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_SEND,
|socket, fd| async move { socket.send(env.tasks.deref(), reader, fd.flags).await },
))
}
};
let bytes_written: M::Offset =
wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
let memory = env.memory_view(&ctx);
wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));
Ok(Errno::Success)

View File

@@ -53,6 +53,8 @@ pub fn sock_send_file<M: MemorySize>(
count -= sub_count;
let fd_entry = wasi_try_ok!(state.fs.get_fd(in_fd));
let fd_flags = fd_entry.flags;
let data = {
let inodes = env.state.inodes.clone();
match in_fd {
@@ -116,13 +118,22 @@ pub fn sock_send_file<M: MemorySize>(
drop(guard);
drop(inodes);
let data =
wasi_try_ok!(__asyncify(&mut ctx, None, async move {
socket
.recv(sub_count as usize)
.await
.map(|a| a.to_vec())
})?);
let data = wasi_try_ok!(__asyncify(&mut ctx, None, async {
let mut buf = Vec::with_capacity(sub_count as usize);
unsafe {
buf.set_len(sub_count as usize);
}
socket.recv(tasks.deref(), &mut buf, fd_flags).await.map(
|amt| {
unsafe {
buf.set_len(amt);
}
let buf: Vec<u8> =
unsafe { std::mem::transmute(buf) };
buf
},
)
})?);
env = ctx.data();
data
}
@@ -177,11 +188,12 @@ pub fn sock_send_file<M: MemorySize>(
};
// Write it down to the socket
let bytes_written = wasi_try_ok!(__sock_actor_mut(
let tasks = ctx.data().tasks.clone();
let bytes_written = wasi_try_ok!(__sock_asyncify_mut(
&mut ctx,
sock,
Rights::SOCK_SEND,
move |socket| async move { socket.send(data).await },
|socket, fd| async move { socket.send(tasks.deref(), &data, fd.flags).await },
));
env = ctx.data();

View File

@@ -30,14 +30,11 @@ pub fn sock_send_to<M: MemorySize>(
ctx.data().tid(),
sock
);
wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);
let mut env = ctx.data();
let env = ctx.data();
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
let buf_len: M::Offset = {
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
iovs_arr
.iter()
.filter_map(|a| a.read().ok())
@@ -45,30 +42,51 @@ pub fn sock_send_to<M: MemorySize>(
.sum()
};
let buf_len: usize = wasi_try_ok!(buf_len.try_into().map_err(|_| Errno::Inval));
let mut buf = Vec::with_capacity(buf_len);
{
let memory = env.memory_view(&ctx);
let iovs_arr = wasi_try_mem_ok!(si_data.slice(&memory, si_data_len));
wasi_try_ok!(write_bytes(&mut buf, &memory, iovs_arr));
}
let (addr_ip, addr_port) = {
let memory = env.memory_view(&ctx);
wasi_try_ok!(read_ip_port(&memory, addr))
};
let addr = SocketAddr::new(addr_ip, addr_port);
let bytes_written = wasi_try_ok!(__sock_actor_mut(
&mut ctx,
sock,
Rights::SOCK_SEND_TO,
move |socket| async move { socket.send_to::<M>(buf, addr).await },
));
env = ctx.data();
let bytes_written = {
if buf_len <= 10240 {
let mut buf: [MaybeUninit<u8>; 10240] = unsafe { MaybeUninit::uninit().assume_init() };
let writer = &mut buf[..buf_len];
let written = wasi_try_ok!(copy_to_slice(&memory, iovs_arr, writer));
let reader = &buf[..written];
let reader: &[u8] = unsafe { std::mem::transmute(reader) };
wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_SEND,
|socket, fd| async move {
socket
.send_to::<M>(env.tasks.deref(), reader, addr, fd.flags)
.await
},
))
} else {
let mut buf = Vec::with_capacity(buf_len);
wasi_try_ok!(write_bytes(&mut buf, &memory, iovs_arr));
let reader = &buf;
wasi_try_ok!(__sock_asyncify(
env,
sock,
Rights::SOCK_SEND_TO,
|socket, fd| async move {
socket
.send_to::<M>(env.tasks.deref(), reader, addr, fd.flags)
.await
},
))
}
};
let bytes_written: M::Offset =
wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
let memory = env.memory_view(&ctx);
wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written as M::Offset));
Ok(Errno::Success)

View File

@@ -36,7 +36,7 @@ pub fn sock_set_opt_flag(
&mut ctx,
sock,
Rights::empty(),
move |mut socket| async move { socket.set_opt_flag(option, flag).await }
|mut socket, _| socket.set_opt_flag(option, flag)
));
Errno::Success
}

View File

@@ -1,5 +1,5 @@
use super::*;
use crate::syscalls::*;
use crate::{net::socket::TimeType, syscalls::*};
/// ### `sock_set_opt_size()
/// Set size of particular option for this socket
@@ -25,11 +25,11 @@ pub fn sock_set_opt_size(
);
let ty = match opt {
Sockoption::RecvTimeout => wasmer_vnet::TimeType::ReadTimeout,
Sockoption::SendTimeout => wasmer_vnet::TimeType::WriteTimeout,
Sockoption::ConnectTimeout => wasmer_vnet::TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => wasmer_vnet::TimeType::AcceptTimeout,
Sockoption::Linger => wasmer_vnet::TimeType::Linger,
Sockoption::RecvTimeout => TimeType::ReadTimeout,
Sockoption::SendTimeout => TimeType::WriteTimeout,
Sockoption::ConnectTimeout => TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => TimeType::AcceptTimeout,
Sockoption::Linger => TimeType::Linger,
_ => return Errno::Inval,
};
@@ -38,14 +38,12 @@ pub fn sock_set_opt_size(
&mut ctx,
sock,
Rights::empty(),
move |mut socket| async move {
match opt {
Sockoption::RecvBufSize => socket.set_recv_buf_size(size as usize).await,
Sockoption::SendBufSize => socket.set_send_buf_size(size as usize).await,
Sockoption::Ttl => socket.set_ttl(size as u32).await,
Sockoption::MulticastTtlV4 => socket.set_multicast_ttl_v4(size as u32).await,
_ => Err(Errno::Inval),
}
|mut socket, _| match opt {
Sockoption::RecvBufSize => socket.set_recv_buf_size(size as usize),
Sockoption::SendBufSize => socket.set_send_buf_size(size as usize),
Sockoption::Ttl => socket.set_ttl(size as u32),
Sockoption::MulticastTtlV4 => socket.set_multicast_ttl_v4(size as u32),
_ => Err(Errno::Inval),
}
));
Errno::Success

View File

@@ -1,5 +1,5 @@
use super::*;
use crate::syscalls::*;
use crate::{net::socket::TimeType, syscalls::*};
/// ### `sock_set_opt_time()`
/// Sets one of the times the socket
@@ -33,11 +33,11 @@ pub fn sock_set_opt_time<M: MemorySize>(
};
let ty = match opt {
Sockoption::RecvTimeout => wasmer_vnet::TimeType::ReadTimeout,
Sockoption::SendTimeout => wasmer_vnet::TimeType::WriteTimeout,
Sockoption::ConnectTimeout => wasmer_vnet::TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => wasmer_vnet::TimeType::AcceptTimeout,
Sockoption::Linger => wasmer_vnet::TimeType::Linger,
Sockoption::RecvTimeout => TimeType::ReadTimeout,
Sockoption::SendTimeout => TimeType::WriteTimeout,
Sockoption::ConnectTimeout => TimeType::ConnectTimeout,
Sockoption::AcceptTimeout => TimeType::AcceptTimeout,
Sockoption::Linger => TimeType::Linger,
_ => return Errno::Inval,
};
@@ -46,7 +46,7 @@ pub fn sock_set_opt_time<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.set_opt_time(ty, time).await }
|socket, _| socket.set_opt_time(ty, time)
));
Errno::Success
}

View File

@@ -28,7 +28,7 @@ pub fn sock_shutdown(mut ctx: FunctionEnvMut<'_, WasiEnv>, sock: WasiFd, how: Sd
&mut ctx,
sock,
Rights::SOCK_SHUTDOWN,
move |mut socket| async move { socket.shutdown(how).await }
|mut socket, _| socket.shutdown(how)
));
Errno::Success

View File

@@ -19,7 +19,7 @@ pub fn sock_status<M: MemorySize>(
&mut ctx,
sock,
Rights::empty(),
move |socket| async move { socket.status().await }
|socket, _| socket.status()
));
use crate::net::socket::WasiSocketStatus;

View File

@@ -10,6 +10,13 @@ use crate::syscalls::*;
pub fn thread_sleep(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
duration: Timestamp,
) -> Result<Errno, WasiError> {
thread_sleep_internal(ctx, duration)
}
pub(crate) fn thread_sleep_internal(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
duration: Timestamp,
) -> Result<Errno, WasiError> {
/*
trace!(