[pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec
Lukas Wagner
l.wagner at proxmox.com
Fri Feb 14 14:06:53 CET 2025
This lets us process finished tasks in the order they finish, not in the
order they were spawned.
Suggested-by: Wolfang Bumiller <w.bumiller at proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
Notes:
New in v2.
.../src/metric_collection/collection_task.rs | 25 ++++++++-----------
1 file changed, 10 insertions(+), 15 deletions(-)
diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
index 5c6e2762..4e8aa627 100644
--- a/server/src/metric_collection/collection_task.rs
+++ b/server/src/metric_collection/collection_task.rs
@@ -10,6 +10,7 @@ use tokio::{
mpsc::{Receiver, Sender},
oneshot, OwnedSemaphorePermit, Semaphore,
},
+ task::JoinSet,
time::{Interval, MissedTickBehavior},
};
@@ -267,7 +268,8 @@ impl MetricCollectionTask {
let semaphore = Arc::new(Semaphore::new(
self.settings.max_concurrent_connections_or_default(),
));
- let mut handles = Vec::new();
+
+ let mut handles = JoinSet::new();
let now = proxmox_time::epoch_i64();
for remote_name in remotes_to_fetch {
@@ -290,30 +292,23 @@ impl MetricCollectionTask {
if let Some(remote) = remote_config.get(remote_name).cloned() {
log::debug!("fetching remote '{}'", remote.id);
- let handle = tokio::spawn(Self::fetch_single_remote(
+ handles.spawn(Self::fetch_single_remote(
self.settings.clone(),
remote,
status,
self.metric_data_tx.clone(),
permit,
));
-
- handles.push((remote_name.clone(), handle));
}
}
- for (remote_name, handle) in handles {
- let res = handle.await;
-
+ while let Some(res) = handles.join_next().await {
match res {
- Ok(Ok(ts)) => {
- self.state.set_status(remote_name, ts);
+ Ok((name, status)) => {
+ self.state.set_status(name, status);
}
- Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
Err(err) => {
- log::error!(
- "join error for metric collection task for remote {remote_name}: {err}"
- )
+ log::error!("join error for metric collection task for remote: {err}")
}
}
}
@@ -362,7 +357,7 @@ impl MetricCollectionTask {
mut status: RemoteStatus,
sender: Sender<RrdStoreRequest>,
_permit: OwnedSemaphorePermit,
- ) -> Result<RemoteStatus, Error> {
+ ) -> (String, RemoteStatus) {
Self::sleep_for_random_millis(
settings.min_connection_delay_or_default(),
settings.max_connection_delay_or_default(),
@@ -436,7 +431,7 @@ impl MetricCollectionTask {
}
}
- Ok(status)
+ (remote.id, status)
}
}
--
2.39.5
More information about the pdm-devel
mailing list