[pbs-devel] [PATCH proxmox-backup 01/15] proxmox-rrd: use a journal to reduce amount of bytes written

Dietmar Maurer dietmar at proxmox.com
Wed Oct 13 10:24:38 CEST 2021


Apply the journal every 30 seconds.
---
 proxmox-rrd/Cargo.toml          |   1 +
 proxmox-rrd/src/cache.rs        | 223 ++++++++++++++++++++++++++++----
 proxmox-rrd/src/lib.rs          |   6 +-
 proxmox-rrd/src/rrd.rs          |  30 +++++
 src/bin/proxmox-backup-api.rs   |   2 +-
 src/bin/proxmox-backup-proxy.rs |  54 ++++----
 src/lib.rs                      |   5 +-
 7 files changed, 261 insertions(+), 60 deletions(-)

diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
index 7225be8e..c66344ac 100644
--- a/proxmox-rrd/Cargo.toml
+++ b/proxmox-rrd/Cargo.toml
@@ -9,6 +9,7 @@ description = "Simple RRD database implementation."
 anyhow = "1.0"
 bitflags = "1.2.1"
 log = "0.4"
+nix = "0.19.1"
 
 proxmox = { version = "0.14.0" }
 proxmox-time = "1"
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
index fe28aeda..7c56e047 100644
--- a/proxmox-rrd/src/cache.rs
+++ b/proxmox-rrd/src/cache.rs
@@ -1,24 +1,46 @@
+use std::fs::File;
 use std::path::{Path, PathBuf};
 use std::collections::HashMap;
-use std::sync::{RwLock};
+use std::sync::RwLock;
+use std::io::Write;
+use std::io::{BufRead, BufReader};
+use std::os::unix::io::AsRawFd;
 
-use anyhow::{format_err, Error};
+use anyhow::{format_err, bail, Error};
+use nix::fcntl::OFlag;
 
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions};
 
 use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
 
 use crate::{DST, rrd::RRD};
 
+const RRD_JOURNAL_NAME: &str = "rrd.journal";
+
 /// RRD cache - keep RRD data in RAM, but write updates to disk
 ///
 /// This cache is designed to run as single instance (no concurrent
 /// access from other processes).
 pub struct RRDCache {
+    apply_interval: f64,
     basedir: PathBuf,
     file_options: CreateOptions,
     dir_options: CreateOptions,
-    cache: RwLock<HashMap<String, RRD>>,
+    state: RwLock<RRDCacheState>,
+}
+
+// shared state behind RwLock
+struct RRDCacheState {
+    rrd_map: HashMap<String, RRD>,
+    journal: File,
+    last_journal_flush: f64,
+}
+
+struct JournalEntry {
+    time: f64,
+    value: f64,
+    dst: DST,
+    rel_path: String,
 }
 
 impl RRDCache {
@@ -28,21 +50,166 @@ impl RRDCache {
         basedir: P,
         file_options: Option<CreateOptions>,
         dir_options: Option<CreateOptions>,
-    ) -> Self {
+        apply_interval: f64,
+    ) -> Result<Self, Error> {
         let basedir = basedir.as_ref().to_owned();
-        Self {
+
+        let file_options = file_options.unwrap_or_else(|| CreateOptions::new());
+        let dir_options = dir_options.unwrap_or_else(|| CreateOptions::new());
+
+        create_path(&basedir, Some(dir_options.clone()), Some(dir_options.clone()))
+            .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+        let mut journal_path = basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_WRONLY|OFlag::O_APPEND;
+        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], file_options.clone())?;
+
+        let state = RRDCacheState {
+            journal,
+            rrd_map: HashMap::new(),
+            last_journal_flush: 0.0,
+        };
+
+        Ok(Self {
             basedir,
-            file_options: file_options.unwrap_or_else(|| CreateOptions::new()),
-            dir_options: dir_options.unwrap_or_else(|| CreateOptions::new()),
-            cache: RwLock::new(HashMap::new()),
+            file_options,
+            dir_options,
+            apply_interval,
+            state: RwLock::new(state),
+        })
+    }
+
+    fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
+
+        let line = line.trim();
+
+        let parts: Vec<&str> = line.splitn(4, ':').collect();
+        if parts.len() != 4 {
+            bail!("wrong numper of components");
         }
+
+        let time: f64 = parts[0].parse()
+            .map_err(|_| format_err!("unable to parse time"))?;
+        let value: f64 = parts[1].parse()
+            .map_err(|_| format_err!("unable to parse value"))?;
+        let dst: u8 = parts[2].parse()
+            .map_err(|_| format_err!("unable to parse data source type"))?;
+
+        let dst = match dst {
+            0 => DST::Gauge,
+            1 => DST::Derive,
+            _ => bail!("got strange value for data source type '{}'", dst),
+        };
+
+        let rel_path = parts[3].to_string();
+
+        Ok(JournalEntry { time, value, dst, rel_path })
+    }
+
+    fn append_journal_entry(
+        state: &mut RRDCacheState,
+        time: f64,
+        value: f64,
+        dst: DST,
+        rel_path: &str,
+    ) -> Result<(), Error> {
+        let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
+        state.journal.write_all(journal_entry.as_bytes())?;
+        Ok(())
     }
 
-    /// Create rrdd stat dir with correct permission
-    pub fn create_rrdb_dir(&self) -> Result<(), Error> {
+    pub fn apply_journal(&self) -> Result<(), Error> {
+        let mut state = self.state.write().unwrap(); // block writers
+        self.apply_journal_locked(&mut state)
+    }
 
-        create_path(&self.basedir, Some(self.dir_options.clone()), Some(self.dir_options.clone()))
-            .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+    fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
+
+        log::info!("applying rrd journal");
+
+        state.last_journal_flush = proxmox_time::epoch_f64();
+
+        let mut journal_path = self.basedir.clone();
+        journal_path.push(RRD_JOURNAL_NAME);
+
+        let flags = OFlag::O_CLOEXEC|OFlag::O_RDONLY;
+        let journal = atomic_open_or_create_file(&journal_path, flags,  &[], self.file_options.clone())?;
+        let mut journal = BufReader::new(journal);
+
+        let mut last_update_map = HashMap::new();
+
+        let mut get_last_update = |rel_path: &str, rrd: &RRD| {
+            if let Some(time) = last_update_map.get(rel_path) {
+                return *time;
+            }
+            let last_update =  rrd.last_update();
+            last_update_map.insert(rel_path.to_string(), last_update);
+            last_update
+        };
+
+        let mut linenr = 0;
+        loop {
+            linenr += 1;
+            let mut line = String::new();
+            let len = journal.read_line(&mut line)?;
+            if len == 0 { break; }
+
+            let entry = match Self::parse_journal_line(&line) {
+                Ok(entry) => entry,
+                Err(err) => {
+                    log::warn!("unable to parse rrd journal line {} (skip) - {}", linenr, err);
+                    continue; // skip unparsable lines
+                }
+            };
+
+            if let Some(rrd) = state.rrd_map.get_mut(&entry.rel_path) {
+                if entry.time > get_last_update(&entry.rel_path, &rrd) {
+                    rrd.update(entry.time, entry.value);
+                }
+            } else {
+                let mut path = self.basedir.clone();
+                path.push(&entry.rel_path);
+                create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
+
+                let mut rrd = match RRD::load(&path) {
+                    Ok(rrd) => rrd,
+                    Err(err) => {
+                        if err.kind() != std::io::ErrorKind::NotFound {
+                            log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
+                        }
+                        RRD::new(entry.dst)
+                    },
+                };
+                if entry.time > get_last_update(&entry.rel_path, &rrd) {
+                    rrd.update(entry.time, entry.value);
+                }
+                state.rrd_map.insert(entry.rel_path.clone(), rrd);
+            }
+        }
+
+        // save all RRDs
+
+        let mut errors = 0;
+        for (rel_path, rrd) in state.rrd_map.iter() {
+            let mut path = self.basedir.clone();
+            path.push(&rel_path);
+            if let Err(err) = rrd.save(&path, self.file_options.clone()) {
+                errors += 1;
+                log::error!("unable to save {:?}: {}", path, err);
+            }
+        }
+
+        // if everything went ok, commit the journal
+
+        if errors == 0 {
+            nix::unistd::ftruncate(state.journal.as_raw_fd(), 0)
+                .map_err(|err| format_err!("unable to truncate journal - {}", err))?;
+            log::info!("rrd journal successfully committed");
+        } else {
+            log::error!("errors during rrd flush - unable to commit rrd journal");
+        }
 
         Ok(())
     }
@@ -53,21 +220,26 @@ impl RRDCache {
         rel_path: &str,
         value: f64,
         dst: DST,
-        save: bool,
     ) -> Result<(), Error> {
 
-        let mut path = self.basedir.clone();
-        path.push(rel_path);
-
-        create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.file_options.clone()))?;
+        let mut state = self.state.write().unwrap(); // block other writers
 
-        let mut map = self.cache.write().unwrap();
         let now = proxmox_time::epoch_f64();
 
-        if let Some(rrd) = map.get_mut(rel_path) {
+        if (now - state.last_journal_flush) > self.apply_interval {
+            if let Err(err) = self.apply_journal_locked(&mut state) {
+                log::error!("apply journal failed: {}", err);
+            }
+        }
+
+        Self::append_journal_entry(&mut state, now, value, dst, rel_path)?;
+
+        if let Some(rrd) = state.rrd_map.get_mut(rel_path) {
             rrd.update(now, value);
-            if save { rrd.save(&path, self.file_options.clone())?; }
         } else {
+            let mut path = self.basedir.clone();
+            path.push(rel_path);
+            create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
             let mut rrd = match RRD::load(&path) {
                 Ok(rrd) => rrd,
                 Err(err) => {
@@ -78,10 +250,7 @@ impl RRDCache {
                 },
             };
             rrd.update(now, value);
-            if save {
-                rrd.save(&path, self.file_options.clone())?;
-            }
-            map.insert(rel_path.into(), rrd);
+            state.rrd_map.insert(rel_path.into(), rrd);
         }
 
         Ok(())
@@ -97,9 +266,9 @@ impl RRDCache {
         mode: RRDMode,
     ) -> Option<(u64, u64, Vec<Option<f64>>)> {
 
-        let map = self.cache.read().unwrap();
+        let state = self.state.read().unwrap();
 
-        match map.get(&format!("{}/{}", base, name)) {
+        match state.rrd_map.get(&format!("{}/{}", base, name)) {
             Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
             None => None,
         }
diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs
index 303cd55d..d83e6ffc 100644
--- a/proxmox-rrd/src/lib.rs
+++ b/proxmox-rrd/src/lib.rs
@@ -13,9 +13,11 @@ mod cache;
 pub use cache::*;
 
 /// RRD data source tyoe
+#[repr(u8)]
+#[derive(Copy, Clone)]
 pub enum DST {
     /// Gauge values are stored unmodified.
-    Gauge,
+    Gauge = 0,
     /// Stores the difference to the previous value.
-    Derive,
+    Derive = 1,
 }
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
index f4c08909..026498ed 100644
--- a/proxmox-rrd/src/rrd.rs
+++ b/proxmox-rrd/src/rrd.rs
@@ -336,6 +336,36 @@ impl RRD {
         replace_file(filename, rrd_slice, options)
     }
 
+    pub fn last_update(&self) -> f64 {
+
+        let mut last_update = 0.0;
+
+        {
+            let mut check_last_update = |rra: &RRA| {
+                if rra.last_update > last_update {
+                    last_update = rra.last_update;
+                }
+            };
+
+            check_last_update(&self.hour_avg);
+            check_last_update(&self.hour_max);
+
+            check_last_update(&self.day_avg);
+            check_last_update(&self.day_max);
+
+            check_last_update(&self.week_avg);
+            check_last_update(&self.week_max);
+
+            check_last_update(&self.month_avg);
+            check_last_update(&self.month_max);
+
+            check_last_update(&self.year_avg);
+            check_last_update(&self.year_max);
+        }
+
+        last_update
+    }
+
     /// Update the value (in memory)
     ///
     /// Note: This does not call [Self::save].
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index c98fba25..3c963a77 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -74,7 +74,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::server::create_run_dir()?;
 
-    RRD_CACHE.create_rrdb_dir()?;
+    RRD_CACHE.apply_journal()?;
 
     proxmox_backup::server::jobstate::create_jobstate_dir()?;
     proxmox_backup::tape::create_tape_status_dir()?;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 4bdb8ce9..8da98ff8 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -889,14 +889,10 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
 
 async fn run_stat_generator() {
 
-    let mut count = 0;
     loop {
-        count += 1;
-        let save = if count >= 6 { count = 0; true } else { false };
-
         let delay_target = Instant::now() +  Duration::from_secs(10);
 
-        generate_host_stats(save).await;
+        generate_host_stats().await;
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
@@ -904,19 +900,19 @@ async fn run_stat_generator() {
 
 }
 
-fn rrd_update_gauge(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) {
+fn rrd_update_gauge(name: &str, value: f64) {
+    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge) {
         eprintln!("rrd::update_value '{}' failed - {}", name, err);
     }
 }
 
-fn rrd_update_derive(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) {
+fn rrd_update_derive(name: &str, value: f64) {
+    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive) {
         eprintln!("rrd::update_value '{}' failed - {}", name, err);
     }
 }
 
-async fn generate_host_stats(save: bool) {
+async fn generate_host_stats() {
     use proxmox::sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
 
@@ -924,8 +920,8 @@ async fn generate_host_stats(save: bool) {
 
         match read_proc_stat() {
             Ok(stat) => {
-                rrd_update_gauge("host/cpu", stat.cpu, save);
-                rrd_update_gauge("host/iowait", stat.iowait_percent, save);
+                rrd_update_gauge("host/cpu", stat.cpu);
+                rrd_update_gauge("host/iowait", stat.iowait_percent);
             }
             Err(err) => {
                 eprintln!("read_proc_stat failed - {}", err);
@@ -934,10 +930,10 @@ async fn generate_host_stats(save: bool) {
 
         match read_meminfo() {
             Ok(meminfo) => {
-                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save);
-                rrd_update_gauge("host/memused", meminfo.memused as f64, save);
-                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save);
-                rrd_update_gauge("host/swapused", meminfo.swapused as f64, save);
+                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+                rrd_update_gauge("host/memused", meminfo.memused as f64);
+                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+                rrd_update_gauge("host/swapused", meminfo.swapused as f64);
             }
             Err(err) => {
                 eprintln!("read_meminfo failed - {}", err);
@@ -954,8 +950,8 @@ async fn generate_host_stats(save: bool) {
                     netin += item.receive;
                     netout += item.send;
                 }
-                rrd_update_derive("host/netin", netin as f64, save);
-                rrd_update_derive("host/netout", netout as f64, save);
+                rrd_update_derive("host/netin", netin as f64);
+                rrd_update_derive("host/netout", netout as f64);
             }
             Err(err) => {
                 eprintln!("read_prox_net_dev failed - {}", err);
@@ -964,7 +960,7 @@ async fn generate_host_stats(save: bool) {
 
         match read_loadavg() {
             Ok(loadavg) => {
-                rrd_update_gauge("host/loadavg", loadavg.0 as f64, save);
+                rrd_update_gauge("host/loadavg", loadavg.0 as f64);
             }
             Err(err) => {
                 eprintln!("read_loadavg failed - {}", err);
@@ -973,7 +969,7 @@ async fn generate_host_stats(save: bool) {
 
         let disk_manager = DiskManage::new();
 
-        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save);
+        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
 
         match pbs_config::datastore::config() {
             Ok((config, _)) => {
@@ -984,7 +980,7 @@ async fn generate_host_stats(save: bool) {
 
                     let rrd_prefix = format!("datastore/{}", config.name);
                     let path = std::path::Path::new(&config.path);
-                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save);
+                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
                 }
             }
             Err(err) => {
@@ -1025,14 +1021,14 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str, save: bool) {
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
 
     match proxmox_backup::tools::disks::disk_usage(path) {
         Ok(status) => {
             let rrd_key = format!("{}/total", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.total as f64, save);
+            rrd_update_gauge(&rrd_key, status.total as f64);
             let rrd_key = format!("{}/used", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.used as f64, save);
+            rrd_update_gauge(&rrd_key, status.used as f64);
         }
         Err(err) => {
             eprintln!("read disk_usage on {:?} failed - {}", path, err);
@@ -1064,17 +1060,17 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
             }
             if let Some(stat) = device_stat {
                 let rrd_key = format!("{}/read_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.read_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.read_ios as f64);
                 let rrd_key = format!("{}/read_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
 
                 let rrd_key = format!("{}/write_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.write_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.write_ios as f64);
                 let rrd_key = format!("{}/write_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
 
                 let rrd_key = format!("{}/io_ticks", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save);
+                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
             }
         }
         Err(err) => {
diff --git a/src/lib.rs b/src/lib.rs
index 98b6b987..5d2b4590 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -51,10 +51,13 @@ lazy_static::lazy_static!{
             .owner(backup_user.uid)
             .group(backup_user.gid);
 
+        let apply_interval = 30.0*60.0; // 30 minutes
+
         RRDCache::new(
             "/var/lib/proxmox-backup/rrdb",
             Some(file_options),
             Some(dir_options),
-        )
+            apply_interval,
+        ).unwrap()
     };
 }
-- 
2.30.2






More information about the pbs-devel mailing list