- , anyhow::Error>> + '_ {
+ let stream = get_app_logs(client, name, owner, tag, start, end, watch);
+
+ stream.map(|res| {
+ let mut logs = Vec::new();
+ let mut hasher = HashSet::new();
+ let mut page = res?;
+
+ // Prevent duplicates.
+ // TODO: don't clone the message, just hash it.
+ page.retain(|log| hasher.insert((log.message.clone(), log.timestamp.round() as i128)));
+
+ logs.extend(page);
+
+ Ok(logs)
+ })
+}
+
+/// Convert a [`OffsetDateTime`] to a unix timestamp that the WAPM backend
+/// understands.
+fn unix_timestamp(ts: OffsetDateTime) -> f64 {
+ let nanos_per_second = 1_000_000_000;
+ let timestamp = ts.unix_timestamp_nanos();
+ let nanos = timestamp % nanos_per_second;
+ let secs = timestamp / nanos_per_second;
+
+ (secs as f64) + (nanos as f64 / nanos_per_second as f64)
+}
diff --git a/lib/backend-api/src/stream.rs b/lib/backend-api/src/stream.rs
new file mode 100644
index 000000000..c2a8f8eb1
--- /dev/null
+++ b/lib/backend-api/src/stream.rs
@@ -0,0 +1,119 @@
+use std::{collections::VecDeque, task::Poll};
+
+use futures::{
+ future::{BoxFuture, OptionFuture},
+ Future,
+};
+
+use super::WasmerClient;
+
+type PaginationFuture = BoxFuture<'static, Result<(Vec, Option
), anyhow::Error>>;
+
+pub trait PaginatedQuery {
+ type Vars;
+ type Paginator;
+ type Item;
+
+ fn query(
+ &self,
+ client: WasmerClient,
+ paginator: Option,
+ ) -> PaginationFuture;
+}
+
+pin_project_lite::pin_project! {
+ pub struct QueryStream {
+ query: Q,
+
+ client: WasmerClient,
+ page: usize,
+ paginator: Option,
+ finished: bool,
+ items: VecDeque,
+
+ #[pin]
+ fut: OptionFuture>,
+ }
+}
+
+impl QueryStream {
+ pub fn new(query: Q, client: WasmerClient) -> Self {
+ Self {
+ query,
+ client,
+ page: 0,
+ finished: false,
+ paginator: None,
+ items: VecDeque::new(),
+ fut: None.into(),
+ }
+ }
+}
+
+impl futures::Stream for QueryStream {
+ type Item = Result;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll