[pdm-devel] [PATCH proxmox-datacenter-manager v3 26/26] metric collection: use JoinSet instead of joining from handles in a Vec

Lukas Wagner l.wagner at proxmox.com
Wed Apr 16 14:56:42 CEST 2025


This lets us process finished tasks in the order they finish, not in the
order they were spawned.

Suggested-by: Wolfang Bumiller <w.bumiller at proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
Reviewed-by: Maximiliano Sandoval <m.sandoval at proxmox.com>
---

Notes:
    New in v2.

 .../src/metric_collection/collection_task.rs  | 25 ++++++++-----------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
index f36bad48..469b518c 100644
--- a/server/src/metric_collection/collection_task.rs
+++ b/server/src/metric_collection/collection_task.rs
@@ -9,6 +9,7 @@ use tokio::{
         mpsc::{Receiver, Sender},
         oneshot, OwnedSemaphorePermit, Semaphore,
     },
+    task::JoinSet,
     time::{Interval, MissedTickBehavior},
 };
 
@@ -239,7 +240,8 @@ impl MetricCollectionTask {
         remotes_to_fetch: &[String],
     ) {
         let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS));
-        let mut handles = Vec::new();
+        let mut handles = JoinSet::new();
+
         let now = proxmox_time::epoch_i64();
 
         for remote_name in remotes_to_fetch {
@@ -262,29 +264,22 @@ impl MetricCollectionTask {
 
             if let Some(remote) = remote_config.get(remote_name).cloned() {
                 log::debug!("fetching remote '{}'", remote.id);
-                let handle = tokio::spawn(Self::fetch_single_remote(
+                handles.spawn(Self::fetch_single_remote(
                     remote,
                     status,
                     self.metric_data_tx.clone(),
                     permit,
                 ));
-
-                handles.push((remote_name.clone(), handle));
             }
         }
 
-        for (remote_name, handle) in handles {
-            let res = handle.await;
-
+        while let Some(res) = handles.join_next().await {
             match res {
-                Ok(Ok(ts)) => {
-                    self.state.set_status(remote_name, ts);
+                Ok((name, status)) => {
+                    self.state.set_status(name, status);
                 }
-                Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
                 Err(err) => {
-                    log::error!(
-                        "join error for metric collection task for remote {remote_name}: {err}"
-                    )
+                    log::error!("join error for metric collection task for remote: {err}")
                 }
             }
         }
@@ -332,7 +327,7 @@ impl MetricCollectionTask {
         mut status: RemoteStatus,
         sender: Sender<RrdStoreRequest>,
         _permit: OwnedSemaphorePermit,
-    ) -> Result<RemoteStatus, Error> {
+    ) -> (String, RemoteStatus) {
         let (result_tx, result_rx) = oneshot::channel();
 
         let now = proxmox_time::epoch_i64();
@@ -400,7 +395,7 @@ impl MetricCollectionTask {
             }
         }
 
-        Ok(status)
+        (remote.id, status)
     }
 }
 
-- 
2.39.5





More information about the pdm-devel mailing list