[pdm-devel] [PATCH proxmox-datacenter-manager 13/25] metric collection: pass rrd cache instance as function parameter

Lukas Wagner l.wagner at proxmox.com
Tue Feb 11 13:05:29 CET 2025


This enables us to do dependency injection for tests by passing in the
rrd cache to use, instead of having to replace the global static
instance. The latter is always awkward because tests might run
multi-threaded, so the global instance could/should be a thread-local,
but this again is weird in tokio-world where we actually want task-local
variables, which in turn are weird once you actually start new tasks,
which then don't access the same task-local variables as their parent
task... - long story short, passing in the dependency as a parameter
makes things easier.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/api/rrd_common.rs                 |  4 +-
 server/src/metric_collection/mod.rs          | 15 +++-
 server/src/metric_collection/rrd_cache.rs    | 77 ++++++++++++--------
 server/src/metric_collection/rrd_task.rs     | 16 ++--
 server/src/metric_collection/top_entities.rs |  3 +
 5 files changed, 75 insertions(+), 40 deletions(-)

diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs
index 0d82d0c..d9ed017 100644
--- a/server/src/api/rrd_common.rs
+++ b/server/src/api/rrd_common.rs
@@ -23,9 +23,11 @@ pub fn create_datapoints_from_rrd<T: DataPoint>(
     let mut timemap = BTreeMap::new();
     let mut last_resolution = None;
 
+    let cache = rrd_cache::get_cache();
+
     for name in T::fields() {
         let (start, resolution, data) =
-            match rrd_cache::extract_data(basedir, name, timeframe, mode)? {
+            match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? {
                 Some(data) => data.into(),
                 None => continue,
             };
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index cf3350b..e41f0c6 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -4,6 +4,8 @@ use std::sync::OnceLock;
 use anyhow::{bail, Error};
 use tokio::sync::mpsc::{self, Sender};
 
+use proxmox_sys::fs::CreateOptions;
+
 mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
@@ -16,7 +18,14 @@ static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
 
 /// Initialize the RRD cache
 pub fn init() -> Result<(), Error> {
-    rrd_cache::init()?;
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+    let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?;
+    rrd_cache::set_cache(cache)?;
 
     Ok(())
 }
@@ -41,8 +50,10 @@ pub fn start_task() -> Result<(), Error> {
         futures::future::select(metric_collection_task_future, abort_future).await;
     });
 
+    let cache = rrd_cache::get_cache();
+
     tokio::spawn(async move {
-        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
+        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(rrd_task_future, abort_future).await;
     });
diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs
index a8d72b8..424d476 100644
--- a/server/src/metric_collection/rrd_cache.rs
+++ b/server/src/metric_collection/rrd_cache.rs
@@ -5,6 +5,7 @@
 //! and update RRD data inside `proxmox-datacenter-api`.
 
 use std::path::Path;
+use std::sync::Arc;
 
 use anyhow::{format_err, Error};
 use once_cell::sync::OnceCell;
@@ -16,32 +17,45 @@ use proxmox_sys::fs::CreateOptions;
 
 use pdm_buildcfg::PDM_STATE_DIR_M;
 
-const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
+pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
 
-static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
+// This is an `Arc` because this makes it easier to do dependency injection
+// in test contexts.
+//
+// For DI in testing, we want to pass in a reference to the Cache
+// as a function parameter. In a couple of these functions we
+// spawn tokio tasks which need access to the reference, hence the
+// reference needs to be 'static. In context, we kind of have a
+// hard time to come up if a 'static reference, so we just
+// wrap the cache in an `Arc` for now, solving the
+// lifetime problem via refcounting.
+static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new();
 
 /// Get the RRD cache instance
-fn get_cache() -> Result<&'static Cache, Error> {
+pub fn get_cache() -> Arc<Cache> {
+    RRD_CACHE.get().cloned().expect("rrd cache not initialized")
+}
+
+pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> {
     RRD_CACHE
-        .get()
-        .ok_or_else(|| format_err!("RRD cache not initialized!"))
+        .set(cache)
+        .map_err(|_| format_err!("RRD cache already initialized!"))?;
+
+    Ok(())
 }
 
 /// Initialize the RRD cache instance
 ///
 /// Note: Only a single process must do this (proxmox-datacenter-api)
-pub fn init() -> Result<&'static Cache, Error> {
-    let api_uid = pdm_config::api_user()?.uid;
-    let api_gid = pdm_config::api_group()?.gid;
-
-    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
+pub fn init<P: AsRef<Path>>(
+    base_path: P,
+    dir_options: CreateOptions,
+    file_options: CreateOptions,
+) -> Result<Arc<Cache>, Error> {
     let apply_interval = 30.0 * 60.0; // 30 minutes
 
     let cache = Cache::new(
-        RRD_CACHE_BASEDIR,
+        base_path,
         Some(file_options),
         Some(dir_options),
         apply_interval,
@@ -51,11 +65,7 @@ pub fn init() -> Result<&'static Cache, Error> {
 
     cache.apply_journal()?;
 
-    RRD_CACHE
-        .set(cache)
-        .map_err(|_| format_err!("RRD cache already initialized!"))?;
-
-    Ok(RRD_CACHE.get().unwrap())
+    Ok(Arc::new(cache))
 }
 
 fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> {
@@ -91,6 +101,7 @@ fn create_callback(dst: DataSourceType) -> Database {
 
 /// Extracts data for the specified time frame from RRD cache
 pub fn extract_data(
+    rrd_cache: &Cache,
     basedir: &str,
     name: &str,
     timeframe: RrdTimeframe,
@@ -112,26 +123,28 @@ pub fn extract_data(
         RrdMode::Average => AggregationFn::Average,
     };
 
-    let rrd_cache = get_cache()?;
-
     rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
 }
 
 /// Sync/Flush the RRD journal
 pub fn sync_journal() {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) = rrd_cache.sync_journal() {
-            log::error!("rrd_sync_journal failed - {err}");
-        }
+    let rrd_cache = get_cache();
+    if let Err(err) = rrd_cache.sync_journal() {
+        log::error!("rrd_sync_journal failed - {err}");
     }
 }
+
 /// Update RRD Gauge values
-pub fn update_value(name: &str, value: f64, timestamp: i64, datasource_type: DataSourceType) {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) =
-            rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
-        {
-            log::error!("rrd::update_value '{name}' failed - {err}");
-        }
+pub fn update_value(
+    rrd_cache: &Cache,
+    name: &str,
+    value: f64,
+    timestamp: i64,
+    datasource_type: DataSourceType,
+) {
+    if let Err(err) =
+        rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
+    {
+        log::error!("rrd::update_value '{name}' failed - {err}");
     }
 }
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index 7ee56c5..a678bb5 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -1,7 +1,9 @@
+use std::sync::Arc;
+
 use anyhow::Error;
 use tokio::sync::{mpsc::Receiver, oneshot};
 
-use proxmox_rrd::rrd::DataSourceType;
+use proxmox_rrd::{rrd::DataSourceType, Cache};
 
 use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
@@ -39,9 +41,11 @@ pub(super) struct RrdStoreResult {
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
+    cache: Arc<Cache>,
     mut receiver: Receiver<RrdStoreRequest>,
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
+        let cache_clone = Arc::clone(&cache);
         //// Involves some blocking file IO
         tokio::task::spawn_blocking(move || {
             let mut most_recent_timestamp = 0;
@@ -53,7 +57,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pve(&remote, &data_point);
+                        store_metric_pve(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -65,7 +69,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pbs(&remote, &data_point);
+                        store_metric_pbs(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -84,7 +88,7 @@ pub(super) async fn store_in_rrd_task(
     Ok(())
 }
 
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) {
     let name = format!(
         "pve/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -98,6 +102,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
@@ -105,7 +110,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     );
 }
 
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) {
     let name = format!(
         "pbs/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -119,6 +124,7 @@ fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs
index f8e053f..f54ee72 100644
--- a/server/src/metric_collection/top_entities.rs
+++ b/server/src/metric_collection/top_entities.rs
@@ -121,7 +121,10 @@ fn get_entity(
     name: String,
     metric: &str,
 ) -> Option<(f64, TopEntity)> {
+    let cache = rrd_cache::get_cache();
+
     if let Ok(Some(values)) = rrd_cache::extract_data(
+        &cache,
         &name,
         metric,
         timeframe,
-- 
2.39.5





More information about the pdm-devel mailing list