Merge branch 'master' into add-log-streams-options

This commit is contained in:
M.Amin Rayej
2024-02-22 01:57:02 +03:30
52 changed files with 578 additions and 522 deletions

View File

@@ -1,9 +1,9 @@
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, pin::Pin, time::Duration};
use anyhow::{bail, Context};
use cynic::{MutationBuilder, QueryBuilder};
use edge_schema::schema::{NetworkTokenV1, WebcIdent};
use futures::StreamExt;
use futures::{Stream, StreamExt};
use time::OffsetDateTime;
use tracing::Instrument;
use url::Url;
@@ -11,8 +11,9 @@ use url::Url;
use crate::{
types::{
self, CreateNamespaceVars, DeployApp, DeployAppConnection, DeployAppVersion,
DeployAppVersionConnection, GetDeployAppAndVersion, GetDeployAppVersionsVars,
GetNamespaceAppsVars, Log, LogStream, PackageVersionConnection, PublishDeployAppVars,
DeployAppVersionConnection, GetCurrentUserWithAppsVars, GetDeployAppAndVersion,
GetDeployAppVersionsVars, GetNamespaceAppsVars, Log, LogStream, PackageVersionConnection,
PublishDeployAppVars,
},
GraphQLApiFailure, WasmerClient,
};
@@ -275,17 +276,31 @@ pub async fn get_app_by_id(
client: &WasmerClient,
app_id: String,
) -> Result<DeployApp, anyhow::Error> {
client
get_app_by_id_opt(client, app_id)
.await?
.context("app not found")
}
/// Retrieve an app by its global id.
pub async fn get_app_by_id_opt(
client: &WasmerClient,
app_id: String,
) -> Result<Option<DeployApp>, anyhow::Error> {
let app_opt = client
.run_graphql(types::GetDeployAppById::build(
types::GetDeployAppByIdVars {
app_id: app_id.into(),
},
))
.await?
.app
.context("app not found")?
.into_deploy_app()
.context("app conversion failed")
.app;
if let Some(app) = app_opt {
let app = app.into_deploy_app().context("app conversion failed")?;
Ok(Some(app))
} else {
Ok(None)
}
}
/// Retrieve an app together with a specific version.
@@ -366,38 +381,45 @@ pub async fn get_app_version_by_id_with_app(
/// List all apps that are accessible by the current user.
///
/// NOTE: this will only include the first pages and does not provide pagination.
pub async fn user_apps(client: &WasmerClient) -> Result<Vec<types::DeployApp>, anyhow::Error> {
let user = client
.run_graphql(types::GetCurrentUserWithApps::build(()))
.await?
.viewer
.context("not logged in")?;
pub async fn user_apps(
client: &WasmerClient,
) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
futures::stream::try_unfold(None, move |cursor| async move {
let user = client
.run_graphql(types::GetCurrentUserWithApps::build(
GetCurrentUserWithAppsVars { after: cursor },
))
.await?
.viewer
.context("not logged in")?;
let apps = user
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
let apps: Vec<_> = user
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(apps)
let cursor = user.apps.page_info.end_cursor;
if apps.is_empty() {
Ok(None)
} else {
Ok(Some((apps, cursor)))
}
})
}
/// List all apps that are accessible by the current user.
///
/// NOTE: this does not currently do full pagination properly.
// TODO(theduke): fix pagination
pub async fn user_accessible_apps(
client: &WasmerClient,
) -> Result<Vec<types::DeployApp>, anyhow::Error> {
let mut apps = Vec::new();
// Get user apps.
let user_apps = user_apps(client).await?;
apps.extend(user_apps);
) -> Result<
impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_,
anyhow::Error,
> {
let apps: Pin<Box<dyn Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + Send + Sync>> =
Box::pin(user_apps(client).await);
// Get all aps in user-accessible namespaces.
let namespace_res = client
@@ -415,18 +437,16 @@ pub async fn user_accessible_apps(
.map(|node| node.name.clone())
.collect::<Vec<_>>();
for namespace in namespace_names {
let out = client
.run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
}))
.await?;
let mut all_apps = vec![apps];
for ns in namespace_names {
let apps: Pin<Box<dyn Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + Send + Sync>> =
Box::pin(namespace_apps(client, ns).await);
if let Some(ns) = out.get_namespace {
let ns_apps = ns.apps.edges.into_iter().flatten().filter_map(|x| x.node);
apps.extend(ns_apps);
}
all_apps.push(apps);
}
let apps = futures::stream::select_all(all_apps);
Ok(apps)
}
@@ -435,27 +455,38 @@ pub async fn user_accessible_apps(
/// NOTE: only retrieves the first page and does not do pagination.
pub async fn namespace_apps(
client: &WasmerClient,
namespace: &str,
) -> Result<Vec<types::DeployApp>, anyhow::Error> {
let res = client
.run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
}))
.await?;
namespace: String,
) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
let namespace = namespace.clone();
let ns = res
.get_namespace
.with_context(|| format!("failed to get namespace '{}'", namespace))?;
futures::stream::try_unfold((None, namespace), move |(cursor, namespace)| async move {
let res = client
.run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
after: cursor,
}))
.await?;
let apps = ns
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
let ns = res
.get_namespace
.with_context(|| format!("failed to get namespace '{}'", namespace))?;
Ok(apps)
let apps: Vec<_> = ns
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
let cursor = ns.apps.page_info.end_cursor;
if apps.is_empty() {
Ok(None)
} else {
Ok(Some((apps, (cursor, namespace))))
}
})
}
/// Publish a new app (version).
@@ -692,10 +723,7 @@ fn get_app_logs(
name: name.clone(),
owner: owner.clone(),
version: tag.clone(),
// TODO: increase pagination size
// See https://github.com/wasmerio/edge/issues/460
// first: Some(500),
first: Some(10),
first: Some(100),
starting_from: unix_timestamp(start),
until: end.map(unix_timestamp),
streams: streams.clone(),