[pdm-devel] [PATCH proxmox-datacenter-manager v2 06/28] metric collection: save metric data to RRD in separate task

Lukas Wagner l.wagner at proxmox.com
Fri Feb 14 14:06:31 CET 2025


This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.

For better code separation, this also splits out the new task into
a new submodule.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/metric_collection/mod.rs      | 102 +++++++----------------
 server/src/metric_collection/rrd_task.rs |  96 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 70 deletions(-)
 create mode 100644 server/src/metric_collection/rrd_task.rs

diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 5540f937..06ade5f0 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
 use std::pin::pin;
 
 use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
 
 use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
 
 use crate::{connection, task_utils};
 
 pub mod rrd_cache;
+mod rrd_task;
 pub mod top_entities;
 
+use rrd_task::RrdStoreRequest;
+
 const COLLECTION_INTERVAL: u64 = 60;
 
 /// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
 
 /// Start the metric collection task.
 pub fn start_task() {
+    let (tx, rx) = mpsc::channel(128);
+
+    tokio::spawn(async move {
+        let task_scheduler = pin!(metric_collection_task(tx));
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
     tokio::spawn(async move {
-        let task_scheduler = pin!(metric_collection_task());
+        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(task_scheduler, abort_future).await;
     });
 }
 
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
     let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
 
     loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
                             .cluster_metrics_export(Some(true), Some(false), Some(start_time))
                             .await?;
 
-                        //// Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pve(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pve {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                     RemoteType::Pbs => {
                         let client = connection::make_pbs_client(remote)?;
                         let metrics = client.metrics(Some(true), Some(start_time)).await?;
 
-                        // Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pbs(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pbs {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                 }?;
 
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
         }
     }
 }
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
-    let name = format!(
-        "pve/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
-        ClusterMetricsDataType::Counter => DataSourceType::Counter,
-        ClusterMetricsDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
-    let name = format!(
-        "pbs/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        MetricDataType::Gauge => DataSourceType::Gauge,
-        MetricDataType::Counter => DataSourceType::Counter,
-        MetricDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 00000000..a72945df
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,96 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store request for the RRD task.
+pub(super) enum RrdStoreRequest {
+    /// Store PVE metrics.
+    Pve {
+        /// Remote name.
+        remote: String,
+        /// Metric data.
+        metrics: ClusterMetrics,
+    },
+    /// Store PBS metrics.
+    Pbs {
+        /// Remote name.
+        remote: String,
+        /// Metric data.
+        metrics: Metrics,
+    },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+    mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+    while let Some(msg) = receiver.recv().await {
+        // Involves some blocking file IO
+        let res = tokio::task::spawn_blocking(move || match msg {
+            RrdStoreRequest::Pve { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pve(&remote, &data_point);
+                }
+            }
+            RrdStoreRequest::Pbs { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pbs(&remote, &data_point);
+                }
+            }
+        })
+        .await;
+
+        if let Err(err) = res {
+            log::error!("error in rrd task when attempting to save metrics: {err}");
+        }
+    }
+
+    Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+    let name = format!(
+        "pve/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+        ClusterMetricsDataType::Counter => DataSourceType::Counter,
+        ClusterMetricsDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+    let name = format!(
+        "pbs/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        MetricDataType::Gauge => DataSourceType::Gauge,
+        MetricDataType::Counter => DataSourceType::Counter,
+        MetricDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
-- 
2.39.5





More information about the pdm-devel mailing list