[pdm-devel] [PATCH proxmox-datacenter-manager 07/25] metric collection: rework metric poll task
Lukas Wagner
l.wagner at proxmox.com
Tue Feb 11 13:05:23 CET 2025
Metric collection is mostly limited by network latency, not disk IO or
CPU. This means we should be able get significant speedups by polling
remotes concurrently by spawning tokio workers.
To avoid load/IO spikes, we limit the number of concurrent connections
using a semaphore.
Each concurrent task which fetches a single remote waits a random amount
of time before actually connecting to the remote. The upper and lower
bounds for this random delay can be configured in the metric collection
settings. The main aim of this mechanism is to reduce load spikes.
Furthermore, each time when the main collection interval timer
fires, a random delay is introduced before actually starting the
collection process.
This is useful due to how we set up the timer; we configure
it to figure at aligned points in time. For instance, if the
collection interval is set to 60s, the timer fires at
minute boundaries. Using this random offset, we can avoid
triggering at the same time as other timers (cron, systemd).
Furthermore, we add a mechanism to trigger metric collection manually.
This is achieve by using `tokio::select!` on both, the timer's `tick`
method and the `recv` method of an `mpsc` which is used to send control
messages to the metric collection task.
For better code structure, the collection task is split into a separate
module.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
Cargo.toml | 1 +
server/Cargo.toml | 1 +
server/src/bin/proxmox-datacenter-api.rs | 2 +-
.../src/metric_collection/collection_task.rs | 317 ++++++++++++++++++
server/src/metric_collection/mod.rs | 128 +++----
5 files changed, 366 insertions(+), 83 deletions(-)
create mode 100644 server/src/metric_collection/collection_task.rs
diff --git a/Cargo.toml b/Cargo.toml
index 4f3b1d0..7dd60a5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -108,6 +108,7 @@ once_cell = "1.3.1"
openssl = "0.10.40"
percent-encoding = "2.1"
pin-project-lite = "0.2"
+rand = "0.8"
regex = "1.5.5"
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.1"
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 7b0058e..c8308f2 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -24,6 +24,7 @@ nix.workspace = true
once_cell.workspace = true
openssl.workspace = true
percent-encoding.workspace = true
+rand.workspace = true
serde.workspace = true
serde_json.workspace = true
syslog.workspace = true
diff --git a/server/src/bin/proxmox-datacenter-api.rs b/server/src/bin/proxmox-datacenter-api.rs
index a79094d..6e85e52 100644
--- a/server/src/bin/proxmox-datacenter-api.rs
+++ b/server/src/bin/proxmox-datacenter-api.rs
@@ -286,7 +286,7 @@ async fn run(debug: bool) -> Result<(), Error> {
});
start_task_scheduler();
- metric_collection::start_task();
+ metric_collection::start_task()?;
resource_cache::start_task();
server.await?;
diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
new file mode 100644
index 0000000..8420039
--- /dev/null
+++ b/server/src/metric_collection/collection_task.rs
@@ -0,0 +1,317 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use anyhow::Error;
+use rand::Rng;
+use tokio::{
+ sync::{
+ mpsc::{Receiver, Sender},
+ OwnedSemaphorePermit, Semaphore,
+ },
+ time::Interval,
+};
+
+use proxmox_section_config::typed::SectionConfigData;
+
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ CollectionSettings,
+};
+use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
+
+use crate::{connection, task_utils};
+
+use super::rrd_task::RrdStoreRequest;
+
+/// Control messages for the metric collection task.
+pub(super) enum ControlMsg {
+ CollectSingleRemote(String),
+ CollectAllRemotes,
+}
+
+/// Task which periodically collects metrics from all remotes and stores
+/// them in the local metrics database.
+pub(super) struct MetricCollectionTask {
+ most_recent_timestamps: HashMap<String, i64>,
+ settings: CollectionSettings,
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+}
+
+impl MetricCollectionTask {
+ /// Create a new metric collection task.
+ pub(super) fn new(
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+ ) -> Result<Self, Error> {
+ let settings = Self::get_settings_or_default();
+
+ Ok(Self {
+ most_recent_timestamps: HashMap::new(),
+ settings,
+ metric_data_tx,
+ control_message_rx,
+ })
+ }
+
+ /// Run the metric collection task.
+ ///
+ /// This function does never return.
+ #[tracing::instrument(skip_all, name = "metric_collection_task")]
+ pub(super) async fn run(&mut self) {
+ let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
+
+ log::debug!(
+ "metric collection starting up. collection interval set to {} seconds",
+ self.settings.collection_interval_or_default()
+ );
+
+ loop {
+ let old_settings = self.settings.clone();
+ tokio::select! {
+ _ = timer.tick() => {
+ // Reload settings in case they have changed in the meanwhile
+ self.settings = Self::get_settings_or_default();
+
+ log::debug!("starting metric collection from all remotes - triggered by timer");
+ self.sleep_for_random_interval_offset().await;
+
+ if let Some(remotes) = Self::load_remote_config() {
+ let to_fetch = remotes.order.as_slice();
+ self.fetch_remotes(&remotes, to_fetch).await;
+ }
+ }
+
+ val = self.control_message_rx.recv() => {
+ // Reload settings in case they have changed in the meanwhile
+ self.settings = Self::get_settings_or_default();
+ match val {
+ Some(ControlMsg::CollectSingleRemote(remote)) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
+ self.fetch_remotes(&remotes, &[remote]).await;
+ }
+ }
+ Some(ControlMsg::CollectAllRemotes) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection from all remotes - triggered by control message");
+ self.fetch_remotes(&remotes, &remotes.order).await;
+ }
+ }
+ _ => {},
+ }
+ }
+ }
+
+ let interval = self.settings.collection_interval_or_default();
+
+ if old_settings.collection_interval_or_default() != interval {
+ log::info!(
+ "metric collection interval changed to {} seconds, reloading timer",
+ interval
+ );
+ timer = Self::setup_timer(interval);
+ }
+ }
+ }
+
+ async fn sleep_for_random_interval_offset(&self) {
+ let mut min = self.settings.min_interval_offset_or_default();
+ let max = self.settings.max_interval_offset_or_default();
+
+ if min > max {
+ log::warn!(
+ "min-interval-offset is larger than max-interval-offset ({min} > {max}) - \
+ capping it to max-interval-offset ({max})"
+ );
+ min = max;
+ }
+
+ let jitter = {
+ let mut rng = rand::thread_rng();
+ rng.gen_range(min..=max)
+ };
+ tokio::time::sleep(Duration::from_secs(jitter)).await;
+ }
+
+ async fn sleep_for_random_connection_delay(settings: &CollectionSettings) {
+ let mut min = settings.min_connection_delay_or_default();
+ let max = settings.max_connection_delay_or_default();
+
+ if min > max {
+ log::warn!(
+ "min-collection-delay is larger than max-collection-delay ({min} > {max}) - \
+ capping it to max-collection-delay ({max})"
+ );
+ min = max;
+ }
+
+ let jitter = {
+ let mut rng = rand::thread_rng();
+ rng.gen_range(min..=max)
+ };
+
+ tokio::time::sleep(Duration::from_millis(jitter)).await;
+ }
+
+ fn get_settings_or_default() -> CollectionSettings {
+ // This function is a bit odd, but not sure if there is a much nicer
+ // way to do it. We want to fall back to defaults if
+ // - the config file does not exist (no errors logged)
+ // - if section type is wrong or the config failed to parse (log errors)
+
+ fn get_settings_impl() -> Result<CollectionSettings, Error> {
+ let (config, _) = pdm_config::metric_collection::config()?;
+
+ let all_sections: Vec<CollectionSettings> =
+ config.convert_to_typed_array(COLLECTION_SETTINGS_TYPE)?;
+
+ for section in all_sections {
+ if section.id == "default" {
+ return Ok(section);
+ }
+ }
+
+ Ok(CollectionSettings {
+ id: "default".into(),
+ ..Default::default()
+ })
+ }
+
+ match get_settings_impl() {
+ Ok(settings) => settings,
+ Err(err) => {
+ log::error!("could not read metric collection settings: {err} - falling back to default config");
+
+ CollectionSettings {
+ id: "default".into(),
+ ..Default::default()
+ }
+ }
+ }
+ }
+
+ /// Set up a [`tokio::time::Interval`] instance with the provided interval.
+ /// The timer will be aligned, e.g. an interval of `60` will let the timer
+ /// fire at minute boundaries.
+ fn setup_timer(interval: u64) -> Interval {
+ let mut timer = tokio::time::interval(Duration::from_secs(interval));
+ let first_run = task_utils::next_aligned_instant(interval).into();
+ timer.reset_at(first_run);
+
+ timer
+ }
+
+ /// Convenience helper to load `remote.cfg`, logging the error
+ /// and returning `None` if the config could not be read.
+ fn load_remote_config() -> Option<SectionConfigData<Remote>> {
+ match pdm_config::remotes::config() {
+ Ok((remotes, _)) => Some(remotes),
+ Err(e) => {
+ log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
+ None
+ }
+ }
+ }
+
+ /// Fetch metric data from a provided list of remotes concurrently.
+ /// The maximum number of concurrent connections is determined by
+ /// `max_concurrent_connections` in the [`CollectionSettings`]
+ /// instance in `self`.
+ async fn fetch_remotes(
+ &mut self,
+ config: &SectionConfigData<Remote>,
+ remotes_to_fetch: &[String],
+ ) {
+ let semaphore = Arc::new(Semaphore::new(
+ self.settings.max_concurrent_connections_or_default(),
+ ));
+ let mut handles = Vec::new();
+
+ for remote_name in remotes_to_fetch {
+ let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+
+ // unwrap is okay here, acquire_* will only fail if `close` has been
+ // called on the semaphore.
+ let permit = semaphore.clone().acquire_owned().await.unwrap();
+
+ if let Some(remote) = config.get(remote_name).cloned() {
+ log::debug!("fetching remote '{}'", remote.id);
+ let handle = tokio::spawn(Self::fetch_single_remote(
+ self.settings.clone(),
+ remote,
+ start_time,
+ self.metric_data_tx.clone(),
+ permit,
+ ));
+
+ handles.push((remote_name.clone(), handle));
+ }
+ }
+
+ for (remote_name, handle) in handles {
+ let res = handle.await;
+
+ match res {
+ Ok(Ok(ts)) => {
+ self.most_recent_timestamps
+ .insert(remote_name.to_string(), ts);
+ }
+ Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
+ Err(err) => {
+ log::error!(
+ "join error for metric collection task for remote {remote_name}: {err}"
+ )
+ }
+ }
+ }
+ }
+
+ /// Fetch a single remote.
+ #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")]
+ async fn fetch_single_remote(
+ settings: CollectionSettings,
+ remote: Remote,
+ start_time: i64,
+ sender: Sender<RrdStoreRequest>,
+ _permit: OwnedSemaphorePermit,
+ ) -> Result<i64, Error> {
+ Self::sleep_for_random_connection_delay(&settings).await;
+
+ let most_recent_timestamp = match remote.ty {
+ RemoteType::Pve => {
+ let client = connection::make_pve_client(&remote)?;
+ let metrics = client
+ .cluster_metrics_export(Some(true), Some(false), Some(start_time))
+ .await?;
+
+ let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+ sender
+ .send(RrdStoreRequest::Pve {
+ remote: remote.id.clone(),
+ metrics,
+ })
+ .await?;
+
+ most_recent
+ }
+ RemoteType::Pbs => {
+ let client = connection::make_pbs_client(&remote)?;
+ let metrics = client.metrics(Some(true), Some(start_time)).await?;
+
+ let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+ sender
+ .send(RrdStoreRequest::Pbs {
+ remote: remote.id.clone(),
+ metrics,
+ })
+ .await?;
+
+ most_recent
+ }
+ };
+
+ Ok(most_recent_timestamp)
+ }
+}
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 06ade5f..0ab6678 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -1,20 +1,17 @@
-use std::collections::HashMap;
use std::pin::pin;
+use std::sync::OnceLock;
-use anyhow::Error;
+use anyhow::{bail, Error};
use tokio::sync::mpsc::{self, Sender};
-use pdm_api_types::remotes::RemoteType;
-
-use crate::{connection, task_utils};
-
+mod collection_task;
pub mod rrd_cache;
mod rrd_task;
pub mod top_entities;
-use rrd_task::RrdStoreRequest;
+use collection_task::{ControlMsg, MetricCollectionTask};
-const COLLECTION_INTERVAL: u64 = 60;
+static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
/// Initialize the RRD cache
pub fn init() -> Result<(), Error> {
@@ -24,89 +21,56 @@ pub fn init() -> Result<(), Error> {
}
/// Start the metric collection task.
-pub fn start_task() {
- let (tx, rx) = mpsc::channel(128);
+pub fn start_task() -> Result<(), Error> {
+ let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
+
+ let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
+ if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
+ bail!("control message sender alread set");
+ }
tokio::spawn(async move {
- let task_scheduler = pin!(metric_collection_task(tx));
+ let metric_collection_task_future = pin!(async move {
+ let mut task =
+ MetricCollectionTask::new(metric_data_tx, trigger_collection_rx).unwrap();
+ task.run().await;
+ });
+
let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
+ futures::future::select(metric_collection_task_future, abort_future).await;
});
tokio::spawn(async move {
- let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
+ let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
+ futures::future::select(rrd_task_future, abort_future).await;
});
+
+ Ok(())
}
-async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
- let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
-
- loop {
- let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let remotes = match pdm_config::remotes::config() {
- Ok((remotes, _)) => remotes,
- Err(e) => {
- log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
- continue;
- }
- };
-
- for (remote_name, remote) in &remotes.sections {
- let start_time = *most_recent_timestamps.get(remote_name).unwrap_or(&0);
- let remote_name_clone = remote_name.clone();
-
- let res = async {
- let most_recent_timestamp = match remote.ty {
- RemoteType::Pve => {
- let client = connection::make_pve_client(remote)?;
- let metrics = client
- .cluster_metrics_export(Some(true), Some(false), Some(start_time))
- .await?;
-
- let most_recent =
- metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
- sender
- .send(RrdStoreRequest::Pve {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
-
- Ok::<i64, Error>(most_recent)
- }
- RemoteType::Pbs => {
- let client = connection::make_pbs_client(remote)?;
- let metrics = client.metrics(Some(true), Some(start_time)).await?;
-
- let most_recent =
- metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
- sender
- .send(RrdStoreRequest::Pbs {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
-
- Ok::<i64, Error>(most_recent)
- }
- }?;
-
- Ok::<i64, Error>(most_recent_timestamp)
- }
- .await;
-
- match res {
- Ok(ts) => {
- most_recent_timestamps.insert(remote_name.to_string(), ts);
- }
- Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
- }
- }
+/// Schedule metric collection for a given remote as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection_for_remote(remote: &str) -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender
+ .send(ControlMsg::CollectSingleRemote(remote.into()))
+ .await?;
}
+
+ Ok(())
+}
+
+/// Schedule metric collection for all remotes as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection() -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender.send(ControlMsg::CollectAllRemotes).await?;
+ }
+
+ Ok(())
}
--
2.39.5
More information about the pdm-devel
mailing list