[pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct

Lukas Wagner l.wagner at proxmox.com
Fri Feb 14 14:06:40 CET 2025


All helper functions storing/retrieving helper functions take the cache
instance as a first parameter. This smells like it should be a method on
a struct.

This commit wraps the foreign `proxmox_rrd::Cache` type in a new type,
transforms the `init` function into a `new` method and all other helpers
into further methods.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/api/rrd_common.rs                 |   9 +-
 server/src/metric_collection/mod.rs          |   6 +-
 server/src/metric_collection/rrd_cache.rs    | 191 +++++++++----------
 server/src/metric_collection/rrd_task.rs     |  21 +-
 server/src/metric_collection/top_entities.rs |   3 +-
 5 files changed, 113 insertions(+), 117 deletions(-)

diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs
index d9ed017a..28868bc1 100644
--- a/server/src/api/rrd_common.rs
+++ b/server/src/api/rrd_common.rs
@@ -26,11 +26,10 @@ pub fn create_datapoints_from_rrd<T: DataPoint>(
     let cache = rrd_cache::get_cache();
 
     for name in T::fields() {
-        let (start, resolution, data) =
-            match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? {
-                Some(data) => data.into(),
-                None => continue,
-            };
+        let (start, resolution, data) = match cache.extract_data(basedir, name, timeframe, mode)? {
+            Some(data) => data.into(),
+            None => continue,
+        };
 
         if let Some(expected_resolution) = last_resolution {
             if resolution != expected_resolution {
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 509d4f88..5b6c14d2 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -1,4 +1,5 @@
 use std::pin::pin;
+use std::sync::Arc;
 use std::sync::OnceLock;
 
 use anyhow::{bail, Error};
@@ -13,6 +14,7 @@ mod state;
 pub mod top_entities;
 
 use collection_task::{ControlMsg, MetricCollectionTask};
+use rrd_cache::RrdCache;
 
 static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
 
@@ -24,8 +26,8 @@ pub fn init() -> Result<(), Error> {
     let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
     let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
 
-    let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?;
-    rrd_cache::set_cache(cache)?;
+    let cache = RrdCache::new(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?;
+    rrd_cache::set_cache(Arc::new(cache))?;
 
     Ok(())
 }
diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs
index 70c91ca6..e8994143 100644
--- a/server/src/metric_collection/rrd_cache.rs
+++ b/server/src/metric_collection/rrd_cache.rs
@@ -29,14 +29,14 @@ pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
 // hard time to come up with a 'static reference, so we just
 // wrap the cache in an `Arc` for now, solving the
 // lifetime problem via refcounting.
-static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new();
+static RRD_CACHE: OnceCell<Arc<RrdCache>> = OnceCell::new();
 
 /// Get the RRD cache instance
-pub fn get_cache() -> Arc<Cache> {
+pub fn get_cache() -> Arc<RrdCache> {
     RRD_CACHE.get().cloned().expect("rrd cache not initialized")
 }
 
-pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> {
+pub fn set_cache(cache: Arc<RrdCache>) -> Result<(), Error> {
     RRD_CACHE
         .set(cache)
         .map_err(|_| format_err!("RRD cache already initialized!"))?;
@@ -44,107 +44,106 @@ pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> {
     Ok(())
 }
 
-/// Initialize the RRD cache instance
-///
-/// Note: Only a single process must do this (proxmox-datacenter-api)
-pub fn init<P: AsRef<Path>>(
-    base_path: P,
-    dir_options: CreateOptions,
-    file_options: CreateOptions,
-) -> Result<Arc<Cache>, Error> {
-    let apply_interval = 30.0 * 60.0; // 30 minutes
-
-    let cache = Cache::new(
-        base_path,
-        Some(file_options),
-        Some(dir_options),
-        apply_interval,
-        load_callback,
-        create_callback,
-    )?;
-
-    cache.apply_journal()?;
-
-    Ok(Arc::new(cache))
+/// Wrapper for proxmox_rrd::Cache to accomodate helper methods.
+pub struct RrdCache {
+    cache: Cache,
 }
 
-fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> {
-    match Database::load(path, true) {
-        Ok(rrd) => Some(rrd),
-        Err(err) => {
-            if err.kind() != std::io::ErrorKind::NotFound {
-                log::warn!("overwriting RRD file {path:?}, because of load error: {err}",);
+impl RrdCache {
+    /// Create a new RrdCache instance
+    pub fn new<P: AsRef<Path>>(
+        base_path: P,
+        dir_options: CreateOptions,
+        file_options: CreateOptions,
+    ) -> Result<Self, Error> {
+        let apply_interval = 30.0 * 60.0; // 30 minutes
+
+        let cache = Cache::new(
+            base_path,
+            Some(file_options),
+            Some(dir_options),
+            apply_interval,
+            Self::load_callback,
+            Self::create_callback,
+        )?;
+
+        cache.apply_journal()?;
+
+        Ok(Self { cache })
+    }
+
+    fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> {
+        match Database::load(path, true) {
+            Ok(rrd) => Some(rrd),
+            Err(err) => {
+                if err.kind() != std::io::ErrorKind::NotFound {
+                    log::warn!("overwriting RRD file {path:?}, because of load error: {err}",);
+                }
+                None
             }
-            None
         }
     }
-}
-
-fn create_callback(dst: DataSourceType) -> Database {
-    let rra_list = vec![
-        // 1 min * 1440 => 1 day
-        Archive::new(AggregationFn::Average, 60, 1440),
-        Archive::new(AggregationFn::Maximum, 60, 1440),
-        // 30 min * 1440 => 30 days ~ 1 month
-        Archive::new(AggregationFn::Average, 30 * 60, 1440),
-        Archive::new(AggregationFn::Maximum, 30 * 60, 1440),
-        // 6 h * 1440 => 360 days ~ 1 year
-        Archive::new(AggregationFn::Average, 6 * 3600, 1440),
-        Archive::new(AggregationFn::Maximum, 6 * 3600, 1440),
-        // 1 week * 570 => 10 years
-        Archive::new(AggregationFn::Average, 7 * 86400, 570),
-        Archive::new(AggregationFn::Maximum, 7 * 86400, 570),
-    ];
-
-    Database::new(dst, rra_list)
-}
 
-/// Extracts data for the specified time frame from RRD cache
-pub fn extract_data(
-    rrd_cache: &Cache,
-    basedir: &str,
-    name: &str,
-    timeframe: RrdTimeframe,
-    mode: RrdMode,
-) -> Result<Option<proxmox_rrd::Entry>, Error> {
-    let end = proxmox_time::epoch_f64() as u64;
-
-    let (start, resolution) = match timeframe {
-        RrdTimeframe::Hour => (end - 3600, 60),
-        RrdTimeframe::Day => (end - 3600 * 24, 60),
-        RrdTimeframe::Week => (end - 3600 * 24 * 7, 30 * 60),
-        RrdTimeframe::Month => (end - 3600 * 24 * 30, 30 * 60),
-        RrdTimeframe::Year => (end - 3600 * 24 * 365, 6 * 60 * 60),
-        RrdTimeframe::Decade => (end - 10 * 3600 * 24 * 366, 7 * 86400),
-    };
-
-    let cf = match mode {
-        RrdMode::Max => AggregationFn::Maximum,
-        RrdMode::Average => AggregationFn::Average,
-    };
-
-    rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
-}
+    fn create_callback(dst: DataSourceType) -> Database {
+        let rra_list = vec![
+            // 1 min * 1440 => 1 day
+            Archive::new(AggregationFn::Average, 60, 1440),
+            Archive::new(AggregationFn::Maximum, 60, 1440),
+            // 30 min * 1440 => 30 days ~ 1 month
+            Archive::new(AggregationFn::Average, 30 * 60, 1440),
+            Archive::new(AggregationFn::Maximum, 30 * 60, 1440),
+            // 6 h * 1440 => 360 days ~ 1 year
+            Archive::new(AggregationFn::Average, 6 * 3600, 1440),
+            Archive::new(AggregationFn::Maximum, 6 * 3600, 1440),
+            // 1 week * 570 => 10 years
+            Archive::new(AggregationFn::Average, 7 * 86400, 570),
+            Archive::new(AggregationFn::Maximum, 7 * 86400, 570),
+        ];
+
+        Database::new(dst, rra_list)
+    }
 
-/// Sync/Flush the RRD journal
-pub fn sync_journal() {
-    let rrd_cache = get_cache();
-    if let Err(err) = rrd_cache.sync_journal() {
-        log::error!("rrd_sync_journal failed - {err}");
+    /// Extracts data for the specified time frame from RRD cache
+    pub fn extract_data(
+        &self,
+        basedir: &str,
+        name: &str,
+        timeframe: RrdTimeframe,
+        mode: RrdMode,
+    ) -> Result<Option<proxmox_rrd::Entry>, Error> {
+        let end = proxmox_time::epoch_f64() as u64;
+
+        let (start, resolution) = match timeframe {
+            RrdTimeframe::Hour => (end - 3600, 60),
+            RrdTimeframe::Day => (end - 3600 * 24, 60),
+            RrdTimeframe::Week => (end - 3600 * 24 * 7, 30 * 60),
+            RrdTimeframe::Month => (end - 3600 * 24 * 30, 30 * 60),
+            RrdTimeframe::Year => (end - 3600 * 24 * 365, 6 * 60 * 60),
+            RrdTimeframe::Decade => (end - 10 * 3600 * 24 * 366, 7 * 86400),
+        };
+
+        let cf = match mode {
+            RrdMode::Max => AggregationFn::Maximum,
+            RrdMode::Average => AggregationFn::Average,
+        };
+
+        self.cache
+            .extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
     }
-}
 
-/// Update RRD Gauge values
-pub fn update_value(
-    rrd_cache: &Cache,
-    name: &str,
-    value: f64,
-    timestamp: i64,
-    datasource_type: DataSourceType,
-) {
-    if let Err(err) =
-        rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
-    {
-        log::error!("rrd::update_value '{name}' failed - {err}");
+    /// Update RRD Gauge values
+    pub fn update_value(
+        &self,
+        name: &str,
+        value: f64,
+        timestamp: i64,
+        datasource_type: DataSourceType,
+    ) {
+        if let Err(err) =
+            self.cache
+                .update_value_ignore_old(name, timestamp as f64, value, datasource_type)
+        {
+            log::error!("rrd::update_value '{name}' failed - {err}");
+        }
     }
 }
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index 27327a29..c2a41d30 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -3,12 +3,12 @@ use std::sync::Arc;
 use anyhow::Error;
 use tokio::sync::{mpsc::Receiver, oneshot};
 
-use proxmox_rrd::{rrd::DataSourceType, Cache};
+use proxmox_rrd::rrd::DataSourceType;
 
 use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
 
-use super::rrd_cache;
+use super::rrd_cache::RrdCache;
 
 /// Store request for the RRD task.
 pub(super) enum RrdStoreRequest {
@@ -41,7 +41,7 @@ pub(super) struct RrdStoreResult {
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
-    cache: Arc<Cache>,
+    cache: Arc<RrdCache>,
     mut receiver: Receiver<RrdStoreRequest>,
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
@@ -95,7 +95,7 @@ pub(super) async fn store_in_rrd_task(
     Ok(())
 }
 
-fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) {
+fn store_metric_pve(cache: &RrdCache, remote_name: &str, data_point: &ClusterMetricsData) {
     let name = format!(
         "pve/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -108,8 +108,7 @@ fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetric
         ClusterMetricsDataType::Derive => DataSourceType::Derive,
     };
 
-    rrd_cache::update_value(
-        cache,
+    cache.update_value(
         &name,
         data_point.value,
         data_point.timestamp,
@@ -117,7 +116,7 @@ fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetric
     );
 }
 
-fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) {
+fn store_metric_pbs(cache: &RrdCache, remote_name: &str, data_point: &MetricDataPoint) {
     let name = format!(
         "pbs/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -130,8 +129,7 @@ fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoi
         MetricDataType::Derive => DataSourceType::Derive,
     };
 
-    rrd_cache::update_value(
-        cache,
+    cache.update_value(
         &name,
         data_point.value,
         data_point.timestamp,
@@ -156,7 +154,7 @@ mod tests {
         // Arrange
         let dir = NamedTempDir::new()?;
         let options = get_create_options().perm(nix::sys::stat::Mode::from_bits_truncate(0o700));
-        let cache = rrd_cache::init(&dir.path(), options.clone(), options.clone())?;
+        let cache = Arc::new(RrdCache::new(dir.path(), options.clone(), options.clone())?);
 
         let (tx, rx) = tokio::sync::mpsc::channel(10);
         let task = store_in_rrd_task(Arc::clone(&cache), rx);
@@ -215,8 +213,7 @@ mod tests {
 
         // There is some race condition in proxmox_rrd, in some rare cases
         // extract_data does not return any data directly after writing.
-        if let Some(data) = rrd_cache::extract_data(
-            &cache,
+        if let Some(data) = cache.extract_data(
             "pve/some-remote/node/some-node",
             "cpu_current",
             RrdTimeframe::Hour,
diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs
index f54ee72f..31e36c34 100644
--- a/server/src/metric_collection/top_entities.rs
+++ b/server/src/metric_collection/top_entities.rs
@@ -123,8 +123,7 @@ fn get_entity(
 ) -> Option<(f64, TopEntity)> {
     let cache = rrd_cache::get_cache();
 
-    if let Ok(Some(values)) = rrd_cache::extract_data(
-        &cache,
+    if let Ok(Some(values)) = cache.extract_data(
         &name,
         metric,
         timeframe,
-- 
2.39.5





More information about the pdm-devel mailing list