[pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching

Christian Ebner c.ebner at proxmox.com
Thu Mar 28 13:36:56 CET 2024


Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.

When walking the file system tree, check for each entry if it is
re-usable, meaning that the metadata did not change and the payload
chunks can be re-indexed instead of re-encoding the whole data.
Since the ammount of payload data might be small as compared to the
actual chunk size, a decision whether to re-use or re-encode is
postponed if the reused payload does not fall below a threshold value,
but the chunks where continuous.
In this case, put the entry's file handle an metadata on the cache and
enable caching mode, and continue with the next entry.
Reusable chunk digests and size as well as reference offsets to the
start of regular files payloads within the payload stream are stored in
memory, to be injected for re-usable file entries.

If the threshold value for re-use is reached, the chunks are injected
in the payload stream and the references with the corresponding offsets
encoded in the metadata stream.
If however a non-reusable (because changed) entry is encountered before
the threshold is reached, the entries on the cache are flushed to the
archive by re-encoding them, the memorized chunks and payload reference
offsets are discarted.

Since multiple files might be contained within a single chunk, it is
assured that the deduplication of chunks is performed also when the
reuse threshold is reached, by keeping back the last chunk in the
memorized list, so following files might as well rei-use that chunk.
It is assured that this chunk is however injected in the stream also in
case that the following lookups lead to a cache clear and re-encoding.

Directory boundaries are cached as well, and written as part of the
encoding when flushing.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- completely reworked
- strongly reduced duplicate code

 pbs-client/src/pxar/create.rs | 259 ++++++++++++++++++++++++++++++++++
 1 file changed, 259 insertions(+)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index c64084a74..07fa17ec4 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
 use std::ffi::{CStr, CString, OsStr};
 use std::fmt;
 use std::io::{self, Read};
+use std::mem::size_of;
 use std::ops::Range;
 use std::os::unix::ffi::OsStrExt;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
@@ -23,6 +24,7 @@ use pxar::accessor::aio::{Accessor, Directory};
 use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
 use pxar::{EntryKind, Metadata};
 
+use proxmox_human_byte::HumanByte;
 use proxmox_io::vec;
 use proxmox_lang::c_str;
 use proxmox_sys::fs::{self, acl, xattr};
@@ -32,6 +34,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
 use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
 
 use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
@@ -274,6 +277,12 @@ struct Archiver {
     reused_chunks: ReusedChunks,
     previous_payload_index: Option<DynamicIndexReader>,
     forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
+    cached_entries: Vec<CacheEntry>,
+    caching_enabled: bool,
+    total_injected_size: u64,
+    total_injected_count: u64,
+    partial_chunks_count: u64,
+    total_reused_payload_size: u64,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -377,6 +386,12 @@ where
         reused_chunks: ReusedChunks::new(),
         previous_payload_index,
         forced_boundaries,
+        cached_entries: Vec::new(),
+        caching_enabled: false,
+        total_injected_size: 0,
+        total_injected_count: 0,
+        partial_chunks_count: 0,
+        total_reused_payload_size: 0,
     };
 
     archiver
@@ -879,6 +894,250 @@ impl Archiver {
         }
     }
 
+    async fn cache_or_flush_entries<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+    ) -> Result<(), Error> {
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        let reusable = if let Some(accessor) = previous_metadata_accessor {
+            self.is_reusable_entry(accessor, file_name, stat, metadata)
+                .await?
+        } else {
+            None
+        };
+
+        let file_size = stat.st_size as u64;
+        if let Some(start_offset) = reusable {
+            if let Some(ref ref_payload_index) = self.previous_payload_index {
+                let payload_size = file_size + size_of::<pxar::format::Header>() as u64;
+                let end_offset = start_offset + payload_size;
+                let (indices, start_padding, end_padding) =
+                    lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;
+
+                let boundary = encoder.payload_position()?;
+                let offset =
+                    self.reused_chunks
+                        .insert(indices, boundary, start_padding, end_padding);
+
+                self.caching_enabled = true;
+                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    offset,
+                ));
+                self.cached_entries.push(cache_entry);
+
+                match self.reused_chunks.suggested() {
+                    Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
+                    Suggested::Reencode => {
+                        self.flush_cached_to_archive(encoder, false, true).await?
+                    }
+                    Suggested::CheckNext => {}
+                }
+
+                return Ok(());
+            }
+        }
+
+        self.flush_cached_to_archive(encoder, false, true).await?;
+        self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
+            .await
+    }
+
+    async fn flush_cached_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        reuse_chunks: bool,
+        keep_back_last_chunk: bool,
+    ) -> Result<(), Error> {
+        let entries = std::mem::take(&mut self.cached_entries);
+
+        if !reuse_chunks {
+            self.clear_cached_chunks(encoder)?;
+        }
+
+        for entry in entries {
+            match entry {
+                CacheEntry::RegEntry(CacheEntryData {
+                    fd,
+                    c_file_name,
+                    stat,
+                    metadata,
+                    payload_offset,
+                }) => {
+                    self.add_entry_to_archive(
+                        encoder,
+                        &mut None,
+                        &c_file_name,
+                        &stat,
+                        fd,
+                        &metadata,
+                        reuse_chunks,
+                        Some(payload_offset),
+                    )
+                    .await?
+                }
+                CacheEntry::DirEntry(CacheEntryData {
+                    c_file_name,
+                    metadata,
+                    ..
+                }) => {
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().start_directory(&c_file_name)?;
+                    }
+                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+                    encoder.create_directory(dir_name, &metadata).await?;
+                }
+                CacheEntry::DirEnd => {
+                    encoder.finish().await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
+                }
+            }
+        }
+
+        self.caching_enabled = false;
+
+        if reuse_chunks {
+            self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
+        }
+
+        Ok(())
+    }
+
+    fn flush_reused_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        keep_back_last_chunk: bool,
+    ) -> Result<(), Error> {
+        let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        // Do not inject the last reused chunk directly, but keep it as base for further entries
+        // to reduce chunk duplication. Needs to be flushed even on cache clear!
+        let last_chunk = if keep_back_last_chunk {
+            reused_chunks.chunks.pop()
+        } else {
+            None
+        };
+
+        let mut injection_boundary = reused_chunks.start_boundary();
+        let payload_writer_position = encoder.payload_position()?.raw();
+
+        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
+            bail!(
+                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
+                injection_boundary.raw(),
+            );
+        }
+
+        for chunks in reused_chunks.chunks.chunks(128) {
+            let mut chunk_list = Vec::with_capacity(128);
+            let mut size = PayloadOffset::default();
+            for (padding, chunk) in chunks.iter() {
+                log::debug!(
+                    "Injecting chunk with {} padding (chunk size {})",
+                    HumanByte::from(*padding),
+                    HumanByte::from(chunk.size()),
+                );
+                self.total_injected_size += chunk.size();
+                self.total_injected_count += 1;
+                if *padding > 0 {
+                    self.partial_chunks_count += 1;
+                }
+                size = size.add(chunk.size());
+                chunk_list.push(chunk.clone());
+            }
+
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: chunk_list,
+                size: size.raw() as usize,
+            };
+
+            if let Some(boundary) = self.forced_boundaries.as_mut() {
+                let mut boundary = boundary.lock().unwrap();
+                boundary.push_back(inject_chunks);
+            } else {
+                bail!("missing injection queue");
+            };
+
+            injection_boundary = injection_boundary.add(size.raw());
+            encoder.advance(size)?;
+        }
+
+        if let Some((padding, chunk)) = last_chunk {
+            // Make sure that we flush this chunk even on clear calls
+            self.reused_chunks.must_flush_first = true;
+            let _offset = self
+                .reused_chunks
+                .insert(vec![chunk], injection_boundary, padding, 0);
+        }
+
+        Ok(())
+    }
+
+    fn clear_cached_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(), Error> {
+        let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        if !reused_chunks.must_flush_first {
+            return Ok(());
+        }
+
+        // First chunk was kept back to avoid duplication but needs to be injected
+        let injection_boundary = reused_chunks.start_boundary();
+        let payload_writer_position = encoder.payload_position()?.raw();
+
+        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
+            bail!(
+                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
+                injection_boundary.raw()
+            );
+        }
+
+        if let Some((padding, chunk)) = reused_chunks.chunks.first() {
+            let size = PayloadOffset::default().add(chunk.size());
+            log::debug!(
+                "Injecting chunk with {} padding (chunk size {})",
+                HumanByte::from(*padding),
+                HumanByte::from(chunk.size()),
+            );
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: vec![chunk.clone()],
+                size: size.raw() as usize,
+            };
+
+            self.total_injected_size += size.raw();
+            self.total_injected_count += 1;
+            if *padding > 0 {
+                self.partial_chunks_count += 1;
+            }
+
+            if let Some(boundary) = self.forced_boundaries.as_mut() {
+                let mut boundary = boundary.lock().unwrap();
+                boundary.push_back(inject_chunks);
+            } else {
+                bail!("missing injection queue");
+            };
+            encoder.advance(size)?;
+        } else {
+            bail!("missing first chunk");
+        }
+
+        Ok(())
+    }
+
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
-- 
2.39.2





More information about the pbs-devel mailing list