[pbs-devel] [PATCH v9 proxmox-backup 44/58] datastore: chunker: add Chunker trait

Christian Ebner c.ebner at proxmox.com
Wed Jun 5 12:54:02 CEST 2024


Add the Chunker trait and move the current Chunker to ChunkerImpl to
implement the trait instead. This allows to use different chunker
implementations by dynamic dispatch and is in preparation for
implementing a dedicated payload chunker.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 8:
- no changes

 examples/test_chunk_size.rs        |  9 +--
 examples/test_chunk_speed.rs       |  7 ++-
 pbs-client/src/chunk_stream.rs     | 37 ++++++------
 pbs-datastore/src/chunker.rs       | 95 ++++++++++++++++++------------
 pbs-datastore/src/dynamic_index.rs |  9 +--
 pbs-datastore/src/lib.rs           |  2 +-
 6 files changed, 91 insertions(+), 68 deletions(-)

diff --git a/examples/test_chunk_size.rs b/examples/test_chunk_size.rs
index a01a5e640..2ebc22f64 100644
--- a/examples/test_chunk_size.rs
+++ b/examples/test_chunk_size.rs
@@ -5,10 +5,10 @@ extern crate proxmox_backup;
 use anyhow::Error;
 use std::io::{Read, Write};
 
-use pbs_datastore::Chunker;
+use pbs_datastore::{Chunker, ChunkerImpl};
 
 struct ChunkWriter {
-    chunker: Chunker,
+    chunker: ChunkerImpl,
     last_chunk: usize,
     chunk_offset: usize,
 
@@ -23,7 +23,7 @@ struct ChunkWriter {
 impl ChunkWriter {
     fn new(chunk_size: usize) -> Self {
         ChunkWriter {
-            chunker: Chunker::new(chunk_size),
+            chunker: ChunkerImpl::new(chunk_size),
             last_chunk: 0,
             chunk_offset: 0,
             chunk_count: 0,
@@ -69,7 +69,8 @@ impl Write for ChunkWriter {
     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
         let chunker = &mut self.chunker;
 
-        let pos = chunker.scan(data);
+        let ctx = pbs_datastore::chunker::Context::default();
+        let pos = chunker.scan(data, &ctx);
 
         if pos > 0 {
             self.chunk_offset += pos;
diff --git a/examples/test_chunk_speed.rs b/examples/test_chunk_speed.rs
index 37e13e0de..2d79604ab 100644
--- a/examples/test_chunk_speed.rs
+++ b/examples/test_chunk_speed.rs
@@ -1,6 +1,6 @@
 extern crate proxmox_backup;
 
-use pbs_datastore::Chunker;
+use pbs_datastore::{Chunker, ChunkerImpl};
 
 fn main() {
     let mut buffer = Vec::new();
@@ -12,7 +12,7 @@ fn main() {
             buffer.push(byte);
         }
     }
-    let mut chunker = Chunker::new(64 * 1024);
+    let mut chunker = ChunkerImpl::new(64 * 1024);
 
     let count = 5;
 
@@ -23,8 +23,9 @@ fn main() {
     for _i in 0..count {
         let mut pos = 0;
         let mut _last = 0;
+        let ctx = pbs_datastore::chunker::Context::default();
         while pos < buffer.len() {
-            let k = chunker.scan(&buffer[pos..]);
+            let k = chunker.scan(&buffer[pos..], &ctx);
             if k == 0 {
                 //println!("LAST {}", pos);
                 break;
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index 87a018d50..84158a2c9 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -7,7 +7,7 @@ use bytes::BytesMut;
 use futures::ready;
 use futures::stream::{Stream, TryStream};
 
-use pbs_datastore::Chunker;
+use pbs_datastore::{Chunker, ChunkerImpl};
 
 use crate::inject_reused_chunks::InjectChunks;
 
@@ -16,7 +16,6 @@ pub struct InjectionData {
     boundaries: mpsc::Receiver<InjectChunks>,
     next_boundary: Option<InjectChunks>,
     injections: mpsc::Sender<InjectChunks>,
-    consumed: u64,
 }
 
 impl InjectionData {
@@ -28,7 +27,6 @@ impl InjectionData {
             boundaries,
             next_boundary: None,
             injections,
-            consumed: 0,
         }
     }
 }
@@ -36,19 +34,22 @@ impl InjectionData {
 /// Split input stream into dynamic sized chunks
 pub struct ChunkStream<S: Unpin> {
     input: S,
-    chunker: Chunker,
+    chunker: Box<dyn Chunker + Send>,
     buffer: BytesMut,
     scan_pos: usize,
+    consumed: u64,
     injection_data: Option<InjectionData>,
 }
 
 impl<S: Unpin> ChunkStream<S> {
     pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
+        let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024);
         Self {
             input,
-            chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
+            chunker: Box::new(ChunkerImpl::new(chunk_size)),
             buffer: BytesMut::new(),
             scan_pos: 0,
+            consumed: 0,
             injection_data,
         }
     }
@@ -68,11 +69,15 @@ where
         let this = self.get_mut();
 
         loop {
+            let ctx = pbs_datastore::chunker::Context {
+                base: this.consumed,
+                total: this.buffer.len() as u64,
+            };
+
             if let Some(InjectionData {
                 boundaries,
                 next_boundary,
                 injections,
-                consumed,
             }) = this.injection_data.as_mut()
             {
                 if next_boundary.is_none() {
@@ -84,29 +89,29 @@ where
                 if let Some(inject) = next_boundary.take() {
                     // require forced boundary, lookup next regular boundary
                     let pos = if this.scan_pos < this.buffer.len() {
-                        this.chunker.scan(&this.buffer[this.scan_pos..])
+                        this.chunker.scan(&this.buffer[this.scan_pos..], &ctx)
                     } else {
                         0
                     };
 
                     let chunk_boundary = if pos == 0 {
-                        *consumed + this.buffer.len() as u64
+                        this.consumed + this.buffer.len() as u64
                     } else {
-                        *consumed + (this.scan_pos + pos) as u64
+                        this.consumed + (this.scan_pos + pos) as u64
                     };
 
                     if inject.boundary <= chunk_boundary {
                         // forced boundary is before next boundary, force within current buffer
-                        let chunk_size = (inject.boundary - *consumed) as usize;
+                        let chunk_size = (inject.boundary - this.consumed) as usize;
                         let raw_chunk = this.buffer.split_to(chunk_size);
                         this.chunker.reset();
                         this.scan_pos = 0;
 
-                        *consumed += chunk_size as u64;
+                        this.consumed += chunk_size as u64;
 
                         // add the size of the injected chunks to consumed, so chunk stream offsets
                         // are in sync with the rest of the archive.
-                        *consumed += inject.size as u64;
+                        this.consumed += inject.size as u64;
 
                         injections.send(inject).unwrap();
 
@@ -118,7 +123,7 @@ where
                         // forced boundary is after next boundary, split off chunk from buffer
                         let chunk_size = this.scan_pos + pos;
                         let raw_chunk = this.buffer.split_to(chunk_size);
-                        *consumed += chunk_size as u64;
+                        this.consumed += chunk_size as u64;
                         this.scan_pos = 0;
 
                         return Poll::Ready(Some(Ok(raw_chunk)));
@@ -131,7 +136,7 @@ where
             }
 
             if this.scan_pos < this.buffer.len() {
-                let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
+                let boundary = this.chunker.scan(&this.buffer[this.scan_pos..], &ctx);
 
                 let chunk_size = this.scan_pos + boundary;
 
@@ -140,9 +145,7 @@ where
                 } else if chunk_size <= this.buffer.len() {
                     // found new chunk boundary inside buffer, split off chunk from buffer
                     let raw_chunk = this.buffer.split_to(chunk_size);
-                    if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
-                        *consumed += chunk_size as u64;
-                    }
+                    this.consumed += chunk_size as u64;
                     this.scan_pos = 0;
                     return Poll::Ready(Some(Ok(raw_chunk)));
                 } else {
diff --git a/pbs-datastore/src/chunker.rs b/pbs-datastore/src/chunker.rs
index 253d2cf4c..d75e63fa8 100644
--- a/pbs-datastore/src/chunker.rs
+++ b/pbs-datastore/src/chunker.rs
@@ -5,6 +5,20 @@
 /// use hash value 0 to detect a boundary.
 const CA_CHUNKER_WINDOW_SIZE: usize = 64;
 
+/// Additional context for chunker to find possible boundaries in payload streams
+#[derive(Default)]
+pub struct Context {
+    /// Already consumed bytes of the chunk stream consumer
+    pub base: u64,
+    /// Total size currently buffered
+    pub total: u64,
+}
+
+pub trait Chunker {
+    fn scan(&mut self, data: &[u8], ctx: &Context) -> usize;
+    fn reset(&mut self);
+}
+
 /// Sliding window chunker (Buzhash)
 ///
 /// This is a rewrite of *casync* chunker (cachunker.h) in rust.
@@ -15,7 +29,7 @@ const CA_CHUNKER_WINDOW_SIZE: usize = 64;
 /// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from
 /// Wikipedia.
 
-pub struct Chunker {
+pub struct ChunkerImpl {
     h: u32,
     window_size: usize,
     chunk_size: usize,
@@ -67,7 +81,7 @@ const BUZHASH_TABLE: [u32; 256] = [
     0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
 ];
 
-impl Chunker {
+impl ChunkerImpl {
     /// Create a new Chunker instance, which produces and average
     /// chunk size of `chunk_size_avg` (need to be a power of two). We
     /// allow variation from `chunk_size_avg/4` up to a maximum of
@@ -105,11 +119,44 @@ impl Chunker {
         }
     }
 
+    // fast implementation avoiding modulo
+    // #[inline(always)]
+    fn shall_break(&self) -> bool {
+        if self.chunk_size >= self.chunk_size_max {
+            return true;
+        }
+
+        if self.chunk_size < self.chunk_size_min {
+            return false;
+        }
+
+        //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
+
+        //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
+
+        (self.h & self.break_test_mask) >= self.break_test_minimum
+    }
+
+    // This is the original implementation from casync
+    /*
+    #[inline(always)]
+    fn shall_break_orig(&self) -> bool {
+
+        if self.chunk_size >= self.chunk_size_max { return true; }
+
+        if self.chunk_size < self.chunk_size_min { return false; }
+
+        (self.h % self.discriminator) == (self.discriminator - 1)
+    }
+     */
+}
+
+impl Chunker for ChunkerImpl {
     /// Scans the specified data for a chunk border. Returns 0 if none
     /// was found (and the function should be called with more data
     /// later on), or another value indicating the position of a
     /// border.
-    pub fn scan(&mut self, data: &[u8]) -> usize {
+    fn scan(&mut self, data: &[u8], _ctx: &Context) -> usize {
         let window_len = self.window.len();
         let data_len = data.len();
 
@@ -167,42 +214,11 @@ impl Chunker {
         0
     }
 
-    pub fn reset(&mut self) {
+    fn reset(&mut self) {
         self.h = 0;
         self.chunk_size = 0;
         self.window_size = 0;
     }
-
-    // fast implementation avoiding modulo
-    // #[inline(always)]
-    fn shall_break(&self) -> bool {
-        if self.chunk_size >= self.chunk_size_max {
-            return true;
-        }
-
-        if self.chunk_size < self.chunk_size_min {
-            return false;
-        }
-
-        //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
-
-        //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
-
-        (self.h & self.break_test_mask) >= self.break_test_minimum
-    }
-
-    // This is the original implementation from casync
-    /*
-    #[inline(always)]
-    fn shall_break_orig(&self) -> bool {
-
-        if self.chunk_size >= self.chunk_size_max { return true; }
-
-        if self.chunk_size < self.chunk_size_min { return false; }
-
-        (self.h % self.discriminator) == (self.discriminator - 1)
-    }
-     */
 }
 
 #[test]
@@ -215,17 +231,18 @@ fn test_chunker1() {
             buffer.push(byte);
         }
     }
-    let mut chunker = Chunker::new(64 * 1024);
+    let mut chunker = ChunkerImpl::new(64 * 1024);
 
     let mut pos = 0;
     let mut last = 0;
 
     let mut chunks1: Vec<(usize, usize)> = vec![];
     let mut chunks2: Vec<(usize, usize)> = vec![];
+    let ctx = Context::default();
 
     // test1: feed single bytes
     while pos < buffer.len() {
-        let k = chunker.scan(&buffer[pos..pos + 1]);
+        let k = chunker.scan(&buffer[pos..pos + 1], &ctx);
         pos += 1;
         if k != 0 {
             let prev = last;
@@ -235,13 +252,13 @@ fn test_chunker1() {
     }
     chunks1.push((last, buffer.len() - last));
 
-    let mut chunker = Chunker::new(64 * 1024);
+    let mut chunker = ChunkerImpl::new(64 * 1024);
 
     let mut pos = 0;
 
     // test2: feed with whole buffer
     while pos < buffer.len() {
-        let k = chunker.scan(&buffer[pos..]);
+        let k = chunker.scan(&buffer[pos..], &ctx);
         if k != 0 {
             chunks2.push((pos, k));
             pos += k;
diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs
index b8047b5b1..dc9eee050 100644
--- a/pbs-datastore/src/dynamic_index.rs
+++ b/pbs-datastore/src/dynamic_index.rs
@@ -23,7 +23,7 @@ use crate::data_blob::{DataBlob, DataChunkBuilder};
 use crate::file_formats;
 use crate::index::{ChunkReadInfo, IndexFile};
 use crate::read_chunk::ReadChunk;
-use crate::Chunker;
+use crate::{Chunker, ChunkerImpl};
 
 /// Header format definition for dynamic index files (`.dixd`)
 #[repr(C)]
@@ -397,7 +397,7 @@ impl DynamicIndexWriter {
 pub struct DynamicChunkWriter {
     index: DynamicIndexWriter,
     closed: bool,
-    chunker: Chunker,
+    chunker: ChunkerImpl,
     stat: ChunkStat,
     chunk_offset: usize,
     last_chunk: usize,
@@ -409,7 +409,7 @@ impl DynamicChunkWriter {
         Self {
             index,
             closed: false,
-            chunker: Chunker::new(chunk_size),
+            chunker: ChunkerImpl::new(chunk_size),
             stat: ChunkStat::new(0),
             chunk_offset: 0,
             last_chunk: 0,
@@ -494,7 +494,8 @@ impl Write for DynamicChunkWriter {
     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
         let chunker = &mut self.chunker;
 
-        let pos = chunker.scan(data);
+        let ctx = crate::chunker::Context::default();
+        let pos = chunker.scan(data, &ctx);
 
         if pos > 0 {
             self.chunk_buffer.extend_from_slice(&data[0..pos]);
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index 43050162f..24429626c 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo};
 pub use checksum_reader::ChecksumReader;
 pub use checksum_writer::ChecksumWriter;
 pub use chunk_store::ChunkStore;
-pub use chunker::Chunker;
+pub use chunker::{Chunker, ChunkerImpl};
 pub use crypt_reader::CryptReader;
 pub use crypt_writer::CryptWriter;
 pub use data_blob::DataBlob;
-- 
2.39.2





More information about the pbs-devel mailing list