[pbs-devel] [PATCH proxmox-backup v4 4/6] backup-proxy: decouple stats gathering from rrd update

Dominik Csapak d.csapak at proxmox.com
Mon Jan 17 11:48:23 CET 2022


that way we can reuse the stats gathered

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs | 213 +++++++++++++++++++++-----------
 1 file changed, 141 insertions(+), 72 deletions(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 30b1d94c..fd2120ee 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,8 +17,11 @@ use tokio_stream::wrappers::ReceiverStream;
 use serde_json::{json, Value};
 use http::{Method, HeaderMap};
 
-use proxmox_sys::linux::socket::set_tcp_keepalive;
-use proxmox_sys::fs::CreateOptions;
+use proxmox_sys::linux::{
+    procfs::{ProcFsStat, ProcFsMemInfo, ProcFsNetDev, Loadavg},
+    socket::set_tcp_keepalive
+};
+use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
@@ -44,6 +47,7 @@ use proxmox_backup::{
             Job,
         },
     },
+    tools::disks::BlockDevStat,
 };
 
 use pbs_buildcfg::configdir;
@@ -931,9 +935,24 @@ async fn run_stat_generator() {
     loop {
         let delay_target = Instant::now() +  Duration::from_secs(10);
 
-        generate_host_stats().await;
+        let stats = match tokio::task::spawn_blocking(|| {
+            let hoststats = collect_host_stats_sync();
+            let (hostdisk, datastores) = collect_disk_stats_sync();
+            Arc::new((hoststats, hostdisk, datastores))
+        }).await {
+            Ok(res) => res,
+            Err(err) => {
+                log::error!("collecting host stats paniced: {}", err);
+                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+                continue;
+            }
+        };
+
+        let rrd_future = tokio::task::spawn_blocking(move || {
+            rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+            rrd_sync_journal();
+        });
 
-        rrd_sync_journal();
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
@@ -941,86 +960,147 @@ async fn run_stat_generator() {
 
 }
 
-async fn generate_host_stats() {
-    match tokio::task::spawn_blocking(generate_host_stats_sync).await {
-        Ok(()) => (),
-        Err(err) => log::error!("generate_host_stats paniced: {}", err),
-    }
+struct HostStats {
+    proc: Option<ProcFsStat>,
+    meminfo: Option<ProcFsMemInfo>,
+    net: Option<Vec<ProcFsNetDev>>,
+    load: Option<Loadavg>,
+}
+
+struct DiskStat {
+    name: String,
+    usage: Option<FileSystemInformation>,
+    dev: Option<BlockDevStat>,
 }
 
-fn generate_host_stats_sync() {
+fn collect_host_stats_sync() -> HostStats {
     use proxmox_sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
 
-    match read_proc_stat() {
-        Ok(stat) => {
-            rrd_update_gauge("host/cpu", stat.cpu);
-            rrd_update_gauge("host/iowait", stat.iowait_percent);
-        }
+    let proc = match read_proc_stat() {
+        Ok(stat) => Some(stat),
         Err(err) => {
             eprintln!("read_proc_stat failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_meminfo() {
-        Ok(meminfo) => {
-            rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-            rrd_update_gauge("host/memused", meminfo.memused as f64);
-            rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-            rrd_update_gauge("host/swapused", meminfo.swapused as f64);
-        }
+    let meminfo = match read_meminfo() {
+        Ok(stat) => Some(stat),
         Err(err) => {
             eprintln!("read_meminfo failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_proc_net_dev() {
-        Ok(netdev) => {
-            use pbs_config::network::is_physical_nic;
-            let mut netin = 0;
-            let mut netout = 0;
-            for item in netdev {
-                if !is_physical_nic(&item.device) { continue; }
-                netin += item.receive;
-                netout += item.send;
-            }
-            rrd_update_derive("host/netin", netin as f64);
-            rrd_update_derive("host/netout", netout as f64);
-        }
+    let net = match read_proc_net_dev() {
+        Ok(netdev) => Some(netdev),
         Err(err) => {
             eprintln!("read_prox_net_dev failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_loadavg() {
-        Ok(loadavg) => {
-            rrd_update_gauge("host/loadavg", loadavg.0 as f64);
-        }
+    let load = match read_loadavg() {
+        Ok(loadavg) => Some(loadavg),
         Err(err) => {
             eprintln!("read_loadavg failed - {}", err);
+            None
         }
+    };
+
+    HostStats {
+        proc,
+        meminfo,
+        net,
+        load,
     }
+}
 
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
     let disk_manager = DiskManage::new();
 
-    gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
 
+    let mut datastores = Vec::new();
     match pbs_config::datastore::config() {
         Ok((config, _)) => {
             let datastore_list: Vec<DataStoreConfig> =
                 config.convert_to_typed_array("datastore").unwrap_or_default();
 
             for config in datastore_list {
-
-                let rrd_prefix = format!("datastore/{}", config.name);
                 let path = std::path::Path::new(&config.path);
-                gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
+                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
             }
         }
         Err(err) => {
             eprintln!("read datastore config failed - {}", err);
         }
     }
+
+    (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+    if let Some(stat) = &host.proc {
+        rrd_update_gauge("host/cpu", stat.cpu);
+        rrd_update_gauge("host/iowait", stat.iowait_percent);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+        rrd_update_gauge("host/memused", meminfo.memused as f64);
+        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+            use pbs_config::network::is_physical_nic;
+            let mut netin = 0;
+            let mut netout = 0;
+            for item in netdev {
+                if !is_physical_nic(&item.device) { continue; }
+                netin += item.receive;
+                netout += item.send;
+            }
+            rrd_update_derive("host/netin", netin as f64);
+            rrd_update_derive("host/netout", netout as f64);
+    }
+
+    if let Some(loadavg) = &host.load {
+        rrd_update_gauge("host/loadavg", loadavg.0 as f64);
+    }
+
+    rrd_update_disk_stat(&hostdisk, "host");
+
+    for stat in datastores {
+        let rrd_prefix = format!("datastore/{}", stat.name);
+        rrd_update_disk_stat(&stat, &rrd_prefix);
+    }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+    if let Some(status) = &disk.usage {
+        let rrd_key = format!("{}/total", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.total as f64);
+        let rrd_key = format!("{}/used", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.used as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        let rrd_key = format!("{}/read_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        let rrd_key = format!("{}/read_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
+
+        let rrd_key = format!("{}/write_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        let rrd_key = format!("{}/write_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
+
+        let rrd_key = format!("{}/io_ticks", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
+    }
 }
 
 fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
@@ -1053,22 +1133,17 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
-
-    match proxmox_sys::fs::fs_info(path) {
-        Ok(status) => {
-            let rrd_key = format!("{}/total", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.total as f64);
-            let rrd_key = format!("{}/used", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.used as f64);
-        }
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+    let usage = match proxmox_sys::fs::fs_info(path) {
+        Ok(status) => Some(status),
         Err(err) => {
             eprintln!("read fs info on {:?} failed - {}", path, err);
+            None
         }
-    }
+    };
 
-    match disk_manager.find_mounted_device(path) {
-        Ok(None) => {},
+    let dev = match disk_manager.find_mounted_device(path) {
+        Ok(None) => None,
         Ok(Some((fs_type, device, source))) => {
             let mut device_stat = None;
             match (fs_type.as_str(), source) {
@@ -1090,24 +1165,18 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
                     }
                 }
             }
-            if let Some(stat) = device_stat {
-                let rrd_key = format!("{}/read_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.read_ios as f64);
-                let rrd_key = format!("{}/read_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
-
-                let rrd_key = format!("{}/write_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.write_ios as f64);
-                let rrd_key = format!("{}/write_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
-
-                let rrd_key = format!("{}/io_ticks", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
-            }
+            device_stat
         }
         Err(err) => {
             eprintln!("find_mounted_device failed - {}", err);
+            None
         }
+    };
+
+    DiskStat {
+        name: name.to_string(),
+        usage,
+        dev,
     }
 }
 
-- 
2.30.2






More information about the pbs-devel mailing list