[pdm-devel] [PATCH proxmox-datacenter-manager v2 2/3] server: add helper for fetching from multiple remotes at once
Lukas Wagner
l.wagner at proxmox.com
Fri Aug 29 16:10:27 CEST 2025
There is already a helper for fetching results from all remotes and all
their nodes. In some cases (e.g. SDN) it is sufficient to fetch the
result of the API calls once from any node. Add a second helper that
executes the API requests only once per remote on any node, instead of
on all nodes.
Originally-by: Stefan Hanreich <s.hanreich at proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
Notes:
Changes since Stefan's RFC:
- always use 'localhost' node, instead of getting a node list first and using
the first node
- Some minor changes to make the code nicer/shorter
server/src/parallel_fetcher.rs | 93 ++++++++++++++++++++++++++--------
1 file changed, 71 insertions(+), 22 deletions(-)
diff --git a/server/src/parallel_fetcher.rs b/server/src/parallel_fetcher.rs
index e4a5c106..58ca1f55 100644
--- a/server/src/parallel_fetcher.rs
+++ b/server/src/parallel_fetcher.rs
@@ -162,13 +162,13 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
let node_name = node.node.clone();
let context_clone = context.clone();
- nodes_join_set.spawn(Self::fetch_pve_node(
+ nodes_join_set.spawn(Self::fetch_node(
func_clone,
context_clone,
remote_clone,
node_name,
permit,
- per_remote_connections_permit,
+ Some(per_remote_connections_permit),
));
}
@@ -186,39 +186,33 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
}
}
RemoteType::Pbs => {
- let node = "localhost".to_string();
-
- let now = Instant::now();
- let result = func(context, remote.clone(), node.clone()).await;
- let api_response_time = now.elapsed();
+ let (nodename, result) = Self::fetch_node(
+ func,
+ context,
+ remote.clone(),
+ "localhost".into(),
+ permit.unwrap(), // Always set to `Some` at this point
+ None,
+ )
+ .await;
match result {
- Ok(data) => {
- per_remote_results.node_results.insert(
- node,
- Ok(NodeResults {
- data,
- api_response_time,
- }),
- );
- }
- Err(err) => {
- per_remote_results.node_results.insert(node, Err(err));
- }
- }
+ Ok(a) => per_remote_results.node_results.insert(nodename, Ok(a)),
+ Err(err) => per_remote_results.node_results.insert(nodename, Err(err)),
+ };
}
}
(remote.id, Ok(per_remote_results))
}
- async fn fetch_pve_node<F, Ft, T>(
+ async fn fetch_node<F, Ft, T>(
func: F,
context: C,
remote: Remote,
node: String,
_permit: OwnedSemaphorePermit,
- _per_remote_connections_permit: OwnedSemaphorePermit,
+ _per_remote_connections_permit: Option<OwnedSemaphorePermit>,
) -> (String, Result<NodeResults<T>, Error>)
where
F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
@@ -240,4 +234,59 @@ impl<C: Clone + Send + 'static> ParallelFetcher<C> {
Err(err) => (node, Err(err)),
}
}
+
+ pub async fn do_for_all_remotes<A, F, T, Ft>(self, remotes: A, func: F) -> FetchResults<T>
+ where
+ A: Iterator<Item = Remote>,
+ F: Fn(C, Remote, String) -> Ft + Clone + Send + 'static,
+ Ft: Future<Output = Result<T, Error>> + Send + 'static,
+ T: Send + Debug + 'static,
+ {
+ let total_connections_semaphore = Arc::new(Semaphore::new(self.max_connections));
+
+ let mut node_join_set = JoinSet::new();
+ let mut results = FetchResults::default();
+
+ for remote in remotes {
+ let total_connections_semaphore = total_connections_semaphore.clone();
+
+ let remote_id = remote.id.clone();
+ let context = self.context.clone();
+ let func = func.clone();
+
+ node_join_set.spawn(async move {
+ let permit = total_connections_semaphore.acquire_owned().await.unwrap();
+
+ (
+ remote_id,
+ Self::fetch_node(func, context, remote, "localhost".into(), permit, None).await,
+ )
+ });
+ }
+
+ while let Some(a) = node_join_set.join_next().await {
+ match a {
+ Ok((remote_id, (node_id, node_result))) => {
+ let mut node_results = HashMap::new();
+ node_results.insert(node_id, node_result);
+
+ let remote_result = RemoteResult { node_results };
+
+ if results
+ .remote_results
+ .insert(remote_id, Ok(remote_result))
+ .is_some()
+ {
+ // should never happen, but log for good measure if it actually does
+ log::warn!("made multiple requests for a remote!");
+ }
+ }
+ Err(err) => {
+ log::error!("join error when waiting for future: {err}")
+ }
+ }
+ }
+
+ results
+ }
}
--
2.47.2
More information about the pdm-devel
mailing list