[pbs-devel] [PATCH proxmox-backup 04/15] proxmox-rrd: implement new CBOR based format

Dietmar Maurer dietmar at proxmox.com
Wed Oct 13 10:24:41 CEST 2021


Storing much more data points now got get better graphs.
---
 proxmox-rrd-api-types/src/lib.rs |  23 +-
 proxmox-rrd/Cargo.toml           |   4 +
 proxmox-rrd/src/cache.rs         |  51 ++-
 proxmox-rrd/src/lib.rs           |  19 +-
 proxmox-rrd/src/rrd.rs           | 534 +++++++++++++++----------------
 proxmox-rrd/src/rrd_v1.rs        | 296 +++++++++++++++++
 src/api2/node/rrd.rs             |  39 ++-
 src/api2/status.rs               |   4 +-
 src/bin/proxmox-backup-proxy.rs  |   2 +-
 9 files changed, 641 insertions(+), 331 deletions(-)
 create mode 100644 proxmox-rrd/src/rrd_v1.rs

diff --git a/proxmox-rrd-api-types/src/lib.rs b/proxmox-rrd-api-types/src/lib.rs
index b5e62e73..32601477 100644
--- a/proxmox-rrd-api-types/src/lib.rs
+++ b/proxmox-rrd-api-types/src/lib.rs
@@ -14,19 +14,20 @@ pub enum RRDMode {
 }
 
 #[api()]
-#[repr(u64)]
 #[derive(Copy, Clone, Serialize, Deserialize)]
 #[serde(rename_all = "lowercase")]
 /// RRD time frame resolution
 pub enum RRDTimeFrameResolution {
-    ///  1 min => last 70 minutes
-    Hour = 60,
-    /// 30 min => last 35 hours
-    Day = 60*30,
-    /// 3 hours => about 8 days
-    Week = 60*180,
-    /// 12 hours => last 35 days
-    Month = 60*720,
-    /// 1 week => last 490 days
-    Year = 60*10080,
+    /// Hour
+    Hour,
+    /// Day
+    Day,
+    /// Week
+    Week,
+    /// Month
+    Month,
+    /// Year
+    Year,
+    /// Decade (10 years)
+    Decade,
 }
diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
index c66344ac..b3dd02c3 100644
--- a/proxmox-rrd/Cargo.toml
+++ b/proxmox-rrd/Cargo.toml
@@ -10,8 +10,12 @@ anyhow = "1.0"
 bitflags = "1.2.1"
 log = "0.4"
 nix = "0.19.1"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+serde_cbor = "0.11.1"
 
 proxmox = { version = "0.14.0" }
 proxmox-time = "1"
+proxmox-schema = { version = "1", features = [ "api-macro" ] }
 
 proxmox-rrd-api-types = { path = "../proxmox-rrd-api-types" }
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
index 7c56e047..f14837fc 100644
--- a/proxmox-rrd/src/cache.rs
+++ b/proxmox-rrd/src/cache.rs
@@ -13,7 +13,7 @@ use proxmox::tools::fs::{atomic_open_or_create_file, create_path, CreateOptions}
 
 use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
 
-use crate::{DST, rrd::RRD};
+use crate::rrd::{DST, CF, RRD, RRA};
 
 const RRD_JOURNAL_NAME: &str = "rrd.journal";
 
@@ -81,6 +81,29 @@ impl RRDCache {
         })
     }
 
+    fn create_default_rrd(dst: DST) -> RRD {
+
+        let mut rra_list = Vec::new();
+
+        // 1min * 1440 => 1day
+        rra_list.push(RRA::new(CF::Average, 60, 1440));
+        rra_list.push(RRA::new(CF::Maximum, 60, 1440));
+
+        // 30min * 1440 => 30days = 1month
+        rra_list.push(RRA::new(CF::Average, 30*60, 1440));
+        rra_list.push(RRA::new(CF::Maximum, 30*60, 1440));
+
+        // 6h * 1440 => 360days = 1year
+        rra_list.push(RRA::new(CF::Average, 6*3600, 1440));
+        rra_list.push(RRA::new(CF::Maximum, 6*3600, 1440));
+
+        // 1week * 570 => 10years
+        rra_list.push(RRA::new(CF::Average, 7*86400, 570));
+        rra_list.push(RRA::new(CF::Maximum, 7*86400, 570));
+
+        RRD::new(dst, rra_list)
+    }
+
     fn parse_journal_line(line: &str) -> Result<JournalEntry, Error> {
 
         let line = line.trim();
@@ -179,7 +202,7 @@ impl RRDCache {
                         if err.kind() != std::io::ErrorKind::NotFound {
                             log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
                         }
-                        RRD::new(entry.dst)
+                        Self::create_default_rrd(entry.dst)
                     },
                 };
                 if entry.time > get_last_update(&entry.rel_path, &rrd) {
@@ -246,7 +269,7 @@ impl RRDCache {
                     if err.kind() != std::io::ErrorKind::NotFound {
                         log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
                     }
-                    RRD::new(dst)
+                    Self::create_default_rrd(dst)
                 },
             };
             rrd.update(now, value);
@@ -264,13 +287,29 @@ impl RRDCache {
         now: f64,
         timeframe: RRDTimeFrameResolution,
         mode: RRDMode,
-    ) -> Option<(u64, u64, Vec<Option<f64>>)> {
+    ) -> Result<Option<(u64, u64, Vec<Option<f64>>)>, Error> {
 
         let state = self.state.read().unwrap();
 
+        let cf = match mode {
+            RRDMode::Max => CF::Maximum,
+            RRDMode::Average => CF::Average,
+        };
+
+        let now = now as u64;
+
+        let (start, resolution) = match timeframe {
+            RRDTimeFrameResolution::Hour => (now - 3600, 60),
+            RRDTimeFrameResolution::Day => (now - 3600*24, 60),
+            RRDTimeFrameResolution::Week => (now - 3600*24*7, 30*60),
+            RRDTimeFrameResolution::Month => (now - 3600*24*30, 30*60),
+            RRDTimeFrameResolution::Year => (now - 3600*24*365, 6*60*60),
+            RRDTimeFrameResolution::Decade => (now - 10*3600*24*366, 7*86400),
+        };
+
         match state.rrd_map.get(&format!("{}/{}", base, name)) {
-            Some(rrd) => Some(rrd.extract_data(now, timeframe, mode)),
-            None => None,
+            Some(rrd) => Ok(Some(rrd.extract_data(start, now, cf, resolution)?)),
+            None => Ok(None),
         }
     }
 }
diff --git a/proxmox-rrd/src/lib.rs b/proxmox-rrd/src/lib.rs
index d83e6ffc..2038170d 100644
--- a/proxmox-rrd/src/lib.rs
+++ b/proxmox-rrd/src/lib.rs
@@ -1,23 +1,14 @@
-//! # Simple Round Robin Database files with fixed format
+//! # Round Robin Database files
 //!
 //! ## Features
 //!
 //! * One file stores a single data source
-//! * Small/constant file size (6008 bytes)
-//! * Stores avarage and maximum values
-//! * Stores data for different time resolution ([RRDTimeFrameResolution](proxmox_rrd_api_types::RRDTimeFrameResolution))
+//! * Stores data for different time resolution
+//! * Simple cache implementation with journal support
+
+mod rrd_v1;
 
 pub mod rrd;
 
 mod cache;
 pub use cache::*;
-
-/// RRD data source tyoe
-#[repr(u8)]
-#[derive(Copy, Clone)]
-pub enum DST {
-    /// Gauge values are stored unmodified.
-    Gauge = 0,
-    /// Stores the difference to the previous value.
-    Derive = 1,
-}
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
index 026498ed..82fa5a3a 100644
--- a/proxmox-rrd/src/rrd.rs
+++ b/proxmox-rrd/src/rrd.rs
@@ -1,82 +1,175 @@
-//! # Round Robin Database file format
+//! # Proxmox RRD format version 2
+//!
+//! The new format uses
+//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
+//! format. This way we can use the serde serialization framework,
+//! which make our code more flexible, much nicer and type safe.
+//!
+//! ## Features
+//!
+//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
+//! * Plattform independent (big endian f64, hopefully a standard format?)
+//! * Arbitrary number of RRAs (dynamically changeable)
 
-use std::io::Read;
 use std::path::Path;
 
 use anyhow::{bail, Error};
-use bitflags::bitflags;
 
-use proxmox::tools::{fs::replace_file, fs::CreateOptions};
+use serde::{Serialize, Deserialize};
+
+use proxmox::tools::fs::{replace_file, CreateOptions};
+use proxmox_schema::api;
+
+use crate::rrd_v1;
+
+/// Proxmox RRD v2 file magic number
+// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
+pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
+
+#[api()]
+#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+/// RRD data source type
+pub enum DST {
+    /// Gauge values are stored unmodified.
+    Gauge,
+    /// Stores the difference to the previous value.
+    Derive,
+    /// Stores the difference to the previous value (like Derive), but
+    /// detect counter overflow (and ignores that value)
+    Counter,
+}
+
+#[api()]
+#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+/// Consolidation function
+pub enum CF {
+    /// Average
+    Average,
+    /// Maximum
+    Maximum,
+    /// Minimum
+    Minimum,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct DataSource {
+    /// Data source type
+    pub dst: DST,
+    /// Last update time (epoch)
+    pub last_update: f64,
+    /// Stores the last value, used to compute differential value for
+    /// derive/counters
+    pub counter_value: f64,
+}
 
-use proxmox_rrd_api_types::{RRDMode, RRDTimeFrameResolution};
+impl DataSource {
 
-/// The number of data entries per RRA
-pub const RRD_DATA_ENTRIES: usize = 70;
+    pub fn new(dst: DST) -> Self {
+        Self {
+            dst,
+            last_update: 0.0,
+            counter_value: f64::NAN,
+        }
+    }
 
-/// Proxmox RRD file magic number
-// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
-pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] =  [206, 46, 26, 212, 172, 158, 5, 186];
+    fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result<f64, Error> {
+        if time <= self.last_update {
+            bail!("time in past ({} < {})", time, self.last_update);
+        }
 
-use crate::DST;
+        if value.is_nan() {
+            bail!("new value is NAN");
+        }
 
-bitflags!{
-    /// Flags to specify the data soure type and consolidation function
-    pub struct RRAFlags: u64 {
-        // Data Source Types
-        const DST_GAUGE  = 1;
-        const DST_DERIVE = 2;
-        const DST_COUNTER = 4;
-        const DST_MASK   = 255; // first 8 bits
+        // derive counter value
+        let is_counter = self.dst == DST::Counter;
+
+        if is_counter || self.dst == DST::Derive {
+            let time_diff = time - self.last_update;
+
+            let diff = if self.counter_value.is_nan() {
+                0.0
+            } else if is_counter && value < 0.0 {
+                bail!("got negative value for counter");
+            } else if is_counter && value < self.counter_value {
+                // Note: We do not try automatic overflow corrections, but
+                // we update counter_value anyways, so that we can compute the diff
+                // next time.
+                self.counter_value = value;
+                bail!("conter overflow/reset detected");
+            } else {
+                value - self.counter_value
+            };
+            self.counter_value = value;
+            value = diff/time_diff;
+        }
 
-        // Consolidation Functions
-        const CF_AVERAGE = 1 << 8;
-        const CF_MAX     = 2 << 8;
-        const CF_MASK    = 255 << 8;
+        Ok(value)
     }
+
+
 }
 
-/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots.
-///
-/// This data structure is used inside [RRD] and directly written to the
-/// RRD files.
-#[repr(C)]
+#[derive(Serialize, Deserialize)]
 pub struct RRA {
-    /// Defined the data soure type and consolidation function
-    pub flags: RRAFlags,
-    /// Resulution (seconds) from [RRDTimeFrameResolution]
     pub resolution: u64,
-    /// Last update time (epoch)
-    pub last_update: f64,
+    pub cf: CF,
     /// Count values computed inside this update interval
     pub last_count: u64,
-    /// Stores the last value, used to compute differential value for derive/counters
-    pub counter_value: f64,
-    /// Data slots
-    pub data: [f64; RRD_DATA_ENTRIES],
+    /// The actual data
+    pub data: Vec<f64>,
 }
 
 impl RRA {
-    fn new(flags: RRAFlags, resolution: u64) -> Self {
+
+    pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
         Self {
-            flags, resolution,
-            last_update: 0.0,
+            cf,
+            resolution,
             last_count: 0,
-            counter_value: f64::NAN,
-            data: [f64::NAN; RRD_DATA_ENTRIES],
+            data: vec![f64::NAN; points],
+        }
+    }
+
+    // directly overwrite data slots
+    // the caller need to set last_update value on the DataSource manually.
+    pub(crate) fn insert_data(
+        &mut self,
+        start: u64,
+        resolution: u64,
+        data: Vec<Option<f64>>,
+    ) -> Result<(), Error> {
+        if resolution != self.resolution {
+            bail!("inser_data failed: got wrong resolution");
+        }
+        let num_entries = self.data.len() as u64;
+        let mut index = ((start/self.resolution) % num_entries) as usize;
+
+        for i in 0..data.len() {
+            if let Some(v) = data[i] {
+                self.data[index] = v;
+            }
+            index += 1;
+            if index >= self.data.len() { index = 0; }
         }
+        Ok(())
     }
 
-    fn delete_old(&mut self, time: f64) {
+    fn delete_old_slots(&mut self, time: f64, last_update: f64) {
         let epoch = time as u64;
-        let last_update = self.last_update as u64;
+        let last_update = last_update as u64;
         let reso = self.resolution;
+        let num_entries = self.data.len() as u64;
 
-        let min_time = epoch - (RRD_DATA_ENTRIES as u64)*reso;
+        let min_time = epoch - num_entries*reso;
         let min_time = (min_time/reso + 1)*reso;
-        let mut t = last_update.saturating_sub((RRD_DATA_ENTRIES as u64)*reso);
-        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        for _ in 0..RRD_DATA_ENTRIES {
-            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        let mut t = last_update.saturating_sub(num_entries*reso);
+        let mut index = ((t/reso) % num_entries) as usize;
+        for _ in 0..num_entries {
+            t += reso;
+            index = (index + 1) % (num_entries as usize);
             if t < min_time {
                 self.data[index] = f64::NAN;
             } else {
@@ -85,13 +178,14 @@ impl RRA {
         }
     }
 
-    fn compute_new_value(&mut self, time: f64, value: f64) {
+    fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) {
         let epoch = time as u64;
-        let last_update = self.last_update as u64;
+        let last_update = last_update as u64;
         let reso = self.resolution;
+        let num_entries = self.data.len() as u64;
 
-        let index = ((epoch/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        let last_index = ((last_update/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        let index = ((epoch/reso) % num_entries) as usize;
+        let last_index = ((last_update/reso) % num_entries) as usize;
 
         if (epoch - (last_update as u64)) > reso || index != last_index {
             self.last_count = 0;
@@ -112,258 +206,111 @@ impl RRA {
             self.data[index] = value;
             self.last_count = 1;
         } else {
-            let new_value = if self.flags.contains(RRAFlags::CF_MAX) {
-                if last_value > value { last_value } else { value }
-            } else if self.flags.contains(RRAFlags::CF_AVERAGE) {
-                (last_value*(self.last_count as f64))/(new_count as f64)
-                    + value/(new_count as f64)
-            } else {
-                log::error!("rrdb update failed - unknown CF");
-                return;
+            let new_value = match self.cf {
+                CF::Maximum => if last_value > value { last_value } else { value },
+                CF::Minimum => if last_value < value { last_value } else { value },
+                CF::Average => {
+                    (last_value*(self.last_count as f64))/(new_count as f64)
+                        + value/(new_count as f64)
+                }
             };
             self.data[index] = new_value;
             self.last_count = new_count;
         }
-        self.last_update = time;
     }
 
-    // Note: This may update the state even in case of errors (see counter overflow)
-    fn update(&mut self, time: f64, mut value: f64) -> Result<(), Error> {
-
-        if time <= self.last_update {
-            bail!("time in past ({} < {})", time, self.last_update);
-        }
-
-        if value.is_nan() {
-            bail!("new value is NAN");
-        }
-
-        // derive counter value
-        if self.flags.intersects(RRAFlags::DST_DERIVE | RRAFlags::DST_COUNTER) {
-            let time_diff = time - self.last_update;
-            let is_counter = self.flags.contains(RRAFlags::DST_COUNTER);
-
-            let diff = if self.counter_value.is_nan() {
-                0.0
-            } else if is_counter && value < 0.0 {
-                bail!("got negative value for counter");
-            } else if is_counter && value < self.counter_value {
-                // Note: We do not try automatic overflow corrections, but
-                // we update counter_value anyways, so that we can compute the diff
-                // next time.
-                self.counter_value = value;
-                bail!("conter overflow/reset detected");
-            } else {
-                value - self.counter_value
-            };
-            self.counter_value = value;
-            value = diff/time_diff;
-        }
-
-        self.delete_old(time);
-        self.compute_new_value(time, value);
-
-        Ok(())
-    }
-}
-
-/// Round Robin Database file format with fixed number of [RRA]s
-#[repr(C)]
-// Note: Avoid alignment problems by using 8byte types only
-pub struct RRD {
-    /// The magic number to identify the file type
-    pub magic: [u8; 8],
-    /// Hourly data (average values)
-    pub hour_avg: RRA,
-    /// Hourly data (maximum values)
-    pub hour_max: RRA,
-    /// Dayly data (average values)
-    pub day_avg: RRA,
-    /// Dayly data (maximum values)
-    pub day_max: RRA,
-    /// Weekly data (average values)
-    pub week_avg: RRA,
-    /// Weekly data (maximum values)
-    pub week_max: RRA,
-    /// Monthly data (average values)
-    pub month_avg: RRA,
-    /// Monthly data (maximum values)
-    pub month_max: RRA,
-    /// Yearly data (average values)
-    pub year_avg: RRA,
-    /// Yearly data (maximum values)
-    pub year_max: RRA,
-}
-
-impl RRD {
-
-    /// Create a new empty instance
-    pub fn new(dst: DST) -> Self {
-        let flags = match dst {
-            DST::Gauge => RRAFlags::DST_GAUGE,
-            DST::Derive => RRAFlags::DST_DERIVE,
-        };
-
-        Self {
-            magic: PROXMOX_RRD_MAGIC_1_0,
-            hour_avg: RRA::new(
-                flags | RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Hour as u64,
-            ),
-            hour_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Hour as u64,
-            ),
-            day_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Day as u64,
-            ),
-            day_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Day as u64,
-            ),
-            week_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Week as u64,
-            ),
-            week_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Week as u64,
-            ),
-            month_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Month as u64,
-            ),
-            month_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Month as u64,
-            ),
-            year_avg: RRA::new(
-                flags |  RRAFlags::CF_AVERAGE,
-                RRDTimeFrameResolution::Year as u64,
-            ),
-            year_max: RRA::new(
-                flags |  RRAFlags::CF_MAX,
-                RRDTimeFrameResolution::Year as u64,
-            ),
-        }
-    }
-
-    /// Extract data from the archive
-    pub fn extract_data(
+    fn extract_data(
         &self,
-        time: f64,
-        timeframe: RRDTimeFrameResolution,
-        mode: RRDMode,
+        start: u64,
+        end: u64,
+        last_update: f64,
     ) -> (u64, u64, Vec<Option<f64>>) {
-
-        let epoch = time as u64;
-        let reso = timeframe as u64;
-
-        let end = reso*(epoch/reso + 1);
-        let start = end - reso*(RRD_DATA_ENTRIES as u64);
+        let last_update = last_update as u64;
+        let reso = self.resolution;
+        let num_entries = self.data.len() as u64;
 
         let mut list = Vec::new();
 
-        let raa = match (mode, timeframe) {
-            (RRDMode::Average, RRDTimeFrameResolution::Hour) => &self.hour_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Hour) => &self.hour_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Day) => &self.day_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Day) => &self.day_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Week) => &self.week_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Week) => &self.week_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Month) => &self.month_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Month) => &self.month_max,
-            (RRDMode::Average, RRDTimeFrameResolution::Year) => &self.year_avg,
-            (RRDMode::Max, RRDTimeFrameResolution::Year) => &self.year_max,
-        };
-
-        let rrd_end = reso*((raa.last_update as u64)/reso);
-        let rrd_start = rrd_end - reso*(RRD_DATA_ENTRIES as u64);
+        let rrd_end = reso*(last_update/reso);
+        let rrd_start = rrd_end.saturating_sub(reso*num_entries);
 
         let mut t = start;
-        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
-        for _ in 0..RRD_DATA_ENTRIES {
+        let mut index = ((t/reso) % num_entries) as usize;
+        for _ in 0..num_entries {
+            if t > end { break; };
             if t < rrd_start || t > rrd_end {
                 list.push(None);
             } else {
-                let value = raa.data[index];
+                let value = self.data[index];
                 if value.is_nan() {
                     list.push(None);
                 } else {
                     list.push(Some(value));
                 }
             }
-            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+            t += reso; index = (index + 1) % (num_entries as usize);
         }
 
         (start, reso, list)
     }
+}
 
-    /// Create instance from raw data, testing data len and magic number
-    pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
-        let expected_len = std::mem::size_of::<RRD>();
-        if raw.len() != expected_len {
-            let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
-        }
+#[derive(Serialize, Deserialize)]
+pub struct RRD {
+    pub source: DataSource,
+    pub rra_list: Vec<RRA>,
+}
 
-        let mut rrd: RRD = unsafe { std::mem::zeroed() };
-        unsafe {
-            let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
-            raw.read_exact(rrd_slice)?;
-        }
+impl RRD {
 
-        if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
-            let msg = "wrong magic number".to_string();
-            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+    pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
+
+        let source = DataSource::new(dst);
+
+        RRD {
+            source,
+            rra_list,
         }
 
-        Ok(rrd)
     }
 
     /// Load data from a file
     pub fn load(path: &Path) -> Result<Self, std::io::Error> {
         let raw = std::fs::read(path)?;
-        Self::from_raw(&raw)
+        if raw.len() < 8 {
+            let msg = format!("not an rrd file - file is too small ({})", raw.len());
+            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        }
+
+        if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
+            let v1 = rrd_v1::RRDv1::from_raw(&raw)?;
+            v1.to_rrd_v2()
+                .map_err(|err| {
+                    let msg = format!("unable to convert from old V1 format - {}", err);
+                    std::io::Error::new(std::io::ErrorKind::Other, msg)
+                })
+        } else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
+            serde_cbor::from_slice(&raw[8..])
+                .map_err(|err| {
+                    let msg = format!("unable to decode RRD file - {}", err);
+                    std::io::Error::new(std::io::ErrorKind::Other, msg)
+                })
+         } else {
+            let msg = format!("not an rrd file - unknown magic number");
+            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        }
     }
 
     /// Store data into a file (atomic replace file)
     pub fn save(&self, filename: &Path, options: CreateOptions) -> Result<(), Error> {
-        let rrd_slice = unsafe {
-            std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
-        };
-        replace_file(filename, rrd_slice, options)
+        let mut data: Vec<u8> = Vec::new();
+        data.extend(&PROXMOX_RRD_MAGIC_2_0);
+        serde_cbor::to_writer(&mut data, self)?;
+        replace_file(filename, &data, options)
     }
 
     pub fn last_update(&self) -> f64 {
-
-        let mut last_update = 0.0;
-
-        {
-            let mut check_last_update = |rra: &RRA| {
-                if rra.last_update > last_update {
-                    last_update = rra.last_update;
-                }
-            };
-
-            check_last_update(&self.hour_avg);
-            check_last_update(&self.hour_max);
-
-            check_last_update(&self.day_avg);
-            check_last_update(&self.day_max);
-
-            check_last_update(&self.week_avg);
-            check_last_update(&self.week_max);
-
-            check_last_update(&self.month_avg);
-            check_last_update(&self.month_max);
-
-            check_last_update(&self.year_avg);
-            check_last_update(&self.year_max);
-        }
-
-        last_update
+        self.source.last_update
     }
 
     /// Update the value (in memory)
@@ -371,32 +318,53 @@ impl RRD {
     /// Note: This does not call [Self::save].
     pub fn update(&mut self, time: f64, value: f64) {
 
-        let mut log_error = true;
-
-        let mut update_rra = |rra: &mut RRA| {
-            if let Err(err) = rra.update(time, value) {
-                if log_error {
-                    log::error!("rrd update failed: {}", err);
-                    // we only log the first error, because it is very
-                    // likely other calls produce the same error
-                    log_error = false;
-                }
+        let value = match self.source.compute_new_value(time, value) {
+            Ok(value) => value,
+            Err(err) => {
+                log::error!("rrd update failed: {}", err);
+                return;
             }
         };
 
-        update_rra(&mut self.hour_avg);
-        update_rra(&mut self.hour_max);
-
-        update_rra(&mut self.day_avg);
-        update_rra(&mut self.day_max);
+        let last_update = self.source.last_update;
+        self.source.last_update = time;
 
-        update_rra(&mut self.week_avg);
-        update_rra(&mut self.week_max);
+        for rra in self.rra_list.iter_mut() {
+            rra.delete_old_slots(time, last_update);
+            rra.compute_new_value(time, last_update, value);
+        }
+    }
 
-        update_rra(&mut self.month_avg);
-        update_rra(&mut self.month_max);
+    /// Extract data from the archive
+    ///
+    /// This selects the RRA with specified [CF] and (minimum)
+    /// resolution, and extract data from `start` to `end`.
+    pub fn extract_data(
+        &self,
+        start: u64,
+        end: u64,
+        cf: CF,
+        resolution: u64,
+    ) -> Result<(u64, u64, Vec<Option<f64>>), Error> {
+
+        let mut rra: Option<&RRA> = None;
+        for item in self.rra_list.iter() {
+            if item.cf != cf { continue; }
+            if item.resolution > resolution { continue; }
+
+            if let Some(current) = rra {
+                if item.resolution > current.resolution {
+                    rra = Some(item);
+                }
+            } else {
+                rra = Some(item);
+            }
+        }
 
-        update_rra(&mut self.year_avg);
-        update_rra(&mut self.year_max);
+        match rra {
+            Some(rra) => Ok(rra.extract_data(start, end, self.source.last_update)),
+            None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
+        }
     }
+
 }
diff --git a/proxmox-rrd/src/rrd_v1.rs b/proxmox-rrd/src/rrd_v1.rs
new file mode 100644
index 00000000..919896f0
--- /dev/null
+++ b/proxmox-rrd/src/rrd_v1.rs
@@ -0,0 +1,296 @@
+use std::io::Read;
+
+use anyhow::Error;
+use bitflags::bitflags;
+
+/// The number of data entries per RRA
+pub const RRD_DATA_ENTRIES: usize = 70;
+
+/// Proxmox RRD file magic number
+// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
+pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] =  [206, 46, 26, 212, 172, 158, 5, 186];
+
+use crate::rrd::{RRD, RRA, CF, DST, DataSource};
+
+bitflags!{
+    /// Flags to specify the data soure type and consolidation function
+    pub struct RRAFlags: u64 {
+        // Data Source Types
+        const DST_GAUGE  = 1;
+        const DST_DERIVE = 2;
+        const DST_COUNTER = 4;
+        const DST_MASK   = 255; // first 8 bits
+
+        // Consolidation Functions
+        const CF_AVERAGE = 1 << 8;
+        const CF_MAX     = 2 << 8;
+        const CF_MASK    = 255 << 8;
+    }
+}
+
+/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots.
+///
+/// This data structure is used inside [RRD] and directly written to the
+/// RRD files.
+#[repr(C)]
+pub struct RRAv1 {
+    /// Defined the data soure type and consolidation function
+    pub flags: RRAFlags,
+    /// Resulution (seconds) from [RRDTimeFrameResolution]
+    pub resolution: u64,
+    /// Last update time (epoch)
+    pub last_update: f64,
+    /// Count values computed inside this update interval
+    pub last_count: u64,
+    /// Stores the last value, used to compute differential value for derive/counters
+    pub counter_value: f64,
+    /// Data slots
+    pub data: [f64; RRD_DATA_ENTRIES],
+}
+
+impl RRAv1 {
+
+    fn extract_data(
+        &self,
+    ) -> (u64, u64, Vec<Option<f64>>) {
+        let reso = self.resolution;
+
+        let mut list = Vec::new();
+
+        let rra_end = reso*((self.last_update as u64)/reso);
+        let rra_start = rra_end - reso*(RRD_DATA_ENTRIES as u64);
+
+        let mut t = rra_start;
+        let mut index = ((t/reso) % (RRD_DATA_ENTRIES as u64)) as usize;
+        for _ in 0..RRD_DATA_ENTRIES {
+            let value = self.data[index];
+            if value.is_nan() {
+                list.push(None);
+            } else {
+                list.push(Some(value));
+            }
+
+            t += reso; index = (index + 1) % RRD_DATA_ENTRIES;
+        }
+
+        (rra_start, reso, list)
+    }
+}
+
+/// Round Robin Database file format with fixed number of [RRA]s
+#[repr(C)]
+// Note: Avoid alignment problems by using 8byte types only
+pub struct RRDv1 {
+    /// The magic number to identify the file type
+    pub magic: [u8; 8],
+    /// Hourly data (average values)
+    pub hour_avg: RRAv1,
+    /// Hourly data (maximum values)
+    pub hour_max: RRAv1,
+    /// Dayly data (average values)
+    pub day_avg: RRAv1,
+    /// Dayly data (maximum values)
+    pub day_max: RRAv1,
+    /// Weekly data (average values)
+    pub week_avg: RRAv1,
+    /// Weekly data (maximum values)
+    pub week_max: RRAv1,
+    /// Monthly data (average values)
+    pub month_avg: RRAv1,
+    /// Monthly data (maximum values)
+    pub month_max: RRAv1,
+    /// Yearly data (average values)
+    pub year_avg: RRAv1,
+    /// Yearly data (maximum values)
+    pub year_max: RRAv1,
+}
+
+impl RRDv1 {
+
+    pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
+
+        let expected_len = std::mem::size_of::<RRDv1>();
+
+        if raw.len() != expected_len {
+            let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
+            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        }
+
+        let mut rrd: RRDv1 = unsafe { std::mem::zeroed() };
+        unsafe {
+            let rrd_slice = std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
+            raw.read_exact(rrd_slice)?;
+        }
+
+        if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
+            let msg = "wrong magic number".to_string();
+            return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        }
+
+        Ok(rrd)
+    }
+
+    pub fn to_rrd_v2(&self) -> Result<RRD, Error> {
+
+        let mut rra_list = Vec::new();
+
+        // old format v1:
+        //
+        // hour      1 min,   70 points
+        // day      30 min,   70 points
+        // week      3 hours, 70 points
+        // month    12 hours, 70 points
+        // year      1 week,  70 points
+        //
+        // new default for RRD v2:
+        //
+        // day      1 min,      1440 points
+        // month   30 min,      1440 points
+        // year   365 min (6h), 1440 points
+        // decade   1 week,      570 points
+
+        // Linear extrapolation
+        fn extrapolate_data(start: u64, reso: u64, factor: u64, data: Vec<Option<f64>>) -> (u64, u64, Vec<Option<f64>>) {
+
+            let mut new = Vec::new();
+
+            for i in 0..data.len() {
+                let mut next = i + 1;
+                if next >= data.len() { next = 0 };
+                let v = data[i];
+                let v1 = data[next];
+                match (v, v1) {
+                    (Some(v), Some(v1)) => {
+                        let diff = (v1 - v)/(factor as f64);
+                        for j in 0..factor {
+                            new.push(Some(v + diff*(j as f64)));
+                        }
+                    }
+                    (Some(v), None) => {
+                        new.push(Some(v));
+                        for _ in 0..factor-1 {
+                            new.push(None);
+                        }
+                    }
+                    (None, Some(v1)) => {
+                        for _ in 0..factor-1 {
+                            new.push(None);
+                        }
+                        new.push(Some(v1));
+                    }
+                    (None, None) => {
+                        for _ in 0..factor {
+                            new.push(None);
+                        }
+                    }
+                }
+            }
+
+            (start, reso/factor, new)
+        }
+
+        // Try to convert to new, higher capacity format
+
+        // compute daily average (merge old self.day_avg and self.hour_avg
+        let mut day_avg = RRA::new(CF::Average, 60, 1440);
+
+        let (start, reso, data) = self.day_avg.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 30, data);
+        day_avg.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.hour_avg.extract_data();
+        day_avg.insert_data(start, reso, data)?;
+
+        // compute daily maximum (merge old self.day_max and self.hour_max
+        let mut day_max = RRA::new(CF::Maximum, 60, 1440);
+
+        let (start, reso, data) = self.day_max.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 30, data);
+        day_max.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.hour_max.extract_data();
+        day_max.insert_data(start, reso, data)?;
+
+        // compute montly average (merge old self.month_avg,
+        // self.week_avg and self.day_avg)
+        let mut month_avg = RRA::new(CF::Average, 30*60, 1440);
+
+        let (start, reso, data) = self.month_avg.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 24, data);
+        month_avg.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.week_avg.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 6, data);
+        month_avg.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.day_avg.extract_data();
+        month_avg.insert_data(start, reso, data)?;
+
+        // compute montly maximum (merge old self.month_max,
+        // self.week_max and self.day_max)
+        let mut month_max = RRA::new(CF::Maximum, 30*60, 1440);
+
+        let (start, reso, data) = self.month_max.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 24, data);
+        month_max.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.week_max.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 6, data);
+        month_max.insert_data(start, reso, data)?;
+
+        let (start, reso, data) = self.day_max.extract_data();
+        month_max.insert_data(start, reso, data)?;
+
+        // compute yearly average (merge old self.year_avg)
+        let mut year_avg = RRA::new(CF::Average, 6*3600, 1440);
+
+        let (start, reso, data) = self.year_avg.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 28, data);
+        year_avg.insert_data(start, reso, data)?;
+
+        // compute yearly maximum (merge old self.year_avg)
+        let mut year_max = RRA::new(CF::Maximum, 6*3600, 1440);
+
+        let (start, reso, data) = self.year_max.extract_data();
+        let (start, reso, data) = extrapolate_data(start, reso, 28, data);
+        year_max.insert_data(start, reso, data)?;
+
+        // compute decade average (merge old self.year_avg)
+        let mut decade_avg = RRA::new(CF::Average, 7*86400, 570);
+        let (start, reso, data) = self.year_avg.extract_data();
+        decade_avg.insert_data(start, reso, data)?;
+
+        // compute decade maximum (merge old self.year_max)
+        let mut decade_max = RRA::new(CF::Maximum, 7*86400, 570);
+        let (start, reso, data) = self.year_max.extract_data();
+        decade_max.insert_data(start, reso, data)?;
+
+        rra_list.push(day_avg);
+        rra_list.push(day_max);
+        rra_list.push(month_avg);
+        rra_list.push(month_max);
+        rra_list.push(year_avg);
+        rra_list.push(year_max);
+        rra_list.push(decade_avg);
+        rra_list.push(decade_max);
+
+        // use values from hour_avg for source (all RRAv1 must have the same config)
+        let dst = if self.hour_avg.flags.contains(RRAFlags::DST_COUNTER) {
+            DST::Counter
+        } else if self.hour_avg.flags.contains(RRAFlags::DST_DERIVE) {
+            DST::Derive
+        } else {
+            DST::Gauge
+        };
+
+        let source = DataSource {
+            dst,
+            counter_value: f64::NAN,
+            last_update:  self.hour_avg.last_update, // IMPORTANT!
+        };
+        Ok(RRD {
+            source,
+            rra_list,
+        })
+    }
+}
diff --git a/src/api2/node/rrd.rs b/src/api2/node/rrd.rs
index b53076d7..4fe18f46 100644
--- a/src/api2/node/rrd.rs
+++ b/src/api2/node/rrd.rs
@@ -1,5 +1,6 @@
-use anyhow::Error;
+use anyhow::{bail, Error};
 use serde_json::{Value, json};
+use std::collections::BTreeMap;
 
 use proxmox_router::{Permission, Router};
 use proxmox_schema::api;
@@ -8,8 +9,6 @@ use pbs_api_types::{
     NODE_SCHEMA, RRDMode, RRDTimeFrameResolution, PRIV_SYS_AUDIT,
 };
 
-use proxmox_rrd::rrd::RRD_DATA_ENTRIES;
-
 use crate::get_rrd_cache;
 
 pub fn create_value_from_rrd(
@@ -19,32 +18,44 @@ pub fn create_value_from_rrd(
     cf: RRDMode,
 ) -> Result<Value, Error> {
 
-    let mut result = Vec::new();
+    let mut result: Vec<Value> = Vec::new();
     let now = proxmox_time::epoch_f64();
 
     let rrd_cache = get_rrd_cache()?;
 
+    let mut timemap = BTreeMap::new();
+
+    let mut last_resolution = None;
+
     for name in list {
-        let (start, reso, list) = match rrd_cache.extract_cached_data(basedir, name, now, timeframe, cf) {
+        let (start, reso, data) = match rrd_cache.extract_cached_data(basedir, name, now, timeframe, cf)? {
             Some(result) => result,
             None => continue,
         };
 
+        if let Some(expected_resolution) = last_resolution  {
+            if reso != expected_resolution {
+                bail!("got unexpected RRD resolution ({} != {})", reso, expected_resolution);
+            }
+        } else {
+            last_resolution = Some(reso);
+        }
+
         let mut t = start;
-        for index in 0..RRD_DATA_ENTRIES {
-            if result.len() <= index {
-                if let Some(value) = list[index] {
-                    result.push(json!({ "time": t, *name: value }));
-                } else {
-                    result.push(json!({ "time": t }));
-                }
-            } else if let Some(value) = list[index] {
-                result[index][name] = value.into();
+
+        for index in 0..data.len() {
+            let entry = timemap.entry(t).or_insert(json!({ "time": t }));
+            if let Some(value) = data[index] {
+                entry[*name] = value.into();
             }
             t += reso;
         }
     }
 
+    for item in timemap.values() {
+        result.push(item.clone());
+    }
+
     Ok(result.into())
 }
 
diff --git a/src/api2/status.rs b/src/api2/status.rs
index 40a9e5b4..2476fe97 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -134,8 +134,8 @@ pub fn datastore_status(
             RRDMode::Average,
         );
 
-        let total_res = get_rrd("total");
-        let used_res = get_rrd("used");
+        let total_res = get_rrd("total")?;
+        let used_res = get_rrd("used")?;
 
         if let (Some((start, reso, total_list)), Some((_, _, used_list))) = (total_res, used_res) {
             let mut usage_list: Vec<f64> = Vec::new();
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 0f0f6f59..2abc3eaa 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -24,7 +24,7 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 
 use pbs_tools::{task_log, task_warn};
 use pbs_datastore::DataStore;
-use proxmox_rrd::DST;
+use proxmox_rrd::rrd::DST;
 
 use proxmox_rest_server::{
     rotate_task_log_archive, extract_cookie , AuthError, ApiConfig, RestServer, RestEnvironment,
-- 
2.30.2






More information about the pbs-devel mailing list