diff --git a/lib/backend-api/src/query.rs b/lib/backend-api/src/query.rs index af19fcaaf..ed8653ac4 100644 --- a/lib/backend-api/src/query.rs +++ b/lib/backend-api/src/query.rs @@ -1,9 +1,9 @@ -use std::{collections::HashSet, time::Duration}; +use std::{collections::HashSet, pin::Pin, time::Duration}; use anyhow::{bail, Context}; use cynic::{MutationBuilder, QueryBuilder}; use edge_schema::schema::{NetworkTokenV1, WebcIdent}; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use time::OffsetDateTime; use tracing::Instrument; use url::Url; @@ -11,8 +11,9 @@ use url::Url; use crate::{ types::{ self, CreateNamespaceVars, DeployApp, DeployAppConnection, DeployAppVersion, - DeployAppVersionConnection, GetDeployAppAndVersion, GetDeployAppVersionsVars, - GetNamespaceAppsVars, Log, PackageVersionConnection, PublishDeployAppVars, + DeployAppVersionConnection, GetCurrentUserWithAppsVars, GetDeployAppAndVersion, + GetDeployAppVersionsVars, GetNamespaceAppsVars, Log, PackageVersionConnection, + PublishDeployAppVars, }, GraphQLApiFailure, WasmerClient, }; @@ -380,38 +381,45 @@ pub async fn get_app_version_by_id_with_app( /// List all apps that are accessible by the current user. /// /// NOTE: this will only include the first pages and does not provide pagination. -pub async fn user_apps(client: &WasmerClient) -> Result, anyhow::Error> { - let user = client - .run_graphql(types::GetCurrentUserWithApps::build(())) - .await? - .viewer - .context("not logged in")?; +pub async fn user_apps( + client: &WasmerClient, +) -> impl futures::Stream, anyhow::Error>> + '_ { + futures::stream::try_unfold(None, move |cursor| async move { + let user = client + .run_graphql(types::GetCurrentUserWithApps::build( + GetCurrentUserWithAppsVars { after: cursor }, + )) + .await? + .viewer + .context("not logged in")?; - let apps = user - .apps - .edges - .into_iter() - .flatten() - .filter_map(|x| x.node) - .collect(); + let apps: Vec<_> = user + .apps + .edges + .into_iter() + .flatten() + .filter_map(|x| x.node) + .collect(); - Ok(apps) + let cursor = user.apps.page_info.end_cursor; + + if apps.is_empty() { + Ok(None) + } else { + Ok(Some((apps, cursor))) + } + }) } /// List all apps that are accessible by the current user. -/// -/// NOTE: this does not currently do full pagination properly. -// TODO(theduke): fix pagination pub async fn user_accessible_apps( client: &WasmerClient, -) -> Result, anyhow::Error> { - let mut apps = Vec::new(); - - // Get user apps. - - let user_apps = user_apps(client).await?; - - apps.extend(user_apps); +) -> Result< + impl futures::Stream, anyhow::Error>> + '_, + anyhow::Error, +> { + let apps: Pin, anyhow::Error>> + Send + Sync>> = + Box::pin(user_apps(client).await); // Get all aps in user-accessible namespaces. let namespace_res = client @@ -429,18 +437,16 @@ pub async fn user_accessible_apps( .map(|node| node.name.clone()) .collect::>(); - for namespace in namespace_names { - let out = client - .run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars { - name: namespace.to_string(), - })) - .await?; + let mut all_apps = vec![apps]; + for ns in namespace_names { + let apps: Pin, anyhow::Error>> + Send + Sync>> = + Box::pin(namespace_apps(client, ns).await); - if let Some(ns) = out.get_namespace { - let ns_apps = ns.apps.edges.into_iter().flatten().filter_map(|x| x.node); - apps.extend(ns_apps); - } + all_apps.push(apps); } + + let apps = futures::stream::select_all(all_apps); + Ok(apps) } @@ -449,27 +455,38 @@ pub async fn user_accessible_apps( /// NOTE: only retrieves the first page and does not do pagination. pub async fn namespace_apps( client: &WasmerClient, - namespace: &str, -) -> Result, anyhow::Error> { - let res = client - .run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars { - name: namespace.to_string(), - })) - .await?; + namespace: String, +) -> impl futures::Stream, anyhow::Error>> + '_ { + let namespace = namespace.clone(); - let ns = res - .get_namespace - .with_context(|| format!("failed to get namespace '{}'", namespace))?; + futures::stream::try_unfold((None, namespace), move |(cursor, namespace)| async move { + let res = client + .run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars { + name: namespace.to_string(), + after: cursor, + })) + .await?; - let apps = ns - .apps - .edges - .into_iter() - .flatten() - .filter_map(|x| x.node) - .collect(); + let ns = res + .get_namespace + .with_context(|| format!("failed to get namespace '{}'", namespace))?; - Ok(apps) + let apps: Vec<_> = ns + .apps + .edges + .into_iter() + .flatten() + .filter_map(|x| x.node) + .collect(); + + let cursor = ns.apps.page_info.end_cursor; + + if apps.is_empty() { + Ok(None) + } else { + Ok(Some((apps, (cursor, namespace)))) + } + }) } /// Publish a new app (version). diff --git a/lib/backend-api/src/types.rs b/lib/backend-api/src/types.rs index 62995a15f..870d258cd 100644 --- a/lib/backend-api/src/types.rs +++ b/lib/backend-api/src/types.rs @@ -180,17 +180,24 @@ mod queries { pub get_deploy_app: Option, } + #[derive(cynic::QueryVariables, Debug)] + pub struct GetCurrentUserWithAppsVars { + pub after: Option, + } + #[derive(cynic::QueryFragment, Debug)] - #[cynic(graphql_type = "Query")] + #[cynic(graphql_type = "Query", variables = "GetCurrentUserWithAppsVars")] pub struct GetCurrentUserWithApps { pub viewer: Option, } #[derive(cynic::QueryFragment, Debug)] #[cynic(graphql_type = "User")] + #[cynic(variables = "GetCurrentUserWithAppsVars")] pub struct UserWithApps { pub id: cynic::Id, pub username: String, + #[arguments(after: $after)] pub apps: DeployAppConnection, } @@ -537,6 +544,7 @@ mod queries { #[derive(cynic::QueryVariables, Debug)] pub struct GetNamespaceAppsVars { pub name: String, + pub after: Option, } #[derive(cynic::QueryFragment, Debug)] @@ -548,9 +556,11 @@ mod queries { #[derive(cynic::QueryFragment, Debug)] #[cynic(graphql_type = "Namespace")] + #[cynic(variables = "GetNamespaceAppsVars")] pub struct NamespaceWithApps { pub id: cynic::Id, pub name: String, + #[arguments(after: $after)] pub apps: DeployAppConnection, } diff --git a/lib/cli/src/commands/app/list.rs b/lib/cli/src/commands/app/list.rs index f4c6fe892..b53782085 100644 --- a/lib/cli/src/commands/app/list.rs +++ b/lib/cli/src/commands/app/list.rs @@ -1,5 +1,10 @@ //! List Edge apps. +use std::pin::Pin; + +use futures::{Stream, StreamExt}; +use wasmer_api::types::DeployApp; + use crate::{ commands::AsyncCliCommand, opts::{ApiOpts, ListFormatOpts}, @@ -23,6 +28,10 @@ pub struct CmdAppList { /// directly owned by the user and apps in namespaces the user can access. #[clap(short = 'a', long)] all: bool, + + /// Maximum number of apps to display + #[clap(long, default_value = "1000")] + max: usize, } #[async_trait::async_trait] @@ -32,15 +41,37 @@ impl AsyncCliCommand for CmdAppList { async fn run_async(self) -> Result<(), anyhow::Error> { let client = self.api.client()?; - let apps = if let Some(ns) = self.namespace { - wasmer_api::query::namespace_apps(&client, &ns).await? + let apps_stream: Pin< + Box, anyhow::Error>> + Send + Sync>, + > = if let Some(ns) = self.namespace.clone() { + Box::pin(wasmer_api::query::namespace_apps(&client, ns).await) } else if self.all { - wasmer_api::query::user_accessible_apps(&client).await? + Box::pin(wasmer_api::query::user_accessible_apps(&client).await?) } else { - wasmer_api::query::user_apps(&client).await? + Box::pin(wasmer_api::query::user_apps(&client).await) }; - println!("{}", self.fmt.format.render(&apps)); + let mut apps_stream = std::pin::pin!(apps_stream); + + let mut rem = self.max; + + let mut display_apps = vec![]; + + while let Some(apps) = apps_stream.next().await { + let mut apps = apps?; + + let limit = std::cmp::min(apps.len(), rem); + + if limit == 0 { + break; + } + + display_apps.extend(apps.drain(..limit)); + + rem -= limit; + } + + println!("{}", self.fmt.format.render(&display_apps)); Ok(()) }