[pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module

Lukas Wagner l.wagner at proxmox.com
Fri Oct 11 12:51:25 CEST 2024


With the upcoming pull-metric system/metric caching, these
things should go into a sepearate module.

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs     | 419 +--------------------------
 src/server/metric_collection/mod.rs | 426 ++++++++++++++++++++++++++++
 src/server/mod.rs                   |   2 +
 3 files changed, 433 insertions(+), 414 deletions(-)
 create mode 100644 src/server/metric_collection/mod.rs

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 041f3aff..859f5b0f 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,10 +17,8 @@ use serde_json::{json, Value};
 
 use proxmox_lang::try_block;
 use proxmox_log::init_logger;
-use proxmox_metrics::MetricsData;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
-use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
-use proxmox_sys::linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat};
+use proxmox_sys::fs::CreateOptions;
 use proxmox_sys::logrotate::LogRotate;
 
 use pbs_datastore::DataStore;
@@ -30,15 +28,11 @@ use proxmox_rest_server::{
     RestEnvironment, RestServer, WorkerTask,
 };
 
-use proxmox_backup::rrd_cache::{
-    initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge,
-};
 use proxmox_backup::{
     server::{
         auth::check_pbs_auth,
         jobstate::{self, Job},
     },
-    tools::disks::BlockDevStat,
     traffic_control_cache::{SharedRateLimit, TRAFFIC_CONTROL_CACHE},
 };
 
@@ -51,11 +45,8 @@ use pbs_api_types::{
 };
 
 use proxmox_backup::auth_helpers::*;
-use proxmox_backup::server;
-use proxmox_backup::tools::{
-    disks::{zfs_dataset_stats, DiskManage},
-    PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use proxmox_backup::server::{self, metric_collection};
+use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
 
 use proxmox_backup::api2::pull::do_sync_job;
 use proxmox_backup::api2::tape::backup::do_tape_backup_job;
@@ -186,9 +177,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::auth_helpers::setup_auth_context(false);
     proxmox_backup::server::notifications::init()?;
-
-    let rrd_cache = initialize_rrd_cache()?;
-    rrd_cache.apply_journal()?;
+    metric_collection::init()?;
 
     let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR);
     indexpath.push("index.hbs");
@@ -356,7 +345,7 @@ async fn run() -> Result<(), Error> {
     });
 
     start_task_scheduler();
-    start_stat_generator();
+    metric_collection::start_collection_task();
     start_traffic_control_updater();
 
     server.await?;
@@ -389,14 +378,6 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
     acceptor.build()
 }
 
-fn start_stat_generator() {
-    tokio::spawn(async {
-        let abort_future = pin!(proxmox_daemon::shutdown_future());
-        let future = pin!(run_stat_generator());
-        futures::future::select(future, abort_future).await;
-    });
-}
-
 fn start_task_scheduler() {
     tokio::spawn(async {
         let abort_future = pin!(proxmox_daemon::shutdown_future());
@@ -867,349 +848,6 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     }
 }
 
-async fn run_stat_generator() {
-    loop {
-        let delay_target = Instant::now() + Duration::from_secs(10);
-
-        let stats_future = tokio::task::spawn_blocking(|| {
-            let hoststats = collect_host_stats_sync();
-            let (hostdisk, datastores) = collect_disk_stats_sync();
-            Arc::new((hoststats, hostdisk, datastores))
-        });
-        let stats = match stats_future.await {
-            Ok(res) => res,
-            Err(err) => {
-                log::error!("collecting host stats panicked: {err}");
-                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-                continue;
-            }
-        };
-
-        let rrd_future = tokio::task::spawn_blocking({
-            let stats = Arc::clone(&stats);
-            move || {
-                rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
-                rrd_sync_journal();
-            }
-        });
-
-        let metrics_future = send_data_to_metric_servers(stats);
-
-        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
-        if let Err(err) = rrd_res {
-            log::error!("rrd update panicked: {err}");
-        }
-        if let Err(err) = metrics_res {
-            log::error!("error during metrics sending: {err}");
-        }
-
-        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-    }
-}
-
-async fn send_data_to_metric_servers(
-    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
-    let (config, _digest) = pbs_config::metrics::config()?;
-    let channel_list = get_metric_server_connections(config)?;
-
-    if channel_list.is_empty() {
-        return Ok(());
-    }
-
-    let ctime = proxmox_time::epoch_i64();
-    let nodename = proxmox_sys::nodename();
-
-    let mut values = Vec::new();
-
-    let mut cpuvalue = match &stats.0.proc {
-        Some(stat) => serde_json::to_value(stat)?,
-        None => json!({}),
-    };
-
-    if let Some(loadavg) = &stats.0.load {
-        cpuvalue["avg1"] = Value::from(loadavg.0);
-        cpuvalue["avg5"] = Value::from(loadavg.1);
-        cpuvalue["avg15"] = Value::from(loadavg.2);
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("cpustat", ctime, cpuvalue)?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    if let Some(stat) = &stats.0.meminfo {
-        values.push(Arc::new(
-            MetricsData::new("memory", ctime, stat)?
-                .tag("object", "host")
-                .tag("host", nodename),
-        ));
-    }
-
-    if let Some(netdev) = &stats.0.net {
-        for item in netdev {
-            values.push(Arc::new(
-                MetricsData::new("nics", ctime, item)?
-                    .tag("object", "host")
-                    .tag("host", nodename)
-                    .tag("instance", item.device.clone()),
-            ));
-        }
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("blockstat", ctime, stats.1.to_value())?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    for datastore in stats.2.iter() {
-        values.push(Arc::new(
-            MetricsData::new("blockstat", ctime, datastore.to_value())?
-                .tag("object", "host")
-                .tag("host", nodename)
-                .tag("datastore", datastore.name.clone()),
-        ));
-    }
-
-    // we must have a concrete functions, because the inferred lifetime from a
-    // closure is not general enough for the tokio::spawn call we are in here...
-    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
-        &item.0
-    }
-
-    let results =
-        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
-    for (res, name) in results
-        .into_iter()
-        .zip(channel_list.iter().map(|(_, name)| name))
-    {
-        if let Err(err) = res {
-            log::error!("error sending into channel of {name}: {err}");
-        }
-    }
-
-    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
-        if let Err(err) = channel.join().await {
-            log::error!("error sending to metric server {name}: {err}");
-        }
-    }))
-    .await;
-
-    Ok(())
-}
-
-/// Get the metric server connections from a config
-pub fn get_metric_server_connections(
-    metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
-    let mut res = Vec::new();
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
-        res.push((future, config.name));
-    }
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_http(
-            &config.url,
-            config.organization.as_deref().unwrap_or("proxmox"),
-            config.bucket.as_deref().unwrap_or("proxmox"),
-            config.token.as_deref(),
-            config.verify_tls.unwrap_or(true),
-            config.max_body_size.unwrap_or(25_000_000),
-        )?;
-        res.push((future, config.name));
-    }
-    Ok(res)
-}
-
-struct HostStats {
-    proc: Option<ProcFsStat>,
-    meminfo: Option<ProcFsMemInfo>,
-    net: Option<Vec<ProcFsNetDev>>,
-    load: Option<Loadavg>,
-}
-
-struct DiskStat {
-    name: String,
-    usage: Option<FileSystemInformation>,
-    dev: Option<BlockDevStat>,
-}
-
-impl DiskStat {
-    fn to_value(&self) -> Value {
-        let mut value = json!({});
-        if let Some(usage) = &self.usage {
-            value["total"] = Value::from(usage.total);
-            value["used"] = Value::from(usage.used);
-            value["avail"] = Value::from(usage.available);
-        }
-
-        if let Some(dev) = &self.dev {
-            value["read_ios"] = Value::from(dev.read_ios);
-            value["read_bytes"] = Value::from(dev.read_sectors * 512);
-            value["write_ios"] = Value::from(dev.write_ios);
-            value["write_bytes"] = Value::from(dev.write_sectors * 512);
-            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
-        }
-        value
-    }
-}
-
-fn collect_host_stats_sync() -> HostStats {
-    use proxmox_sys::linux::procfs::{
-        read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
-    };
-
-    let proc = match read_proc_stat() {
-        Ok(stat) => Some(stat),
-        Err(err) => {
-            eprintln!("read_proc_stat failed - {err}");
-            None
-        }
-    };
-
-    let meminfo = match read_meminfo() {
-        Ok(stat) => Some(stat),
-        Err(err) => {
-            eprintln!("read_meminfo failed - {err}");
-            None
-        }
-    };
-
-    let net = match read_proc_net_dev() {
-        Ok(netdev) => Some(netdev),
-        Err(err) => {
-            eprintln!("read_prox_net_dev failed - {err}");
-            None
-        }
-    };
-
-    let load = match read_loadavg() {
-        Ok(loadavg) => Some(loadavg),
-        Err(err) => {
-            eprintln!("read_loadavg failed - {err}");
-            None
-        }
-    };
-
-    HostStats {
-        proc,
-        meminfo,
-        net,
-        load,
-    }
-}
-
-fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
-    let disk_manager = DiskManage::new();
-
-    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
-
-    let mut datastores = Vec::new();
-    match pbs_config::datastore::config() {
-        Ok((config, _)) => {
-            let datastore_list: Vec<DataStoreConfig> = config
-                .convert_to_typed_array("datastore")
-                .unwrap_or_default();
-
-            for config in datastore_list {
-                if config
-                    .get_maintenance_mode()
-                    .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
-                {
-                    continue;
-                }
-                let path = std::path::Path::new(&config.path);
-                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
-            }
-        }
-        Err(err) => {
-            eprintln!("read datastore config failed - {err}");
-        }
-    }
-
-    (root, datastores)
-}
-
-fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
-    if let Some(stat) = &host.proc {
-        rrd_update_gauge("host/cpu", stat.cpu);
-        rrd_update_gauge("host/iowait", stat.iowait_percent);
-    }
-
-    if let Some(meminfo) = &host.meminfo {
-        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-        rrd_update_gauge("host/memused", meminfo.memused as f64);
-        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
-    }
-
-    if let Some(netdev) = &host.net {
-        use pbs_config::network::is_physical_nic;
-        let mut netin = 0;
-        let mut netout = 0;
-        for item in netdev {
-            if !is_physical_nic(&item.device) {
-                continue;
-            }
-            netin += item.receive;
-            netout += item.send;
-        }
-        rrd_update_derive("host/netin", netin as f64);
-        rrd_update_derive("host/netout", netout as f64);
-    }
-
-    if let Some(loadavg) = &host.load {
-        rrd_update_gauge("host/loadavg", loadavg.0);
-    }
-
-    rrd_update_disk_stat(hostdisk, "host");
-
-    for stat in datastores {
-        let rrd_prefix = format!("datastore/{}", stat.name);
-        rrd_update_disk_stat(stat, &rrd_prefix);
-    }
-}
-
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
-    if let Some(status) = &disk.usage {
-        let rrd_key = format!("{}/total", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.total as f64);
-        let rrd_key = format!("{}/used", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.used as f64);
-        let rrd_key = format!("{}/available", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.available as f64);
-    }
-
-    if let Some(stat) = &disk.dev {
-        let rrd_key = format!("{}/read_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.read_ios as f64);
-        let rrd_key = format!("{}/read_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/write_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.write_ios as f64);
-        let rrd_key = format!("{}/write_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/io_ticks", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
-    }
-}
-
 fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     let event: CalendarEvent = match event_str.parse() {
         Ok(event) => event,
@@ -1240,53 +878,6 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
-    let usage = match proxmox_sys::fs::fs_info(path) {
-        Ok(status) => Some(status),
-        Err(err) => {
-            eprintln!("read fs info on {path:?} failed - {err}");
-            None
-        }
-    };
-
-    let dev = match disk_manager.find_mounted_device(path) {
-        Ok(None) => None,
-        Ok(Some((fs_type, device, source))) => {
-            let mut device_stat = None;
-            match (fs_type.as_str(), source) {
-                ("zfs", Some(source)) => match source.into_string() {
-                    Ok(dataset) => match zfs_dataset_stats(&dataset) {
-                        Ok(stat) => device_stat = Some(stat),
-                        Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
-                    },
-                    Err(source) => {
-                        eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
-                    }
-                },
-                _ => {
-                    if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
-                        match disk.read_stat() {
-                            Ok(stat) => device_stat = stat,
-                            Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
-                        }
-                    }
-                }
-            }
-            device_stat
-        }
-        Err(err) => {
-            eprintln!("find_mounted_device failed - {err}");
-            None
-        }
-    };
-
-    DiskStat {
-        name: name.to_string(),
-        usage,
-        dev,
-    }
-}
-
 // Rate Limiter lookup
 async fn run_traffic_control_updater() {
     loop {
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
new file mode 100644
index 00000000..e220f51a
--- /dev/null
+++ b/src/server/metric_collection/mod.rs
@@ -0,0 +1,426 @@
+use std::{
+    path::Path,
+    pin::pin,
+    sync::Arc,
+    time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pbs_api_types::{DataStoreConfig, Operation};
+use serde_json::{json, Value};
+use tokio::join;
+
+use proxmox_metrics::MetricsData;
+use proxmox_sys::{
+    fs::FileSystemInformation,
+    linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
+};
+
+use crate::{
+    rrd_cache::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge},
+    tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
+};
+
+pub fn init() -> Result<(), Error> {
+    let rrd_cache = initialize_rrd_cache()?;
+    rrd_cache.apply_journal()?;
+    Ok(())
+}
+
+pub fn start_collection_task() {
+    tokio::spawn(async {
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        let future = pin!(run_stat_generator());
+        futures::future::select(future, abort_future).await;
+    });
+}
+
+async fn run_stat_generator() {
+    loop {
+        let delay_target = Instant::now() + Duration::from_secs(10);
+
+        let stats_future = tokio::task::spawn_blocking(|| {
+            let hoststats = collect_host_stats_sync();
+            let (hostdisk, datastores) = collect_disk_stats_sync();
+            Arc::new((hoststats, hostdisk, datastores))
+        });
+        let stats = match stats_future.await {
+            Ok(res) => res,
+            Err(err) => {
+                log::error!("collecting host stats panicked: {err}");
+                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+                continue;
+            }
+        };
+
+        let rrd_future = tokio::task::spawn_blocking({
+            let stats = Arc::clone(&stats);
+            move || {
+                rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+                rrd_sync_journal();
+            }
+        });
+
+        let metrics_future = send_data_to_metric_servers(stats);
+
+        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+        if let Err(err) = rrd_res {
+            log::error!("rrd update panicked: {err}");
+        }
+        if let Err(err) = metrics_res {
+            log::error!("error during metrics sending: {err}");
+        }
+
+        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+}
+
+async fn send_data_to_metric_servers(
+    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+    let (config, _digest) = pbs_config::metrics::config()?;
+    let channel_list = get_metric_server_connections(config)?;
+
+    if channel_list.is_empty() {
+        return Ok(());
+    }
+
+    let ctime = proxmox_time::epoch_i64();
+    let nodename = proxmox_sys::nodename();
+
+    let mut values = Vec::new();
+
+    let mut cpuvalue = match &stats.0.proc {
+        Some(stat) => serde_json::to_value(stat)?,
+        None => json!({}),
+    };
+
+    if let Some(loadavg) = &stats.0.load {
+        cpuvalue["avg1"] = Value::from(loadavg.0);
+        cpuvalue["avg5"] = Value::from(loadavg.1);
+        cpuvalue["avg15"] = Value::from(loadavg.2);
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("cpustat", ctime, cpuvalue)?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    if let Some(stat) = &stats.0.meminfo {
+        values.push(Arc::new(
+            MetricsData::new("memory", ctime, stat)?
+                .tag("object", "host")
+                .tag("host", nodename),
+        ));
+    }
+
+    if let Some(netdev) = &stats.0.net {
+        for item in netdev {
+            values.push(Arc::new(
+                MetricsData::new("nics", ctime, item)?
+                    .tag("object", "host")
+                    .tag("host", nodename)
+                    .tag("instance", item.device.clone()),
+            ));
+        }
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("blockstat", ctime, stats.1.to_value())?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    for datastore in stats.2.iter() {
+        values.push(Arc::new(
+            MetricsData::new("blockstat", ctime, datastore.to_value())?
+                .tag("object", "host")
+                .tag("host", nodename)
+                .tag("datastore", datastore.name.clone()),
+        ));
+    }
+
+    // we must have a concrete functions, because the inferred lifetime from a
+    // closure is not general enough for the tokio::spawn call we are in here...
+    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+        &item.0
+    }
+
+    let results =
+        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+    for (res, name) in results
+        .into_iter()
+        .zip(channel_list.iter().map(|(_, name)| name))
+    {
+        if let Err(err) = res {
+            log::error!("error sending into channel of {name}: {err}");
+        }
+    }
+
+    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+        if let Err(err) = channel.join().await {
+            log::error!("error sending to metric server {name}: {err}");
+        }
+    }))
+    .await;
+
+    Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+    metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+    let mut res = Vec::new();
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+        res.push((future, config.name));
+    }
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_http(
+            &config.url,
+            config.organization.as_deref().unwrap_or("proxmox"),
+            config.bucket.as_deref().unwrap_or("proxmox"),
+            config.token.as_deref(),
+            config.verify_tls.unwrap_or(true),
+            config.max_body_size.unwrap_or(25_000_000),
+        )?;
+        res.push((future, config.name));
+    }
+    Ok(res)
+}
+
+struct HostStats {
+    proc: Option<ProcFsStat>,
+    meminfo: Option<ProcFsMemInfo>,
+    net: Option<Vec<ProcFsNetDev>>,
+    load: Option<Loadavg>,
+}
+
+struct DiskStat {
+    name: String,
+    usage: Option<FileSystemInformation>,
+    dev: Option<BlockDevStat>,
+}
+
+impl DiskStat {
+    fn to_value(&self) -> Value {
+        let mut value = json!({});
+        if let Some(usage) = &self.usage {
+            value["total"] = Value::from(usage.total);
+            value["used"] = Value::from(usage.used);
+            value["avail"] = Value::from(usage.available);
+        }
+
+        if let Some(dev) = &self.dev {
+            value["read_ios"] = Value::from(dev.read_ios);
+            value["read_bytes"] = Value::from(dev.read_sectors * 512);
+            value["write_ios"] = Value::from(dev.write_ios);
+            value["write_bytes"] = Value::from(dev.write_sectors * 512);
+            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+        }
+        value
+    }
+}
+
+fn collect_host_stats_sync() -> HostStats {
+    use proxmox_sys::linux::procfs::{
+        read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
+    };
+
+    let proc = match read_proc_stat() {
+        Ok(stat) => Some(stat),
+        Err(err) => {
+            eprintln!("read_proc_stat failed - {err}");
+            None
+        }
+    };
+
+    let meminfo = match read_meminfo() {
+        Ok(stat) => Some(stat),
+        Err(err) => {
+            eprintln!("read_meminfo failed - {err}");
+            None
+        }
+    };
+
+    let net = match read_proc_net_dev() {
+        Ok(netdev) => Some(netdev),
+        Err(err) => {
+            eprintln!("read_prox_net_dev failed - {err}");
+            None
+        }
+    };
+
+    let load = match read_loadavg() {
+        Ok(loadavg) => Some(loadavg),
+        Err(err) => {
+            eprintln!("read_loadavg failed - {err}");
+            None
+        }
+    };
+
+    HostStats {
+        proc,
+        meminfo,
+        net,
+        load,
+    }
+}
+
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
+    let disk_manager = DiskManage::new();
+
+    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+
+    let mut datastores = Vec::new();
+    match pbs_config::datastore::config() {
+        Ok((config, _)) => {
+            let datastore_list: Vec<DataStoreConfig> = config
+                .convert_to_typed_array("datastore")
+                .unwrap_or_default();
+
+            for config in datastore_list {
+                if config
+                    .get_maintenance_mode()
+                    .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
+                {
+                    continue;
+                }
+                let path = std::path::Path::new(&config.path);
+                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
+            }
+        }
+        Err(err) => {
+            eprintln!("read datastore config failed - {err}");
+        }
+    }
+
+    (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+    if let Some(stat) = &host.proc {
+        rrd_update_gauge("host/cpu", stat.cpu);
+        rrd_update_gauge("host/iowait", stat.iowait_percent);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+        rrd_update_gauge("host/memused", meminfo.memused as f64);
+        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+        use pbs_config::network::is_physical_nic;
+        let mut netin = 0;
+        let mut netout = 0;
+        for item in netdev {
+            if !is_physical_nic(&item.device) {
+                continue;
+            }
+            netin += item.receive;
+            netout += item.send;
+        }
+        rrd_update_derive("host/netin", netin as f64);
+        rrd_update_derive("host/netout", netout as f64);
+    }
+
+    if let Some(loadavg) = &host.load {
+        rrd_update_gauge("host/loadavg", loadavg.0);
+    }
+
+    rrd_update_disk_stat(hostdisk, "host");
+
+    for stat in datastores {
+        let rrd_prefix = format!("datastore/{}", stat.name);
+        rrd_update_disk_stat(stat, &rrd_prefix);
+    }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+    if let Some(status) = &disk.usage {
+        let rrd_key = format!("{}/total", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.total as f64);
+        let rrd_key = format!("{}/used", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.used as f64);
+        let rrd_key = format!("{}/available", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.available as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        let rrd_key = format!("{}/read_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        let rrd_key = format!("{}/read_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/write_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        let rrd_key = format!("{}/write_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/io_ticks", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+    }
+}
+
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+    let usage = match proxmox_sys::fs::fs_info(path) {
+        Ok(status) => Some(status),
+        Err(err) => {
+            eprintln!("read fs info on {path:?} failed - {err}");
+            None
+        }
+    };
+
+    let dev = match disk_manager.find_mounted_device(path) {
+        Ok(None) => None,
+        Ok(Some((fs_type, device, source))) => {
+            let mut device_stat = None;
+            match (fs_type.as_str(), source) {
+                ("zfs", Some(source)) => match source.into_string() {
+                    Ok(dataset) => match zfs_dataset_stats(&dataset) {
+                        Ok(stat) => device_stat = Some(stat),
+                        Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
+                    },
+                    Err(source) => {
+                        eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
+                    }
+                },
+                _ => {
+                    if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
+                        match disk.read_stat() {
+                            Ok(stat) => device_stat = stat,
+                            Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
+                        }
+                    }
+                }
+            }
+            device_stat
+        }
+        Err(err) => {
+            eprintln!("find_mounted_device failed - {err}");
+            None
+        }
+    };
+
+    DiskStat {
+        name: name.to_string(),
+        usage,
+        dev,
+    }
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7f845e5b..ff92a821 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -33,6 +33,8 @@ pub use report::*;
 
 pub mod auth;
 
+pub mod metric_collection;
+
 pub(crate) mod pull;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
-- 
2.39.5





More information about the pbs-devel mailing list