Merge pull request #5468 from wasmerio/fix/journal-fixes

Journal fixes to enable PHP+proc_snapshot
This commit is contained in:
Arshia001
2025-03-11 22:45:51 +03:30
committed by GitHub
29 changed files with 738 additions and 190 deletions

View File

@ -161,6 +161,18 @@
}
]
},
"mode": {
"description": "The method to use to generate the instaboot snapshot for the instance.",
"default": null,
"anyOf": [
{
"$ref": "#/definitions/InstabootSnapshotModeV1"
},
{
"type": "null"
}
]
},
"requests": {
"description": "HTTP requests to perform during startup snapshot creation. Apps can perform all the appropriate warmup logic in these requests.\n\nNOTE: if no requests are configured, then a single HTTP request to '/' will be performed instead.",
"type": "array",
@ -195,6 +207,17 @@
"type": "null"
}
]
},
"runtime": {
"description": "Runtime settings.",
"anyOf": [
{
"$ref": "#/definitions/AppConfigCapabilityRuntimeV1"
},
{
"type": "null"
}
]
}
},
"additionalProperties": true
@ -212,6 +235,26 @@
}
}
},
"AppConfigCapabilityRuntimeV1": {
"description": "Runtime capability settings.",
"type": "object",
"properties": {
"async_threads": {
"description": "Whether to enable asynchronous threads/deep sleeping.",
"type": [
"boolean",
"null"
]
},
"engine": {
"description": "Engine to use for an instance, e.g. wasmer_cranelift, wasmer_llvm, etc.",
"type": [
"string",
"null"
]
}
}
},
"AppScalingConfigV1": {
"type": "object",
"properties": {
@ -543,6 +586,34 @@
}
}
},
"InstabootSnapshotModeV1": {
"description": "How will an instance be bootstrapped?",
"oneOf": [
{
"description": "Start the instance without any snapshot triggers. Once the requests are done, use [`snapshot_and_stop`](wasmer_wasix::WasiProcess::snapshot_and_stop) to capture a snapshot and shut the instance down.",
"type": "string",
"enum": [
"bootstrap"
]
},
{
"description": "Explicitly enable the given snapshot triggers before starting the instance. The instance's process will have its stop_running_after_checkpoint flag set, so the first snapshot will cause the instance to shut down.",
"type": "object",
"required": [
"triggers"
],
"properties": {
"triggers": {
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false
}
]
},
"Job": {
"description": "Job configuration.",
"type": "object",

View File

@ -428,19 +428,26 @@ impl Run {
for trigger in self.wasi.snapshot_on.iter().cloned() {
config.add_snapshot_trigger(trigger);
}
if self.wasi.snapshot_on.is_empty() && !self.wasi.journals.is_empty() {
if self.wasi.snapshot_on.is_empty() && !self.wasi.writable_journals.is_empty() {
config.add_default_snapshot_triggers();
}
if let Some(period) = self.wasi.snapshot_interval {
if self.wasi.journals.is_empty() {
if self.wasi.writable_journals.is_empty() {
return Err(anyhow::format_err!(
"If you specify a snapshot interval then you must also specify a journal file"
"If you specify a snapshot interval then you must also specify a writable journal file"
));
}
config.with_snapshot_interval(Duration::from_millis(period));
}
for journal in self.wasi.build_journals()? {
config.add_journal(journal);
if self.wasi.stop_after_snapshot {
config.with_stop_running_after_snapshot(true);
}
let (r, w) = self.wasi.build_journals()?;
for journal in r {
config.add_read_only_journal(journal);
}
for journal in w {
config.add_writable_journal(journal);
}
}
@ -537,20 +544,28 @@ impl Run {
for trigger in self.wasi.snapshot_on.iter().cloned() {
runner.with_snapshot_trigger(trigger);
}
if self.wasi.snapshot_on.is_empty() && !self.wasi.journals.is_empty() {
if self.wasi.snapshot_on.is_empty() && !self.wasi.writable_journals.is_empty() {
runner.with_default_snapshot_triggers();
}
if let Some(period) = self.wasi.snapshot_interval {
if self.wasi.journals.is_empty() {
if self.wasi.writable_journals.is_empty() {
return Err(anyhow::format_err!(
"If you specify a snapshot interval then you must also specify a journal file"
"If you specify a snapshot interval then you must also specify a writable journal file"
));
}
runner.with_snapshot_interval(Duration::from_millis(period));
}
for journal in self.wasi.build_journals()? {
runner.with_journal(journal);
if self.wasi.stop_after_snapshot {
runner.with_stop_running_after_snapshot(true);
}
let (r, w) = self.wasi.build_journals()?;
for journal in r {
runner.with_read_only_journal(journal);
}
for journal in w {
runner.with_writable_journal(journal);
}
runner.with_skip_stdio_during_bootstrap(self.wasi.skip_stdio_during_bootstrap);
}
Ok(runner)
@ -567,7 +582,7 @@ impl Run {
let program_name = wasm_path.display().to_string();
let runner = self.build_wasi_runner(&runtime)?;
runner.run_wasm(runtime, &program_name, module)
runner.run_wasm(runtime, &program_name, module, module_hash)
}
#[allow(unused_variables)]
@ -1047,6 +1062,25 @@ impl<R: wasmer_wasix::Runtime + Send + Sync> wasmer_wasix::Runtime for Monitorin
fn tty(&self) -> Option<&(dyn wasmer_wasix::os::TtyBridge + Send + Sync)> {
self.runtime.tty()
}
#[cfg(feature = "journal")]
fn read_only_journals<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Arc<wasmer_wasix::journal::DynReadableJournal>> + 'a> {
self.runtime.read_only_journals()
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Arc<wasmer_wasix::journal::DynJournal>> + 'a> {
self.runtime.writable_journals()
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&'_ wasmer_wasix::journal::DynJournal> {
self.runtime.active_journal()
}
}
#[derive(Debug)]

View File

@ -23,7 +23,7 @@ use wasmer_wasix::{
capabilities::Capabilities,
default_fs_backing, get_wasi_versions,
http::HttpClient,
journal::{CompactingLogFileJournal, DynJournal},
journal::{CompactingLogFileJournal, DynJournal, DynReadableJournal},
os::{tty_sys::SysTty, TtyBridge},
rewind_ext,
runners::MAPPED_CURRENT_DIR_DEFAULT_PATH,
@ -135,6 +135,15 @@ pub struct Wasi {
#[clap(long = "enable-cpu-backoff")]
pub enable_cpu_backoff: Option<u64>,
/// Specifies one or more journal files that Wasmer will use to restore
/// the state of the WASM process as it executes.
///
/// The state of the WASM process and its sandbox will be reapplied using
/// the journals in the order that you specify here.
#[cfg(feature = "journal")]
#[clap(long = "journal")]
pub read_only_journals: Vec<PathBuf>,
/// Specifies one or more journal files that Wasmer will use to restore
/// and save the state of the WASM process as it executes.
///
@ -145,8 +154,8 @@ pub struct Wasi {
/// and opened for read and write. New journal events will be written to this
/// file
#[cfg(feature = "journal")]
#[clap(long = "journal")]
pub journals: Vec<PathBuf>,
#[clap(long = "journal-writable")]
pub writable_journals: Vec<PathBuf>,
/// Flag that indicates if the journal will be automatically compacted
/// as it fills up and when the process exits
@ -189,6 +198,17 @@ pub struct Wasi {
#[clap(long = "snapshot-period")]
pub snapshot_interval: Option<u64>,
/// If specified, the runtime will stop executing the WASM module after the first snapshot
/// is taken.
#[cfg(feature = "journal")]
#[clap(long = "stop-after-snapshot")]
pub stop_after_snapshot: bool,
/// Skip writes to stdout and stderr when replying journal events to bootstrap a module.
#[cfg(feature = "journal")]
#[clap(long = "skip-journal-stdio")]
pub skip_stdio_during_bootstrap: bool,
/// Allow instances to send http requests.
///
/// Access to domains is granted by default.
@ -397,18 +417,40 @@ impl Wasi {
if let Some(interval) = self.snapshot_interval {
builder.with_snapshot_interval(std::time::Duration::from_millis(interval));
}
for journal in self.build_journals()? {
builder.add_journal(journal);
if self.stop_after_snapshot {
builder.with_stop_running_after_snapshot(true);
}
let (r, w) = self.build_journals()?;
for journal in r {
builder.add_read_only_journal(journal);
}
for journal in w {
builder.add_writable_journal(journal);
}
builder.with_skip_stdio_during_bootstrap(self.skip_stdio_during_bootstrap);
}
Ok(builder)
}
#[cfg(feature = "journal")]
pub fn build_journals(&self) -> anyhow::Result<Vec<Arc<DynJournal>>> {
let mut ret = Vec::new();
for journal in self.journals.clone() {
#[allow(clippy::type_complexity)]
pub fn build_journals(
&self,
) -> anyhow::Result<(Vec<Arc<DynReadableJournal>>, Vec<Arc<DynJournal>>)> {
let mut readable = Vec::new();
for journal in self.read_only_journals.clone() {
if matches!(std::fs::metadata(&journal), Err(e) if e.kind() == std::io::ErrorKind::NotFound)
{
bail!("Read-only journal file does not exist: {journal:?}");
}
readable
.push(Arc::new(LogFileJournal::new_readonly(journal)?) as Arc<DynReadableJournal>);
}
let mut writable = Vec::new();
for journal in self.writable_journals.clone() {
if self.enable_compaction {
let mut journal = CompactingLogFileJournal::new(journal)?;
if !self.without_compact_on_drop {
@ -417,12 +459,12 @@ impl Wasi {
if self.with_compact_on_growth.is_normal() && self.with_compact_on_growth != 0f32 {
journal = journal.with_compact_on_factor_size(self.with_compact_on_growth);
}
ret.push(Arc::new(journal) as Arc<DynJournal>);
writable.push(Arc::new(journal) as Arc<DynJournal>);
} else {
ret.push(Arc::new(LogFileJournal::new(journal)?));
writable.push(Arc::new(LogFileJournal::new(journal)?));
}
}
Ok(ret)
Ok((readable, writable))
}
#[cfg(not(feature = "journal"))]
@ -605,8 +647,14 @@ impl Wasi {
}
#[cfg(feature = "journal")]
for journal in self.build_journals()? {
rt.add_journal(journal);
{
let (r, w) = self.build_journals()?;
for journal in r {
rt.add_read_only_journal(journal);
}
for journal in w {
rt.add_writable_journal(journal);
}
}
if !self.no_tty {

View File

@ -198,6 +198,10 @@ pub struct AppConfigCapabilityMapV1 {
#[serde(skip_serializing_if = "Option::is_none")]
pub memory: Option<AppConfigCapabilityMemoryV1>,
/// Runtime settings.
#[serde(skip_serializing_if = "Option::is_none")]
pub runtime: Option<AppConfigCapabilityRuntimeV1>,
/// Enables app bootstrapping with startup snapshots.
#[serde(skip_serializing_if = "Option::is_none")]
pub instaboot: Option<AppConfigCapabilityInstaBootV1>,
@ -227,6 +231,19 @@ pub struct AppConfigCapabilityMemoryV1 {
pub limit: Option<ByteSize>,
}
/// Runtime capability settings.
#[derive(
serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
)]
pub struct AppConfigCapabilityRuntimeV1 {
/// Engine to use for an instance, e.g. wasmer_cranelift, wasmer_llvm, etc.
#[serde(skip_serializing_if = "Option::is_none")]
pub engine: Option<String>,
/// Whether to enable asynchronous threads/deep sleeping.
#[serde(skip_serializing_if = "Option::is_none")]
pub async_threads: Option<bool>,
}
/// Enables accelerated instance boot times with startup snapshots.
///
/// How it works:
@ -242,6 +259,10 @@ pub struct AppConfigCapabilityMemoryV1 {
serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
)]
pub struct AppConfigCapabilityInstaBootV1 {
/// The method to use to generate the instaboot snapshot for the instance.
#[serde(default)]
pub mode: Option<InstabootSnapshotModeV1>,
/// HTTP requests to perform during startup snapshot creation.
/// Apps can perform all the appropriate warmup logic in these requests.
///
@ -260,6 +281,33 @@ pub struct AppConfigCapabilityInstaBootV1 {
pub max_age: Option<PrettyDuration>,
}
/// How will an instance be bootstrapped?
#[derive(
serde::Serialize,
serde::Deserialize,
PartialEq,
Eq,
Hash,
Clone,
Debug,
schemars::JsonSchema,
Default,
)]
#[serde(rename_all = "snake_case")]
pub enum InstabootSnapshotModeV1 {
/// Start the instance without any snapshot triggers. Once the requests are done,
/// use [`snapshot_and_stop`](wasmer_wasix::WasiProcess::snapshot_and_stop) to
/// capture a snapshot and shut the instance down.
#[default]
Bootstrap,
/// Explicitly enable the given snapshot triggers before starting the instance.
/// The instance's process will have its stop_running_after_checkpoint flag set,
/// so the first snapshot will cause the instance to shut down.
// FIXME: make this strongly typed
Triggers(Vec<String>),
}
/// App redirect configuration.
#[derive(
serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,

View File

@ -61,7 +61,7 @@ impl From<Range<u64>> for MemoryRange {
/// on the final deterministic outcome of the entire log.
///
/// By grouping events into subevents it makes it possible to ignore an
/// entire subgroup of events which are superseeded by a later event. For
/// entire subgroup of events which are superceded by a later event. For
/// example, all the events involved in creating a file are irrelevant if
/// that file is later deleted.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
@ -230,6 +230,15 @@ impl State {
filter.build_split(writer, reader)
}
fn insert_new_sub_events_empty(&mut self) -> SubGroupIndex {
let lookup = SubGroupIndex(self.descriptor_seed);
self.descriptor_seed += 1;
self.sub_events.entry(lookup).or_default();
lookup
}
fn insert_new_sub_events(&mut self, event_index: usize) -> SubGroupIndex {
let lookup = SubGroupIndex(self.descriptor_seed);
self.descriptor_seed += 1;
@ -299,6 +308,10 @@ impl State {
self.stdio_descriptors.clear();
self.suspect_descriptors.clear();
self.thread_map.clear();
for i in 0..=2 {
let lookup = self.insert_new_sub_events_empty();
self.stdio_descriptors.insert(i, lookup);
}
}
}
@ -335,7 +348,7 @@ impl CompactingJournal {
J: Journal,
{
let (tx, rx) = inner.split();
let state = State {
let mut state = State {
inner_tx: tx,
inner_rx: rx.as_restarted()?,
tty: None,
@ -364,6 +377,11 @@ impl CompactingJournal {
delta_list: None,
event_index: 0,
};
// stdio FDs are always created for a process initially, fill them out here
for i in 0..=2 {
let lookup = state.insert_new_sub_events_empty();
state.stdio_descriptors.insert(i, lookup);
}
Ok(Self {
tx: CompactingJournalTx {
state: Arc::new(Mutex::new(state)),
@ -584,12 +602,6 @@ impl WritableJournal for CompactingJournalTx {
state.keep_descriptors.insert(*fd, lookup);
}
// If its stdio then we need to create the descriptor if its not there already
if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
let lookup = state.insert_new_sub_events(event_index);
state.stdio_descriptors.insert(*fd, lookup);
}
// Update the state
if let Some(state) = state
.find_sub_events(fd)
@ -608,19 +620,11 @@ impl WritableJournal for CompactingJournalTx {
}
}
}
// Seeks to a particular position within
// We keep non-mutable events for file descriptors that are suspect
JournalEntry::FileDescriptorSeekV1 { fd, .. }
| JournalEntry::FileDescriptorSetFdFlagsV1 { fd, .. }
| JournalEntry::FileDescriptorSetFlagsV1 { fd, .. } => {
// If its stdio then we need to create the descriptor if its not there already
if *fd <= 3 && !state.stdio_descriptors.contains_key(fd) {
let lookup = state.insert_new_sub_events(event_index);
state.stdio_descriptors.insert(*fd, lookup);
}
state.find_sub_events_and_append(fd, event_index);
}
// We keep non-mutable events for file descriptors that are suspect
JournalEntry::SocketBindV1 { fd, .. }
| JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
| JournalEntry::SocketBindV1 { fd, .. }
| JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
| JournalEntry::SocketSendToV1 { fd, .. }
| JournalEntry::SocketSendV1 { fd, .. }
@ -669,36 +673,50 @@ impl WritableJournal for CompactingJournalTx {
} => {
if let Some(lookup) = state.suspect_descriptors.get(original_fd).cloned() {
state.suspect_descriptors.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.keep_descriptors.get(original_fd).cloned() {
state.keep_descriptors.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.stdio_descriptors.get(original_fd).cloned() {
state.stdio_descriptors.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.open_pipes.get(original_fd).cloned() {
state.open_pipes.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.open_sockets.get(original_fd).cloned() {
state.open_sockets.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.accepted_sockets.get(original_fd).cloned() {
state.accepted_sockets.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.event_descriptors.get(original_fd).cloned() {
state.event_descriptors.insert(*copied_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
}
}
// Renumbered file descriptors will retain their suspect status
JournalEntry::RenumberFileDescriptorV1 { old_fd, new_fd } => {
if let Some(lookup) = state.suspect_descriptors.remove(old_fd) {
state.suspect_descriptors.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.keep_descriptors.remove(old_fd) {
state.keep_descriptors.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.stdio_descriptors.remove(old_fd) {
state.stdio_descriptors.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.open_pipes.remove(old_fd) {
state.open_pipes.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.open_sockets.remove(old_fd) {
state.open_sockets.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.open_sockets.remove(old_fd) {
state.accepted_sockets.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
} else if let Some(lookup) = state.event_descriptors.remove(old_fd) {
state.event_descriptors.insert(*new_fd, lookup);
state.append_to_sub_events(&lookup, event_index);
}
}
// Creating a new directory only needs to be done once

View File

@ -100,7 +100,7 @@ impl<'a> fmt::Display for JournalEntry<'a> {
..
} => write!(
f,
"thread-update (id={}, call-stack.len={}, mem-stack.len={}, store-size={}",
"thread-update (id={}, call-stack.len={}, mem-stack.len={}, store-size={})",
id,
call_stack.len(),
memory_stack.len(),
@ -287,7 +287,7 @@ impl<'a> fmt::Display for JournalEntry<'a> {
write!(f, "sock-send-to (fd={}, data.len={}, addr={})", fd, data.len(), addr)
}
JournalEntry::SocketSendV1 { fd, data, .. } => {
write!(f, "sock-send (fd={}, data.len={}", fd, data.len())
write!(f, "sock-send (fd={}, data.len={})", fd, data.len())
}
JournalEntry::SocketSetOptFlagV1 { fd, opt, flag } => {
write!(f, "sock-set-opt (fd={fd}, opt={opt:?}, flag={flag})")

View File

@ -95,7 +95,9 @@ pub trait ReadableJournal: std::fmt::Debug {
/// a WASM process at a point in time and saves it so that it can be restored.
/// It also allows for the restoration of that state at a later moment
#[allow(unused_variables)]
pub trait Journal: WritableJournal + ReadableJournal + std::fmt::Debug {
pub trait Journal:
WritableJournal + ReadableJournal + AsDynReadableJournal + std::fmt::Debug
{
/// Splits the journal into a read and write side
fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>);
}
@ -103,3 +105,14 @@ pub trait Journal: WritableJournal + ReadableJournal + std::fmt::Debug {
pub type DynJournal = dyn Journal + Send + Sync;
pub type DynWritableJournal = dyn WritableJournal + Send + Sync;
pub type DynReadableJournal = dyn ReadableJournal + Send + Sync;
/// A bit of manual up-casting support
pub trait AsDynReadableJournal {
fn as_dyn_readable_journal(&self) -> &DynReadableJournal;
}
impl<T: Journal + Send + Sync + 'static> AsDynReadableJournal for T {
fn as_dyn_readable_journal(&self) -> &DynReadableJournal {
self
}
}

View File

@ -1,3 +1,5 @@
use std::fmt::Display;
use super::*;
/// Various triggers that will cause the runtime to take snapshot
@ -81,3 +83,28 @@ impl FromStr for SnapshotTrigger {
})
}
}
impl Display for SnapshotTrigger {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Bootstrap => "bootstrap",
Self::Explicit => "explicit",
Self::FirstEnviron => "first-environ",
Self::FirstListen => "first-listen",
Self::FirstSigint => "first-sigint",
Self::FirstStdin => "first-stdin",
Self::Idle => "idle",
Self::NonDeterministicCall => "non-deterministic-call",
Self::PeriodicInterval => "periodic-interval",
Self::Sigalrm => "sigalrm",
Self::Sigint => "sigint",
Self::Sigtstp => "sigtstp",
Self::Sigstop => "sigstop",
Self::Transaction => "transaction",
}
)
}
}

View File

@ -83,6 +83,7 @@ const STDIN_DEFAULT_RIGHTS: Rights = {
| Rights::FD_SYNC.bits()
| Rights::FD_ADVISE.bits()
| Rights::FD_FILESTAT_GET.bits()
| Rights::FD_FDSTAT_SET_FLAGS.bits()
| Rights::POLL_FD_READWRITE.bits(),
)
};
@ -95,6 +96,7 @@ const STDOUT_DEFAULT_RIGHTS: Rights = {
| Rights::FD_WRITE.bits()
| Rights::FD_ADVISE.bits()
| Rights::FD_FILESTAT_GET.bits()
| Rights::FD_FDSTAT_SET_FLAGS.bits()
| Rights::POLL_FD_READWRITE.bits(),
)
};

View File

@ -33,6 +33,7 @@ impl JournalEffector {
)
})?;
if ret_fd != copied_fd {
let ret = crate::syscalls::fd_renumber_internal(ctx, ret_fd, copied_fd);
if !matches!(ret, Ok(Errno::Success)) {
bail!(
@ -42,23 +43,6 @@ impl JournalEffector {
ret.unwrap_or(Errno::Unknown)
);
}
let ret = crate::syscalls::fd_fdflags_set_internal(
ctx,
copied_fd,
if cloexec {
Fdflagsext::CLOEXEC
} else {
Fdflagsext::empty()
},
);
if !matches!(ret, Ok(Errno::Success)) {
bail!(
"journal restore error: failed renumber file descriptor after duplicate (from={}, to={}) - {}",
ret_fd,
copied_fd,
ret.unwrap_or(Errno::Unknown)
);
}
Ok(())

View File

@ -12,6 +12,7 @@ use std::{
sync::{atomic::AtomicBool, Arc, Mutex},
};
use futures::future::Either;
use linked_hash_set::LinkedHashSet;
use tokio::sync::{mpsc, RwLock};
#[allow(unused_imports, dead_code)]
@ -229,7 +230,7 @@ impl Console {
.prepare_webc_env(
prog,
&wasi_opts,
Some(&pkg),
Either::Left(&pkg),
self.runtime.clone(),
Some(root_fs),
)

View File

@ -171,6 +171,9 @@ pub struct WasiProcessInner {
/// If true then the journaling will be disabled after the
/// next snapshot is taken
pub disable_journaling_after_checkpoint: bool,
/// If true then the process will stop running after the
/// next snapshot is taken
pub stop_running_after_checkpoint: bool,
/// List of situations that the process will checkpoint on
#[cfg(feature = "journal")]
pub snapshot_on: HashSet<SnapshotTrigger>,
@ -309,6 +312,14 @@ impl WasiProcessInner {
ctx.data().thread.set_checkpointing(false);
trace!("checkpoint finished");
if guard.stop_running_after_checkpoint {
trace!("will stop running now");
// Need to stop recording journal events so we don't also record the
// thread and process exit events
ctx.data_mut().enable_journal = false;
return OnCalledAction::Finish;
}
// Rewind the stack and carry on
return match rewind_ext::<M>(
&mut ctx,
@ -427,6 +438,7 @@ impl WasiProcess {
#[cfg(feature = "journal")]
snapshot_memory_hash: Default::default(),
disable_journaling_after_checkpoint: false,
stop_running_after_checkpoint: false,
backoff: WasiProcessCpuBackoff::new(max_cpu_backoff_time, max_cpu_cool_off_time),
}),
Condvar::new(),
@ -595,6 +607,21 @@ impl WasiProcess {
self.wait_for_checkpoint_finish()
}
/// Takes a snapshot of the process and shuts it down after the snapshot
/// is taken.
///
/// Note: If you ignore the returned future the checkpoint will still
/// occur but it will execute asynchronously
pub fn snapshot_and_stop(
&self,
trigger: SnapshotTrigger,
) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + Sync>> {
let mut guard = self.inner.0.lock().unwrap();
guard.stop_running_after_checkpoint = true;
guard.checkpoint = WasiProcessCheckpoint::Snapshot { trigger };
self.wait_for_checkpoint_finish()
}
/// Takes a snapshot of the process
///
/// Note: If you ignore the returned future the checkpoint will still
@ -614,6 +641,12 @@ impl WasiProcess {
guard.disable_journaling_after_checkpoint = true;
}
/// Stop running once a checkpoint is taken
pub fn stop_running_after_checkpoint(&self) {
let mut guard = self.inner.0.lock().unwrap();
guard.stop_running_after_checkpoint = true;
}
/// Wait for the checkout process to finish
#[cfg(not(feature = "journal"))]
pub fn wait_for_checkpoint(

View File

@ -74,9 +74,7 @@ impl crate::runners::Runner for DcgiRunner {
// file system changes as it is unable to run the main function more than
// once due to limitations in the runtime
let journals = runtime
.journals()
.clone()
.into_iter()
.writable_journals()
.map(|journal| {
let journal = FilteredJournalBuilder::new()
.with_ignore_memory(true)
@ -89,7 +87,7 @@ impl crate::runners::Runner for DcgiRunner {
Arc::new(journal) as Arc<DynJournal>
})
.collect::<Vec<_>>();
let runtime = OverriddenRuntime::new(runtime).with_journals(journals);
let runtime = OverriddenRuntime::new(runtime).with_writable_journals(journals);
let runtime = Arc::new(runtime) as Arc<DynRuntime>;
//We now pass the runtime to the handlers
@ -191,26 +189,45 @@ impl Config {
self.inner.capabilities()
}
#[cfg(feature = "journal")]
pub fn add_snapshot_trigger(&mut self, on: crate::journal::SnapshotTrigger) {
self.inner.add_snapshot_trigger(on);
}
#[cfg(feature = "journal")]
pub fn add_default_snapshot_triggers(&mut self) -> &mut Self {
self.inner.add_default_snapshot_triggers();
self
}
#[cfg(feature = "journal")]
pub fn has_snapshot_trigger(&self, on: crate::journal::SnapshotTrigger) -> bool {
self.inner.has_snapshot_trigger(on)
}
#[cfg(feature = "journal")]
pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
self.inner.with_snapshot_interval(period);
self
}
pub fn add_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
self.inner.add_journal(journal);
#[cfg(feature = "journal")]
pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
self.inner.with_stop_running_after_snapshot(stop_running);
}
#[cfg(feature = "journal")]
pub fn add_read_only_journal(
&mut self,
journal: Arc<crate::journal::DynReadableJournal>,
) -> &mut Self {
self.inner.add_read_only_journal(journal);
self
}
#[cfg(feature = "journal")]
pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
self.inner.add_writable_journal(journal);
self
}
}

View File

@ -58,17 +58,14 @@ impl DProxyInstanceFactory {
// DProxy is able to resume execution of the stateful workload using memory
// snapshots hence the journals it stores are complete journals
let journals = runtime
.journals()
.clone()
.into_iter()
.writable_journals()
.map(|journal| {
let tx = journal.clone();
let rx = journal.as_restarted()?;
let combined = RecombinedJournal::new(tx, rx);
let combined = RecombinedJournal::new(journal, rx);
anyhow::Result::Ok(Arc::new(combined) as Arc<DynJournal>)
})
.collect::<anyhow::Result<Vec<_>>>()?;
let mut runtime = OverriddenRuntime::new(runtime).with_journals(journals);
let mut runtime = OverriddenRuntime::new(runtime).with_writable_journals(journals);
// We attach a composite networking to the runtime which includes a loopback
// networking implementation connected to a socket manager

View File

@ -3,15 +3,17 @@
use std::{path::PathBuf, sync::Arc};
use anyhow::{Context, Error};
use futures::future::Either;
use tracing::Instrument;
use virtual_fs::{ArcBoxFile, FileSystem, TmpFileSystem, VirtualFile};
use wasmer::{Extern, Module};
use wasmer_types::ModuleHash;
use webc::metadata::{annotations::Wasi, Command};
use crate::{
bin_factory::BinaryPackage,
capabilities::Capabilities,
journal::{DynJournal, SnapshotTrigger},
journal::{DynJournal, DynReadableJournal, SnapshotTrigger},
runners::{wasi_common::CommonWasiOptions, MappedDirectory, MountedDirectory},
runtime::task_manager::VirtualTaskManagerExt,
Runtime, WasiEnvBuilder, WasiError, WasiRuntimeError,
@ -168,11 +170,13 @@ impl WasiRunner {
self
}
#[cfg(feature = "journal")]
pub fn with_snapshot_trigger(&mut self, on: SnapshotTrigger) -> &mut Self {
self.wasi.snapshot_on.push(on);
self
}
#[cfg(feature = "journal")]
pub fn with_default_snapshot_triggers(&mut self) -> &mut Self {
for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
if !self.has_snapshot_trigger(on) {
@ -182,10 +186,12 @@ impl WasiRunner {
self
}
#[cfg(feature = "journal")]
pub fn has_snapshot_trigger(&self, on: SnapshotTrigger) -> bool {
self.wasi.snapshot_on.iter().any(|t| *t == on)
}
#[cfg(feature = "journal")]
pub fn with_snapshot_interval(&mut self, period: std::time::Duration) -> &mut Self {
if !self.has_snapshot_trigger(SnapshotTrigger::PeriodicInterval) {
self.with_snapshot_trigger(SnapshotTrigger::PeriodicInterval);
@ -194,8 +200,26 @@ impl WasiRunner {
self
}
pub fn with_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
self.wasi.journals.push(journal);
#[cfg(feature = "journal")]
pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) -> &mut Self {
self.wasi.stop_running_after_snapshot = stop_running;
self
}
#[cfg(feature = "journal")]
pub fn with_read_only_journal(&mut self, journal: Arc<DynReadableJournal>) -> &mut Self {
self.wasi.read_only_journals.push(journal);
self
}
#[cfg(feature = "journal")]
pub fn with_writable_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
self.wasi.writable_journals.push(journal);
self
}
pub fn with_skip_stdio_during_bootstrap(&mut self, skip: bool) -> &mut Self {
self.wasi.skip_stdio_during_bootstrap = skip;
self
}
@ -246,19 +270,23 @@ impl WasiRunner {
&self,
program_name: &str,
wasi: &Wasi,
pkg: Option<&BinaryPackage>,
pkg_or_module_hash: Either<&BinaryPackage, ModuleHash>,
runtime: Arc<dyn Runtime + Send + Sync>,
root_fs: Option<TmpFileSystem>,
) -> Result<WasiEnvBuilder, anyhow::Error> {
let mut builder = WasiEnvBuilder::new(program_name).runtime(runtime);
let container_fs = if let Some(pkg) = pkg {
let container_fs = match pkg_or_module_hash {
Either::Left(pkg) => {
builder.add_webc(pkg.clone());
builder.set_module_hash(pkg.hash());
builder.include_packages(pkg.package_ids.clone());
Some(Arc::clone(&pkg.webc_fs))
} else {
}
Either::Right(hash) => {
builder.set_module_hash(hash);
None
}
};
if self.wasi.is_home_mapped {
@ -294,11 +322,17 @@ impl WasiRunner {
runtime: Arc<dyn Runtime + Send + Sync>,
program_name: &str,
module: Module,
module_hash: ModuleHash,
) -> Result<(), Error> {
let wasi = webc::metadata::annotations::Wasi::new(program_name);
let mut builder =
self.prepare_webc_env(program_name, &wasi, None, runtime.clone(), None)?;
let mut builder = self.prepare_webc_env(
program_name,
&wasi,
Either::Right(module_hash),
runtime.clone(),
None,
)?;
#[cfg(feature = "ctrlc")]
{
@ -307,28 +341,34 @@ impl WasiRunner {
#[cfg(feature = "journal")]
{
for journal in self.wasi.journals.clone() {
builder.add_journal(journal);
for journal in self.wasi.read_only_journals.iter().cloned() {
builder.add_read_only_journal(journal);
}
for journal in self.wasi.writable_journals.iter().cloned() {
builder.add_writable_journal(journal);
}
if !self.wasi.snapshot_on.is_empty() {
for trigger in self.wasi.snapshot_on.iter().cloned() {
builder.add_snapshot_trigger(trigger);
}
} else if !self.wasi.journals.is_empty() {
} else if !self.wasi.writable_journals.is_empty() {
for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
builder.add_snapshot_trigger(on);
}
}
if let Some(period) = self.wasi.snapshot_interval {
if self.wasi.journals.is_empty() {
if self.wasi.writable_journals.is_empty() {
return Err(anyhow::format_err!(
"If you specify a snapshot interval then you must also specify a journal file"
"If you specify a snapshot interval then you must also specify a writable journal file"
));
}
builder.with_snapshot_interval(period);
}
builder.with_stop_running_after_snapshot(self.wasi.stop_running_after_snapshot);
builder.with_skip_stdio_during_bootstrap(self.wasi.skip_stdio_during_bootstrap);
}
let env = builder.build()?;
@ -402,33 +442,44 @@ impl crate::runners::Runner for WasiRunner {
#[allow(unused_mut)]
let mut builder = self
.prepare_webc_env(exec_name, &wasi, Some(pkg), Arc::clone(&runtime), None)
.prepare_webc_env(
exec_name,
&wasi,
Either::Left(pkg),
Arc::clone(&runtime),
None,
)
.context("Unable to prepare the WASI environment")?;
#[cfg(feature = "journal")]
{
for journal in self.wasi.journals.clone() {
builder.add_journal(journal);
for journal in self.wasi.read_only_journals.iter().cloned() {
builder.add_read_only_journal(journal);
}
for journal in self.wasi.writable_journals.iter().cloned() {
builder.add_writable_journal(journal);
}
if !self.wasi.snapshot_on.is_empty() {
for trigger in self.wasi.snapshot_on.iter().cloned() {
builder.add_snapshot_trigger(trigger);
}
} else if !self.wasi.journals.is_empty() {
} else if !self.wasi.writable_journals.is_empty() {
for on in crate::journal::DEFAULT_SNAPSHOT_TRIGGERS {
builder.add_snapshot_trigger(on);
}
}
if let Some(period) = self.wasi.snapshot_interval {
if self.wasi.journals.is_empty() {
if self.wasi.writable_journals.is_empty() {
return Err(anyhow::format_err!(
"If you specify a snapshot interval then you must also specify a journal file"
));
}
builder.with_snapshot_interval(period);
}
builder.with_stop_running_after_snapshot(self.wasi.stop_running_after_snapshot);
}
let env = builder.build()?;
@ -516,6 +567,8 @@ mod tests {
async fn test_volume_mount_without_webcs() {
use std::sync::Arc;
use crate::utils::xxhash_random;
let root_fs = virtual_fs::RootFileSystemBuilder::new().build();
let tokrt = tokio::runtime::Handle::current();
@ -537,7 +590,13 @@ mod tests {
let rt = crate::PluggableRuntime::new(tm);
let envb = envb
.prepare_webc_env("test", &annotations, None, Arc::new(rt), Some(root_fs))
.prepare_webc_env(
"test",
&annotations,
Either::Right(xxhash_random()),
Arc::new(rt),
Some(root_fs),
)
.unwrap();
let init = envb.build_init().unwrap();
@ -587,7 +646,7 @@ mod tests {
.prepare_webc_env(
"test",
&annotations,
Some(&binpkg),
Either::Left(&binpkg),
Arc::new(rt),
Some(root_fs),
)

View File

@ -14,7 +14,7 @@ use webc::metadata::annotations::Wasi as WasiAnnotation;
use crate::{
bin_factory::BinaryPackage,
capabilities::Capabilities,
journal::{DynJournal, SnapshotTrigger},
journal::{DynJournal, DynReadableJournal, SnapshotTrigger},
WasiEnvBuilder,
};
@ -40,9 +40,12 @@ pub(crate) struct CommonWasiOptions {
pub(crate) is_tmp_mapped: bool,
pub(crate) injected_packages: Vec<BinaryPackage>,
pub(crate) capabilities: Capabilities,
pub(crate) journals: Vec<Arc<DynJournal>>,
pub(crate) read_only_journals: Vec<Arc<DynReadableJournal>>,
pub(crate) writable_journals: Vec<Arc<DynJournal>>,
pub(crate) snapshot_on: Vec<SnapshotTrigger>,
pub(crate) snapshot_interval: Option<std::time::Duration>,
pub(crate) stop_running_after_snapshot: bool,
pub(crate) skip_stdio_during_bootstrap: bool,
pub(crate) current_dir: Option<PathBuf>,
pub(crate) additional_imports: Imports,
}
@ -94,6 +97,23 @@ impl CommonWasiOptions {
builder.add_imports(&self.additional_imports);
#[cfg(feature = "journal")]
{
for journal in &self.read_only_journals {
builder.add_read_only_journal(journal.clone());
}
for journal in &self.writable_journals {
builder.add_writable_journal(journal.clone());
}
for trigger in &self.snapshot_on {
builder.add_snapshot_trigger(*trigger);
}
if let Some(interval) = self.snapshot_interval {
builder.with_snapshot_interval(interval);
}
builder.with_stop_running_after_snapshot(self.stop_running_after_snapshot);
}
Ok(())
}

View File

@ -325,8 +325,22 @@ impl Config {
}
#[cfg(feature = "journal")]
pub fn add_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
self.wasi.journals.push(journal);
pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
self.wasi.stop_running_after_snapshot = stop_running;
}
#[cfg(feature = "journal")]
pub fn add_read_only_journal(
&mut self,
journal: Arc<crate::journal::DynReadableJournal>,
) -> &mut Self {
self.wasi.read_only_journals.push(journal);
self
}
#[cfg(feature = "journal")]
pub fn add_writable_journal(&mut self, journal: Arc<crate::journal::DynJournal>) -> &mut Self {
self.wasi.writable_journals.push(journal);
self
}
}

View File

@ -19,7 +19,7 @@ use wasmer::{Module, RuntimeError};
use wasmer_wasix_types::wasi::ExitCode;
#[cfg(feature = "journal")]
use crate::journal::DynJournal;
use crate::journal::{DynJournal, DynReadableJournal};
use crate::{
bin_factory::BinaryPackageCommand,
http::{DynHttpClient, HttpClient},
@ -143,11 +143,17 @@ where
/// for multiple reasons however the most common is a panic within the process
fn on_taint(&self, _reason: TaintReason) {}
/// The list of journals which will be used to restore the state of the
/// The list of all read-only journals which will be used to restore the state of the
/// runtime at a particular point in time
#[cfg(feature = "journal")]
fn journals(&self) -> &'_ Vec<Arc<DynJournal>> {
&EMPTY_JOURNAL_LIST
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
Box::new(std::iter::empty())
}
/// The list of writable journals which will be appended to
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
Box::new(std::iter::empty())
}
/// The snapshot capturer takes and restores snapshots of the WASM process at specific
@ -160,9 +166,6 @@ where
pub type DynRuntime = dyn Runtime + Send + Sync;
#[cfg(feature = "journal")]
static EMPTY_JOURNAL_LIST: Vec<Arc<DynJournal>> = Vec::new();
/// Load a a Webassembly module, trying to use a pre-compiled version if possible.
///
// This function exists to provide a reusable baseline implementation for
@ -239,7 +242,9 @@ pub struct PluggableRuntime {
pub module_cache: Arc<dyn ModuleCache + Send + Sync>,
pub tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
#[cfg(feature = "journal")]
pub journals: Vec<Arc<DynJournal>>,
pub read_only_journals: Vec<Arc<DynReadableJournal>>,
#[cfg(feature = "journal")]
pub writable_journals: Vec<Arc<DynJournal>>,
}
impl PluggableRuntime {
@ -275,7 +280,9 @@ impl PluggableRuntime {
package_loader: Arc::new(loader),
module_cache: Arc::new(module_cache::in_memory()),
#[cfg(feature = "journal")]
journals: Vec::new(),
read_only_journals: Vec::new(),
#[cfg(feature = "journal")]
writable_journals: Vec::new(),
}
}
@ -327,8 +334,14 @@ impl PluggableRuntime {
}
#[cfg(feature = "journal")]
pub fn add_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
self.journals.push(journal);
pub fn add_read_only_journal(&mut self, journal: Arc<DynReadableJournal>) -> &mut Self {
self.read_only_journals.push(journal);
self
}
#[cfg(feature = "journal")]
pub fn add_writable_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
self.writable_journals.push(journal);
self
}
}
@ -374,13 +387,18 @@ impl Runtime for PluggableRuntime {
}
#[cfg(feature = "journal")]
fn journals(&self) -> &'_ Vec<Arc<DynJournal>> {
&self.journals
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
Box::new(self.read_only_journals.iter().cloned())
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
Box::new(self.writable_journals.iter().cloned())
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&DynJournal> {
self.journals.iter().last().map(|a| a.as_ref())
self.writable_journals.iter().last().map(|a| a.as_ref())
}
}
@ -398,7 +416,9 @@ pub struct OverriddenRuntime {
module_cache: Option<Arc<dyn ModuleCache + Send + Sync>>,
tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
#[cfg(feature = "journal")]
journals: Option<Vec<Arc<DynJournal>>>,
pub read_only_journals: Option<Vec<Arc<DynReadableJournal>>>,
#[cfg(feature = "journal")]
pub writable_journals: Option<Vec<Arc<DynJournal>>>,
}
impl OverriddenRuntime {
@ -414,7 +434,9 @@ impl OverriddenRuntime {
module_cache: None,
tty: None,
#[cfg(feature = "journal")]
journals: None,
read_only_journals: None,
#[cfg(feature = "journal")]
writable_journals: None,
}
}
@ -456,15 +478,20 @@ impl OverriddenRuntime {
self
}
#[cfg(feature = "journal")]
pub fn with_tty(mut self, tty: Arc<dyn TtyBridge + Send + Sync>) -> Self {
self.tty.replace(tty);
self
}
#[cfg(feature = "journal")]
pub fn with_journals(mut self, journals: Vec<Arc<DynJournal>>) -> Self {
self.journals.replace(journals);
pub fn with_read_only_journals(mut self, journals: Vec<Arc<DynReadableJournal>>) -> Self {
self.read_only_journals.replace(journals);
self
}
#[cfg(feature = "journal")]
pub fn with_writable_journals(mut self, journals: Vec<Arc<DynJournal>>) -> Self {
self.writable_journals.replace(journals);
self
}
}
@ -543,17 +570,26 @@ impl Runtime for OverriddenRuntime {
}
#[cfg(feature = "journal")]
fn journals(&self) -> &'_ Vec<Arc<DynJournal>> {
if let Some(journals) = self.journals.as_ref() {
journals
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
if let Some(journals) = self.read_only_journals.as_ref() {
Box::new(journals.iter().cloned())
} else {
self.inner.journals()
self.inner.read_only_journals()
}
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
if let Some(journals) = self.writable_journals.as_ref() {
Box::new(journals.iter().cloned())
} else {
self.inner.writable_journals()
}
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&'_ DynJournal> {
if let Some(journals) = self.journals.as_ref() {
if let Some(journals) = self.writable_journals.as_ref() {
journals.iter().last().map(|a| a.as_ref())
} else {
self.inner.active_journal()

View File

@ -13,7 +13,7 @@ use wasmer::{AsStoreMut, Extern, Imports, Instance, Module, Store};
use wasmer_config::package::PackageId;
#[cfg(feature = "journal")]
use crate::journal::{DynJournal, SnapshotTrigger};
use crate::journal::{DynJournal, DynReadableJournal, SnapshotTrigger};
use crate::{
bin_factory::{BinFactory, BinaryPackage},
capabilities::Capabilities,
@ -92,7 +92,15 @@ pub struct WasiEnvBuilder {
pub(super) snapshot_interval: Option<std::time::Duration>,
#[cfg(feature = "journal")]
pub(super) journals: Vec<Arc<DynJournal>>,
pub(super) stop_running_after_snapshot: bool,
#[cfg(feature = "journal")]
pub(super) read_only_journals: Vec<Arc<DynReadableJournal>>,
#[cfg(feature = "journal")]
pub(super) writable_journals: Vec<Arc<DynJournal>>,
pub(super) skip_stdio_during_bootstrap: bool,
#[cfg(feature = "ctrlc")]
pub(super) attach_ctrl_c: bool,
@ -610,6 +618,16 @@ impl WasiEnvBuilder {
Ok(self)
}
/// Specifies one or more journal files that Wasmer will use to restore
/// the state of the WASM process.
///
/// The state of the WASM process and its sandbox will be reapplied use
/// the journals in the order that you specify here.
#[cfg(feature = "journal")]
pub fn add_read_only_journal(&mut self, journal: Arc<DynReadableJournal>) {
self.read_only_journals.push(journal);
}
/// Specifies one or more journal files that Wasmer will use to restore
/// the state of the WASM process.
///
@ -620,8 +638,8 @@ impl WasiEnvBuilder {
/// and opened for read and write. New journal events will be written to this
/// file
#[cfg(feature = "journal")]
pub fn add_journal(&mut self, journal: Arc<DynJournal>) {
self.journals.push(journal);
pub fn add_writable_journal(&mut self, journal: Arc<DynJournal>) {
self.writable_journals.push(journal);
}
pub fn get_current_dir(&mut self) -> Option<PathBuf> {
@ -740,6 +758,15 @@ impl WasiEnvBuilder {
self.snapshot_interval.replace(interval);
}
#[cfg(feature = "journal")]
pub fn with_stop_running_after_snapshot(&mut self, stop_running: bool) {
self.stop_running_after_snapshot = stop_running;
}
pub fn with_skip_stdio_during_bootstrap(&mut self, skip: bool) {
self.skip_stdio_during_bootstrap = skip;
}
/// Add an item to the list of importable items provided to the instance.
pub fn import(
mut self,
@ -945,8 +972,12 @@ impl WasiEnvBuilder {
#[allow(unused_mut)]
let mut runtime = crate::runtime::PluggableRuntime::new(Arc::new(crate::runtime::task_manager::tokio::TokioTaskManager::default()));
#[cfg(feature = "journal")]
for journal in self.journals.clone() {
runtime.add_journal(journal);
for journal in self.read_only_journals.clone() {
runtime.add_read_only_journal(journal);
}
#[cfg(feature = "journal")]
for journal in self.writable_journals.clone() {
runtime.add_writable_journal(journal);
}
Arc::new(runtime)
}
@ -983,13 +1014,17 @@ impl WasiEnvBuilder {
process: None,
thread: None,
#[cfg(feature = "journal")]
call_initialize: self.journals.is_empty(),
call_initialize: self.read_only_journals.is_empty()
&& self.writable_journals.is_empty(),
#[cfg(not(feature = "journal"))]
call_initialize: true,
can_deep_sleep: false,
extra_tracing: true,
#[cfg(feature = "journal")]
snapshot_on: self.snapshot_on,
#[cfg(feature = "journal")]
stop_running_after_snapshot: self.stop_running_after_snapshot,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
additional_imports: self.additional_imports,
};

View File

@ -249,6 +249,13 @@ pub struct WasiEnvInit {
/// Indicates triggers that will cause a snapshot to be taken
#[cfg(feature = "journal")]
pub snapshot_on: Vec<SnapshotTrigger>,
/// Stop running after the first snapshot is taken
#[cfg(feature = "journal")]
pub stop_running_after_snapshot: bool,
/// Skip writes to stdout and stderr when bootstrapping from a journal
pub skip_stdio_during_bootstrap: bool,
}
impl WasiEnvInit {
@ -288,6 +295,9 @@ impl WasiEnvInit {
extra_tracing: false,
#[cfg(feature = "journal")]
snapshot_on: self.snapshot_on.clone(),
#[cfg(feature = "journal")]
stop_running_after_snapshot: self.stop_running_after_snapshot,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
additional_imports: self.additional_imports.clone(),
}
}
@ -334,6 +344,9 @@ pub struct WasiEnv {
/// (and hence it should not record new events)
pub replaying_journal: bool,
/// Should stdio be skipped when bootstrapping this module from an existing journal?
pub skip_stdio_during_bootstrap: bool,
/// Flag that indicates the cleanup of the environment is to be disabled
/// (this is normally used so that the instance can be reused later on)
pub(crate) disable_fs_cleanup: bool,
@ -370,6 +383,7 @@ impl Clone for WasiEnv {
enable_journal: self.enable_journal,
enable_exponential_cpu_backoff: self.enable_exponential_cpu_backoff,
replaying_journal: self.replaying_journal,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
disable_fs_cleanup: self.disable_fs_cleanup,
}
}
@ -410,6 +424,7 @@ impl WasiEnv {
enable_journal: self.enable_journal,
enable_exponential_cpu_backoff: self.enable_exponential_cpu_backoff,
replaying_journal: false,
skip_stdio_during_bootstrap: self.skip_stdio_during_bootstrap,
disable_fs_cleanup: self.disable_fs_cleanup,
};
Ok((new_env, handle))
@ -511,7 +526,9 @@ impl WasiEnv {
#[cfg(feature = "journal")]
{
process.inner.0.lock().unwrap().snapshot_on = init.snapshot_on.into_iter().collect();
let mut guard = process.inner.0.lock().unwrap();
guard.snapshot_on = init.snapshot_on.into_iter().collect();
guard.stop_running_after_checkpoint = init.stop_running_after_snapshot;
}
let layout = WasiMemoryLayout::default();
@ -536,6 +553,7 @@ impl WasiEnv {
#[cfg(not(feature = "journal"))]
enable_journal: false,
replaying_journal: false,
skip_stdio_during_bootstrap: init.skip_stdio_during_bootstrap,
enable_deep_sleep: init.capabilities.threading.enable_asynchronous_threading,
enable_exponential_cpu_backoff: init
.capabilities

View File

@ -340,14 +340,40 @@ impl WasiFunctionEnv {
{
// If there are journals we need to restore then do so (this will
// prevent the initialization function from running
let restore_journals = self.data(&store).runtime.journals().clone();
if !restore_journals.is_empty() {
let restore_ro_journals = self
.data(&store)
.runtime
.read_only_journals()
.collect::<Vec<_>>();
let restore_w_journals = self
.data(&store)
.runtime
.writable_journals()
.collect::<Vec<_>>();
if !restore_ro_journals.is_empty() || !restore_w_journals.is_empty() {
tracing::trace!("replaying journal=true");
self.data_mut(&mut store).replaying_journal = true;
for journal in restore_journals {
for journal in restore_ro_journals {
let ctx = self.env.clone().into_mut(&mut store);
let rewind = match restore_snapshot(ctx, journal, true) {
let rewind = match restore_snapshot(ctx, journal.as_ref(), true) {
Ok(r) => r,
Err(err) => {
tracing::trace!("replaying journal=false (err={:?})", err);
self.data_mut(&mut store).replaying_journal = false;
return Err(err);
}
};
rewind_state = rewind.map(|rewind| (rewind, RewindResultType::RewindRestart));
}
for journal in restore_w_journals {
let ctx = self.env.clone().into_mut(&mut store);
let rewind = match restore_snapshot(
ctx,
journal.as_ref().as_dyn_readable_journal(),
true,
) {
Ok(r) => r,
Err(err) => {
tracing::trace!("replaying journal=false (err={:?})", err);

View File

@ -11,11 +11,23 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
) -> Result<(), WasiRuntimeError> {
tracing::trace!(%fd, %offset, "Replay journal - FdWrite");
if self.stdout_fds.contains(&fd) {
self.stdout.push((offset, data, is_64bit));
if let Some(x) = self.stdout.as_mut() {
x.push(JournalStdIoWrite {
offset,
data,
is_64bit,
});
}
return Ok(());
}
if self.stderr_fds.contains(&fd) {
self.stderr.push((offset, data, is_64bit));
if let Some(x) = self.stdout.as_mut() {
x.push(JournalStdIoWrite {
offset,
data,
is_64bit,
});
}
return Ok(());
}

View File

@ -22,7 +22,7 @@ mod update_memory;
use crate::journal::JournalEffector;
use crate::syscalls::anyhow_err_to_runtime_err;
use crate::syscalls::JournalSyscallPlayer;
use crate::syscalls::{JournalStdIoWrite, JournalSyscallPlayer};
use crate::RewindState;
use crate::WasiRuntimeError;
use crate::WasiThreadId;

View File

@ -7,12 +7,19 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
) {
tracing::trace!("Replay journal - ClearEthereal");
self.spawn_threads.clear();
self.stdout.clear();
self.stderr.clear();
if let Some(x) = self.stdout.as_mut() {
x.clear();
}
self.stdout_fds.clear();
self.stderr_fds.clear();
self.stdout_fds.insert(1 as WasiFd);
if let Some(x) = self.stderr.as_mut() {
x.clear();
}
self.stderr_fds.clear();
self.stderr_fds.insert(2 as WasiFd);
differ_ethereal.iter_mut().for_each(|e| e.clear());
self.staged_differ_memory.clear();
}

View File

@ -27,6 +27,12 @@ use std::{collections::BTreeMap, ops::Range};
use super::*;
pub struct JournalStdIoWrite<'a> {
pub offset: u64,
pub data: Cow<'a, [u8]>,
pub is_64bit: bool,
}
pub struct JournalSyscallPlayer<'a, 'c> {
pub ctx: FunctionEnvMut<'c, WasiEnv>,
pub bootstrapping: bool,
@ -45,16 +51,18 @@ pub struct JournalSyscallPlayer<'a, 'c> {
pub differ_memory: Vec<(Range<u64>, Cow<'a, [u8]>)>,
// We capture the stdout and stderr while we replay
pub stdout: Vec<(u64, Cow<'a, [u8]>, bool)>,
pub stderr: Vec<(u64, Cow<'a, [u8]>, bool)>,
pub stdout: Option<Vec<JournalStdIoWrite<'a>>>,
pub stderr: Option<Vec<JournalStdIoWrite<'a>>>,
pub stdout_fds: HashSet<u32>,
pub stderr_fds: HashSet<u32>,
}
impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
pub fn new(mut ctx: FunctionEnvMut<'c, WasiEnv>, bootstrapping: bool) -> Self {
let cur_module_hash: Box<[u8]> = Box::from(ctx.data().process.module_hash.as_bytes());
let mut ret = JournalSyscallPlayer {
let env = ctx.data();
let keep_stdio = !env.skip_stdio_during_bootstrap;
let cur_module_hash: Box<[u8]> = Box::from(env.process.module_hash.as_bytes());
JournalSyscallPlayer {
ctx,
bootstrapping,
cur_module_hash,
@ -64,17 +72,12 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> {
spawn_threads: Default::default(),
staged_differ_memory: Default::default(),
differ_memory: Default::default(),
stdout: Default::default(),
stderr: Default::default(),
stdout_fds: Default::default(),
stderr_fds: Default::default(),
// We capture stdout and stderr while we replay
stdout_fds: [1 as WasiFd].into(),
stderr_fds: [2 as WasiFd].into(),
stdout: keep_stdio.then(Default::default),
stderr: keep_stdio.then(Default::default),
real_fd: Default::default(),
};
// We capture the stdout and stderr while we replay
ret.stdout_fds.insert(1 as WasiFd);
ret.stderr_fds.insert(2 as WasiFd);
ret
}
}
}

View File

@ -8,7 +8,7 @@ use super::*;
#[tracing::instrument(skip_all)]
pub unsafe fn restore_snapshot(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
journal: Arc<DynJournal>,
journal: &DynReadableJournal,
bootstrapping: bool,
) -> Result<Option<RewindState>, WasiRuntimeError> {
use std::{collections::BTreeMap, ops::Range};
@ -22,7 +22,7 @@ pub unsafe fn restore_snapshot(
let mut ethereal_events = Vec::new();
while let Some(next) = journal.read().map_err(anyhow_err_to_runtime_err)? {
tracing::trace!(event=?next, "restoring event");
runner.play_event(next.into_inner(), Some(&mut ethereal_events));
runner.play_event(next.into_inner(), Some(&mut ethereal_events))?;
}
// Check for events that are orphaned
@ -30,9 +30,18 @@ pub unsafe fn restore_snapshot(
tracing::trace!("Orphaned ethereal events - {:?}", evt);
}
// FIXME: if the stdout/stderr FDs were closed as a result of replaying the journal,
// this breaks. A potential fix would be to only close those two FDs afterwards; so
// a `JournalSyscallPlayer::should_close_stdout: bool` or similar.
// Now output the stdout and stderr
if let Some(stdout) = runner.stdout {
tracing::trace!("replaying stdout");
for (offset, data, is_64bit) in runner.stdout {
for JournalStdIoWrite {
offset,
data,
is_64bit,
} in stdout
{
if is_64bit {
JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 1, offset, data)
} else {
@ -40,9 +49,16 @@ pub unsafe fn restore_snapshot(
}
.map_err(anyhow_err_to_runtime_err)?;
}
}
tracing::trace!("replaying stdout");
for (offset, data, is_64bit) in runner.stderr {
if let Some(stderr) = runner.stderr {
tracing::trace!("replaying stderr");
for JournalStdIoWrite {
offset,
data,
is_64bit,
} in stderr
{
if is_64bit {
JournalEffector::apply_fd_write::<Memory64>(&mut runner.ctx, 2, offset, data)
} else {
@ -50,6 +66,7 @@ pub unsafe fn restore_snapshot(
}
.map_err(anyhow_err_to_runtime_err)?;
}
}
// Apply the memory changes (if this is in bootstrapping mode we differed them)
for (region, data) in runner.differ_memory {

View File

@ -124,7 +124,7 @@ use crate::{
fs_error_into_wasi_err, virtual_file_type_to_wasi_file_type, Fd, FdInner, InodeVal, Kind,
MAX_SYMLINKS,
},
journal::{DynJournal, JournalEffector},
journal::{DynJournal, DynReadableJournal, DynWritableJournal, JournalEffector},
os::task::{
process::{MaybeCheckpointResult, WasiProcessCheckpoint},
thread::{RewindResult, RewindResultType},

View File

@ -42,7 +42,12 @@ pub(crate) fn fd_renumber_internal(
let (_, mut state) = unsafe { env.get_memory_and_wasi_state(&ctx, 0) };
if state.fs.get_fd(to).is_ok() {
wasi_try_ok!(__asyncify_light(env, None, state.fs.flush(to))?);
match __asyncify_light(env, None, state.fs.flush(to))? {
Ok(_) | Err(Errno::Isdir) | Err(Errno::Io) | Err(Errno::Access) => {}
Err(e) => {
return Ok(e);
}
}
wasi_try_ok!(state.fs.close_fd(to));
}

View File

@ -6,6 +6,9 @@ use crate::syscalls::*;
pub fn proc_snapshot<M: MemorySize>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
) -> Result<Errno, WasiError> {
wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::Explicit)?);
// If we have an Explicit trigger, process that...
ctx = wasi_try_ok!(maybe_snapshot_once::<M>(ctx, SnapshotTrigger::Explicit)?);
// ... if not, we may still have an external request for a snapshot, so do that as well
ctx = wasi_try_ok!(maybe_snapshot::<M>(ctx)?);
Ok(Errno::Success)
}