[pbs-devel] [PATCH v4 proxmox-backup 57/58] datastore: chunker: implement chunker for payload stream

Christian Ebner c.ebner at proxmox.com
Mon Apr 29 14:11:01 CEST 2024


Implement the Chunker trait for a dedicated payload stream chunker,
which extends the regular chunker by the option to suggest boundaries
to be used over the hast based boundaries whenever possible.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-datastore/src/chunker.rs | 74 ++++++++++++++++++++++++++++++++++++
 pbs-datastore/src/lib.rs     |  2 +-
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/pbs-datastore/src/chunker.rs b/pbs-datastore/src/chunker.rs
index 119b88a03..6c97252e0 100644
--- a/pbs-datastore/src/chunker.rs
+++ b/pbs-datastore/src/chunker.rs
@@ -1,3 +1,5 @@
+use std::sync::mpsc::Receiver;
+
 /// Note: window size 32 or 64, is faster because we can
 /// speedup modulo operations, but always computes hash 0
 /// for constant data streams .. 0,0,0,0,0,0
@@ -45,6 +47,16 @@ pub struct ChunkerImpl {
     window: [u8; CA_CHUNKER_WINDOW_SIZE],
 }
 
+/// Sliding window chunker (Buzhash) with boundary suggestions
+///
+/// Suggest to chunk at a given boundary instead of the regular chunk boundary for better alignment
+/// with file payload boundaries.
+pub struct PayloadChunker {
+    chunker: ChunkerImpl,
+    current_suggested: Option<u64>,
+    suggested_boundaries: Receiver<u64>,
+}
+
 const BUZHASH_TABLE: [u32; 256] = [
     0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
     0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
@@ -214,6 +226,68 @@ impl Chunker for ChunkerImpl {
     }
 }
 
+impl PayloadChunker {
+    /// Create a new PayloadChunker instance, which produces and average
+    /// chunk size of `chunk_size_avg` (need to be a power of two), if no
+    /// suggested boundaries are provided.
+    /// Use suggested boundaries instead,  whenever the chunk size is within
+    /// the min - max range.
+    pub fn new(chunk_size_avg: usize, suggested_boundaries: Receiver<u64>) -> Self {
+        Self {
+            chunker: ChunkerImpl::new(chunk_size_avg),
+            current_suggested: None,
+            suggested_boundaries,
+        }
+    }
+}
+
+impl Chunker for PayloadChunker {
+    fn scan(&mut self, data: &[u8], ctx: &Context) -> usize {
+        let pos = ctx.total - data.len() as u64;
+
+        loop {
+            if let Some(boundary) = self.current_suggested {
+                if boundary < ctx.base + pos {
+                    // ignore passed boundaries
+                    self.current_suggested = None;
+                    continue;
+                }
+
+                if boundary >= ctx.base + ctx.total {
+                    // boundary in future, cannot decide yet
+                    return self.chunker.scan(data, ctx);
+                }
+
+                let chunk_size = (boundary - ctx.base) as usize;
+                if chunk_size < self.chunker.chunk_size_min {
+                    // chunk to small, ignore boundary
+                    self.current_suggested = None;
+                    continue;
+                }
+
+                if chunk_size <= self.chunker.chunk_size_max {
+                    log::debug!("Chunk at suggested boundary: {boundary}, {chunk_size}");
+                    self.current_suggested = None;
+                    // although we ignore the output, consume the data with the chunker
+                    let _ignore = self.chunker.scan(data, ctx);
+                    // calculate boundary relative to start of given data buffer
+                    return chunk_size - pos as usize;
+                }
+
+                // chunk to big, cannot decide yet
+                // scan for hash based chunk boundary instead
+                return self.chunker.scan(data, ctx);
+            }
+
+            if let Ok(boundary) = self.suggested_boundaries.try_recv() {
+                self.current_suggested = Some(boundary);
+            } else {
+                return self.chunker.scan(data, ctx);
+            }
+        }
+    }
+}
+
 #[test]
 fn test_chunker1() {
     let mut buffer = Vec::new();
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index 24429626c..3e4aa34c2 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo};
 pub use checksum_reader::ChecksumReader;
 pub use checksum_writer::ChecksumWriter;
 pub use chunk_store::ChunkStore;
-pub use chunker::{Chunker, ChunkerImpl};
+pub use chunker::{Chunker, ChunkerImpl, PayloadChunker};
 pub use crypt_reader::CryptReader;
 pub use crypt_writer::CryptWriter;
 pub use data_blob::DataBlob;
-- 
2.39.2





More information about the pbs-devel mailing list