Small journal logic + lint fixes

This commit is contained in:
Arshia Ghafoori
2025-03-11 18:26:56 +04:00
parent 7446c20dba
commit be78dca2b1
12 changed files with 113 additions and 16 deletions

View File

@@ -1,3 +1,5 @@
use std::fmt::Display;
use super::*;
/// Various triggers that will cause the runtime to take snapshot
@@ -81,3 +83,28 @@ impl FromStr for SnapshotTrigger {
})
}
}
impl Display for SnapshotTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Bootstrap => "bootstrap",
Self::Explicit => "explicit",
Self::FirstEnviron => "first-environ",
Self::FirstListen => "first-listen",
Self::FirstSigint => "first-sigint",
Self::FirstStdin => "first-stdin",
Self::Idle => "idle",
Self::NonDeterministicCall => "non-deterministic-call",
Self::PeriodicInterval => "periodic-interval",
Self::Sigalrm => "sigalrm",
Self::Sigint => "sigint",
Self::Sigtstp => "sigtstp",
Self::Sigstop => "sigstop",
Self::Transaction => "transaction",
}
)
}
}

View File

@@ -607,6 +607,21 @@ impl WasiProcess {
self.wait_for_checkpoint_finish()
}
/// Takes a snapshot of the process and shuts it down after the snapshot
/// is taken.
///
/// Note: If you ignore the returned future the checkpoint will still
/// occur but it will execute asynchronously
pub fn snapshot_and_stop(
&self,
trigger: SnapshotTrigger,
) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
let mut guard = self.inner.0.lock().unwrap();
guard.stop_running_after_checkpoint = true;
guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
self.wait_for_checkpoint_finish()
}
/// Takes a snapshot of the process
///
/// Note: If you ignore the returned future the checkpoint will still

View File

@@ -14,7 +14,7 @@ use webc::metadata::annotations::Wasi as WasiAnnotation;
use crate::{
bin_factory::BinaryPackage,
capabilities::Capabilities,
journal::{DynJournal, DynReadableJournal, SnapshotTrigger},
journal::{self, DynJournal, DynReadableJournal, SnapshotTrigger},
WasiEnvBuilder,
};
@@ -97,6 +97,23 @@ impl CommonWasiOptions {
builder.add_imports(&self.additional_imports);
#[cfg(feature = "journal")]
{
for journal in &self.read_only_journals {
builder.add_read_only_journal(journal.clone());
}
for journal in &self.writable_journals {
builder.add_writable_journal(journal.clone());
}
for trigger in &self.snapshot_on {
builder.add_snapshot_trigger(trigger.clone());
}
if let Some(interval) = self.snapshot_interval {
builder.with_snapshot_interval(interval);
}
builder.with_stop_running_after_snapshot(self.stop_running_after_snapshot);
}
Ok(())
}

View File

@@ -1022,6 +1022,7 @@ impl WasiEnvBuilder {
extra_tracing: true,
#[cfg(feature = "journal")]
snapshot_on: self.snapshot_on,
#[cfg(feature = "journal")]
stop_running_after_snapshot: self.stop_running_after_snapshot,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
additional_imports: self.additional_imports,

View File

@@ -295,6 +295,7 @@ impl WasiEnvInit {
extra_tracing: false,
#[cfg(feature = "journal")]
snapshot_on: self.snapshot_on.clone(),
#[cfg(feature = "journal")]
stop_running_after_snapshot: self.stop_running_after_snapshot,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
additional_imports: self.additional_imports.clone(),

View File

@@ -11,15 +11,23 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
) -> Result<(), WasiRuntimeError> {
tracing::trace!(%fd, %offset, "Replay journal - FdWrite");
if self.stdout_fds.contains(&fd) {
self.stdout
.as_mut()
.map(|x| x.push((offset, data, is_64bit)));
if let Some(x) = self.stdout.as_mut() {
x.push(JournalStdIoWrite {
offset,
data,
is_64bit,
});
}
return Ok(());
}
if self.stderr_fds.contains(&fd) {
self.stderr
.as_mut()
.map(|x| x.push((offset, data, is_64bit)));
if let Some(x) = self.stdout.as_mut() {
x.push(JournalStdIoWrite {
offset,
data,
is_64bit,
});
}
return Ok(());
}

View File

@@ -22,7 +22,7 @@ mod update_memory;
use crate::journal::JournalEffector;
use crate::syscalls::anyhow_err_to_runtime_err;
use crate::syscalls::JournalSyscallPlayer;
use crate::syscalls::{JournalStdIoWrite, JournalSyscallPlayer};
use crate::RewindState;
use crate::WasiRuntimeError;
use crate::WasiThreadId;

View File

@@ -8,11 +8,15 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
tracing::trace!("Replay journal - ClearEthereal");
self.spawn_threads.clear();
self.stdout.as_mut().map(|x| x.clear());
if let Some(x) = self.stdout.as_mut() {
x.clear();
}
self.stdout_fds.clear();
self.stdout_fds.insert(1 as WasiFd);
self.stderr.as_mut().map(|x| x.clear());
if let Some(x) = self.stderr.as_mut() {
x.clear();
}
self.stderr_fds.clear();
self.stderr_fds.insert(2 as WasiFd);

View File

@@ -27,6 +27,12 @@ use std::{collections::BTreeMap, ops::Range};
use super::*;
pub struct JournalStdIoWrite<'a> {
pub offset: u64,
pub data: Cow<'a, [u8]>,
pub is_64bit: bool,
}
pub struct JournalSyscallPlayer<'a, 'c> {
pub ctx: FunctionEnvMut<'c, WasiEnv>,
pub bootstrapping: bool,
@@ -45,8 +51,8 @@ pub struct JournalSyscallPlayer<'a, 'c> {
pub differ_memory: Vec<(Range<u64>, Cow<'a, [u8]>)>,
// We capture the stdout and stderr while we replay
pub stdout: Option<Vec<(u64, Cow<'a, [u8]>, bool)>>,
pub stderr: Option<Vec<(u64, Cow<'a, [u8]>, bool)>>,
pub stdout: Option<Vec<JournalStdIoWrite<'a>>>,
pub stderr: Option<Vec<JournalStdIoWrite<'a>>>,
pub stdout_fds: HashSet<u32>,
pub stderr_fds: HashSet<u32>,
}

View File

@@ -36,7 +36,12 @@ pub unsafe fn restore_snapshot(
// Now output the stdout and stderr
if let Some(stdout) = runner.stdout {
tracing::trace!("replaying stdout");
for (offset, data, is_64bit) in stdout {
for JournalStdIoWrite {
offset,
data,
is_64bit,
} in stdout
{
if is_64bit {
JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 1, offset, data)
} else {
@@ -48,7 +53,12 @@ pub unsafe fn restore_snapshot(
if let Some(stderr) = runner.stderr {
tracing::trace!("replaying stderr");
for (offset, data, is_64bit) in stderr {
for JournalStdIoWrite {
offset,
data,
is_64bit,
} in stderr
{
if is_64bit {
JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 2, offset, data)
} else {

View File

@@ -42,7 +42,12 @@ pub(crate) fn fd_renumber_internal(
let (_, mut state) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
if state.fs.get_fd(to).is_ok() {
wasi_try_ok!(__asyncify_light(env, None, state.fs.flush(to))?);
match __asyncify_light(env, None, state.fs.flush(to))? {
Ok(_) | Err(Errno::Isdir) | Err(Errno::Io) | Err(Errno::Access) => {}
Err(e) => {
return Ok(e);
}
}
wasi_try_ok!(state.fs.close_fd(to));
}

View File

@@ -6,6 +6,9 @@ use crate::syscalls::*;
pub fn proc_snapshot<M: MemorySize>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
) -> Result<Errno, WasiError> {
wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::Explicit)?);
// If we have an Explicit trigger, process that...
ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::Explicit)?);
// ... if not, we may still have an external request for a snapshot, so do that as well
ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
Ok(Errno::Success)
}