[pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec

Lukas Wagner l.wagner at proxmox.com
Fri Feb 14 14:06:53 CET 2025


This lets us process finished tasks in the order they finish, not in the
order they were spawned.

Suggested-by: Wolfang Bumiller <w.bumiller at proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---

Notes:
    New in v2.

 .../src/metric_collection/collection_task.rs  | 25 ++++++++-----------
 1 file changed, 10 insertions(+), 15 deletions(-)

diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
index 5c6e2762..4e8aa627 100644
--- a/server/src/metric_collection/collection_task.rs
+++ b/server/src/metric_collection/collection_task.rs
@@ -10,6 +10,7 @@ use tokio::{
         mpsc::{Receiver, Sender},
         oneshot, OwnedSemaphorePermit, Semaphore,
     },
+    task::JoinSet,
     time::{Interval, MissedTickBehavior},
 };
 
@@ -267,7 +268,8 @@ impl MetricCollectionTask {
         let semaphore = Arc::new(Semaphore::new(
             self.settings.max_concurrent_connections_or_default(),
         ));
-        let mut handles = Vec::new();
+
+        let mut handles = JoinSet::new();
         let now = proxmox_time::epoch_i64();
 
         for remote_name in remotes_to_fetch {
@@ -290,30 +292,23 @@ impl MetricCollectionTask {
 
             if let Some(remote) = remote_config.get(remote_name).cloned() {
                 log::debug!("fetching remote '{}'", remote.id);
-                let handle = tokio::spawn(Self::fetch_single_remote(
+                handles.spawn(Self::fetch_single_remote(
                     self.settings.clone(),
                     remote,
                     status,
                     self.metric_data_tx.clone(),
                     permit,
                 ));
-
-                handles.push((remote_name.clone(), handle));
             }
         }
 
-        for (remote_name, handle) in handles {
-            let res = handle.await;
-
+        while let Some(res) = handles.join_next().await {
             match res {
-                Ok(Ok(ts)) => {
-                    self.state.set_status(remote_name, ts);
+                Ok((name, status)) => {
+                    self.state.set_status(name, status);
                 }
-                Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
                 Err(err) => {
-                    log::error!(
-                        "join error for metric collection task for remote {remote_name}: {err}"
-                    )
+                    log::error!("join error for metric collection task for remote: {err}")
                 }
             }
         }
@@ -362,7 +357,7 @@ impl MetricCollectionTask {
         mut status: RemoteStatus,
         sender: Sender<RrdStoreRequest>,
         _permit: OwnedSemaphorePermit,
-    ) -> Result<RemoteStatus, Error> {
+    ) -> (String, RemoteStatus) {
         Self::sleep_for_random_millis(
             settings.min_connection_delay_or_default(),
             settings.max_connection_delay_or_default(),
@@ -436,7 +431,7 @@ impl MetricCollectionTask {
             }
         }
 
-        Ok(status)
+        (remote.id, status)
     }
 }
 
-- 
2.39.5





More information about the pdm-devel mailing list