[pdm-devel] [PATCH proxmox-datacenter-manager 07/25] metric collection: rework metric poll task
Wolfgang Bumiller
w.bumiller at proxmox.com
Wed Feb 12 16:57:37 CET 2025
Looks good.
One improvement/follow-up suggestion (JoinSet), some nitpicking &
potential refactoring requests...
On Tue, Feb 11, 2025 at 01:05:23PM +0100, Lukas Wagner wrote:
> Metric collection is mostly limited by network latency, not disk IO or
> CPU. This means we should be able get significant speedups by polling
> remotes concurrently by spawning tokio workers.
>
> To avoid load/IO spikes, we limit the number of concurrent connections
> using a semaphore.
>
> Each concurrent task which fetches a single remote waits a random amount
> of time before actually connecting to the remote. The upper and lower
> bounds for this random delay can be configured in the metric collection
> settings. The main aim of this mechanism is to reduce load spikes.
>
> Furthermore, each time when the main collection interval timer
> fires, a random delay is introduced before actually starting the
> collection process.
> This is useful due to how we set up the timer; we configure
> it to figure at aligned points in time. For instance, if the
> collection interval is set to 60s, the timer fires at
> minute boundaries. Using this random offset, we can avoid
> triggering at the same time as other timers (cron, systemd).
>
> Furthermore, we add a mechanism to trigger metric collection manually.
> This is achieve by using `tokio::select!` on both, the timer's `tick`
> method and the `recv` method of an `mpsc` which is used to send control
> messages to the metric collection task.
>
> For better code structure, the collection task is split into a separate
> module.
>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> Cargo.toml | 1 +
> server/Cargo.toml | 1 +
> server/src/bin/proxmox-datacenter-api.rs | 2 +-
> .../src/metric_collection/collection_task.rs | 317 ++++++++++++++++++
> server/src/metric_collection/mod.rs | 128 +++----
> 5 files changed, 366 insertions(+), 83 deletions(-)
> create mode 100644 server/src/metric_collection/collection_task.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 4f3b1d0..7dd60a5 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -108,6 +108,7 @@ once_cell = "1.3.1"
> openssl = "0.10.40"
> percent-encoding = "2.1"
> pin-project-lite = "0.2"
> +rand = "0.8"
> regex = "1.5.5"
> serde = { version = "1.0", features = ["derive"] }
> serde_cbor = "0.11.1"
> diff --git a/server/Cargo.toml b/server/Cargo.toml
> index 7b0058e..c8308f2 100644
> --- a/server/Cargo.toml
> +++ b/server/Cargo.toml
> @@ -24,6 +24,7 @@ nix.workspace = true
> once_cell.workspace = true
> openssl.workspace = true
> percent-encoding.workspace = true
> +rand.workspace = true
> serde.workspace = true
> serde_json.workspace = true
> syslog.workspace = true
> diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
> index a79094d..6e85e52 100644
> --- a/server/src/bin/proxmox-datacenter-api.rs
> +++ b/server/src/bin/proxmox-datacenter-api.rs
> @@ -286,7 +286,7 @@ async fn run(debug: bool) -> Result<(), Error> {
> });
>
> start_task_scheduler();
> - metric_collection::start_task();
> + metric_collection::start_task()?;
> resource_cache::start_task();
>
> server.await?;
> diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
> new file mode 100644
> index 0000000..8420039
> --- /dev/null
> +++ b/server/src/metric_collection/collection_task.rs
> @@ -0,0 +1,317 @@
> +use std::{collections::HashMap, sync::Arc, time::Duration};
> +
> +use anyhow::Error;
> +use rand::Rng;
> +use tokio::{
> + sync::{
> + mpsc::{Receiver, Sender},
> + OwnedSemaphorePermit, Semaphore,
> + },
> + time::Interval,
> +};
> +
> +use proxmox_section_config::typed::SectionConfigData;
> +
> +use pdm_api_types::{
> + remotes::{Remote, RemoteType},
> + CollectionSettings,
> +};
> +use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
> +
> +use crate::{connection, task_utils};
> +
> +use super::rrd_task::RrdStoreRequest;
> +
> +/// Control messages for the metric collection task.
> +pub(super) enum ControlMsg {
> + CollectSingleRemote(String),
> + CollectAllRemotes,
> +}
> +
> +/// Task which periodically collects metrics from all remotes and stores
> +/// them in the local metrics database.
> +pub(super) struct MetricCollectionTask {
> + most_recent_timestamps: HashMap<String, i64>,
> + settings: CollectionSettings,
> + metric_data_tx: Sender<RrdStoreRequest>,
> + control_message_rx: Receiver<ControlMsg>,
> +}
> +
> +impl MetricCollectionTask {
> + /// Create a new metric collection task.
> + pub(super) fn new(
> + metric_data_tx: Sender<RrdStoreRequest>,
> + control_message_rx: Receiver<ControlMsg>,
> + ) -> Result<Self, Error> {
> + let settings = Self::get_settings_or_default();
> +
> + Ok(Self {
> + most_recent_timestamps: HashMap::new(),
> + settings,
> + metric_data_tx,
> + control_message_rx,
> + })
> + }
> +
> + /// Run the metric collection task.
> + ///
> + /// This function does never return.
> + #[tracing::instrument(skip_all, name = "metric_collection_task")]
> + pub(super) async fn run(&mut self) {
> + let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
> +
> + log::debug!(
> + "metric collection starting up. collection interval set to {} seconds",
^ nit: using punctuation in the middle but continuing in lower case and
not ending with punctuation 🤪
> + self.settings.collection_interval_or_default()
> + );
> +
> + loop {
> + let old_settings = self.settings.clone();
> + tokio::select! {
> + _ = timer.tick() => {
> + // Reload settings in case they have changed in the meanwhile
> + self.settings = Self::get_settings_or_default();
> +
> + log::debug!("starting metric collection from all remotes - triggered by timer");
Not sure if it's worth moving this into a `self.on_tick()` but...
> + self.sleep_for_random_interval_offset().await;
> +
> + if let Some(remotes) = Self::load_remote_config() {
> + let to_fetch = remotes.order.as_slice();
> + self.fetch_remotes(&remotes, to_fetch).await;
> + }
> + }
> +
> + val = self.control_message_rx.recv() => {
> + // Reload settings in case they have changed in the meanwhile
> + self.settings = Self::get_settings_or_default();
> + match val {
...but I think this part should be factored out into a
`self.handle_control_message(msg).await`
It gets indented quite deeply and it just makes sense IMO :)
> + Some(ControlMsg::CollectSingleRemote(remote)) => {
> + if let Some(remotes) = Self::load_remote_config() {
^ Also this...
> + log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
> + self.fetch_remotes(&remotes, &[remote]).await;
> + }
> + }
> + Some(ControlMsg::CollectAllRemotes) => {
> + if let Some(remotes) = Self::load_remote_config() {
... is the same as this, so if this is factored out into a separate
function, it could just early-out via a let-else there, as we don't need
to call it when `val` is `None`, and then it's quite compact.
> + log::debug!("starting metric collection from all remotes - triggered by control message");
> + self.fetch_remotes(&remotes, &remotes.order).await;
> + }
> + }
> + _ => {},
> + }
> + }
> + }
> +
> + let interval = self.settings.collection_interval_or_default();
> +
> + if old_settings.collection_interval_or_default() != interval {
> + log::info!(
> + "metric collection interval changed to {} seconds, reloading timer",
> + interval
> + );
> + timer = Self::setup_timer(interval);
> + }
> + }
> + }
> +
> + async fn sleep_for_random_interval_offset(&self) {
> + let mut min = self.settings.min_interval_offset_or_default();
> + let max = self.settings.max_interval_offset_or_default();
> +
> + if min > max {
> + log::warn!(
> + "min-interval-offset is larger than max-interval-offset ({min} > {max}) - \
> + capping it to max-interval-offset ({max})"
> + );
> + min = max;
> + }
> +
> + let jitter = {
> + let mut rng = rand::thread_rng();
> + rng.gen_range(min..=max)
> + };
> + tokio::time::sleep(Duration::from_secs(jitter)).await;
> + }
> +
↑ and ↓ are crying out for a common fn with the variable name, min and
max as parameters ;-)
> + async fn sleep_for_random_connection_delay(settings: &CollectionSettings) {
> + let mut min = settings.min_connection_delay_or_default();
> + let max = settings.max_connection_delay_or_default();
> +
> + if min > max {
> + log::warn!(
> + "min-collection-delay is larger than max-collection-delay ({min} > {max}) - \
> + capping it to max-collection-delay ({max})"
> + );
> + min = max;
> + }
> +
> + let jitter = {
> + let mut rng = rand::thread_rng();
> + rng.gen_range(min..=max)
> + };
> +
> + tokio::time::sleep(Duration::from_millis(jitter)).await;
> + }
> +
> + fn get_settings_or_default() -> CollectionSettings {
> + // This function is a bit odd, but not sure if there is a much nicer
> + // way to do it. We want to fall back to defaults if
> + // - the config file does not exist (no errors logged)
> + // - if section type is wrong or the config failed to parse (log errors)
It gets a bit shorter and if the `_impl` returns a result+option.
The match below can just `return settings` in the first case, and have
an `Ok(None) => (),`, the `Err` only has the log line and the default is
moved to after the match.
Alternatively a `default()` helper could also shorten it
(`Ok(default())` in the `_impl` and
`get_settings_impl().unwrap_or_elese(|err| { log(err); default() })` is
also shorter :-)
> +
> + fn get_settings_impl() -> Result<CollectionSettings, Error> {
> + let (config, _) = pdm_config::metric_collection::config()?;
> +
> + let all_sections: Vec<CollectionSettings> =
> + config.convert_to_typed_array(COLLECTION_SETTINGS_TYPE)?;
> +
> + for section in all_sections {
> + if section.id == "default" {
> + return Ok(section);
> + }
> + }
> +
> + Ok(CollectionSettings {
> + id: "default".into(),
> + ..Default::default()
> + })
> + }
> +
> + match get_settings_impl() {
> + Ok(settings) => settings,
> + Err(err) => {
> + log::error!("could not read metric collection settings: {err} - falling back to default config");
> +
> + CollectionSettings {
> + id: "default".into(),
> + ..Default::default()
> + }
> + }
> + }
> + }
> +
> + /// Set up a [`tokio::time::Interval`] instance with the provided interval.
> + /// The timer will be aligned, e.g. an interval of `60` will let the timer
> + /// fire at minute boundaries.
> + fn setup_timer(interval: u64) -> Interval {
> + let mut timer = tokio::time::interval(Duration::from_secs(interval));
> + let first_run = task_utils::next_aligned_instant(interval).into();
> + timer.reset_at(first_run);
> +
> + timer
> + }
> +
> + /// Convenience helper to load `remote.cfg`, logging the error
> + /// and returning `None` if the config could not be read.
> + fn load_remote_config() -> Option<SectionConfigData<Remote>> {
> + match pdm_config::remotes::config() {
> + Ok((remotes, _)) => Some(remotes),
> + Err(e) => {
> + log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
> + None
> + }
> + }
> + }
> +
> + /// Fetch metric data from a provided list of remotes concurrently.
> + /// The maximum number of concurrent connections is determined by
> + /// `max_concurrent_connections` in the [`CollectionSettings`]
> + /// instance in `self`.
> + async fn fetch_remotes(
> + &mut self,
> + config: &SectionConfigData<Remote>,
> + remotes_to_fetch: &[String],
> + ) {
> + let semaphore = Arc::new(Semaphore::new(
> + self.settings.max_concurrent_connections_or_default(),
> + ));
> + let mut handles = Vec::new();
Not sure how "robust" the spawning is wrt. ordering - and this doesn't
need to happen in this patch (unless it's easy), but a follow-up could
probably change this into a tokio `JoinSet`, that way the await-loop
below can use `.join_next()`, so that for example if an early task
happens to end up finishing last, the loop still collects the remaining
logs in a more "temporally accurate" way...
> +
> + for remote_name in remotes_to_fetch {
> + let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
> +
> + // unwrap is okay here, acquire_* will only fail if `close` has been
> + // called on the semaphore.
> + let permit = semaphore.clone().acquire_owned().await.unwrap();
^ Nit: where possible, I prefer `Arc::clone(&ptr)` just for the sake of
not accidentally `.clone()`ing something large after later a refactor
where the clones aren't in the patch context lines.
> +
> + if let Some(remote) = config.get(remote_name).cloned() {
> + log::debug!("fetching remote '{}'", remote.id);
> + let handle = tokio::spawn(Self::fetch_single_remote(
> + self.settings.clone(),
> + remote,
> + start_time,
> + self.metric_data_tx.clone(),
> + permit,
> + ));
> +
> + handles.push((remote_name.clone(), handle));
> + }
> + }
> +
> + for (remote_name, handle) in handles {
> + let res = handle.await;
> +
> + match res {
> + Ok(Ok(ts)) => {
> + self.most_recent_timestamps
> + .insert(remote_name.to_string(), ts);
> + }
> + Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
> + Err(err) => {
> + log::error!(
> + "join error for metric collection task for remote {remote_name}: {err}"
> + )
> + }
> + }
> + }
> + }
> +
> + /// Fetch a single remote.
> + #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")]
> + async fn fetch_single_remote(
> + settings: CollectionSettings,
> + remote: Remote,
> + start_time: i64,
> + sender: Sender<RrdStoreRequest>,
> + _permit: OwnedSemaphorePermit,
> + ) -> Result<i64, Error> {
> + Self::sleep_for_random_connection_delay(&settings).await;
> +
> + let most_recent_timestamp = match remote.ty {
> + RemoteType::Pve => {
> + let client = connection::make_pve_client(&remote)?;
> + let metrics = client
> + .cluster_metrics_export(Some(true), Some(false), Some(start_time))
> + .await?;
> +
> + let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> +
> + sender
> + .send(RrdStoreRequest::Pve {
> + remote: remote.id.clone(),
> + metrics,
> + })
> + .await?;
> +
> + most_recent
> + }
> + RemoteType::Pbs => {
> + let client = connection::make_pbs_client(&remote)?;
> + let metrics = client.metrics(Some(true), Some(start_time)).await?;
> +
> + let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> +
> + sender
> + .send(RrdStoreRequest::Pbs {
> + remote: remote.id.clone(),
> + metrics,
> + })
> + .await?;
> +
> + most_recent
> + }
> + };
> +
> + Ok(most_recent_timestamp)
> + }
> +}
> diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
> index 06ade5f..0ab6678 100644
> --- a/server/src/metric_collection/mod.rs
> +++ b/server/src/metric_collection/mod.rs
> @@ -1,20 +1,17 @@
> -use std::collections::HashMap;
> use std::pin::pin;
> +use std::sync::OnceLock;
>
> -use anyhow::Error;
> +use anyhow::{bail, Error};
> use tokio::sync::mpsc::{self, Sender};
>
> -use pdm_api_types::remotes::RemoteType;
> -
> -use crate::{connection, task_utils};
> -
> +mod collection_task;
> pub mod rrd_cache;
> mod rrd_task;
> pub mod top_entities;
>
> -use rrd_task::RrdStoreRequest;
> +use collection_task::{ControlMsg, MetricCollectionTask};
>
> -const COLLECTION_INTERVAL: u64 = 60;
> +static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
>
> /// Initialize the RRD cache
> pub fn init() -> Result<(), Error> {
> @@ -24,89 +21,56 @@ pub fn init() -> Result<(), Error> {
> }
>
> /// Start the metric collection task.
> -pub fn start_task() {
> - let (tx, rx) = mpsc::channel(128);
> +pub fn start_task() -> Result<(), Error> {
> + let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
> +
> + let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
> + if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
> + bail!("control message sender alread set");
> + }
>
> tokio::spawn(async move {
> - let task_scheduler = pin!(metric_collection_task(tx));
> + let metric_collection_task_future = pin!(async move {
> + let mut task =
> + MetricCollectionTask::new(metric_data_tx, trigger_collection_rx).unwrap();
> + task.run().await;
^ nit: could drop the let binding (then it IMO linewraps a bit nicer,
too)
> + });
> +
> let abort_future = pin!(proxmox_daemon::shutdown_future());
> - futures::future::select(task_scheduler, abort_future).await;
> + futures::future::select(metric_collection_task_future, abort_future).await;
> });
>
> tokio::spawn(async move {
> - let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
> + let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
> let abort_future = pin!(proxmox_daemon::shutdown_future());
> - futures::future::select(task_scheduler, abort_future).await;
> + futures::future::select(rrd_task_future, abort_future).await;
> });
> +
> + Ok(())
> }
>
> -async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
> - let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
> -
> - loop {
> - let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
> - tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
> -
> - let remotes = match pdm_config::remotes::config() {
> - Ok((remotes, _)) => remotes,
> - Err(e) => {
> - log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
> - continue;
> - }
> - };
> -
> - for (remote_name, remote) in &remotes.sections {
> - let start_time = *most_recent_timestamps.get(remote_name).unwrap_or(&0);
> - let remote_name_clone = remote_name.clone();
> -
> - let res = async {
> - let most_recent_timestamp = match remote.ty {
> - RemoteType::Pve => {
> - let client = connection::make_pve_client(remote)?;
> - let metrics = client
> - .cluster_metrics_export(Some(true), Some(false), Some(start_time))
> - .await?;
> -
> - let most_recent =
> - metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> -
> - sender
> - .send(RrdStoreRequest::Pve {
> - remote: remote_name_clone,
> - metrics,
> - })
> - .await?;
> -
> - Ok::<i64, Error>(most_recent)
> - }
> - RemoteType::Pbs => {
> - let client = connection::make_pbs_client(remote)?;
> - let metrics = client.metrics(Some(true), Some(start_time)).await?;
> -
> - let most_recent =
> - metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
> -
> - sender
> - .send(RrdStoreRequest::Pbs {
> - remote: remote_name_clone,
> - metrics,
> - })
> - .await?;
> -
> - Ok::<i64, Error>(most_recent)
> - }
> - }?;
> -
> - Ok::<i64, Error>(most_recent_timestamp)
> - }
> - .await;
> -
> - match res {
> - Ok(ts) => {
> - most_recent_timestamps.insert(remote_name.to_string(), ts);
> - }
> - Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
> - }
> - }
> +/// Schedule metric collection for a given remote as soon as possible.
> +///
> +/// Has no effect if the tx end of the channel has not been initialized yet.
> +/// Returns an error if the mpsc channel has been closed already.
> +pub async fn trigger_metric_collection_for_remote(remote: &str) -> Result<(), Error> {
> + if let Some(sender) = CONTROL_MESSAGE_TX.get() {
^ nit: This is the normal case (given that `CONTROL_MESSAGE_TX` gets
initialized in `stark_task()` on startup, so IMO the function signature
could already be `String` to allow the caller to move it (or `impl
Into<String>` for this to be implicit) - both callers could currently
move it ;-)
> + sender
> + .send(ControlMsg::CollectSingleRemote(remote.into()))
> + .await?;
> }
> +
> + Ok(())
> +}
> +
> +/// Schedule metric collection for all remotes as soon as possible.
> +///
> +/// Has no effect if the tx end of the channel has not been initialized yet.
> +/// Returns an error if the mpsc channel has been closed already.
> +pub async fn trigger_metric_collection() -> Result<(), Error> {
> + if let Some(sender) = CONTROL_MESSAGE_TX.get() {
> + sender.send(ControlMsg::CollectAllRemotes).await?;
> + }
> +
> + Ok(())
> }
> --
> 2.39.5
More information about the pdm-devel
mailing list