[pbs-devel] [PATCH v4 proxmox-backup 46/58] fix #3174: client: pxar: enable caching and meta comparison

Christian Ebner c.ebner at proxmox.com
Mon Apr 29 14:10:50 CEST 2024


When walking the file system tree, check for each entry if it is
reusable, meaning that the metadata did not change and the payload
chunks can be reindexed instead of reencoding the whole data.

If the metadata matched, the range of the dynamic index entries for
that file are looked up in the previous payload data index.
Use the range and possible padding introduced by partial reuse of
chunks to decide wheather to reuse the dynamic entries and encode
the file payloads as payload reference right away or cache the entry
for now and keep looking ahead.

If however a non-reusable (because changed) entry is encountered
before the padding threshold is reached, the entries on the cache are
flushed to the archive by reencoding them, resetting the cached state.

Reusable chunk digests and size as well as reference offsets to the
start of regular files payloads within the payload stream are injected
into the backup stream by sending them to the chunker via a dedicated
channel, forcing a chunk boundary and inserting the chunks.

If the threshold value for reuse is reached, the chunks are injected
in the payload stream and the references with the corresponding
offsets encoded in the metadata stream.

Since multiple files might be contained within a single chunk, it is
assured that the deduplication of chunks is performed, by keeping back
the last chunk, so following files might as well reuse that same
chunk without double indexing it.  It is assured that this chunk is
injected in the stream also in case that the following lookups lead to
a cache clear and reencoding.

Directory boundaries are cached as well, and written as part of the
encoding when flushing.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-client/src/pxar/create.rs | 492 +++++++++++++++++++++++++++++++---
 1 file changed, 458 insertions(+), 34 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index bb2a4f42a..28823d196 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -20,9 +20,10 @@ use nix::sys::stat::{FileStat, Mode};
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use proxmox_sys::error::SysError;
 use pxar::accessor::aio::{Accessor, Directory};
-use pxar::encoder::{LinkOffset, SeqWrite};
+use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
 use pxar::{EntryKind, Metadata};
 
+use proxmox_human_byte::HumanByte;
 use proxmox_io::vec;
 use proxmox_lang::c_str;
 use proxmox_sys::fs::{self, acl, xattr};
@@ -32,11 +33,15 @@ use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
 use pbs_datastore::index::IndexFile;
 
 use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
 use crate::RemoteChunkReader;
 
+const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
+const MAX_CACHE_SIZE: usize = 512;
+
 /// Pxar options for creating a pxar archive/stream
 #[derive(Default)]
 pub struct PxarCreateOptions {
@@ -152,6 +157,11 @@ struct Archiver {
     skip_e2big_xattr: bool,
     forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
     previous_payload_index: Option<DynamicIndexReader>,
+    cached_entries: Vec<CacheEntry>,
+    cached_hardlinks: HashSet<HardLinkInfo>,
+    cached_range: Range<u64>,
+    cached_last_chunk: Option<ReusableDynamicEntry>,
+    caching_enabled: bool,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -211,6 +221,8 @@ where
         set.insert(stat.st_dev);
     }
 
+    let metadata_mode = options.previous_ref.is_some() && writers.payload_writer.is_some();
+
     let mut encoder = Encoder::new(
         &mut writers.writer,
         &metadata,
@@ -254,11 +266,23 @@ where
         skip_e2big_xattr: options.skip_e2big_xattr,
         forced_boundaries,
         previous_payload_index,
+        cached_entries: Vec::new(),
+        cached_range: Range::default(),
+        cached_last_chunk: None,
+        cached_hardlinks: HashSet::new(),
+        caching_enabled: false,
     };
 
     archiver
         .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
         .await?;
+
+    if metadata_mode {
+        archiver
+            .flush_cached_reusing_if_below_threshold(&mut encoder, false)
+            .await?;
+    }
+
     encoder.finish().await?;
     encoder.close().await?;
 
@@ -316,7 +340,10 @@ impl Archiver {
             for file_entry in file_list {
                 let file_name = file_entry.name.to_bytes();
 
-                if is_root && file_name == b".pxarexclude-cli" {
+                if is_root
+                    && file_name == b".pxarexclude-cli"
+                    && previous_metadata_accessor.is_none()
+                {
                     self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
                         .await?;
                     continue;
@@ -334,6 +361,7 @@ impl Archiver {
                 .await
                 .map_err(|err| self.wrap_err(err))?;
             }
+
             self.path = old_path;
             self.entry_counter = entry_counter;
             self.patterns.truncate(old_patterns_count);
@@ -616,8 +644,6 @@ impl Archiver {
         c_file_name: &CStr,
         stat: &FileStat,
     ) -> Result<(), Error> {
-        use pxar::format::mode;
-
         let file_mode = stat.st_mode & libc::S_IFMT;
         let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
             OFlag::empty()
@@ -655,6 +681,96 @@ impl Archiver {
             self.skip_e2big_xattr,
         )?;
 
+        if self.previous_payload_index.is_none() {
+            return self
+                .add_entry_to_archive(
+                    encoder,
+                    previous_metadata,
+                    c_file_name,
+                    stat,
+                    fd,
+                    &metadata,
+                    None,
+                )
+                .await;
+        }
+
+        // Avoid having to many open file handles in cached entries
+        if self.cached_entries.len() > MAX_CACHE_SIZE {
+            log::debug!("Max cache size reached, reuse cached entries");
+            self.flush_cached_reusing_if_below_threshold(encoder, true)
+                .await?;
+        }
+
+        if metadata.is_regular_file() {
+            self.cache_or_flush_entries(
+                encoder,
+                previous_metadata,
+                c_file_name,
+                stat,
+                fd,
+                &metadata,
+            )
+            .await
+        } else if self.caching_enabled {
+            if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+                let fd_clone = fd.try_clone()?;
+                let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    PayloadOffset::default(),
+                ));
+                self.cached_entries.push(cache_entry);
+
+                let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+                self.add_directory(
+                    encoder,
+                    previous_metadata,
+                    dir,
+                    c_file_name,
+                    &metadata,
+                    stat,
+                )
+                .await?;
+            } else {
+                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata,
+                    PayloadOffset::default(),
+                ));
+                self.cached_entries.push(cache_entry);
+            }
+            Ok(())
+        } else {
+            self.add_entry_to_archive(
+                encoder,
+                previous_metadata,
+                c_file_name,
+                stat,
+                fd,
+                &metadata,
+                None,
+            )
+            .await
+        }
+    }
+
+    async fn add_entry_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        previous_metadata: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+        payload_offset: Option<PayloadOffset>,
+    ) -> Result<(), Error> {
+        use pxar::format::mode;
+
         let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
         match metadata.file_type() {
             mode::IFREG => {
@@ -683,9 +799,14 @@ impl Archiver {
                         .add_file(c_file_name, file_size, stat.st_mtime)?;
                 }
 
-                let offset: LinkOffset = self
-                    .add_regular_file(encoder, fd, file_name, &metadata, file_size)
-                    .await?;
+                let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
+                    encoder
+                        .add_payload_ref(metadata, file_name, file_size, payload_offset)
+                        .await?
+                } else {
+                    self.add_regular_file(encoder, fd, file_name, metadata, file_size)
+                        .await?
+                };
 
                 if stat.st_nlink > 1 {
                     self.hardlinks
@@ -696,59 +817,43 @@ impl Archiver {
             }
             mode::IFDIR => {
                 let dir = Dir::from_fd(fd.into_raw_fd())?;
-
-                if let Some(ref catalog) = self.catalog {
-                    catalog.lock().unwrap().start_directory(c_file_name)?;
-                }
-                let result = self
-                    .add_directory(
-                        encoder,
-                        previous_metadata,
-                        dir,
-                        c_file_name,
-                        &metadata,
-                        stat,
-                    )
-                    .await;
-                if let Some(ref catalog) = self.catalog {
-                    catalog.lock().unwrap().end_directory()?;
-                }
-                result
+                self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
+                    .await
             }
             mode::IFSOCK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_socket(c_file_name)?;
                 }
 
-                Ok(encoder.add_socket(&metadata, file_name).await?)
+                Ok(encoder.add_socket(metadata, file_name).await?)
             }
             mode::IFIFO => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_fifo(c_file_name)?;
                 }
 
-                Ok(encoder.add_fifo(&metadata, file_name).await?)
+                Ok(encoder.add_fifo(metadata, file_name).await?)
             }
             mode::IFLNK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_symlink(c_file_name)?;
                 }
 
-                self.add_symlink(encoder, fd, file_name, &metadata).await
+                self.add_symlink(encoder, fd, file_name, metadata).await
             }
             mode::IFBLK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_block_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, stat).await
+                self.add_device(encoder, file_name, metadata, stat).await
             }
             mode::IFCHR => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_char_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, stat).await
+                self.add_device(encoder, file_name, metadata, stat).await
             }
             other => bail!(
                 "encountered unknown file type: 0x{:x} (0o{:o})",
@@ -758,18 +863,327 @@ impl Archiver {
         }
     }
 
+    async fn cache_or_flush_entries<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+    ) -> Result<(), Error> {
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        let reusable = if let Some(accessor) = previous_metadata_accessor {
+            self.is_reusable_entry(accessor, file_name, metadata)
+                .await?
+        } else {
+            None
+        };
+
+        if stat.st_nlink > 1 {
+            let link_info = HardLinkInfo {
+                st_dev: stat.st_dev,
+                st_ino: stat.st_ino,
+            };
+            if self.cached_hardlinks.contains(&link_info) {
+                // This hardlink has been seen by the lookahead cache already, put it on the cache
+                // with a dummy offset and continue without lookup and chunk injection.
+                // On flushing or re-encoding, the logic there will store the actual hardlink with
+                // offset.
+                self.caching_enabled = true;
+                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    PayloadOffset::default(),
+                ));
+                self.cached_entries.push(cache_entry);
+                return Ok(());
+            } else {
+                // mark this hardlink as seen by the lookahead cache
+                self.cached_hardlinks.insert(link_info);
+            }
+        }
+
+        if let Some(payload_range) = reusable {
+            // check for range continuation in payload archive
+            if self.cached_range.end == 0 {
+                // initialize first range to start and end with start of new range
+                self.cached_range.start = payload_range.start;
+                self.cached_range.end = payload_range.start;
+            }
+
+            if self.cached_range.end == payload_range.start {
+                self.cached_range.end = payload_range.end;
+                log::debug!(
+                    "Cache range continuation, new range: {:?}",
+                    self.cached_range
+                );
+            } else {
+                log::debug!("Cache range has hole, new range: {payload_range:?}");
+                self.flush_cached_reusing_if_below_threshold(encoder, true)
+                    .await?;
+                // range has to be set after flushing of cached entries, which resets the range
+                self.cached_range = payload_range.clone();
+            }
+
+            // offset relative to start of current range, does not include possible padding of
+            // actual chunks, which needs to be added before encoding the payload reference
+            let offset =
+                PayloadOffset::default().add(payload_range.start - self.cached_range.start);
+            log::debug!("Offset relative to range start: {offset:?}");
+
+            self.caching_enabled = true;
+            self.cached_entries
+                .push(CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    offset,
+                )));
+
+            return Ok(());
+        }
+
+        self.flush_cached_reencoding(encoder).await?;
+        self.add_entry_to_archive(
+            encoder,
+            previous_metadata_accessor,
+            c_file_name,
+            stat,
+            fd,
+            metadata,
+            None,
+        )
+        .await
+    }
+
+    async fn flush_cached_reusing_if_below_threshold<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        keep_last_chunk: bool,
+    ) -> Result<(), Error> {
+        let mut prev_last_chunk = self.cached_last_chunk.take();
+
+        if self.cached_range.is_empty() {
+            if let Some(prev) = prev_last_chunk {
+                // make sure to inject previous last
+                self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
+            }
+            // only non regular file entries (directories) in cache, allows to do regular encoding
+            self.encode_entries_to_archive(encoder, None).await?;
+            return Ok(());
+        }
+
+        if let Some(ref ref_payload_index) = self.previous_payload_index {
+            let (mut indices, start_padding, end_padding) =
+                lookup_dynamic_entries(ref_payload_index, self.cached_range.clone())?;
+            let mut padding = start_padding + end_padding;
+            let total_size = (self.cached_range.end - self.cached_range.start) + padding;
+
+            // take into account used bytes of kept back chunk for padding
+            if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) {
+                if last.digest() == first.digest() {
+                    // Update padding used for threshold calculation only
+                    let used = last.size() - last.padding;
+                    padding -= used;
+                }
+            }
+
+            let ratio = padding as f64 / total_size as f64;
+
+            if ratio > CHUNK_PADDING_THRESHOLD {
+                log::debug!(
+                    "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
+                    HumanByte::from(padding),
+                    HumanByte::from(total_size),
+                    indices.len(),
+                );
+                // do not reuse chunks if introduced padding higher than threshold
+                // opt for re-encoding in that case
+                if let Some(prev) = prev_last_chunk {
+                    // make sure to inject previous last
+                    self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
+                }
+                self.encode_entries_to_archive(encoder, None).await?;
+            } else {
+                log::debug!(
+                    "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
+                    HumanByte::from(padding),
+                    HumanByte::from(total_size),
+                    indices.len(),
+                );
+                let base_offset = Some(encoder.payload_position()?.add(start_padding));
+                self.encode_entries_to_archive(encoder, base_offset).await?;
+
+                if keep_last_chunk {
+                    self.cached_last_chunk = indices.pop();
+                }
+
+                // check for cases where kept back last is not equal first chunk because the range
+                // end aligned with a chunk boundary, and the chunks therefore needs to be injected
+                if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) {
+                    if last.digest() != first.digest() {
+                        // make sure to inject previous last
+                        self.inject_chunks_at_current_payload_position(
+                            encoder,
+                            vec![last].as_slice(),
+                        )?;
+                    } else {
+                        let used = last.size() - last.padding;
+                        first.padding -= used;
+                    }
+                }
+                self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?;
+            }
+
+            // clear range while keeping end for possible continuation if entries have been flushed
+            // because the max cache size was reached
+            self.cached_range = self.cached_range.end..self.cached_range.end;
+            self.caching_enabled = false;
+
+            Ok(())
+        } else {
+            bail!("cannot reuse chunks without previous index reader");
+        }
+    }
+
+    // Clear the cache and reencode all cached entries
+    // Make sure to inject a possibly kept back chunk from a previous chunk continuation attempt
+    async fn flush_cached_reencoding<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(), Error> {
+        if let Some(prev) = self.cached_last_chunk.take() {
+            // make sure to inject previous last
+            self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
+        }
+
+        self.encode_entries_to_archive(encoder, None).await?;
+
+        self.cached_range = self.cached_range.end..self.cached_range.end;
+        self.caching_enabled = false;
+        Ok(())
+    }
+
+    // Take ownership of cached entries and encode them to the archive
+    // Encode with reused payload chunks when base offset is some, reencode otherwise
+    async fn encode_entries_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        base_offset: Option<PayloadOffset>,
+    ) -> Result<(), Error> {
+        // take ownership of cached entries, leaving new empty cache behind
+        let entries = std::mem::take(&mut self.cached_entries);
+        log::debug!(
+            "Got {} cache entries to encode: reuse is {}",
+            entries.len(),
+            base_offset.is_some()
+        );
+
+        for entry in entries {
+            match entry {
+                CacheEntry::RegEntry(CacheEntryData {
+                    fd,
+                    c_file_name,
+                    stat,
+                    metadata,
+                    payload_offset,
+                }) => {
+                    self.add_entry_to_archive(
+                        encoder,
+                        &mut None,
+                        &c_file_name,
+                        &stat,
+                        fd,
+                        &metadata,
+                        base_offset.map(|base_offset| payload_offset.add(base_offset.raw())),
+                    )
+                    .await?
+                }
+                CacheEntry::DirEntry(CacheEntryData {
+                    c_file_name,
+                    metadata,
+                    ..
+                }) => {
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().start_directory(&c_file_name)?;
+                    }
+                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+                    encoder.create_directory(dir_name, &metadata).await?;
+                }
+                CacheEntry::DirEnd => {
+                    encoder.finish().await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    fn inject_chunks_at_current_payload_position<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        reused_chunks: &[ReusableDynamicEntry],
+    ) -> Result<(), Error> {
+        let mut injection_boundary = encoder.payload_position()?;
+
+        for chunks in reused_chunks.chunks(128) {
+            let mut chunk_list = Vec::with_capacity(128);
+            let mut size = PayloadOffset::default();
+
+            for chunk in chunks.iter() {
+                log::debug!(
+                    "Injecting chunk with {} padding (chunk size {})",
+                    HumanByte::from(chunk.padding),
+                    HumanByte::from(chunk.size()),
+                );
+                size = size.add(chunk.size());
+                chunk_list.push(chunk.clone());
+            }
+
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: chunk_list,
+                size: size.raw() as usize,
+            };
+
+            if let Some(sender) = self.forced_boundaries.as_mut() {
+                sender.send(inject_chunks)?;
+            } else {
+                bail!("missing injection queue");
+            };
+
+            injection_boundary = injection_boundary.add(size.raw());
+            log::debug!("Advance payload position by: {size:?}");
+            encoder.advance(size)?;
+        }
+
+        Ok(())
+    }
+
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
         previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
         dir: Dir,
-        dir_name: &CStr,
+        c_dir_name: &CStr,
         metadata: &Metadata,
         stat: &FileStat,
     ) -> Result<(), Error> {
-        let dir_name = OsStr::from_bytes(dir_name.to_bytes());
+        let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
 
-        encoder.create_directory(dir_name, metadata).await?;
+        if !self.caching_enabled {
+            if let Some(ref catalog) = self.catalog {
+                catalog.lock().unwrap().start_directory(c_dir_name)?;
+            }
+            encoder.create_directory(dir_name, metadata).await?;
+        }
 
         let old_fs_magic = self.fs_magic;
         let old_fs_feature_flags = self.fs_feature_flags;
@@ -809,7 +1223,17 @@ impl Archiver {
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish().await?;
+        if !self.caching_enabled {
+            encoder.finish().await?;
+            if let Some(ref catalog) = self.catalog {
+                if !self.caching_enabled {
+                    catalog.lock().unwrap().end_directory()?;
+                }
+            }
+        } else {
+            self.cached_entries.push(CacheEntry::DirEnd);
+        }
+
         result
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list