[pbs-devel] [PATCH proxmox-backup v2 12/13] metric collection: put metrics in a cache
Lukas Wagner
l.wagner at proxmox.com
Tue Oct 15 10:46:35 CEST 2024
Any pull-metric API endpoint can alter access the cache to
retrieve metric data for a limited time (30mins).
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
src/server/metric_collection/mod.rs | 13 ++-
src/server/metric_collection/pull_metrics.rs | 107 ++++++++++++++++++-
2 files changed, 118 insertions(+), 2 deletions(-)
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 3be73c22..e6e04c5b 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -72,16 +72,27 @@ async fn run_stat_generator() {
rrd::sync_journal();
}
});
+ let pull_metric_future = tokio::task::spawn_blocking({
+ let stats = Arc::clone(&stats);
+ move || {
+ pull_metrics::update_metrics(&stats.0, &stats.1, &stats.2)?;
+ Ok::<(), Error>(())
+ }
+ });
let metrics_future = metric_server::send_data_to_metric_servers(stats);
- let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+ let (rrd_res, metrics_res, pull_metrics_res) =
+ join!(rrd_future, metrics_future, pull_metric_future);
if let Err(err) = rrd_res {
log::error!("rrd update panicked: {err}");
}
if let Err(err) = metrics_res {
log::error!("error during metrics sending: {err}");
}
+ if let Err(err) = pull_metrics_res {
+ log::error!("error caching pull-style metrics: {err}");
+ }
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
}
diff --git a/src/server/metric_collection/pull_metrics.rs b/src/server/metric_collection/pull_metrics.rs
index 707cb27c..f4b506cf 100644
--- a/src/server/metric_collection/pull_metrics.rs
+++ b/src/server/metric_collection/pull_metrics.rs
@@ -3,11 +3,16 @@ use std::{path::Path, sync::OnceLock, time::Duration};
use anyhow::{format_err, Error};
use nix::sys::stat::Mode;
+use pbs_api_types::{
+ MetricDataPoint,
+ MetricDataType::{self, Derive, Gauge},
+};
use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR;
use proxmox_shared_cache::SharedCache;
use proxmox_sys::fs::CreateOptions;
+use serde::{Deserialize, Serialize};
-use super::METRIC_COLLECTION_INTERVAL;
+use super::{DiskStat, HostStats, METRIC_COLLECTION_INTERVAL};
const METRIC_CACHE_TIME: Duration = Duration::from_secs(30 * 60);
const STORED_METRIC_GENERATIONS: u64 =
@@ -33,3 +38,103 @@ pub(super) fn init() -> Result<(), Error> {
Ok(())
}
+
+/// Convert `DiskStat` `HostStat` into a universal metric data point and cache
+/// them for a later retrieval.
+pub(super) fn update_metrics(
+ host: &HostStats,
+ hostdisk: &DiskStat,
+ datastores: &[DiskStat],
+) -> Result<(), Error> {
+ let mut points = MetricDataPoints::new(proxmox_time::epoch_i64());
+
+ // Using the same metric names as in PVE's new /cluster/metrics/export endpoint
+ if let Some(stat) = &host.proc {
+ points.add(Gauge, "host", "cpu_current", stat.cpu);
+ points.add(Gauge, "host", "cpu_iowait", stat.iowait_percent);
+ }
+
+ if let Some(loadavg) = &host.load {
+ points.add(Gauge, "host", "cpu_avg1", loadavg.0);
+ points.add(Gauge, "host", "cpu_avg5", loadavg.1);
+ points.add(Gauge, "host", "cpu_avg15", loadavg.2);
+ }
+
+ if let Some(meminfo) = &host.meminfo {
+ points.add(Gauge, "host", "mem_total", meminfo.memtotal as f64);
+ points.add(Gauge, "host", "mem_used", meminfo.memused as f64);
+ points.add(Gauge, "host", "swap_total", meminfo.swaptotal as f64);
+ points.add(Gauge, "host", "swap_used", meminfo.swapused as f64);
+ }
+
+ if let Some(netdev) = &host.net {
+ use pbs_config::network::is_physical_nic;
+ let mut netin = 0;
+ let mut netout = 0;
+ for item in netdev {
+ if !is_physical_nic(&item.device) {
+ continue;
+ }
+ netin += item.receive;
+ netout += item.send;
+ }
+ points.add(Derive, "host", "net_in", netin as f64);
+ points.add(Derive, "host", "net_out", netout as f64);
+ }
+
+ update_disk_metrics(&mut points, hostdisk, "host");
+
+ for stat in datastores {
+ let id = format!("datastore/{}", stat.name);
+ update_disk_metrics(&mut points, stat, &id);
+ }
+
+ get_cache()?.set(&points, Duration::from_secs(2))?;
+
+ Ok(())
+}
+
+fn get_cache() -> Result<&'static SharedCache, Error> {
+ // Not using get_or_init here since initialization can fail.
+ METRIC_CACHE
+ .get()
+ .ok_or_else(|| format_err!("metric cache not initialized"))
+}
+
+fn update_disk_metrics(points: &mut MetricDataPoints, disk: &DiskStat, id: &str) {
+ if let Some(status) = &disk.usage {
+ points.add(Gauge, id, "disk_total", status.total as f64);
+ points.add(Gauge, id, "disk_used", status.used as f64);
+ points.add(Gauge, id, "disk_available", status.available as f64);
+ }
+
+ if let Some(stat) = &disk.dev {
+ points.add(Derive, id, "disk_read", (stat.read_sectors * 512) as f64);
+ points.add(Derive, id, "disk_write", (stat.write_sectors * 512) as f64);
+ }
+}
+
+#[derive(Serialize, Deserialize)]
+struct MetricDataPoints {
+ timestamp: i64,
+ datapoints: Vec<MetricDataPoint>,
+}
+
+impl MetricDataPoints {
+ fn new(timestamp: i64) -> Self {
+ Self {
+ datapoints: Vec::new(),
+ timestamp,
+ }
+ }
+
+ fn add(&mut self, ty: MetricDataType, id: &str, metric: &str, value: f64) {
+ self.datapoints.push(MetricDataPoint {
+ id: id.into(),
+ metric: metric.into(),
+ timestamp: self.timestamp,
+ ty,
+ value,
+ })
+ }
+}
--
2.39.5
More information about the pbs-devel
mailing list