[pbs-devel] [PATCH v8 proxmox-backup 50/69] fix #3174: client: pxar: enable caching and meta comparison

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Jun 4 13:50:36 CEST 2024


On May 28, 2024 11:42 am, Christian Ebner wrote:
> When walking the file system tree, check for each entry if it is
> reusable, meaning that the metadata did not change and the payload
> chunks can be reindexed instead of reencoding the whole data.
> 
> If the metadata matched, the range of the dynamic index entries for
> that file are looked up in the previous payload data index.
> Use the range and possible padding introduced by partial reuse of
> chunks to decide whether to reuse the dynamic entries and encode
> the file payloads as payload reference right away or cache the entry
> for now and keep looking ahead.
> 
> If however a non-reusable (because changed) entry is encountered
> before the padding threshold is reached, the entries on the cache are
> flushed to the archive by reencoding them, resetting the cached state.
> 
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are injected
> into the backup stream by sending them to the chunker via a dedicated
> channel, forcing a chunk boundary and inserting the chunks.
> 
> If the threshold value for reuse is reached, the chunks are injected
> in the payload stream and the references with the corresponding
> offsets encoded in the metadata stream.
> 
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed, by keeping back
> the last chunk, so following files might as well reuse that same
> chunk without double indexing it.  It is assured that this chunk is
> injected in the stream also in case that the following lookups lead to
> a cache clear and reencoding.
> 
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
> changes since version 6:
> - use PxarLookaheadCache and its provided methods
> - refactoring removing some unnecessary methods to improve readability
> 
>  pbs-client/src/pxar/create.rs | 387 +++++++++++++++++++++++++++++++---
>  1 file changed, 360 insertions(+), 27 deletions(-)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 04c89b453..f044dd1e6 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
>  use proxmox_sys::error::SysError;
>  use pxar::accessor::aio::{Accessor, Directory};
>  use pxar::accessor::ReadAt;
> -use pxar::encoder::{LinkOffset, SeqWrite};
> +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
>  use pxar::{EntryKind, Metadata, PxarVariant};
>  
> +use proxmox_human_byte::HumanByte;
>  use proxmox_io::vec;
>  use proxmox_lang::c_str;
>  use proxmox_sys::fs::{self, acl, xattr};
> @@ -33,10 +34,13 @@ use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::index::IndexFile;
>  
>  use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData, PxarLookaheadCache};
>  use crate::pxar::metadata::errno_is_unsupported;
>  use crate::pxar::tools::assert_single_path_component;
>  use crate::pxar::Flags;
>  
> +const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
> +
>  /// Pxar options for creating a pxar archive/stream
>  #[derive(Default)]
>  pub struct PxarCreateOptions {
> @@ -154,6 +158,7 @@ struct Archiver {
>      skip_e2big_xattr: bool,
>      forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>      previous_payload_index: Option<DynamicIndexReader>,
> +    cache: PxarLookaheadCache,
>  }
>  
>  type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -207,6 +212,7 @@ where
>          set.insert(stat.st_dev);
>      }
>  
> +    let metadata_mode = options.previous_ref.is_some() && writers.archive.payload().is_some();
>      let mut encoder = Encoder::new(writers.archive, &metadata).await?;
>  
>      let mut patterns = options.patterns;
> @@ -245,11 +251,19 @@ where
>          skip_e2big_xattr: options.skip_e2big_xattr,
>          forced_boundaries,
>          previous_payload_index,
> +        cache: PxarLookaheadCache::new(None),
>      };
>  
>      archiver
>          .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
>          .await?;
> +
> +    if metadata_mode {
> +        archiver
> +            .flush_cached_reusing_if_below_threshold(&mut encoder, false)
> +            .await?;
> +    }
> +
>      encoder.finish().await?;
>      encoder.close().await?;
>  
> @@ -307,7 +321,10 @@ impl Archiver {
>              for file_entry in file_list {
>                  let file_name = file_entry.name.to_bytes();
>  
> -                if is_root && file_name == b".pxarexclude-cli" {
> +                if is_root
> +                    && file_name == b".pxarexclude-cli"
> +                    && previous_metadata_accessor.is_none()
> +                {
>                      self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
>                          .await?;
>                      continue;
> @@ -610,8 +627,6 @@ impl Archiver {
>          c_file_name: &CStr,
>          stat: &FileStat,
>      ) -> Result<(), Error> {
> -        use pxar::format::mode;
> -
>          let file_mode = stat.st_mode & libc::S_IFMT;
>          let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
>              OFlag::empty()
> @@ -649,6 +664,127 @@ impl Archiver {
>              self.skip_e2big_xattr,
>          )?;
>  
> +        if self.previous_payload_index.is_none() {
> +            return self
> +                .add_entry_to_archive(encoder, &mut None, c_file_name, stat, fd, &metadata, None)
> +                .await;
> +        }
> +
> +        // Avoid having to many open file handles in cached entries
> +        if self.cache.is_full() {
> +            log::debug!("Max cache size reached, reuse cached entries");
> +            self.flush_cached_reusing_if_below_threshold(encoder, true)
> +                .await?;
> +        }
> +
> +        if metadata.is_regular_file() {
> +            if stat.st_nlink > 1 {
> +                let link_info = HardLinkInfo {
> +                    st_dev: stat.st_dev,
> +                    st_ino: stat.st_ino,
> +                };
> +                if self.cache.contains_hardlink(&link_info) {
> +                    // This hardlink has been seen by the lookahead cache already, put it on the cache
> +                    // with a dummy offset and continue without lookup and chunk injection.
> +                    // On flushing or re-encoding, the logic there will store the actual hardlink with
> +                    // offset.
> +                    if !self.cache.caching_enabled() {
> +                        // have regular file, get directory path
> +                        let mut path = self.path.clone();
> +                        path.pop();
> +                        self.cache.update_start_path(path);
> +                    }
> +                    self.cache.insert(
> +                        fd,
> +                        c_file_name.into(),
> +                        *stat,
> +                        metadata.clone(),
> +                        PayloadOffset::default(),
> +                    );

see comment on patch introducing the cache helper

> +                    return Ok(());
> +                } else {
> +                    // mark this hardlink as seen by the lookahead cache
> +                    self.cache.insert_hardlink(link_info);
> +                }
> +            }
> +
> +            let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> +            if let Some(payload_range) = self
> +                .is_reusable_entry(previous_metadata, file_name, &metadata)
> +                .await?
> +            {
> +                if !self.cache.try_extend_range(payload_range.clone()) {
> +                    log::debug!("Cache range has hole, new range: {payload_range:?}");
> +                    self.flush_cached_reusing_if_below_threshold(encoder, true)
> +                        .await?;
> +                    // range has to be set after flushing of cached entries, which resets the range
> +                    self.cache.update_range(payload_range.clone());
> +                }
> +
> +                // offset relative to start of current range, does not include possible padding of
> +                // actual chunks, which needs to be added before encoding the payload reference
> +                let offset =
> +                    PayloadOffset::default().add(payload_range.start - self.cache.range().start);
> +                log::debug!("Offset relative to range start: {offset:?}");
> +
> +                if !self.cache.caching_enabled() {
> +                    // have regular file, get directory path
> +                    let mut path = self.path.clone();
> +                    path.pop();
> +                    self.cache.update_start_path(path);
> +                }
> +                self.cache
> +                    .insert(fd, c_file_name.into(), *stat, metadata.clone(), offset);

see comment on patch introducing the cache helper

> +                return Ok(());
> +            }
> +        } else if self.cache.caching_enabled() {
> +            self.cache.insert(
> +                fd.try_clone()?,
> +                c_file_name.into(),
> +                *stat,
> +                metadata.clone(),
> +                PayloadOffset::default(),
> +            );
> +

see comment on patch introducing the cache helper

> +            if metadata.is_dir() {
> +                self.add_directory(
> +                    encoder,
> +                    previous_metadata,
> +                    Dir::from_fd(fd.into_raw_fd())?,
> +                    c_file_name,
> +                    &metadata,
> +                    stat,
> +                )
> +                .await?;
> +            }
> +            return Ok(());
> +        }
> +
> +        self.encode_entries_to_archive(encoder, None).await?;
> +        self.add_entry_to_archive(
> +            encoder,
> +            previous_metadata,
> +            c_file_name,
> +            stat,
> +            fd,
> +            &metadata,
> +            None,
> +        )
> +        .await
> +    }
> +
> +    async fn add_entry_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +        payload_offset: Option<PayloadOffset>,
> +    ) -> Result<(), Error> {
> +        use pxar::format::mode;
> +
>          let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
>          match metadata.file_type() {
>              mode::IFREG => {
> @@ -677,9 +813,14 @@ impl Archiver {
>                          .add_file(c_file_name, file_size, stat.st_mtime)?;
>                  }
>  
> -                let offset: LinkOffset = self
> -                    .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> -                    .await?;
> +                let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
> +                    encoder
> +                        .add_payload_ref(metadata, file_name, file_size, payload_offset)
> +                        .await?
> +                } else {
> +                    self.add_regular_file(encoder, fd, file_name, metadata, file_size)
> +                        .await?
> +                };
>  
>                  if stat.st_nlink > 1 {
>                      self.hardlinks
> @@ -690,50 +831,43 @@ impl Archiver {
>              }
>              mode::IFDIR => {
>                  let dir = Dir::from_fd(fd.into_raw_fd())?;
> -                self.add_directory(
> -                    encoder,
> -                    previous_metadata,
> -                    dir,
> -                    c_file_name,
> -                    &metadata,
> -                    stat,
> -                )
> -                .await
> +                self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
> +                    .await
>              }
>              mode::IFSOCK => {
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().add_socket(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_socket(&metadata, file_name).await?)
> +                Ok(encoder.add_socket(metadata, file_name).await?)
>              }
>              mode::IFIFO => {
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().add_fifo(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_fifo(&metadata, file_name).await?)
> +                Ok(encoder.add_fifo(metadata, file_name).await?)
>              }
>              mode::IFLNK => {
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().add_symlink(c_file_name)?;
>                  }
>  
> -                self.add_symlink(encoder, fd, file_name, &metadata).await
> +                self.add_symlink(encoder, fd, file_name, metadata).await
>              }
>              mode::IFBLK => {
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().add_block_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, stat).await
> +                self.add_device(encoder, file_name, metadata, stat).await
>              }
>              mode::IFCHR => {
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().add_char_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, stat).await
> +                self.add_device(encoder, file_name, metadata, stat).await
>              }
>              other => bail!(
>                  "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -743,6 +877,199 @@ impl Archiver {
>          }
>      }
>  
> +    async fn flush_cached_reusing_if_below_threshold<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        keep_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        if self.cache.range().is_empty() {
> +            // only non regular file entries (e.g. directories) in cache, allows to do regular encoding
> +            self.encode_entries_to_archive(encoder, None).await?;
> +            return Ok(());
> +        }
> +
> +        // Take ownership of previous last chunk, only update where it must be injected
> +        let mut prev_last_chunk = self.cache.take_last_chunk();

doesn't need to be mut

> +        if let Some(ref ref_payload_index) = self.previous_payload_index {

and should probably be moved in here, since it doesn't make sense without a previous payload index?

> +            let (mut indices, start_padding, end_padding) =
> +                lookup_dynamic_entries(ref_payload_index, self.cache.range().clone())?;

if lookup_dynamic_entries would take a &Range

> +            let mut padding = start_padding + end_padding;
> +            let range = self.cache.range();

this could be moved up, and the clone above can be dropped

> +            let total_size = (range.end - range.start) + padding;
> +
> +            // take into account used bytes of kept back chunk for padding
> +            if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) {

this should then be first instead of first_mut, and as_ref instead of
as_mut

> +                if last.digest() == first.digest() {
> +                    // Update padding used for threshold calculation only
> +                    let used = last.size() - last.padding;
> +                    padding -= used;
> +                }
> +            }
> +
> +            let ratio = padding as f64 / total_size as f64;
> +
> +            // do not reuse chunks if introduced padding higher than threshold
> +            // opt for re-encoding in that case
> +            if ratio > CHUNK_PADDING_THRESHOLD {
> +                log::debug!(
> +                    "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> +                    HumanByte::from(padding),
> +                    HumanByte::from(total_size),
> +                    indices.len(),
> +                );
> +                self.cache.update_last_chunk(prev_last_chunk);
> +                self.encode_entries_to_archive(encoder, None).await?;
> +            } else {
> +                log::debug!(
> +                    "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> +                    HumanByte::from(padding),
> +                    HumanByte::from(total_size),
> +                    indices.len(),
> +                );
> +
> +                // check for cases where kept back last is not equal first chunk because the range
> +                // end aligned with a chunk boundary, and the chunks therefore needs to be injected
> +                if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) {
> +                    if last.digest() != first.digest() {
> +                        // make sure to inject previous last chunk before encoding entries
> +                        self.inject_chunks_at_current_payload_position(encoder, &[last])?;
> +                    } else {
> +                        let used = last.size() - last.padding;
> +                        first.padding -= used;
> +                    }

the else is (basically) the same as above, maybe you originally meant to
unify them? you could save the digest cmp result as well, and then just
have an `if foo { inject }` here?

> +                }
> +
> +                let base_offset = Some(encoder.payload_position()?.add(start_padding));
> +                self.encode_entries_to_archive(encoder, base_offset).await?;
> +
> +                if keep_last_chunk {
> +                    self.cache.update_last_chunk(indices.pop());
> +                }
> +
> +                self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?;
> +            }
> +
> +            Ok(())
> +        } else {
> +            bail!("cannot reuse chunks without previous index reader");
> +        }
> +    }
> +
> +    // Take ownership of cached entries and encode them to the archive
> +    // Encode with reused payload chunks when base offset is some, reencode otherwise
> +    async fn encode_entries_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        base_offset: Option<PayloadOffset>,
> +    ) -> Result<(), Error> {
> +        if let Some(prev) = self.cache.take_last_chunk() {
> +            // make sure to inject previous last chunk before encoding entries
> +            self.inject_chunks_at_current_payload_position(encoder, &[prev])?;
> +        }
> +
> +        let old_path = self.path.clone();
> +        self.path = self.cache.start_path().clone();
> +
> +        // take ownership of cached entries and reset caching state
> +        let entries = self.cache.take_and_reset();

see comment on patch introducing the cache helper

> +        log::debug!(
> +            "Got {} cache entries to encode: reuse is {}",
> +            entries.len(),
> +            base_offset.is_some()
> +        );
> +
> +        for entry in entries {
> +            match entry {
> +                CacheEntry::RegEntry(CacheEntryData {
> +                    fd,
> +                    c_file_name,
> +                    stat,
> +                    metadata,
> +                    payload_offset,
> +                }) => {
> +                    let file_name = OsStr::from_bytes(c_file_name.to_bytes());
> +                    self.path.push(file_name);
> +                    self.add_entry_to_archive(
> +                        encoder,
> +                        &mut None,
> +                        &c_file_name,
> +                        &stat,
> +                        fd,
> +                        &metadata,
> +                        base_offset.map(|base_offset| payload_offset.add(base_offset.raw())),
> +                    )
> +                    .await?;
> +                    self.path.pop();
> +                }
> +                CacheEntry::DirEntry(CacheEntryData {
> +                    c_file_name,
> +                    metadata,
> +                    ..
> +                }) => {
> +                    let file_name = OsStr::from_bytes(c_file_name.to_bytes());
> +                    self.path.push(file_name);
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().start_directory(&c_file_name)?;
> +                    }
> +                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> +                    encoder.create_directory(dir_name, &metadata).await?;
> +                }
> +                CacheEntry::DirEnd => {
> +                    encoder.finish().await?;
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().end_directory()?;
> +                    }
> +                    self.path.pop();
> +                }
> +            }
> +        }
> +
> +        self.path = old_path;
> +
> +        Ok(())
> +    }
> +
> +    fn inject_chunks_at_current_payload_position<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        reused_chunks: &[ReusableDynamicEntry],

this actually consumes the chunks/reusable entries, so it should
probably take the Vec, and not a slice?

> +    ) -> Result<(), Error> {
> +        let mut injection_boundary = encoder.payload_position()?;
> +
> +        for chunks in reused_chunks.chunks(128) {
> +            let mut chunk_list = Vec::with_capacity(128);

even though we still can't get away without copying here I think(?), but
this could be 

let chunks = chunks.to_vec();

? it should never be worse than pushing single elements, but in the best
case it's better optimized (but maybe I am wrong? ;))

then we could drop the mut

> +            let mut size = PayloadOffset::default();
> +
> +            for chunk in chunks.iter() {
> +                log::debug!(
> +                    "Injecting chunk with {} padding (chunk size {})",
> +                    HumanByte::from(chunk.padding),
> +                    HumanByte::from(chunk.size()),
> +                );
> +                size = size.add(chunk.size());
> +                chunk_list.push(chunk.clone());

and drop the push

> +            }
> +
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: chunk_list,

and drop the variable name here

although I wonder whether we could have done all this
calculation/logging earlier already via lookup_dynamic_entries? might be
an optimization for the future, to avoid iterating twice when using
to_vec

> +                size: size.raw() as usize,
> +            };
> +
> +            if let Some(sender) = self.forced_boundaries.as_mut() {
> +                sender.send(inject_chunks)?;
> +            } else {
> +                bail!("missing injection queue");
> +            };
> +
> +            injection_boundary = injection_boundary.add(size.raw());
> +            log::debug!("Advance payload position by: {size:?}");
> +            encoder.advance(size)?;
> +        }
> +
> +        Ok(())
> +    }
> +
>      async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> @@ -754,10 +1081,12 @@ impl Archiver {
>      ) -> Result<(), Error> {
>          let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
>  
> -        if let Some(ref catalog) = self.catalog {
> -            catalog.lock().unwrap().start_directory(c_dir_name)?;
> +        if !self.cache.caching_enabled() {
> +            if let Some(ref catalog) = self.catalog {
> +                catalog.lock().unwrap().start_directory(c_dir_name)?;
> +            }
> +            encoder.create_directory(dir_name, metadata).await?;
>          }
> -        encoder.create_directory(dir_name, metadata).await?;
>  
>          let old_fs_magic = self.fs_magic;
>          let old_fs_feature_flags = self.fs_feature_flags;
> @@ -797,9 +1126,13 @@ impl Archiver {
>          self.fs_feature_flags = old_fs_feature_flags;
>          self.current_st_dev = old_st_dev;
>  
> -        encoder.finish().await?;
> -        if let Some(ref catalog) = self.catalog {
> -            catalog.lock().unwrap().end_directory()?;
> +        if !self.cache.caching_enabled() {
> +            encoder.finish().await?;
> +            if let Some(ref catalog) = self.catalog {
> +                catalog.lock().unwrap().end_directory()?;
> +            }
> +        } else {
> +            self.cache.insert_dir_end();
>          }
>  
>          result
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list