Fix WASI pipe to properly store read bytes in temp_buffer

This commit is contained in:
Felix Schütt
2022-10-17 13:51:10 +02:00
parent f6fa5b7cde
commit 05d74ea3fb
2 changed files with 80 additions and 39 deletions

View File

@@ -58,6 +58,7 @@ pub struct wasi_pipe_t {
data: Option<Box<Arc<Mutex<WasiPipeDataWithDestructor>>>>,
}
#[derive(Debug)]
struct WasiPipeDataWithDestructor {
data: Vec<c_char>,
// Buffer of already-read data that is being read into,
@@ -71,12 +72,17 @@ impl WasiPipeDataWithDestructor {
&mut self,
read_cb: WasiConsoleIoReadCallback,
max_read: Option<usize>,
) -> io::Result<Vec<u8>> {
) -> io::Result<usize> {
const BLOCK_SIZE: usize = 1024;
let mut final_buf = Vec::new();
let max_read = max_read.unwrap_or(usize::MAX);
let max_to_read = max_read.unwrap_or(usize::MAX);
let max_read = max_to_read.saturating_sub(self.temp_buffer.len());
if max_read == 0 {
// there are n bytes being available to read in the temp_buffer
return Ok(max_to_read);
}
let mut cur_read = 0;
// Read bytes until either EOF is reached or max_read bytes are reached
@@ -107,15 +113,28 @@ impl WasiPipeDataWithDestructor {
));
}
if result == 0 {
let result = result as usize;
if result == 0 || result > temp_buffer.len() {
break; // EOF
}
cur_read += temp_buffer.len();
final_buf.append(&mut temp_buffer);
cur_read += result;
final_buf.extend_from_slice(&temp_buffer[..result]);
}
Ok(final_buf)
let final_buf_len = final_buf.len();
// store the bytes in temp_buffer
self.temp_buffer.extend_from_slice(&final_buf);
// temp_buffer.len() can be smaller than max_read in case we
// encounter EOF earlier than expected
assert!(self.temp_buffer.len() <= max_read);
// return how many bytes were just read
//
// caller has to clear temp_buffer to advance actual reading
Ok(final_buf_len)
}
}
@@ -161,26 +180,11 @@ impl io::Read for wasi_pipe_t {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let self_read = self.read;
let mut data = self.get_data_mut("read")?;
// fill up buf by draining temp_buffer first, then read more bytes
let bytes_to_read = data.temp_buffer.len().min(buf.len());
let mut temp_buffer_drained: Vec<_> = data.temp_buffer.drain(..bytes_to_read).collect();
assert!(temp_buffer_drained.len() <= buf.len());
// If temp_buffer is exhausted, try reading the remaining bytes from the pipe
let mut bytes_read = bytes_to_read;
if buf.len() >= temp_buffer_drained.len() {
let secondary_bytes_to_read = data.temp_buffer.len().min(buf.len());
data.read_buffer(self_read, Some(secondary_bytes_to_read))?;
temp_buffer_drained
.append(&mut data.temp_buffer.drain(..secondary_bytes_to_read).collect());
bytes_read += secondary_bytes_to_read;
}
assert_eq!(buf.len(), temp_buffer_drained.len());
buf.clone_from_slice(&temp_buffer_drained);
Ok(bytes_read)
let _ = data.read_buffer(self_read, Some(buf.len()))?;
let bytes_to_read = buf.len().min(data.temp_buffer.len());
let bytes_read = data.temp_buffer.drain(..bytes_to_read).collect::<Vec<_>>();
buf[..bytes_read.len()].clone_from_slice(&bytes_read);
Ok(bytes_to_read)
}
}
@@ -278,7 +282,7 @@ impl VirtualFile for wasi_pipe_t {
fn bytes_available_read(&self) -> Result<Option<usize>, FsError> {
let self_read = self.read;
let mut data = self.get_data_mut("bytes_available_read")?;
data.read_buffer(self_read, None)?;
let _ = data.read_buffer(self_read, None)?;
Ok(Some(data.temp_buffer.len()))
}
fn bytes_available_write(&self) -> Result<Option<usize>, FsError> {
@@ -507,6 +511,36 @@ pub unsafe extern "C" fn wasi_pipe_flush(ptr: *mut wasi_pipe_t) -> i64 {
}
}
#[test]
fn test_wasi_pipe_with_destructor() {
let mut wasi_pipe_t_ptr = std::ptr::null_mut();
let second_wasi_pipe_t_ptr = unsafe { wasi_pipe_new(&mut wasi_pipe_t_ptr) };
let wasi_pipe_t_ptr = unsafe { &mut *wasi_pipe_t_ptr };
let second_wasi_pipe_t_ptr = unsafe { &mut *second_wasi_pipe_t_ptr };
let data = b"hello".into_iter().map(|v| *v as i8).collect::<Vec<_>>();
let result = unsafe { wasi_pipe_write_bytes(wasi_pipe_t_ptr, data.as_ptr(), data.len()) };
assert_eq!(result, 5);
let bytes_avail = wasi_pipe_t_ptr.bytes_available_read();
assert_eq!(bytes_avail, Ok(Some(0)));
let bytes_avail2 = second_wasi_pipe_t_ptr.bytes_available_read();
assert_eq!(bytes_avail2, Ok(Some(5)));
let mut read_str_ptr = std::ptr::null_mut();
let result = unsafe { wasi_pipe_read_str(second_wasi_pipe_t_ptr, &mut read_str_ptr) };
assert_eq!(result, 6); // hello\0
let buf_slice = unsafe { std::slice::from_raw_parts_mut(read_str_ptr, result as usize) };
assert_eq!(buf_slice[..5], data);
unsafe {
wasi_pipe_delete_str(read_str_ptr);
}
unsafe { wasi_pipe_delete(wasi_pipe_t_ptr) };
unsafe { wasi_pipe_delete(second_wasi_pipe_t_ptr) };
}
#[no_mangle]
pub unsafe extern "C" fn wasi_pipe_read_bytes(
ptr: *const wasi_pipe_t,
@@ -553,12 +587,13 @@ unsafe fn wasi_pipe_read_bytes_internal(ptr: *const wasi_pipe_t, buf: &mut Vec<u
}
}
let len = target.len() as i64;
*buf = target;
0
len
}
#[no_mangle]
pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: *mut *mut c_char) -> i64 {
pub unsafe extern "C" fn wasi_pipe_read_str(ptr: *const wasi_pipe_t, buf: &mut *mut c_char) -> i64 {
use std::ffi::CString;
let mut target = Vec::new();