[pdm-devel] [PATCH proxmox-datacenter-manager 07/25] metric collection: rework metric poll task

Wolfgang Bumiller w.bumiller at proxmox.com
Wed Feb 12 16:57:37 CET 2025


Looks good.
One improvement/follow-up suggestion (JoinSet), some nitpicking &
potential refactoring requests...

On Tue, Feb 11, 2025 at 01:05:23PM +0100, Lukas Wagner wrote:
> Metric collection is mostly limited by network latency, not disk IO or
> CPU. This means we should be able get significant speedups by polling
> remotes concurrently by spawning tokio workers.
> 
> To avoid load/IO spikes, we limit the number of concurrent connections
> using a semaphore.
> 
> Each concurrent task which fetches a single remote waits a random amount
> of time before actually connecting to the remote. The upper and lower
> bounds for this random delay can be configured in the metric collection
> settings. The main aim of this mechanism is to reduce load spikes.
> 
> Furthermore, each time when the main collection interval timer
> fires, a random delay is introduced before actually starting the
> collection process.
> This is useful due to how we set up the timer; we configure
> it to figure at aligned points in time. For instance, if the
> collection interval is set to 60s, the timer fires at
> minute boundaries. Using this random offset, we can avoid
> triggering at the same time as other timers (cron, systemd).
> 
> Furthermore, we add a mechanism to trigger metric collection manually.
> This is achieve by using `tokio::select!` on both, the timer's `tick`
> method and the `recv` method of an `mpsc` which is used to send control
> messages to the metric collection task.
> 
> For better code structure, the collection task is split into a separate
> module.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
>  Cargo.toml                                    |   1 +
>  server/Cargo.toml                             |   1 +
>  server/src/bin/proxmox-datacenter-api.rs      |   2 +-
>  .../src/metric_collection/collection_task.rs  | 317 ++++++++++++++++++
>  server/src/metric_collection/mod.rs           | 128 +++----
>  5 files changed, 366 insertions(+), 83 deletions(-)
>  create mode 100644 server/src/metric_collection/collection_task.rs
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 4f3b1d0..7dd60a5 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -108,6 +108,7 @@ once_cell = "1.3.1"
>  openssl = "0.10.40"
>  percent-encoding = "2.1"
>  pin-project-lite = "0.2"
> +rand = "0.8"
>  regex = "1.5.5"
>  serde = { version = "1.0", features = ["derive"] }
>  serde_cbor = "0.11.1"
> diff --git a/server/Cargo.toml b/server/Cargo.toml
> index 7b0058e..c8308f2 100644
> --- a/server/Cargo.toml
> +++ b/server/Cargo.toml
> @@ -24,6 +24,7 @@ nix.workspace = true
>  once_cell.workspace = true
>  openssl.workspace = true
>  percent-encoding.workspace = true
> +rand.workspace = true
>  serde.workspace = true
>  serde_json.workspace = true
>  syslog.workspace = true
> diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
> index a79094d..6e85e52 100644
> --- a/server/src/bin/proxmox-datacenter-api.rs
> +++ b/server/src/bin/proxmox-datacenter-api.rs
> @@ -286,7 +286,7 @@ async fn run(debug: bool) -> Result<(), Error> {
>      });
>  
>      start_task_scheduler();
> -    metric_collection::start_task();
> +    metric_collection::start_task()?;
>      resource_cache::start_task();
>  
>      server.await?;
> diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
> new file mode 100644
> index 0000000..8420039
> --- /dev/null
> +++ b/server/src/metric_collection/collection_task.rs
> @@ -0,0 +1,317 @@
> +use std::{collections::HashMap, sync::Arc, time::Duration};
> +
> +use anyhow::Error;
> +use rand::Rng;
> +use tokio::{
> +    sync::{
> +        mpsc::{Receiver, Sender},
> +        OwnedSemaphorePermit, Semaphore,
> +    },
> +    time::Interval,
> +};
> +
> +use proxmox_section_config::typed::SectionConfigData;
> +
> +use pdm_api_types::{
> +    remotes::{Remote, RemoteType},
> +    CollectionSettings,
> +};
> +use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
> +
> +use crate::{connection, task_utils};
> +
> +use super::rrd_task::RrdStoreRequest;
> +
> +/// Control messages for the metric collection task.
> +pub(super) enum ControlMsg {
> +    CollectSingleRemote(String),
> +    CollectAllRemotes,
> +}
> +
> +/// Task which periodically collects metrics from all remotes and stores
> +/// them in the local metrics database.
> +pub(super) struct MetricCollectionTask {
> +    most_recent_timestamps: HashMap<String, i64>,
> +    settings: CollectionSettings,
> +    metric_data_tx: Sender<RrdStoreRequest>,
> +    control_message_rx: Receiver<ControlMsg>,
> +}
> +
> +impl MetricCollectionTask {
> +    /// Create a new metric collection task.
> +    pub(super) fn new(
> +        metric_data_tx: Sender<RrdStoreRequest>,
> +        control_message_rx: Receiver<ControlMsg>,
> +    ) -> Result<Self, Error> {
> +        let settings = Self::get_settings_or_default();
> +
> +        Ok(Self {
> +            most_recent_timestamps: HashMap::new(),
> +            settings,
> +            metric_data_tx,
> +            control_message_rx,
> +        })
> +    }
> +
> +    /// Run the metric collection task.
> +    ///
> +    /// This function does never return.
> +    #[tracing::instrument(skip_all, name = "metric_collection_task")]
> +    pub(super) async fn run(&mut self) {
> +        let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
> +
> +        log::debug!(
> +            "metric collection starting up. collection interval set to {} seconds",

^ nit: using punctuation in the middle but continuing in lower case and
not ending with punctuation 🤪

> +            self.settings.collection_interval_or_default()
> +        );
> +
> +        loop {
> +            let old_settings = self.settings.clone();
> +            tokio::select! {
> +                _ = timer.tick() => {
> +                    // Reload settings in case they have changed in the meanwhile
> +                    self.settings = Self::get_settings_or_default();
> +
> +                    log::debug!("starting metric collection from all remotes - triggered by timer");

Not sure if it's worth moving this into a `self.on_tick()` but...

> +                    self.sleep_for_random_interval_offset().await;
> +
> +                    if let Some(remotes) = Self::load_remote_config() {
> +                        let to_fetch = remotes.order.as_slice();
> +                        self.fetch_remotes(&remotes, to_fetch).await;
> +                    }
> +                }
> +
> +                val = self.control_message_rx.recv() => {
> +                    // Reload settings in case they have changed in the meanwhile
> +                    self.settings = Self::get_settings_or_default();
> +                    match val {

...but I think this part should be factored out into a
`self.handle_control_message(msg).await`

It gets indented quite deeply and it just makes sense IMO :)

> +                        Some(ControlMsg::CollectSingleRemote(remote)) => {
> +                            if let Some(remotes) = Self::load_remote_config() {

^ Also this...

> +                                log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
> +                                self.fetch_remotes(&remotes, &[remote]).await;
> +                            }
> +                        }
> +                        Some(ControlMsg::CollectAllRemotes) => {
> +                            if let Some(remotes) = Self::load_remote_config() {

... is the same as this, so if this is factored out into a separate
function, it could just early-out via a let-else there, as we don't need
to call it when `val` is `None`, and then it's quite compact.

> +                                log::debug!("starting metric collection from all remotes - triggered by control message");
> +                                self.fetch_remotes(&remotes, &remotes.order).await;
> +                            }
> +                        }
> +                        _ => {},
> +                    }
> +                }
> +            }
> +
> +            let interval = self.settings.collection_interval_or_default();
> +
> +            if old_settings.collection_interval_or_default() != interval {
> +                log::info!(
> +                    "metric collection interval changed to {} seconds, reloading timer",
> +                    interval
> +                );
> +                timer = Self::setup_timer(interval);
> +            }
> +        }
> +    }
> +
> +    async fn sleep_for_random_interval_offset(&self) {
> +        let mut min = self.settings.min_interval_offset_or_default();
> +        let max = self.settings.max_interval_offset_or_default();
> +
> +        if min > max {
> +            log::warn!(
> +                "min-interval-offset is larger than max-interval-offset ({min} > {max}) - \
> +                capping it to max-interval-offset ({max})"
> +            );
> +            min = max;
> +        }
> +
> +        let jitter = {
> +            let mut rng = rand::thread_rng();
> +            rng.gen_range(min..=max)
> +        };
> +        tokio::time::sleep(Duration::from_secs(jitter)).await;
> +    }
> +

↑ and ↓ are crying out for a common fn with the variable name, min and
max as parameters ;-)

> +    async fn sleep_for_random_connection_delay(settings: &CollectionSettings) {
> +        let mut min = settings.min_connection_delay_or_default();
> +        let max = settings.max_connection_delay_or_default();
> +
> +        if min > max {
> +            log::warn!(
> +                "min-collection-delay is larger than max-collection-delay ({min} > {max}) - \
> +                capping it to max-collection-delay ({max})"
> +            );
> +            min = max;
> +        }
> +
> +        let jitter = {
> +            let mut rng = rand::thread_rng();
> +            rng.gen_range(min..=max)
> +        };
> +
> +        tokio::time::sleep(Duration::from_millis(jitter)).await;
> +    }
> +
> +    fn get_settings_or_default() -> CollectionSettings {
> +        // This function is a bit odd, but not sure if there is a much nicer
> +        // way to do it. We want to fall back to defaults if
> +        //   - the config file does not exist (no errors logged)
> +        //   - if section type is wrong or the config failed to parse (log errors)

It gets a bit shorter and if the `_impl` returns a result+option.
The match below can just `return settings` in the first case, and have
an `Ok(None) => (),`, the `Err` only has the log line and the default is
moved to after the match.

Alternatively a `default()` helper could also shorten it
(`Ok(default())` in the `_impl` and
`get_settings_impl().unwrap_or_elese(|err| { log(err); default() })` is
also shorter :-)

> +
> +        fn get_settings_impl() -> Result<CollectionSettings, Error> {
> +            let (config, _) = pdm_config::metric_collection::config()?;
> +
> +            let all_sections: Vec<CollectionSettings> =
> +                config.convert_to_typed_array(COLLECTION_SETTINGS_TYPE)?;
> +
> +            for section in all_sections {
> +                if section.id == "default" {
> +                    return Ok(section);
> +                }
> +            }
> +
> +            Ok(CollectionSettings {
> +                id: "default".into(),
> +                ..Default::default()
> +            })
> +        }
> +
> +        match get_settings_impl() {
> +            Ok(settings) => settings,
> +            Err(err) => {
> +                log::error!("could not read metric collection settings: {err} - falling back to default config");
> +
> +                CollectionSettings {
> +                    id: "default".into(),
> +                    ..Default::default()
> +                }
> +            }
> +        }
> +    }
> +
> +    /// Set up a [`tokio::time::Interval`] instance with the provided interval.
> +    /// The timer will be aligned, e.g. an interval of `60` will let the timer
> +    /// fire at minute boundaries.
> +    fn setup_timer(interval: u64) -> Interval {
> +        let mut timer = tokio::time::interval(Duration::from_secs(interval));
> +        let first_run = task_utils::next_aligned_instant(interval).into();
> +        timer.reset_at(first_run);
> +
> +        timer
> +    }
> +
> +    /// Convenience helper to load `remote.cfg`, logging the error
> +    /// and returning `None` if the config could not be read.
> +    fn load_remote_config() -> Option<SectionConfigData<Remote>> {
> +        match pdm_config::remotes::config() {
> +            Ok((remotes, _)) => Some(remotes),
> +            Err(e) => {
> +                log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
> +                None
> +            }
> +        }
> +    }
> +
> +    /// Fetch metric data from a provided list of remotes concurrently.
> +    /// The maximum number of concurrent connections is determined by
> +    /// `max_concurrent_connections` in the [`CollectionSettings`]
> +    /// instance in `self`.
> +    async fn fetch_remotes(
> +        &mut self,
> +        config: &SectionConfigData<Remote>,
> +        remotes_to_fetch: &[String],
> +    ) {
> +        let semaphore = Arc::new(Semaphore::new(
> +            self.settings.max_concurrent_connections_or_default(),
> +        ));
> +        let mut handles = Vec::new();

Not sure how "robust" the spawning is wrt. ordering - and this doesn't
need to happen in this patch (unless it's easy), but a follow-up could
probably change this into a tokio `JoinSet`, that way the await-loop
below can use `.join_next()`, so that for example if an early task
happens to end up finishing last, the loop still collects the remaining
logs in a more "temporally accurate" way...

> +
> +        for remote_name in remotes_to_fetch {
> +            let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
> +
> +            // unwrap is okay here, acquire_* will only fail if `close` has been
> +            // called on the semaphore.
> +            let permit = semaphore.clone().acquire_owned().await.unwrap();

^ Nit: where possible, I prefer `Arc::clone(&ptr)` just for the sake of
not accidentally `.clone()`ing something large after later a refactor
where the clones aren't in the patch context lines.

> +
> +            if let Some(remote) = config.get(remote_name).cloned() {
> +                log::debug!("fetching remote '{}'", remote.id);
> +                let handle = tokio::spawn(Self::fetch_single_remote(
> +                    self.settings.clone(),
> +                    remote,
> +                    start_time,
> +                    self.metric_data_tx.clone(),
> +                    permit,
> +                ));
> +
> +                handles.push((remote_name.clone(), handle));
> +            }
> +        }
> +
> +        for (remote_name, handle) in handles {
> +            let res = handle.await;
> +
> +            match res {
> +                Ok(Ok(ts)) => {
> +                    self.most_recent_timestamps
> +                        .insert(remote_name.to_string(), ts);
> +                }
> +                Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
> +                Err(err) => {
> +                    log::error!(
> +                        "join error for metric collection task for remote {remote_name}: {err}"
> +                    )
> +                }
> +            }
> +        }
> +    }
> +
> +    /// Fetch a single remote.
> +    #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")]
> +    async fn fetch_single_remote(
> +        settings: CollectionSettings,
> +        remote: Remote,
> +        start_time: i64,
> +        sender: Sender<RrdStoreRequest>,
> +        _permit: OwnedSemaphorePermit,
> +    ) -> Result<i64, Error> {
> +        Self::sleep_for_random_connection_delay(&settings).await;
> +
> +        let most_recent_timestamp = match remote.ty {
> +            RemoteType::Pve => {
> +                let client = connection::make_pve_client(&remote)?;
> +                let metrics = client
> +                    .cluster_metrics_export(Some(true), Some(false), Some(start_time))
> +                    .await?;
> +
> +                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> +
> +                sender
> +                    .send(RrdStoreRequest::Pve {
> +                        remote: remote.id.clone(),
> +                        metrics,
> +                    })
> +                    .await?;
> +
> +                most_recent
> +            }
> +            RemoteType::Pbs => {
> +                let client = connection::make_pbs_client(&remote)?;
> +                let metrics = client.metrics(Some(true), Some(start_time)).await?;
> +
> +                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> +
> +                sender
> +                    .send(RrdStoreRequest::Pbs {
> +                        remote: remote.id.clone(),
> +                        metrics,
> +                    })
> +                    .await?;
> +
> +                most_recent
> +            }
> +        };
> +
> +        Ok(most_recent_timestamp)
> +    }
> +}
> diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
> index 06ade5f..0ab6678 100644
> --- a/server/src/metric_collection/mod.rs
> +++ b/server/src/metric_collection/mod.rs
> @@ -1,20 +1,17 @@
> -use std::collections::HashMap;
>  use std::pin::pin;
> +use std::sync::OnceLock;
>  
> -use anyhow::Error;
> +use anyhow::{bail, Error};
>  use tokio::sync::mpsc::{self, Sender};
>  
> -use pdm_api_types::remotes::RemoteType;
> -
> -use crate::{connection, task_utils};
> -
> +mod collection_task;
>  pub mod rrd_cache;
>  mod rrd_task;
>  pub mod top_entities;
>  
> -use rrd_task::RrdStoreRequest;
> +use collection_task::{ControlMsg, MetricCollectionTask};
>  
> -const COLLECTION_INTERVAL: u64 = 60;
> +static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
>  
>  /// Initialize the RRD cache
>  pub fn init() -> Result<(), Error> {
> @@ -24,89 +21,56 @@ pub fn init() -> Result<(), Error> {
>  }
>  
>  /// Start the metric collection task.
> -pub fn start_task() {
> -    let (tx, rx) = mpsc::channel(128);
> +pub fn start_task() -> Result<(), Error> {
> +    let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
> +
> +    let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
> +    if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
> +        bail!("control message sender alread set");
> +    }
>  
>      tokio::spawn(async move {
> -        let task_scheduler = pin!(metric_collection_task(tx));
> +        let metric_collection_task_future = pin!(async move {
> +            let mut task =
> +                MetricCollectionTask::new(metric_data_tx, trigger_collection_rx).unwrap();
> +            task.run().await;

^ nit: could drop the let binding (then it IMO linewraps a bit nicer,
too)

> +        });
> +
>          let abort_future = pin!(proxmox_daemon::shutdown_future());
> -        futures::future::select(task_scheduler, abort_future).await;
> +        futures::future::select(metric_collection_task_future, abort_future).await;
>      });
>  
>      tokio::spawn(async move {
> -        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
> +        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
>          let abort_future = pin!(proxmox_daemon::shutdown_future());
> -        futures::future::select(task_scheduler, abort_future).await;
> +        futures::future::select(rrd_task_future, abort_future).await;
>      });
> +
> +    Ok(())
>  }
>  
> -async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
> -    let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
> -
> -    loop {
> -        let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
> -        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
> -
> -        let remotes = match pdm_config::remotes::config() {
> -            Ok((remotes, _)) => remotes,
> -            Err(e) => {
> -                log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
> -                continue;
> -            }
> -        };
> -
> -        for (remote_name, remote) in &remotes.sections {
> -            let start_time = *most_recent_timestamps.get(remote_name).unwrap_or(&0);
> -            let remote_name_clone = remote_name.clone();
> -
> -            let res = async {
> -                let most_recent_timestamp = match remote.ty {
> -                    RemoteType::Pve => {
> -                        let client = connection::make_pve_client(remote)?;
> -                        let metrics = client
> -                            .cluster_metrics_export(Some(true), Some(false), Some(start_time))
> -                            .await?;
> -
> -                        let most_recent =
> -                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> -
> -                        sender
> -                            .send(RrdStoreRequest::Pve {
> -                                remote: remote_name_clone,
> -                                metrics,
> -                            })
> -                            .await?;
> -
> -                        Ok::<i64, Error>(most_recent)
> -                    }
> -                    RemoteType::Pbs => {
> -                        let client = connection::make_pbs_client(remote)?;
> -                        let metrics = client.metrics(Some(true), Some(start_time)).await?;
> -
> -                        let most_recent =
> -                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> -
> -                        sender
> -                            .send(RrdStoreRequest::Pbs {
> -                                remote: remote_name_clone,
> -                                metrics,
> -                            })
> -                            .await?;
> -
> -                        Ok::<i64, Error>(most_recent)
> -                    }
> -                }?;
> -
> -                Ok::<i64, Error>(most_recent_timestamp)
> -            }
> -            .await;
> -
> -            match res {
> -                Ok(ts) => {
> -                    most_recent_timestamps.insert(remote_name.to_string(), ts);
> -                }
> -                Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
> -            }
> -        }
> +/// Schedule metric collection for a given remote as soon as possible.
> +///
> +/// Has no effect if the tx end of the channel has not been initialized yet.
> +/// Returns an error if the mpsc channel has been closed already.
> +pub async fn trigger_metric_collection_for_remote(remote: &str) -> Result<(), Error> {
> +    if let Some(sender) = CONTROL_MESSAGE_TX.get() {

^ nit: This is the normal case (given that `CONTROL_MESSAGE_TX` gets
initialized in `stark_task()` on startup, so IMO the function signature
could already be `String` to allow the caller to move it (or `impl
Into<String>` for this to be implicit) - both callers could currently
move it ;-)

> +        sender
> +            .send(ControlMsg::CollectSingleRemote(remote.into()))
> +            .await?;
>      }
> +
> +    Ok(())
> +}
> +
> +/// Schedule metric collection for all remotes as soon as possible.
> +///
> +/// Has no effect if the tx end of the channel has not been initialized yet.
> +/// Returns an error if the mpsc channel has been closed already.
> +pub async fn trigger_metric_collection() -> Result<(), Error> {
> +    if let Some(sender) = CONTROL_MESSAGE_TX.get() {
> +        sender.send(ControlMsg::CollectAllRemotes).await?;
> +    }
> +
> +    Ok(())
>  }
> -- 
> 2.39.5




More information about the pdm-devel mailing list