[pdm-devel] [PATCH proxmox-datacenter-manager 1/2] server: add helper for fetching results from one node per remote
Stefan Hanreich
s.hanreich at proxmox.com
Fri Aug 29 14:33:28 CEST 2025
There is already a helper for fetching results from all remotes and
all their nodes. In some cases (e.g. SDN) it is sufficient to fetch
the result of the API calls once from any node. Add a second helper
that executes the API requests only once per remote on any node,
instead of on all nodes. Currently this takes the first node returned
from the PVE API, but in the future could be extended to re-try on a
different node if the first node is not available or choose a random
node instead of hitting the first node all the time.
Signed-off-by: Stefan Hanreich <s.hanreich at proxmox.com>
---
server/src/parallel_fetcher.rs | 133 ++++++++++++++++++++++++++++++++-
1 file changed, 132 insertions(+), 1 deletion(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index 3fad68a..e55cc25 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -6,7 +6,7 @@ use std::{
time::{Duration, Instant},
};
-use anyhow::Error;
+use anyhow::{anyhow, Error};
use pdm_api_types::remotes::{Remote, RemoteType};
use pve_api_types::ClusterNodeIndexResponse;
use tokio::{
@@ -103,6 +103,107 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
results
}
+ pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ where
+ A: Iterator<Item = Remote>,
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
+
+ let mut node_join_set = JoinSet::new();
+ let mut results = FetchResults::default();
+
+ for remote in remotes {
+ // TODO: currently the fetch helpers require a per_remote permit, but we are only making one
+ // request per remote, so fake it here until we find a better solution to handle both
+ // cases in the helpers
+ let per_remote_semaphore = Arc::new(Semaphore::new(1));
+ let total_connections_semaphore = total_connections_semaphore.clone();
+
+ let remote_id = remote.id.clone();
+ let context = self.context.clone();
+ let func = func.clone();
+
+ match remote.ty {
+ RemoteType::Pve => {
+ let nodes = match Self::fetch_pve_node_list(&remote).await {
+ Ok(nodes) => nodes,
+ Err(error) => {
+ results.remote_results.insert(remote.id.clone(), Err(error));
+ continue;
+ }
+ };
+
+ let Some(first_node) = nodes.into_iter().next() else {
+ results.remote_results.insert(remote.id.clone(), Err(anyhow!("no node returned for remote {}", remote.id)));
+ continue;
+ };
+
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ let per_remote_permit =
+ per_remote_semaphore.clone().acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_pve_node(
+ func,
+ context,
+ remote,
+ first_node.node,
+ permit,
+ per_remote_permit,
+ )
+ .await,
+ )
+ });
+ }
+ RemoteType::Pbs => {
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ let per_remote_permit =
+ per_remote_semaphore.clone().acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_pbs_node(func, context, remote, permit, per_remote_permit)
+ .await,
+ )
+ });
+ }
+ }
+ }
+
+ while let Some(a) = node_join_set.join_next().await {
+ match a {
+ Ok((remote_id, (node_id, node_result))) => {
+ let mut node_results = HashMap::new();
+ node_results.insert(node_id, node_result);
+
+ let remote_result = RemoteResult { node_results };
+
+ if results
+ .remote_results
+ .insert(remote_id, Ok(remote_result))
+ .is_some()
+ {
+ // should never happen, but log for good measure if it actually does
+ log::warn!("made multiple requests for a remote!");
+ }
+ }
+ Err(err) => {
+ log::error!("join error when waiting for future: {err}")
+ }
+ }
+ }
+
+ results
+ }
+
async fn fetch_remote<F, Ft, T>(
remote: Remote,
context: C,
@@ -205,6 +306,36 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
(remote.id, Ok(per_remote_results))
}
+ async fn fetch_pve_node_list(remote: &Remote) -> Result<Vec<ClusterNodeIndexResponse>, Error> {
+ let client = connection::make_pve_client(remote)?;
+ Ok(client.list_nodes().await?)
+ }
+
+ async fn fetch_pbs_node<F, Ft, T>(
+ func: F,
+ context: C,
+ remote: Remote,
+ permit: OwnedSemaphorePermit,
+ per_remote_connections_permit: OwnedSemaphorePermit,
+ ) -> (String, Result<NodeResults<T>, Error>)
+ where
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ // implementation is currently the same for PVE / PBS, except that PVE requires a node
+ // name, which we can hardcode to localhost for PBS
+ Self::fetch_pve_node(
+ func,
+ context,
+ remote,
+ "localhost".to_string(),
+ permit,
+ per_remote_connections_permit,
+ )
+ .await
+ }
+
async fn fetch_pve_node<F, Ft, T>(
func: F,
context: C,
--
2.47.2
More information about the pdm-devel
mailing list