[pbs-devel] [PATCH proxmox-backup] proxmox-rrd: define and implement new ZSTD compressed format

Dietmar Maurer dietmar at proxmox.com
Fri Oct 8 12:33:57 CEST 2021


This further reduce the amount of data written.
---
 proxmox-rrd/Cargo.toml   |  1 +
 proxmox-rrd/src/cache.rs | 38 ++++++++++++++++---------------
 proxmox-rrd/src/rrd.rs   | 49 ++++++++++++++++++++++++++++++++++++----
 3 files changed, 65 insertions(+), 23 deletions(-)

diff --git a/proxmox-rrd/Cargo.toml b/proxmox-rrd/Cargo.toml
index 1c75b2e1..b095bd6f 100644
--- a/proxmox-rrd/Cargo.toml
+++ b/proxmox-rrd/Cargo.toml
@@ -10,6 +10,7 @@ anyhow = "1.0"
 bitflags = "1.2.1"
 log = "0.4"
 nix = "0.19.1"
+zstd = { version = "0.6", features = [ "bindgen" ] }
 
 proxmox = { version = "0.13.5", features = ["api-macro"] }
 
diff --git a/proxmox-rrd/src/cache.rs b/proxmox-rrd/src/cache.rs
index 57cbb394..bf075161 100644
--- a/proxmox-rrd/src/cache.rs
+++ b/proxmox-rrd/src/cache.rs
@@ -125,6 +125,21 @@ impl RRDCache {
         self.apply_journal_locked(&mut state)
     }
 
+    fn load_rrd(path: &Path, dst: DST) -> RRD {
+        match RRD::load(path) {
+            Ok(mut rrd) => {
+                rrd.set_compression(true);
+                rrd
+            }
+            Err(err) => {
+                if err.kind() != std::io::ErrorKind::NotFound {
+                    log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
+                }
+                RRD::new(dst, true)
+            },
+        }
+    }
+
     fn apply_journal_locked(&self, state: &mut RRDCacheState) -> Result<(), Error> {
 
         log::info!("applying rrd journal");
@@ -173,15 +188,8 @@ impl RRDCache {
                 path.push(&entry.rel_path);
                 create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
 
-                let mut rrd = match RRD::load(&path) {
-                    Ok(rrd) => rrd,
-                    Err(err) => {
-                        if err.kind() != std::io::ErrorKind::NotFound {
-                            log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
-                        }
-                        RRD::new(entry.dst)
-                    },
-                };
+                let mut rrd = Self::load_rrd(&path, entry.dst);
+
                 if entry.time > get_last_update(&entry.rel_path, &rrd) {
                     rrd.update(entry.time, entry.value);
                 }
@@ -240,16 +248,10 @@ impl RRDCache {
             let mut path = self.basedir.clone();
             path.push(rel_path);
             create_path(path.parent().unwrap(), Some(self.dir_options.clone()), Some(self.dir_options.clone()))?;
-            let mut rrd = match RRD::load(&path) {
-                Ok(rrd) => rrd,
-                Err(err) => {
-                    if err.kind() != std::io::ErrorKind::NotFound {
-                        log::warn!("overwriting RRD file {:?}, because of load error: {}", path, err);
-                    }
-                    RRD::new(dst)
-                },
-            };
+
+            let mut rrd = Self::load_rrd(&path, dst);
             rrd.update(now, value);
+
             state.rrd_map.insert(rel_path.into(), rrd);
         }
 
diff --git a/proxmox-rrd/src/rrd.rs b/proxmox-rrd/src/rrd.rs
index 026498ed..f2558646 100644
--- a/proxmox-rrd/src/rrd.rs
+++ b/proxmox-rrd/src/rrd.rs
@@ -17,6 +17,10 @@ pub const RRD_DATA_ENTRIES: usize = 70;
 // openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
 pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] =  [206, 46, 26, 212, 172, 158, 5, 186];
 
+/// Proxmox RRD file magic number for zstd compressed RRD files
+//openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0 (zstd compressed)")[0..8];
+pub const PROXMOX_ZSTD_RRD_MAGIC_1_0: [u8; 8] =  [84, 61, 44, 56, 140, 132, 5, 11];
+
 use crate::DST;
 
 bitflags!{
@@ -198,14 +202,14 @@ pub struct RRD {
 impl RRD {
 
     /// Create a new empty instance
-    pub fn new(dst: DST) -> Self {
+    pub fn new(dst: DST, compress: bool) -> Self {
         let flags = match dst {
             DST::Gauge => RRAFlags::DST_GAUGE,
             DST::Derive => RRAFlags::DST_DERIVE,
         };
 
         Self {
-            magic: PROXMOX_RRD_MAGIC_1_0,
+            magic: if compress { PROXMOX_ZSTD_RRD_MAGIC_1_0 } else { PROXMOX_RRD_MAGIC_1_0 },
             hour_avg: RRA::new(
                 flags | RRAFlags::CF_AVERAGE,
                 RRDTimeFrameResolution::Hour as u64,
@@ -301,8 +305,28 @@ impl RRD {
     }
 
     /// Create instance from raw data, testing data len and magic number
-    pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
+    pub fn from_raw(raw: &[u8]) -> Result<Self, std::io::Error> {
+
+        if raw.len() < 8 {
+            let msg = format!("not an rrd file - file is too small ({})", raw.len());
+             return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+        }
+
+        if raw[0..8] == PROXMOX_ZSTD_RRD_MAGIC_1_0 {
+            let mut data = Vec::new();
+            data.extend(&PROXMOX_ZSTD_RRD_MAGIC_1_0);
+            data.extend(zstd::block::decompress(&raw[8..], std::mem::size_of::<RRD>())?);
+            Self::from_uncompressed_raw(&data)
+        } else {
+            Self::from_uncompressed_raw(raw)
+        }
+
+    }
+
+    fn from_uncompressed_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
+
         let expected_len = std::mem::size_of::<RRD>();
+
         if raw.len() != expected_len {
             let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
             return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
@@ -314,7 +338,7 @@ impl RRD {
             raw.read_exact(rrd_slice)?;
         }
 
-        if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
+        if !(rrd.magic == PROXMOX_RRD_MAGIC_1_0 || rrd.magic == PROXMOX_ZSTD_RRD_MAGIC_1_0) {
             let msg = "wrong magic number".to_string();
             return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
         }
@@ -322,6 +346,13 @@ impl RRD {
         Ok(rrd)
     }
 
+    /// Set compression flag
+    ///
+    /// This just changes the magic number. Actual compression is done in [Self::save].
+    pub fn set_compression(&mut self, compress: bool) {
+        self.magic = if compress { PROXMOX_ZSTD_RRD_MAGIC_1_0 } else { PROXMOX_RRD_MAGIC_1_0 };
+    }
+
     /// Load data from a file
     pub fn load(path: &Path) -> Result<Self, std::io::Error> {
         let raw = std::fs::read(path)?;
@@ -333,7 +364,15 @@ impl RRD {
         let rrd_slice = unsafe {
             std::slice::from_raw_parts(self as *const _ as *const u8, std::mem::size_of::<RRD>())
         };
-        replace_file(filename, rrd_slice, options)
+
+        if rrd_slice[0..8] == PROXMOX_ZSTD_RRD_MAGIC_1_0  {
+            let mut data = Vec::new();
+            data.extend(&PROXMOX_ZSTD_RRD_MAGIC_1_0);
+            data.extend(zstd::block::compress(&rrd_slice[8..], 1)?);
+            replace_file(filename, &data, options)
+        } else {
+            replace_file(filename, rrd_slice, options)
+        }
     }
 
     pub fn last_update(&self) -> f64 {
-- 
2.30.2





More information about the pbs-devel mailing list