add pagination to all queries

This commit is contained in:
M.Amin Rayej
2024-02-15 05:57:40 +03:30
parent 5aba707549
commit 7f91ded06f
3 changed files with 121 additions and 63 deletions

View File

@@ -1,9 +1,9 @@
use std::{collections::HashSet, time::Duration}; use std::{collections::HashSet, pin::Pin, time::Duration};
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use cynic::{MutationBuilder, QueryBuilder}; use cynic::{MutationBuilder, QueryBuilder};
use edge_schema::schema::{NetworkTokenV1, WebcIdent}; use edge_schema::schema::{NetworkTokenV1, WebcIdent};
use futures::StreamExt; use futures::{Stream, StreamExt};
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::Instrument; use tracing::Instrument;
use url::Url; use url::Url;
@@ -11,8 +11,9 @@ use url::Url;
use crate::{ use crate::{
types::{ types::{
self, CreateNamespaceVars, DeployApp, DeployAppConnection, DeployAppVersion, self, CreateNamespaceVars, DeployApp, DeployAppConnection, DeployAppVersion,
DeployAppVersionConnection, GetDeployAppAndVersion, GetDeployAppVersionsVars, DeployAppVersionConnection, GetCurrentUserWithAppsVars, GetDeployAppAndVersion,
GetNamespaceAppsVars, Log, PackageVersionConnection, PublishDeployAppVars, GetDeployAppVersionsVars, GetNamespaceAppsVars, Log, PackageVersionConnection,
PublishDeployAppVars,
}, },
GraphQLApiFailure, WasmerClient, GraphQLApiFailure, WasmerClient,
}; };
@@ -380,38 +381,45 @@ pub async fn get_app_version_by_id_with_app(
/// List all apps that are accessible by the current user. /// List all apps that are accessible by the current user.
/// ///
/// NOTE: this will only include the first pages and does not provide pagination. /// NOTE: this will only include the first pages and does not provide pagination.
pub async fn user_apps(client: &WasmerClient) -> Result<Vec<types::DeployApp>, anyhow::Error> { pub async fn user_apps(
let user = client client: &WasmerClient,
.run_graphql(types::GetCurrentUserWithApps::build(())) ) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
.await? futures::stream::try_unfold(None, move |cursor| async move {
.viewer let user = client
.context("not logged in")?; .run_graphql(types::GetCurrentUserWithApps::build(
GetCurrentUserWithAppsVars { after: cursor },
))
.await?
.viewer
.context("not logged in")?;
let apps = user let apps: Vec<_> = user
.apps .apps
.edges .edges
.into_iter() .into_iter()
.flatten() .flatten()
.filter_map(|x| x.node) .filter_map(|x| x.node)
.collect(); .collect();
Ok(apps) let cursor = user.apps.page_info.end_cursor;
if apps.is_empty() {
Ok(None)
} else {
Ok(Some((apps, cursor)))
}
})
} }
/// List all apps that are accessible by the current user. /// List all apps that are accessible by the current user.
///
/// NOTE: this does not currently do full pagination properly.
// TODO(theduke): fix pagination
pub async fn user_accessible_apps( pub async fn user_accessible_apps(
client: &WasmerClient, client: &WasmerClient,
) -> Result<Vec<types::DeployApp>, anyhow::Error> { ) -> Result<
let mut apps = Vec::new(); impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_,
anyhow::Error,
// Get user apps. > {
let apps: Pin<Box<dyn Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + Send + Sync>> =
let user_apps = user_apps(client).await?; Box::pin(user_apps(client).await);
apps.extend(user_apps);
// Get all aps in user-accessible namespaces. // Get all aps in user-accessible namespaces.
let namespace_res = client let namespace_res = client
@@ -429,18 +437,16 @@ pub async fn user_accessible_apps(
.map(|node| node.name.clone()) .map(|node| node.name.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for namespace in namespace_names { let mut all_apps = vec![apps];
let out = client for ns in namespace_names {
.run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars { let apps: Pin<Box<dyn Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + Send + Sync>> =
name: namespace.to_string(), Box::pin(namespace_apps(client, ns).await);
}))
.await?;
if let Some(ns) = out.get_namespace { all_apps.push(apps);
let ns_apps = ns.apps.edges.into_iter().flatten().filter_map(|x| x.node);
apps.extend(ns_apps);
}
} }
let apps = futures::stream::select_all(all_apps);
Ok(apps) Ok(apps)
} }
@@ -449,27 +455,38 @@ pub async fn user_accessible_apps(
/// NOTE: only retrieves the first page and does not do pagination. /// NOTE: only retrieves the first page and does not do pagination.
pub async fn namespace_apps( pub async fn namespace_apps(
client: &WasmerClient, client: &WasmerClient,
namespace: &str, namespace: String,
) -> Result<Vec<types::DeployApp>, anyhow::Error> { ) -> impl futures::Stream<Item = Result<Vec<types::DeployApp>, anyhow::Error>> + '_ {
let res = client let namespace = namespace.clone();
.run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
}))
.await?;
let ns = res futures::stream::try_unfold((None, namespace), move |(cursor, namespace)| async move {
.get_namespace let res = client
.with_context(|| format!("failed to get namespace '{}'", namespace))?; .run_graphql(types::GetNamespaceApps::build(GetNamespaceAppsVars {
name: namespace.to_string(),
after: cursor,
}))
.await?;
let apps = ns let ns = res
.apps .get_namespace
.edges .with_context(|| format!("failed to get namespace '{}'", namespace))?;
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
Ok(apps) let apps: Vec<_> = ns
.apps
.edges
.into_iter()
.flatten()
.filter_map(|x| x.node)
.collect();
let cursor = ns.apps.page_info.end_cursor;
if apps.is_empty() {
Ok(None)
} else {
Ok(Some((apps, (cursor, namespace))))
}
})
} }
/// Publish a new app (version). /// Publish a new app (version).

View File

@@ -180,17 +180,24 @@ mod queries {
pub get_deploy_app: Option<DeployApp>, pub get_deploy_app: Option<DeployApp>,
} }
#[derive(cynic::QueryVariables, Debug)]
pub struct GetCurrentUserWithAppsVars {
pub after: Option<String>,
}
#[derive(cynic::QueryFragment, Debug)] #[derive(cynic::QueryFragment, Debug)]
#[cynic(graphql_type = "Query")] #[cynic(graphql_type = "Query", variables = "GetCurrentUserWithAppsVars")]
pub struct GetCurrentUserWithApps { pub struct GetCurrentUserWithApps {
pub viewer: Option<UserWithApps>, pub viewer: Option<UserWithApps>,
} }
#[derive(cynic::QueryFragment, Debug)] #[derive(cynic::QueryFragment, Debug)]
#[cynic(graphql_type = "User")] #[cynic(graphql_type = "User")]
#[cynic(variables = "GetCurrentUserWithAppsVars")]
pub struct UserWithApps { pub struct UserWithApps {
pub id: cynic::Id, pub id: cynic::Id,
pub username: String, pub username: String,
#[arguments(after: $after)]
pub apps: DeployAppConnection, pub apps: DeployAppConnection,
} }
@@ -537,6 +544,7 @@ mod queries {
#[derive(cynic::QueryVariables, Debug)] #[derive(cynic::QueryVariables, Debug)]
pub struct GetNamespaceAppsVars { pub struct GetNamespaceAppsVars {
pub name: String, pub name: String,
pub after: Option<String>,
} }
#[derive(cynic::QueryFragment, Debug)] #[derive(cynic::QueryFragment, Debug)]
@@ -548,9 +556,11 @@ mod queries {
#[derive(cynic::QueryFragment, Debug)] #[derive(cynic::QueryFragment, Debug)]
#[cynic(graphql_type = "Namespace")] #[cynic(graphql_type = "Namespace")]
#[cynic(variables = "GetNamespaceAppsVars")]
pub struct NamespaceWithApps { pub struct NamespaceWithApps {
pub id: cynic::Id, pub id: cynic::Id,
pub name: String, pub name: String,
#[arguments(after: $after)]
pub apps: DeployAppConnection, pub apps: DeployAppConnection,
} }

View File

@@ -1,5 +1,10 @@
//! List Edge apps. //! List Edge apps.
use std::pin::Pin;
use futures::{Stream, StreamExt};
use wasmer_api::types::DeployApp;
use crate::{ use crate::{
commands::AsyncCliCommand, commands::AsyncCliCommand,
opts::{ApiOpts, ListFormatOpts}, opts::{ApiOpts, ListFormatOpts},
@@ -23,6 +28,10 @@ pub struct CmdAppList {
/// directly owned by the user and apps in namespaces the user can access. /// directly owned by the user and apps in namespaces the user can access.
#[clap(short = 'a', long)] #[clap(short = 'a', long)]
all: bool, all: bool,
/// Maximum number of apps to display
#[clap(long, default_value = "1000")]
max: usize,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -32,15 +41,37 @@ impl AsyncCliCommand for CmdAppList {
async fn run_async(self) -> Result<(), anyhow::Error> { async fn run_async(self) -> Result<(), anyhow::Error> {
let client = self.api.client()?; let client = self.api.client()?;
let apps = if let Some(ns) = self.namespace { let apps_stream: Pin<
wasmer_api::query::namespace_apps(&client, &ns).await? Box<dyn Stream<Item = Result<Vec<DeployApp>, anyhow::Error>> + Send + Sync>,
> = if let Some(ns) = self.namespace.clone() {
Box::pin(wasmer_api::query::namespace_apps(&client, ns).await)
} else if self.all { } else if self.all {
wasmer_api::query::user_accessible_apps(&client).await? Box::pin(wasmer_api::query::user_accessible_apps(&client).await?)
} else { } else {
wasmer_api::query::user_apps(&client).await? Box::pin(wasmer_api::query::user_apps(&client).await)
}; };
println!("{}", self.fmt.format.render(&apps)); let mut apps_stream = std::pin::pin!(apps_stream);
let mut rem = self.max;
let mut display_apps = vec![];
while let Some(apps) = apps_stream.next().await {
let mut apps = apps?;
let limit = std::cmp::min(apps.len(), rem);
if limit == 0 {
break;
}
display_apps.extend(apps.drain(..limit));
rem -= limit;
}
println!("{}", self.fmt.format.render(&display_apps));
Ok(()) Ok(())
} }