[pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update
Dominik Csapak
d.csapak at proxmox.com
Tue Dec 14 13:24:10 CET 2021
that way we can reuse the stats gathered
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 213 +++++++++++++++++++++-----------
1 file changed, 141 insertions(+), 72 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index fa79322d..2700fabf 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,8 +17,11 @@ use tokio_stream::wrappers::ReceiverStream;
use serde_json::{json, Value};
use http::{Method, HeaderMap};
-use proxmox_sys::linux::socket::set_tcp_keepalive;
-use proxmox_sys::fs::CreateOptions;
+use proxmox_sys::linux::{
+ procfs::{ProcFsStat, ProcFsMemInfo, ProcFsNetDev, Loadavg},
+ socket::set_tcp_keepalive
+};
+use proxmox_sys::fs::{CreateOptions, DiskUsage};
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
@@ -44,6 +47,7 @@ use proxmox_backup::{
Job,
},
},
+ tools::disks::BlockDevStat,
};
use pbs_buildcfg::configdir;
@@ -931,9 +935,24 @@ async fn run_stat_generator() {
loop {
let delay_target = Instant::now() + Duration::from_secs(10);
- generate_host_stats().await;
+ let (hoststats, hostdisk, datastores) = match tokio::task::spawn_blocking(|| {
+ let hoststats = collect_host_stats_sync();
+ let (hostdisk, datastores) = collect_disk_stats_sync();
+ (Arc::new(hoststats), Arc::new(hostdisk), Arc::new(datastores))
+ }).await {
+ Ok(res) => res,
+ Err(err) => {
+ log::error!("collecting host stats paniced: {}", err);
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ continue;
+ }
+ };
+
+ let rrd_future = tokio::task::spawn_blocking(move || {
+ rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
+ rrd_sync_journal();
+ });
- rrd_sync_journal();
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
@@ -941,86 +960,147 @@ async fn run_stat_generator() {
}
-async fn generate_host_stats() {
- match tokio::task::spawn_blocking(generate_host_stats_sync).await {
- Ok(()) => (),
- Err(err) => log::error!("generate_host_stats paniced: {}", err),
- }
+struct HostStats {
+ proc: Option<ProcFsStat>,
+ meminfo: Option<ProcFsMemInfo>,
+ net: Option<Vec<ProcFsNetDev>>,
+ load: Option<Loadavg>,
+}
+
+struct DiskStat {
+ name: String,
+ usage: Option<DiskUsage>,
+ dev: Option<BlockDevStat>,
}
-fn generate_host_stats_sync() {
+fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
- match read_proc_stat() {
- Ok(stat) => {
- rrd_update_gauge("host/cpu", stat.cpu);
- rrd_update_gauge("host/iowait", stat.iowait_percent);
- }
+ let proc = match read_proc_stat() {
+ Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_proc_stat failed - {}", err);
+ None
}
- }
+ };
- match read_meminfo() {
- Ok(meminfo) => {
- rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
- rrd_update_gauge("host/memused", meminfo.memused as f64);
- rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
- rrd_update_gauge("host/swapused", meminfo.swapused as f64);
- }
+ let meminfo = match read_meminfo() {
+ Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_meminfo failed - {}", err);
+ None
}
- }
+ };
- match read_proc_net_dev() {
- Ok(netdev) => {
- use pbs_config::network::is_physical_nic;
- let mut netin = 0;
- let mut netout = 0;
- for item in netdev {
- if !is_physical_nic(&item.device) { continue; }
- netin += item.receive;
- netout += item.send;
- }
- rrd_update_derive("host/netin", netin as f64);
- rrd_update_derive("host/netout", netout as f64);
- }
+ let net = match read_proc_net_dev() {
+ Ok(netdev) => Some(netdev),
Err(err) => {
eprintln!("read_prox_net_dev failed - {}", err);
+ None
}
- }
+ };
- match read_loadavg() {
- Ok(loadavg) => {
- rrd_update_gauge("host/loadavg", loadavg.0 as f64);
- }
+ let load = match read_loadavg() {
+ Ok(loadavg) => Some(loadavg),
Err(err) => {
eprintln!("read_loadavg failed - {}", err);
+ None
}
+ };
+
+ HostStats {
+ proc,
+ meminfo,
+ net,
+ load,
}
+}
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
let disk_manager = DiskManage::new();
- gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+ let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+ let mut datastores = Vec::new();
match pbs_config::datastore::config() {
Ok((config, _)) => {
let datastore_list: Vec<DataStoreConfig> =
config.convert_to_typed_array("datastore").unwrap_or_default();
for config in datastore_list {
-
- let rrd_prefix = format!("datastore/{}", config.name);
let path = std::path::Path::new(&config.path);
- gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
+ datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
}
}
Err(err) => {
eprintln!("read datastore config failed - {}", err);
}
}
+
+ (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+ if let Some(stat) = &host.proc {
+ rrd_update_gauge("host/cpu", stat.cpu);
+ rrd_update_gauge("host/iowait", stat.iowait_percent);
+ }
+
+ if let Some(meminfo) = &host.meminfo {
+ rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+ rrd_update_gauge("host/memused", meminfo.memused as f64);
+ rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+ rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+ }
+
+ if let Some(netdev) = &host.net {
+ use pbs_config::network::is_physical_nic;
+ let mut netin = 0;
+ let mut netout = 0;
+ for item in netdev {
+ if !is_physical_nic(&item.device) { continue; }
+ netin += item.receive;
+ netout += item.send;
+ }
+ rrd_update_derive("host/netin", netin as f64);
+ rrd_update_derive("host/netout", netout as f64);
+ }
+
+ if let Some(loadavg) = &host.load {
+ rrd_update_gauge("host/loadavg", loadavg.0 as f64);
+ }
+
+ rrd_update_disk_stat(&hostdisk, "host");
+
+ for stat in datastores {
+ let rrd_prefix = format!("datastore/{}", stat.name);
+ rrd_update_disk_stat(&stat, &rrd_prefix);
+ }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+ if let Some(status) = &disk.usage {
+ let rrd_key = format!("{}/total", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.total as f64);
+ let rrd_key = format!("{}/used", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.used as f64);
+ }
+
+ if let Some(stat) = &disk.dev {
+ let rrd_key = format!("{}/read_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.read_ios as f64);
+ let rrd_key = format!("{}/read_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
+
+ let rrd_key = format!("{}/write_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.write_ios as f64);
+ let rrd_key = format!("{}/write_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
+
+ let rrd_key = format!("{}/io_ticks", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
+ }
}
fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
@@ -1053,22 +1133,17 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
-
- match proxmox_sys::fs::disk_usage(path) {
- Ok(status) => {
- let rrd_key = format!("{}/total", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.total as f64);
- let rrd_key = format!("{}/used", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.used as f64);
- }
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+ let usage = match proxmox_sys::fs::disk_usage(path) {
+ Ok(status) => Some(status),
Err(err) => {
eprintln!("read disk_usage on {:?} failed - {}", path, err);
+ None
}
- }
+ };
- match disk_manager.find_mounted_device(path) {
- Ok(None) => {},
+ let dev = match disk_manager.find_mounted_device(path) {
+ Ok(None) => None,
Ok(Some((fs_type, device, source))) => {
let mut device_stat = None;
match fs_type.as_str() {
@@ -1090,24 +1165,18 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
}
}
- if let Some(stat) = device_stat {
- let rrd_key = format!("{}/read_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.read_ios as f64);
- let rrd_key = format!("{}/read_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
-
- let rrd_key = format!("{}/write_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.write_ios as f64);
- let rrd_key = format!("{}/write_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
-
- let rrd_key = format!("{}/io_ticks", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
- }
+ device_stat
}
Err(err) => {
eprintln!("find_mounted_device failed - {}", err);
+ None
}
+ };
+
+ DiskStat {
+ name: name.to_string(),
+ usage,
+ dev,
}
}
--
2.30.2
More information about the pbs-devel
mailing list