mirror of
https://github.com/mii443/wasmer.git
synced 2025-12-07 21:28:21 +00:00
chore: Fix a whole bunch of clippy lints
Mostly just cosmetic stuff. On non-trivial change: changed WasiPipe to have a wrapper subtype for the reader, which includes the channel and the buffer.
This commit is contained in:
@@ -40,7 +40,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a listener thats used to receive BUS commands
|
/// Creates a listener thats used to receive BUS commands
|
||||||
fn listen<'a>(&'a self) -> Result<&'a dyn VirtualBusListener> {
|
fn listen(&self) -> Result<&'_ dyn VirtualBusListener> {
|
||||||
Err(VirtualBusError::Unsupported)
|
Err(VirtualBusError::Unsupported)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -576,7 +576,7 @@ pub struct ExitedProcess {
|
|||||||
|
|
||||||
impl VirtualBusProcess for ExitedProcess {
|
impl VirtualBusProcess for ExitedProcess {
|
||||||
fn exit_code(&self) -> Option<ExitCode> {
|
fn exit_code(&self) -> Option<ExitCode> {
|
||||||
Some(self.exit_code.clone())
|
Some(self.exit_code)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
|
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
|
||||||
|
|||||||
@@ -105,8 +105,7 @@ impl crate::FileSystem for FileSystem {
|
|||||||
let _ = fs_extra::remove_items(&[from]);
|
let _ = fs_extra::remove_items(&[from]);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
let e: Result<()> = fs::copy(from, to).map(|_| ()).map_err(Into::into);
|
fs::copy(from, to).map(|_| ()).map_err(FsError::from)?;
|
||||||
let _ = e?;
|
|
||||||
fs::remove_file(from).map(|_| ()).map_err(Into::into)
|
fs::remove_file(from).map(|_| ()).map_err(Into::into)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -393,7 +392,7 @@ impl VirtualFile for File {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn set_len(&mut self, new_size: u64) -> Result<()> {
|
fn set_len(&mut self, new_size: u64) -> Result<()> {
|
||||||
fs::File::set_len(&mut self.inner_std, new_size).map_err(Into::into)
|
fs::File::set_len(&self.inner_std, new_size).map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unlink(&mut self) -> Result<()> {
|
fn unlink(&mut self) -> Result<()> {
|
||||||
|
|||||||
@@ -396,9 +396,9 @@ impl From<io::Error> for FsError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Into<io::Error> for FsError {
|
impl From<FsError> for io::Error {
|
||||||
fn into(self) -> io::Error {
|
fn from(val: FsError) -> Self {
|
||||||
let kind = match self {
|
let kind = match val {
|
||||||
FsError::AddressInUse => io::ErrorKind::AddrInUse,
|
FsError::AddressInUse => io::ErrorKind::AddrInUse,
|
||||||
FsError::AddressNotAvailable => io::ErrorKind::AddrNotAvailable,
|
FsError::AddressNotAvailable => io::ErrorKind::AddrNotAvailable,
|
||||||
FsError::AlreadyExists => io::ErrorKind::AlreadyExists,
|
FsError::AlreadyExists => io::ErrorKind::AlreadyExists,
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ pub(super) struct FileHandle {
|
|||||||
impl Clone for FileHandle {
|
impl Clone for FileHandle {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inode: self.inode.clone(),
|
inode: self.inode,
|
||||||
filesystem: self.filesystem.clone(),
|
filesystem: self.filesystem.clone(),
|
||||||
readable: self.readable,
|
readable: self.readable,
|
||||||
writable: self.writable,
|
writable: self.writable,
|
||||||
@@ -93,7 +93,7 @@ impl FileHandle {
|
|||||||
.as_mut()
|
.as_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.map_err(|err| err.clone())?
|
.map_err(|err| *err)?
|
||||||
.as_mut())
|
.as_mut())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -152,7 +152,7 @@ impl VirtualFile for FileHandle {
|
|||||||
Some(Node::ReadOnlyFile(node)) => node.file.len().try_into().unwrap_or(0),
|
Some(Node::ReadOnlyFile(node)) => node.file.len().try_into().unwrap_or(0),
|
||||||
Some(Node::CustomFile(node)) => {
|
Some(Node::CustomFile(node)) => {
|
||||||
let file = node.file.lock().unwrap();
|
let file = node.file.lock().unwrap();
|
||||||
file.size().try_into().unwrap_or(0)
|
file.size()
|
||||||
}
|
}
|
||||||
Some(Node::ArcFile(node)) => match self.arc_file.as_ref() {
|
Some(Node::ArcFile(node)) => match self.arc_file.as_ref() {
|
||||||
Some(file) => file.as_ref().map(|file| file.size()).unwrap_or(0),
|
Some(file) => file.as_ref().map(|file| file.size()).unwrap_or(0),
|
||||||
@@ -313,20 +313,16 @@ impl VirtualFile for FileHandle {
|
|||||||
let file = Pin::new(file);
|
let file = Pin::new(file);
|
||||||
file.poll_read_ready(cx)
|
file.poll_read_ready(cx)
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => Poll::Ready(Err(io::Error::new(
|
||||||
return Poll::Ready(Err(io::Error::new(
|
io::ErrorKind::NotFound,
|
||||||
io::ErrorKind::NotFound,
|
format!("inode `{}` doesn't match a file", self.inode),
|
||||||
format!("inode `{}` doesn't match a file", self.inode),
|
))),
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => Poll::Ready(Err(io::Error::new(
|
||||||
return Poll::Ready(Err(io::Error::new(
|
io::ErrorKind::NotFound,
|
||||||
io::ErrorKind::NotFound,
|
format!("inode `{}` doesn't match a file", self.inode),
|
||||||
format!("inode `{}` doesn't match a file", self.inode),
|
))),
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -362,20 +358,16 @@ impl VirtualFile for FileHandle {
|
|||||||
let file = Pin::new(file);
|
let file = Pin::new(file);
|
||||||
file.poll_read_ready(cx)
|
file.poll_read_ready(cx)
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => Poll::Ready(Err(io::Error::new(
|
||||||
return Poll::Ready(Err(io::Error::new(
|
io::ErrorKind::NotFound,
|
||||||
io::ErrorKind::NotFound,
|
format!("inode `{}` doesn't match a file", self.inode),
|
||||||
format!("inode `{}` doesn't match a file", self.inode),
|
))),
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => Poll::Ready(Err(io::Error::new(
|
||||||
return Poll::Ready(Err(io::Error::new(
|
io::ErrorKind::NotFound,
|
||||||
io::ErrorKind::NotFound,
|
format!("inode `{}` doesn't match a file", self.inode),
|
||||||
format!("inode `{}` doesn't match a file", self.inode),
|
))),
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -796,7 +788,7 @@ impl AsyncWrite for FileHandle {
|
|||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
};
|
};
|
||||||
cursor += bytes_written as u64;
|
cursor += bytes_written as u64;
|
||||||
node.metadata.len = guard.size().try_into().unwrap();
|
node.metadata.len = guard.size();
|
||||||
bytes_written
|
bytes_written
|
||||||
}
|
}
|
||||||
Some(Node::ArcFile(_)) => {
|
Some(Node::ArcFile(_)) => {
|
||||||
|
|||||||
@@ -46,29 +46,42 @@ impl FileSystem {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let _ = crate::FileSystem::create_dir(self, next.as_path());
|
let _ = crate::FileSystem::create_dir(self, next.as_path());
|
||||||
if let Ok(dir) = other.read_dir(next.as_path()) {
|
|
||||||
for sub_dir in dir.into_iter() {
|
let dir = match other.read_dir(next.as_path()) {
|
||||||
if let Ok(sub_dir) = sub_dir {
|
Ok(dir) => dir,
|
||||||
match sub_dir.file_type() {
|
Err(_) => {
|
||||||
Ok(t) if t.is_dir() => {
|
// TODO: propagate errors (except NotFound)
|
||||||
remaining.push_back(sub_dir.path());
|
continue;
|
||||||
}
|
}
|
||||||
Ok(t) if t.is_file() => {
|
};
|
||||||
if sub_dir.file_name().to_string_lossy().starts_with(".wh.") {
|
|
||||||
let rm = next.to_string_lossy();
|
for sub_dir_res in dir {
|
||||||
let rm = &rm[".wh.".len()..];
|
let sub_dir = match sub_dir_res {
|
||||||
let rm = PathBuf::from(rm);
|
Ok(sub_dir) => sub_dir,
|
||||||
let _ = crate::FileSystem::remove_dir(self, rm.as_path());
|
Err(_) => {
|
||||||
let _ = crate::FileSystem::remove_file(self, rm.as_path());
|
// TODO: propagate errors (except NotFound)
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
let _ = self
|
|
||||||
.new_open_options_ext()
|
|
||||||
.insert_arc_file(sub_dir.path(), other.clone());
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match sub_dir.file_type() {
|
||||||
|
Ok(t) if t.is_dir() => {
|
||||||
|
remaining.push_back(sub_dir.path());
|
||||||
|
}
|
||||||
|
Ok(t) if t.is_file() => {
|
||||||
|
if sub_dir.file_name().to_string_lossy().starts_with(".wh.") {
|
||||||
|
let rm = next.to_string_lossy();
|
||||||
|
let rm = &rm[".wh.".len()..];
|
||||||
|
let rm = PathBuf::from(rm);
|
||||||
|
let _ = crate::FileSystem::remove_dir(self, rm.as_path());
|
||||||
|
let _ = crate::FileSystem::remove_file(self, rm.as_path());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let _ = self
|
||||||
|
.new_open_options_ext()
|
||||||
|
.insert_arc_file(sub_dir.path(), other.clone());
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,11 +20,17 @@ pub struct WasiPipe {
|
|||||||
tx: Arc<Mutex<mpsc::UnboundedSender<Vec<u8>>>>,
|
tx: Arc<Mutex<mpsc::UnboundedSender<Vec<u8>>>>,
|
||||||
/// Receives bytes from the pipe
|
/// Receives bytes from the pipe
|
||||||
/// Also, buffers the last read message from the pipe while its being consumed
|
/// Also, buffers the last read message from the pipe while its being consumed
|
||||||
rx: Arc<Mutex<(mpsc::UnboundedReceiver<Vec<u8>>, Option<Bytes>)>>,
|
rx: Arc<Mutex<PipeReceiver>>,
|
||||||
/// Whether the pipe should block or not block to wait for stdin reads
|
/// Whether the pipe should block or not block to wait for stdin reads
|
||||||
block: bool,
|
block: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct PipeReceiver {
|
||||||
|
chan: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
buffer: Option<Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
impl WasiPipe {
|
impl WasiPipe {
|
||||||
pub fn channel() -> (WasiPipe, WasiPipe) {
|
pub fn channel() -> (WasiPipe, WasiPipe) {
|
||||||
let pair = WasiBidirectionalPipePair::new();
|
let pair = WasiBidirectionalPipePair::new();
|
||||||
@@ -131,13 +137,19 @@ impl WasiBidirectionalPipePair {
|
|||||||
|
|
||||||
let pipe1 = WasiPipe {
|
let pipe1 = WasiPipe {
|
||||||
tx: Arc::new(Mutex::new(tx1)),
|
tx: Arc::new(Mutex::new(tx1)),
|
||||||
rx: Arc::new(Mutex::new((rx2, None))),
|
rx: Arc::new(Mutex::new(PipeReceiver {
|
||||||
|
chan: rx2,
|
||||||
|
buffer: None,
|
||||||
|
})),
|
||||||
block: true,
|
block: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
let pipe2 = WasiPipe {
|
let pipe2 = WasiPipe {
|
||||||
tx: Arc::new(Mutex::new(tx2)),
|
tx: Arc::new(Mutex::new(tx2)),
|
||||||
rx: Arc::new(Mutex::new((rx1, None))),
|
rx: Arc::new(Mutex::new(PipeReceiver {
|
||||||
|
chan: rx1,
|
||||||
|
buffer: None,
|
||||||
|
})),
|
||||||
block: true,
|
block: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -180,7 +192,10 @@ impl WasiPipe {
|
|||||||
pub fn close(&self) {
|
pub fn close(&self) {
|
||||||
let (mut null_tx, _) = mpsc::unbounded_channel();
|
let (mut null_tx, _) = mpsc::unbounded_channel();
|
||||||
let (_, null_rx) = mpsc::unbounded_channel();
|
let (_, null_rx) = mpsc::unbounded_channel();
|
||||||
let mut null_rx = (null_rx, None);
|
let mut null_rx = PipeReceiver {
|
||||||
|
chan: null_rx,
|
||||||
|
buffer: None,
|
||||||
|
};
|
||||||
{
|
{
|
||||||
let mut guard = self.rx.lock().unwrap();
|
let mut guard = self.rx.lock().unwrap();
|
||||||
std::mem::swap(guard.deref_mut(), &mut null_rx);
|
std::mem::swap(guard.deref_mut(), &mut null_rx);
|
||||||
@@ -205,7 +220,7 @@ impl Read for WasiPipe {
|
|||||||
let mut rx = self.rx.lock().unwrap();
|
let mut rx = self.rx.lock().unwrap();
|
||||||
loop {
|
loop {
|
||||||
{
|
{
|
||||||
if let Some(read_buffer) = rx.1.as_mut() {
|
if let Some(read_buffer) = rx.buffer.as_mut() {
|
||||||
let buf_len = read_buffer.len();
|
let buf_len = read_buffer.len();
|
||||||
if buf_len > 0 {
|
if buf_len > 0 {
|
||||||
let mut read = buf_len.min(max_size);
|
let mut read = buf_len.min(max_size);
|
||||||
@@ -218,13 +233,13 @@ impl Read for WasiPipe {
|
|||||||
}
|
}
|
||||||
let data = {
|
let data = {
|
||||||
match self.block {
|
match self.block {
|
||||||
true => match rx.0.blocking_recv() {
|
true => match rx.chan.blocking_recv() {
|
||||||
Some(a) => a,
|
Some(a) => a,
|
||||||
None => {
|
None => {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
false => match rx.0.try_recv() {
|
false => match rx.chan.try_recv() {
|
||||||
Ok(a) => a,
|
Ok(a) => a,
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
return Err(Into::<io::Error>::into(io::ErrorKind::WouldBlock));
|
return Err(Into::<io::Error>::into(io::ErrorKind::WouldBlock));
|
||||||
@@ -235,7 +250,7 @@ impl Read for WasiPipe {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
rx.1.replace(Bytes::from(data));
|
rx.buffer.replace(Bytes::from(data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -293,7 +308,7 @@ impl AsyncRead for WasiPipe {
|
|||||||
let mut rx = self.rx.lock().unwrap();
|
let mut rx = self.rx.lock().unwrap();
|
||||||
loop {
|
loop {
|
||||||
{
|
{
|
||||||
if let Some(inner_buf) = rx.1.as_mut() {
|
if let Some(inner_buf) = rx.buffer.as_mut() {
|
||||||
let buf_len = inner_buf.len();
|
let buf_len = inner_buf.len();
|
||||||
if buf_len > 0 {
|
if buf_len > 0 {
|
||||||
let read = buf_len.min(buf.remaining());
|
let read = buf_len.min(buf.remaining());
|
||||||
@@ -304,7 +319,7 @@ impl AsyncRead for WasiPipe {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut rx = Pin::new(rx.deref_mut());
|
let mut rx = Pin::new(rx.deref_mut());
|
||||||
let data = match rx.0.poll_recv(cx) {
|
let data = match rx.chan.poll_recv(cx) {
|
||||||
Poll::Ready(Some(a)) => a,
|
Poll::Ready(Some(a)) => a,
|
||||||
Poll::Ready(None) => return Poll::Ready(Ok(())),
|
Poll::Ready(None) => return Poll::Ready(Ok(())),
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
@@ -317,7 +332,7 @@ impl AsyncRead for WasiPipe {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
rx.1.replace(Bytes::from(data));
|
rx.buffer.replace(Bytes::from(data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -359,7 +374,7 @@ impl VirtualFile for WasiPipe {
|
|||||||
fn is_open(&self) -> bool {
|
fn is_open(&self) -> bool {
|
||||||
self.tx
|
self.tx
|
||||||
.try_lock()
|
.try_lock()
|
||||||
.map(|a| a.is_closed() == false)
|
.map(|a| !a.is_closed())
|
||||||
.unwrap_or_else(|_| true)
|
.unwrap_or_else(|_| true)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -368,7 +383,7 @@ impl VirtualFile for WasiPipe {
|
|||||||
let mut rx = self.rx.lock().unwrap();
|
let mut rx = self.rx.lock().unwrap();
|
||||||
loop {
|
loop {
|
||||||
{
|
{
|
||||||
if let Some(inner_buf) = rx.1.as_mut() {
|
if let Some(inner_buf) = rx.buffer.as_mut() {
|
||||||
let buf_len = inner_buf.len();
|
let buf_len = inner_buf.len();
|
||||||
if buf_len > 0 {
|
if buf_len > 0 {
|
||||||
return Poll::Ready(Ok(buf_len));
|
return Poll::Ready(Ok(buf_len));
|
||||||
@@ -376,7 +391,7 @@ impl VirtualFile for WasiPipe {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pinned_rx = Pin::new(&mut rx.0);
|
let mut pinned_rx = Pin::new(&mut rx.chan);
|
||||||
let data = match pinned_rx.poll_recv(cx) {
|
let data = match pinned_rx.poll_recv(cx) {
|
||||||
Poll::Ready(Some(a)) => a,
|
Poll::Ready(Some(a)) => a,
|
||||||
Poll::Ready(None) => return Poll::Ready(Ok(0)),
|
Poll::Ready(None) => return Poll::Ready(Ok(0)),
|
||||||
@@ -390,7 +405,7 @@ impl VirtualFile for WasiPipe {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
rx.1.replace(Bytes::from(data));
|
rx.buffer.replace(Bytes::from(data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ impl UnionFileSystem {
|
|||||||
path3.push('/')
|
path3.push('/')
|
||||||
}
|
}
|
||||||
if path2.ends_with('/') {
|
if path2.ends_with('/') {
|
||||||
path2 = (&path2[..(path2.len() - 1)]).to_string();
|
path2 = (path2[..(path2.len() - 1)]).to_string();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.mounts
|
self.mounts
|
||||||
@@ -275,7 +275,7 @@ impl FileSystem for UnionFileSystem {
|
|||||||
let to = to.to_string_lossy();
|
let to = to.to_string_lossy();
|
||||||
for (path, mount) in filter_mounts(&self.mounts, from.as_ref()) {
|
for (path, mount) in filter_mounts(&self.mounts, from.as_ref()) {
|
||||||
let mut to = if to.starts_with(mount.path.as_str()) {
|
let mut to = if to.starts_with(mount.path.as_str()) {
|
||||||
(&to[mount.path.len()..]).to_string()
|
(to[mount.path.len()..]).to_string()
|
||||||
} else {
|
} else {
|
||||||
ret_error = FsError::UnknownError;
|
ret_error = FsError::UnknownError;
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
@@ -163,17 +163,22 @@ primitives! {
|
|||||||
f32 f64
|
f32 f64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::unused_unit)]
|
||||||
macro_rules! tuples {
|
macro_rules! tuples {
|
||||||
($(($($t:ident)*))*) => ($(
|
($(($($t:ident)*))*) => ($(
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
impl <$($t:Endian,)*> Endian for ($($t,)*) {
|
impl <$($t:Endian,)*> Endian for ($($t,)*) {
|
||||||
|
#[allow(clippy::unused_unit)]
|
||||||
fn into_le(self) -> Self {
|
fn into_le(self) -> Self {
|
||||||
let ($($t,)*) = self;
|
let ($($t,)*) = self;
|
||||||
|
// Needed for single element "tuples".
|
||||||
($($t.into_le(),)*)
|
($($t.into_le(),)*)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::unused_unit)]
|
||||||
fn from_le(self) -> Self {
|
fn from_le(self) -> Self {
|
||||||
let ($($t,)*) = self;
|
let ($($t,)*) = self;
|
||||||
|
// Needed for single element "tuples".
|
||||||
($($t.from_le(),)*)
|
($($t.from_le(),)*)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -261,16 +261,16 @@ impl VirtualTcpSocket for LocalTcpStream {
|
|||||||
fn set_opt_time(&mut self, ty: TimeType, timeout: Option<Duration>) -> Result<()> {
|
fn set_opt_time(&mut self, ty: TimeType, timeout: Option<Duration>) -> Result<()> {
|
||||||
match ty {
|
match ty {
|
||||||
TimeType::ReadTimeout => {
|
TimeType::ReadTimeout => {
|
||||||
self.read_timeout = timeout.clone();
|
self.read_timeout = timeout;
|
||||||
}
|
}
|
||||||
TimeType::WriteTimeout => {
|
TimeType::WriteTimeout => {
|
||||||
self.write_timeout = timeout.clone();
|
self.write_timeout = timeout;
|
||||||
}
|
}
|
||||||
TimeType::ConnectTimeout => {
|
TimeType::ConnectTimeout => {
|
||||||
self.connect_timeout = timeout;
|
self.connect_timeout = timeout;
|
||||||
}
|
}
|
||||||
TimeType::Linger => {
|
TimeType::Linger => {
|
||||||
self.linger_timeout = timeout.clone();
|
self.linger_timeout = timeout;
|
||||||
}
|
}
|
||||||
_ => return Err(NetworkError::InvalidInput),
|
_ => return Err(NetworkError::InvalidInput),
|
||||||
}
|
}
|
||||||
@@ -336,7 +336,7 @@ impl LocalTcpStream {
|
|||||||
) -> Result<SocketReceive> {
|
) -> Result<SocketReceive> {
|
||||||
if nonblocking {
|
if nonblocking {
|
||||||
let max_buf_size = 8192;
|
let max_buf_size = 8192;
|
||||||
let mut buf = Vec::with_capacity(max_buf_size);
|
let mut buf = vec![0u8; max_buf_size];
|
||||||
unsafe {
|
unsafe {
|
||||||
buf.set_len(max_buf_size);
|
buf.set_len(max_buf_size);
|
||||||
}
|
}
|
||||||
@@ -345,7 +345,7 @@ impl LocalTcpStream {
|
|||||||
let mut cx = Context::from_waker(&waker);
|
let mut cx = Context::from_waker(&waker);
|
||||||
let stream = Pin::new(stream);
|
let stream = Pin::new(stream);
|
||||||
let mut read_buf = tokio::io::ReadBuf::new(&mut buf);
|
let mut read_buf = tokio::io::ReadBuf::new(&mut buf);
|
||||||
return match stream.poll_read(&mut cx, &mut read_buf) {
|
match stream.poll_read(&mut cx, &mut read_buf) {
|
||||||
Poll::Ready(Ok(read)) => {
|
Poll::Ready(Ok(read)) => {
|
||||||
let read = read_buf.remaining();
|
let read = read_buf.remaining();
|
||||||
unsafe {
|
unsafe {
|
||||||
@@ -362,7 +362,7 @@ impl LocalTcpStream {
|
|||||||
}
|
}
|
||||||
Poll::Ready(Err(err)) => Err(io_err_into_net_error(err)),
|
Poll::Ready(Err(err)) => Err(io_err_into_net_error(err)),
|
||||||
Poll::Pending => Err(NetworkError::WouldBlock),
|
Poll::Pending => Err(NetworkError::WouldBlock),
|
||||||
};
|
}
|
||||||
} else {
|
} else {
|
||||||
Self::recv_now(stream, timeout).await
|
Self::recv_now(stream, timeout).await
|
||||||
}
|
}
|
||||||
@@ -428,7 +428,7 @@ impl VirtualConnectedSocket for LocalTcpStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
let timeout = self.write_timeout.clone();
|
let timeout = self.write_timeout;
|
||||||
let work = async move {
|
let work = async move {
|
||||||
match timeout {
|
match timeout {
|
||||||
Some(timeout) => tokio::time::timeout(timeout, self.stream.write_all(&data[..]))
|
Some(timeout) => tokio::time::timeout(timeout, self.stream.write_all(&data[..]))
|
||||||
@@ -461,7 +461,7 @@ impl VirtualConnectedSocket for LocalTcpStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
let timeout = self.write_timeout.clone();
|
let timeout = self.write_timeout;
|
||||||
let work = async move {
|
let work = async move {
|
||||||
match timeout {
|
match timeout {
|
||||||
Some(timeout) => tokio::time::timeout(timeout, self.stream.flush())
|
Some(timeout) => tokio::time::timeout(timeout, self.stream.flush())
|
||||||
@@ -479,12 +479,7 @@ impl VirtualConnectedSocket for LocalTcpStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn recv(&mut self) -> Result<SocketReceive> {
|
async fn recv(&mut self) -> Result<SocketReceive> {
|
||||||
Self::recv_now_ext(
|
Self::recv_now_ext(self.nonblocking, &mut self.stream, self.read_timeout).await
|
||||||
self.nonblocking,
|
|
||||||
&mut self.stream,
|
|
||||||
self.read_timeout.clone(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_recv(&mut self) -> Result<Option<SocketReceive>> {
|
fn try_recv(&mut self) -> Result<Option<SocketReceive>> {
|
||||||
|
|||||||
Reference in New Issue
Block a user