[pbs-devel] [RFC v2 proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching

Christian Ebner c.ebner at proxmox.com
Tue Mar 5 10:26:55 CET 2024


In preparation for the look-ahead caching used to temprarily store
entries before encoding them in the pxar archive, being able to
decide wether to re-use or re-encode regular file entries.

Allows to insert and store reused chunks in the archiver,
deduplicating chunks upon insert when possible.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- s/Appendable/Reusable/ incorrect naming leftover from previous
  approach

 pbs-client/src/pxar/create.rs | 109 +++++++++++++++++++++++++++++++++-
 1 file changed, 107 insertions(+), 2 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index cb0af29e..66bdbce8 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -18,7 +18,7 @@ use nix::sys::stat::{FileStat, Mode};
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use proxmox_sys::error::SysError;
 use pxar::accessor::aio::Accessor;
-use pxar::encoder::{LinkOffset, SeqWrite};
+use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
 use pxar::Metadata;
 
 use proxmox_io::vec;
@@ -27,13 +27,116 @@ use proxmox_sys::fs::{self, acl, xattr};
 
 use crate::RemoteChunkReader;
 use pbs_datastore::catalog::BackupCatalogWriter;
-use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
+use pbs_datastore::dynamic_index::{
+    ReusableDynamicEntry, DynamicIndexReader, LocalDynamicReadAt,
+};
 
 use crate::inject_reused_chunks::InjectChunks;
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
 
+#[derive(Default)]
+struct ReusedChunks {
+    start_boundary: PayloadOffset,
+    total: PayloadOffset,
+    chunks: Vec<ReusableDynamicEntry>,
+    must_flush_first: bool,
+}
+
+impl ReusedChunks {
+    fn new() -> Self {
+        Self {
+            start_boundary: PayloadOffset::default(),
+            total: PayloadOffset::default(),
+            chunks: Vec::new(),
+            must_flush_first: false,
+        }
+    }
+
+    fn start_boundary(&self) -> PayloadOffset {
+        self.start_boundary
+    }
+
+    fn is_empty(&self) -> bool {
+        self.chunks.is_empty()
+    }
+
+    fn insert(
+        &mut self,
+        indices: Vec<ReusableDynamicEntry>,
+        boundary: PayloadOffset,
+        start_padding: u64,
+    ) -> PayloadOffset {
+        if self.is_empty() {
+            self.start_boundary = boundary;
+        }
+
+        if let Some(offset) = self.digest_sequence_contained(&indices) {
+            self.start_boundary.add(offset + start_padding)
+        } else if let Some(offset) = self.last_digest_matched(&indices) {
+            for chunk in indices.into_iter().skip(1) {
+                self.total = self.total.add(chunk.size());
+                self.chunks.push(chunk);
+            }
+            self.start_boundary.add(offset + start_padding)
+        } else {
+            let offset = self.total.raw();
+            for chunk in indices.into_iter() {
+                self.total = self.total.add(chunk.size());
+                self.chunks.push(chunk);
+            }
+            self.start_boundary.add(offset + start_padding)
+        }
+    }
+
+    fn digest_sequence_contained(&self, indices: &[ReusableDynamicEntry]) -> Option<u64> {
+        let digest = if let Some(first) = indices.first() {
+            first.digest()
+        } else {
+            return None;
+        };
+
+        let mut offset = 0;
+        let mut iter = self.chunks.iter();
+        while let Some(position) = iter.position(|e| {
+            offset += e.size();
+            e.digest() == digest
+        }) {
+            if indices.len() + position > self.chunks.len() {
+                return None;
+            }
+
+            for (ind, chunk) in indices.iter().skip(1).enumerate() {
+                if chunk.digest() != self.chunks[ind + position].digest() {
+                    return None;
+                }
+            }
+
+            offset -= self.chunks[position].size();
+            return Some(offset);
+        }
+
+        None
+    }
+
+    fn last_digest_matched(&self, indices: &[ReusableDynamicEntry]) -> Option<u64> {
+        let digest = if let Some(first) = indices.first() {
+            first.digest()
+        } else {
+            return None;
+        };
+
+        if let Some(last) = self.chunks.last() {
+            if last.digest() == digest {
+                return Some(self.total.raw() - last.size());
+            }
+        }
+
+        None
+    }
+}
+
 /// Pxar options for creating a pxar archive/stream
 #[derive(Default, Clone)]
 pub struct PxarCreateOptions {
@@ -145,6 +248,7 @@ struct Archiver {
     hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
     file_copy_buffer: Vec<u8>,
     skip_e2big_xattr: bool,
+    reused_chunks: ReusedChunks,
     forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
 }
 
@@ -217,6 +321,7 @@ where
         hardlinks: HashMap::new(),
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         skip_e2big_xattr: options.skip_e2big_xattr,
+        reused_chunks: ReusedChunks::new(),
         forced_boundaries,
     };
 
-- 
2.39.2





More information about the pbs-devel mailing list