[pdm-devel] [PATCH proxmox-datacenter-manager v2 08/28] metric collection: persist state after metric collection

Lukas Wagner l.wagner at proxmox.com
Fri Feb 14 14:06:33 CET 2025


The metric collection has to maintain some state, at the moment this
is only the most recent timestamp from all received metric data points.
We use this as a cut-off time when requesting metric from the remote, as
everything *older* than that should already be stored in the database.

Up until now, the state was only stored in memory, which means that it
was lost when the daemon restarted.

The changes in this commit makes the metric collection system
load/save the state from a file in
/var/lib/proxmox-datacenter-manager/metric-collection-state.json

The following data points are stored for every remote:
- most-recent-datapoint:
  Timestamp of the most recent datapoint, used as a cut-off when
  fetching new metrics
- last-collection:
  `last-collection` field saves the timestamp of the last *successful*
  metric collection for a remote as local timestamp.
- error:
  String representation of any error that occured in the last collection
  attempt

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---

Notes:
    Changes since v1:
      - use .inspect_err(..).unwrap_or_default() instead of a manual match
        in `MetricCollectionState::new`

 .../src/metric_collection/collection_task.rs  | 135 ++++++++++++------
 server/src/metric_collection/mod.rs           |   1 +
 server/src/metric_collection/rrd_task.rs      |  60 ++++++--
 server/src/metric_collection/state.rs         | 126 ++++++++++++++++
 4 files changed, 269 insertions(+), 53 deletions(-)
 create mode 100644 server/src/metric_collection/state.rs

diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
index b55c8e92..f985bd3e 100644
--- a/server/src/metric_collection/collection_task.rs
+++ b/server/src/metric_collection/collection_task.rs
@@ -1,16 +1,17 @@
-use std::{collections::HashMap, sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
 
 use anyhow::Error;
 use rand::Rng;
 use tokio::{
     sync::{
         mpsc::{Receiver, Sender},
-        OwnedSemaphorePermit, Semaphore,
+        oneshot, OwnedSemaphorePermit, Semaphore,
     },
     time::Interval,
 };
 
 use proxmox_section_config::typed::SectionConfigData;
+use proxmox_sys::fs::CreateOptions;
 
 use pdm_api_types::{
     remotes::{Remote, RemoteType},
@@ -20,7 +21,16 @@ use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
 
 use crate::{connection, task_utils};
 
-use super::rrd_task::RrdStoreRequest;
+use super::{
+    rrd_task::{RrdStoreRequest, RrdStoreResult},
+    state::{MetricCollectionState, RemoteStatus},
+};
+
+/// Location of the metric collection state file.
+const METRIC_COLLECTION_STATE_FILE: &str = concat!(
+    pdm_buildcfg::PDM_STATE_DIR_M!(),
+    "/metric-collection-state.json"
+);
 
 /// Control messages for the metric collection task.
 pub(super) enum ControlMsg {
@@ -31,7 +41,7 @@ pub(super) enum ControlMsg {
 /// Task which periodically collects metrics from all remotes and stores
 /// them in the local metrics database.
 pub(super) struct MetricCollectionTask {
-    most_recent_timestamps: HashMap<String, i64>,
+    state: MetricCollectionState,
     settings: CollectionSettings,
     metric_data_tx: Sender<RrdStoreRequest>,
     control_message_rx: Receiver<ControlMsg>,
@@ -44,9 +54,10 @@ impl MetricCollectionTask {
         control_message_rx: Receiver<ControlMsg>,
     ) -> Result<Self, Error> {
         let settings = Self::get_settings_or_default();
+        let state = load_state()?;
 
         Ok(Self {
-            most_recent_timestamps: HashMap::new(),
+            state,
             settings,
             metric_data_tx,
             control_message_rx,
@@ -115,6 +126,10 @@ impl MetricCollectionTask {
                 );
                 timer = Self::setup_timer(interval);
             }
+
+            if let Err(err) = self.state.save() {
+                log::error!("could not update metric collection state: {err}");
+            }
         }
     }
 
@@ -206,7 +221,11 @@ impl MetricCollectionTask {
         let mut handles = Vec::new();
 
         for remote_name in remotes_to_fetch {
-            let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+            let status = self
+                .state
+                .get_status(remote_name)
+                .cloned()
+                .unwrap_or_default();
 
             // unwrap is okay here, acquire_* will only fail if `close` has been
             // called on the semaphore.
@@ -217,7 +236,7 @@ impl MetricCollectionTask {
                 let handle = tokio::spawn(Self::fetch_single_remote(
                     self.settings.clone(),
                     remote,
-                    start_time,
+                    status,
                     self.metric_data_tx.clone(),
                     permit,
                 ));
@@ -231,8 +250,7 @@ impl MetricCollectionTask {
 
             match res {
                 Ok(Ok(ts)) => {
-                    self.most_recent_timestamps
-                        .insert(remote_name.to_string(), ts);
+                    self.state.set_status(remote_name, ts);
                 }
                 Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
                 Err(err) => {
@@ -249,52 +267,85 @@ impl MetricCollectionTask {
     async fn fetch_single_remote(
         settings: CollectionSettings,
         remote: Remote,
-        start_time: i64,
+        mut status: RemoteStatus,
         sender: Sender<RrdStoreRequest>,
         _permit: OwnedSemaphorePermit,
-    ) -> Result<i64, Error> {
+    ) -> Result<RemoteStatus, Error> {
         Self::sleep_for_random_millis(
             settings.min_connection_delay_or_default(),
             settings.max_connection_delay_or_default(),
             "connection-delay",
         )
         .await;
+        let (result_tx, result_rx) = oneshot::channel();
+
+        let now = proxmox_time::epoch_i64();
+
+        let res: Result<RrdStoreResult, Error> = async {
+            match remote.ty {
+                RemoteType::Pve => {
+                    let client = connection::make_pve_client(&remote)?;
+                    let metrics = client
+                        .cluster_metrics_export(
+                            Some(true),
+                            Some(false),
+                            Some(status.most_recent_datapoint),
+                        )
+                        .await?;
+
+                    sender
+                        .send(RrdStoreRequest::Pve {
+                            remote: remote.id.clone(),
+                            metrics,
+                            channel: result_tx,
+                        })
+                        .await?;
+                }
+                RemoteType::Pbs => {
+                    let client = connection::make_pbs_client(&remote)?;
+                    let metrics = client
+                        .metrics(Some(true), Some(status.most_recent_datapoint))
+                        .await?;
+
+                    sender
+                        .send(RrdStoreRequest::Pbs {
+                            remote: remote.id.clone(),
+                            metrics,
+                            channel: result_tx,
+                        })
+                        .await?;
+                }
+            }
 
-        let most_recent_timestamp = match remote.ty {
-            RemoteType::Pve => {
-                let client = connection::make_pve_client(&remote)?;
-                let metrics = client
-                    .cluster_metrics_export(Some(true), Some(false), Some(start_time))
-                    .await?;
-
-                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
-                sender
-                    .send(RrdStoreRequest::Pve {
-                        remote: remote.id.clone(),
-                        metrics,
-                    })
-                    .await?;
+            result_rx.await.map_err(Error::from)
+        }
+        .await;
 
-                most_recent
+        match res {
+            Ok(result) => {
+                status.most_recent_datapoint = result.most_recent_timestamp;
+                status.last_collection = Some(now);
+                status.error = None;
+            }
+            Err(err) => {
+                status.error = Some(err.to_string());
+                log::error!("coud not fetch metrics from '{}': {err}", remote.id);
             }
-            RemoteType::Pbs => {
-                let client = connection::make_pbs_client(&remote)?;
-                let metrics = client.metrics(Some(true), Some(start_time)).await?;
+        }
 
-                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+        Ok(status)
+    }
+}
 
-                sender
-                    .send(RrdStoreRequest::Pbs {
-                        remote: remote.id.clone(),
-                        metrics,
-                    })
-                    .await?;
+/// Load the metric collection state file.
+pub(super) fn load_state() -> Result<MetricCollectionState, Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
 
-                most_recent
-            }
-        };
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
 
-        Ok(most_recent_timestamp)
-    }
+    Ok(MetricCollectionState::new(
+        METRIC_COLLECTION_STATE_FILE.into(),
+        file_options,
+    ))
 }
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 9b203615..9cd60455 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -7,6 +7,7 @@ use tokio::sync::mpsc::{self, Sender};
 mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
+mod state;
 pub mod top_entities;
 
 use collection_task::{ControlMsg, MetricCollectionTask};
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index a72945df..1c618f54 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -1,8 +1,10 @@
 use anyhow::Error;
-use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use tokio::sync::{mpsc::Receiver, oneshot};
+
 use proxmox_rrd::rrd::DataSourceType;
+
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
-use tokio::sync::mpsc::Receiver;
 
 use super::rrd_cache;
 
@@ -14,6 +16,8 @@ pub(super) enum RrdStoreRequest {
         remote: String,
         /// Metric data.
         metrics: ClusterMetrics,
+        /// Oneshot channel to return the [`RrdStoreResult`].
+        channel: oneshot::Sender<RrdStoreResult>,
     },
     /// Store PBS metrics.
     Pbs {
@@ -21,9 +25,17 @@ pub(super) enum RrdStoreRequest {
         remote: String,
         /// Metric data.
         metrics: Metrics,
+        /// Oneshot channel to return the [`RrdStoreResult`].
+        channel: oneshot::Sender<RrdStoreResult>,
     },
 }
 
+/// Result for a [`RrdStoreRequest`].
+pub(super) struct RrdStoreResult {
+    /// Most recent timestamp of any stored metric datapoint (UNIX epoch).
+    pub(super) most_recent_timestamp: i64,
+}
+
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
@@ -31,17 +43,43 @@ pub(super) async fn store_in_rrd_task(
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
         // Involves some blocking file IO
-        let res = tokio::task::spawn_blocking(move || match msg {
-            RrdStoreRequest::Pve { remote, metrics } => {
-                for data_point in metrics.data {
-                    store_metric_pve(&remote, &data_point);
+        let res = tokio::task::spawn_blocking(move || {
+            let mut most_recent_timestamp = 0;
+            let channel = match msg {
+                RrdStoreRequest::Pve {
+                    remote,
+                    metrics,
+                    channel,
+                } => {
+                    for data_point in metrics.data {
+                        most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
+                        store_metric_pve(&remote, &data_point);
+                    }
+
+                    channel
                 }
-            }
-            RrdStoreRequest::Pbs { remote, metrics } => {
-                for data_point in metrics.data {
-                    store_metric_pbs(&remote, &data_point);
+                RrdStoreRequest::Pbs {
+                    remote,
+                    metrics,
+                    channel,
+                } => {
+                    for data_point in metrics.data {
+                        most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
+                        store_metric_pbs(&remote, &data_point);
+                    }
+
+                    channel
                 }
-            }
+            };
+
+            if channel
+                .send(RrdStoreResult {
+                    most_recent_timestamp,
+                })
+                .is_err()
+            {
+                log::error!("could not send RrdStoreStoreResult to metric collection task");
+            };
         })
         .await;
 
diff --git a/server/src/metric_collection/state.rs b/server/src/metric_collection/state.rs
new file mode 100644
index 00000000..5b04ea61
--- /dev/null
+++ b/server/src/metric_collection/state.rs
@@ -0,0 +1,126 @@
+use std::{
+    collections::HashMap,
+    path::{Path, PathBuf},
+};
+
+use anyhow::Error;
+use serde::{Deserialize, Serialize};
+
+use proxmox_sys::fs::CreateOptions;
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Metric collection state file content.
+struct State {
+    remote_status: HashMap<String, RemoteStatus>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A remote's metric collection state.
+pub struct RemoteStatus {
+    /// Most recent datapoint - time stamp is based on remote time
+    pub most_recent_datapoint: i64,
+    /// Last successful metric collection - timestamp based on PDM's time
+    pub last_collection: Option<i64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// Any error that occured during the last metric collection attempt.
+    pub error: Option<String>,
+}
+
+/// Manage and persist metric collection state.
+pub struct MetricCollectionState {
+    /// Path to the persisted state
+    path: PathBuf,
+    /// File owner/perms for the persisted state file
+    file_options: CreateOptions,
+    /// The current state
+    state: State,
+}
+
+impl MetricCollectionState {
+    /// Initialize state by trying to load the existing statefile. If the file does not exist,
+    /// state will be empty. If the file  failed to load, state will be empty and
+    /// and error will be logged.
+    pub fn new(statefile: PathBuf, file_options: CreateOptions) -> Self {
+        let state = Self::load_or_default(&statefile)
+            .inspect_err(|err| {
+                log::error!("could not load metric collection state: {err}");
+            })
+            .unwrap_or_default();
+
+        Self {
+            path: statefile,
+            file_options,
+            state,
+        }
+    }
+
+    /// Set a remote's status.
+    pub fn set_status(&mut self, remote: String, remote_state: RemoteStatus) {
+        self.state.remote_status.insert(remote, remote_state);
+    }
+
+    /// Get a remote's status.
+    pub fn get_status(&self, remote: &str) -> Option<&RemoteStatus> {
+        self.state.remote_status.get(remote)
+    }
+
+    /// Persist the state to the statefile.
+    pub fn save(&self) -> Result<(), Error> {
+        let data = serde_json::to_vec_pretty(&self.state)?;
+        proxmox_sys::fs::replace_file(&self.path, &data, self.file_options.clone(), true)?;
+
+        Ok(())
+    }
+
+    fn load_or_default(path: &Path) -> Result<State, Error> {
+        let content = proxmox_sys::fs::file_read_optional_string(path)?;
+
+        if let Some(content) = content {
+            Ok(serde_json::from_str(&content)?)
+        } else {
+            Ok(Default::default())
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use proxmox_sys::fs::CreateOptions;
+
+    use super::*;
+
+    use crate::test_support::temp::NamedTempFile;
+
+    fn get_options() -> CreateOptions {
+        CreateOptions::new()
+            .owner(nix::unistd::Uid::effective())
+            .group(nix::unistd::Gid::effective())
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o600))
+    }
+
+    #[test]
+    fn save_and_load() -> Result<(), Error> {
+        let file = NamedTempFile::new(get_options())?;
+        let options = get_options();
+        let mut state = MetricCollectionState::new(file.path().into(), options.clone());
+
+        state.set_status(
+            "some-remote".into(),
+            RemoteStatus {
+                most_recent_datapoint: 1234,
+                ..Default::default()
+            },
+        );
+
+        state.save()?;
+
+        let state = MetricCollectionState::new(file.path().into(), options);
+
+        let status = state.get_status("some-remote").unwrap();
+        assert_eq!(status.most_recent_datapoint, 1234);
+
+        Ok(())
+    }
+}
-- 
2.39.5





More information about the pdm-devel mailing list