[pbs-devel] [PATCH v8 proxmox-backup 50/69] fix #3174: client: pxar: enable caching and meta comparison
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Jun 4 13:50:36 CEST 2024
On May 28, 2024 11:42 am, Christian Ebner wrote:
> When walking the file system tree, check for each entry if it is
> reusable, meaning that the metadata did not change and the payload
> chunks can be reindexed instead of reencoding the whole data.
>
> If the metadata matched, the range of the dynamic index entries for
> that file are looked up in the previous payload data index.
> Use the range and possible padding introduced by partial reuse of
> chunks to decide whether to reuse the dynamic entries and encode
> the file payloads as payload reference right away or cache the entry
> for now and keep looking ahead.
>
> If however a non-reusable (because changed) entry is encountered
> before the padding threshold is reached, the entries on the cache are
> flushed to the archive by reencoding them, resetting the cached state.
>
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are injected
> into the backup stream by sending them to the chunker via a dedicated
> channel, forcing a chunk boundary and inserting the chunks.
>
> If the threshold value for reuse is reached, the chunks are injected
> in the payload stream and the references with the corresponding
> offsets encoded in the metadata stream.
>
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed, by keeping back
> the last chunk, so following files might as well reuse that same
> chunk without double indexing it. It is assured that this chunk is
> injected in the stream also in case that the following lookups lead to
> a cache clear and reencoding.
>
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> changes since version 6:
> - use PxarLookaheadCache and its provided methods
> - refactoring removing some unnecessary methods to improve readability
>
> pbs-client/src/pxar/create.rs | 387 +++++++++++++++++++++++++++++++---
> 1 file changed, 360 insertions(+), 27 deletions(-)
>
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 04c89b453..f044dd1e6 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
> use proxmox_sys::error::SysError;
> use pxar::accessor::aio::{Accessor, Directory};
> use pxar::accessor::ReadAt;
> -use pxar::encoder::{LinkOffset, SeqWrite};
> +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
> use pxar::{EntryKind, Metadata, PxarVariant};
>
> +use proxmox_human_byte::HumanByte;
> use proxmox_io::vec;
> use proxmox_lang::c_str;
> use proxmox_sys::fs::{self, acl, xattr};
> @@ -33,10 +34,13 @@ use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::index::IndexFile;
>
> use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData, PxarLookaheadCache};
> use crate::pxar::metadata::errno_is_unsupported;
> use crate::pxar::tools::assert_single_path_component;
> use crate::pxar::Flags;
>
> +const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
> +
> /// Pxar options for creating a pxar archive/stream
> #[derive(Default)]
> pub struct PxarCreateOptions {
> @@ -154,6 +158,7 @@ struct Archiver {
> skip_e2big_xattr: bool,
> forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
> previous_payload_index: Option<DynamicIndexReader>,
> + cache: PxarLookaheadCache,
> }
>
> type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -207,6 +212,7 @@ where
> set.insert(stat.st_dev);
> }
>
> + let metadata_mode = options.previous_ref.is_some() && writers.archive.payload().is_some();
> let mut encoder = Encoder::new(writers.archive, &metadata).await?;
>
> let mut patterns = options.patterns;
> @@ -245,11 +251,19 @@ where
> skip_e2big_xattr: options.skip_e2big_xattr,
> forced_boundaries,
> previous_payload_index,
> + cache: PxarLookaheadCache::new(None),
> };
>
> archiver
> .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
> .await?;
> +
> + if metadata_mode {
> + archiver
> + .flush_cached_reusing_if_below_threshold(&mut encoder, false)
> + .await?;
> + }
> +
> encoder.finish().await?;
> encoder.close().await?;
>
> @@ -307,7 +321,10 @@ impl Archiver {
> for file_entry in file_list {
> let file_name = file_entry.name.to_bytes();
>
> - if is_root && file_name == b".pxarexclude-cli" {
> + if is_root
> + && file_name == b".pxarexclude-cli"
> + && previous_metadata_accessor.is_none()
> + {
> self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
> .await?;
> continue;
> @@ -610,8 +627,6 @@ impl Archiver {
> c_file_name: &CStr,
> stat: &FileStat,
> ) -> Result<(), Error> {
> - use pxar::format::mode;
> -
> let file_mode = stat.st_mode & libc::S_IFMT;
> let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
> OFlag::empty()
> @@ -649,6 +664,127 @@ impl Archiver {
> self.skip_e2big_xattr,
> )?;
>
> + if self.previous_payload_index.is_none() {
> + return self
> + .add_entry_to_archive(encoder, &mut None, c_file_name, stat, fd, &metadata, None)
> + .await;
> + }
> +
> + // Avoid having to many open file handles in cached entries
> + if self.cache.is_full() {
> + log::debug!("Max cache size reached, reuse cached entries");
> + self.flush_cached_reusing_if_below_threshold(encoder, true)
> + .await?;
> + }
> +
> + if metadata.is_regular_file() {
> + if stat.st_nlink > 1 {
> + let link_info = HardLinkInfo {
> + st_dev: stat.st_dev,
> + st_ino: stat.st_ino,
> + };
> + if self.cache.contains_hardlink(&link_info) {
> + // This hardlink has been seen by the lookahead cache already, put it on the cache
> + // with a dummy offset and continue without lookup and chunk injection.
> + // On flushing or re-encoding, the logic there will store the actual hardlink with
> + // offset.
> + if !self.cache.caching_enabled() {
> + // have regular file, get directory path
> + let mut path = self.path.clone();
> + path.pop();
> + self.cache.update_start_path(path);
> + }
> + self.cache.insert(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + PayloadOffset::default(),
> + );
see comment on patch introducing the cache helper
> + return Ok(());
> + } else {
> + // mark this hardlink as seen by the lookahead cache
> + self.cache.insert_hardlink(link_info);
> + }
> + }
> +
> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> + if let Some(payload_range) = self
> + .is_reusable_entry(previous_metadata, file_name, &metadata)
> + .await?
> + {
> + if !self.cache.try_extend_range(payload_range.clone()) {
> + log::debug!("Cache range has hole, new range: {payload_range:?}");
> + self.flush_cached_reusing_if_below_threshold(encoder, true)
> + .await?;
> + // range has to be set after flushing of cached entries, which resets the range
> + self.cache.update_range(payload_range.clone());
> + }
> +
> + // offset relative to start of current range, does not include possible padding of
> + // actual chunks, which needs to be added before encoding the payload reference
> + let offset =
> + PayloadOffset::default().add(payload_range.start - self.cache.range().start);
> + log::debug!("Offset relative to range start: {offset:?}");
> +
> + if !self.cache.caching_enabled() {
> + // have regular file, get directory path
> + let mut path = self.path.clone();
> + path.pop();
> + self.cache.update_start_path(path);
> + }
> + self.cache
> + .insert(fd, c_file_name.into(), *stat, metadata.clone(), offset);
see comment on patch introducing the cache helper
> + return Ok(());
> + }
> + } else if self.cache.caching_enabled() {
> + self.cache.insert(
> + fd.try_clone()?,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + PayloadOffset::default(),
> + );
> +
see comment on patch introducing the cache helper
> + if metadata.is_dir() {
> + self.add_directory(
> + encoder,
> + previous_metadata,
> + Dir::from_fd(fd.into_raw_fd())?,
> + c_file_name,
> + &metadata,
> + stat,
> + )
> + .await?;
> + }
> + return Ok(());
> + }
> +
> + self.encode_entries_to_archive(encoder, None).await?;
> + self.add_entry_to_archive(
> + encoder,
> + previous_metadata,
> + c_file_name,
> + stat,
> + fd,
> + &metadata,
> + None,
> + )
> + .await
> + }
> +
> + async fn add_entry_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
> + c_file_name: &CStr,
> + stat: &FileStat,
> + fd: OwnedFd,
> + metadata: &Metadata,
> + payload_offset: Option<PayloadOffset>,
> + ) -> Result<(), Error> {
> + use pxar::format::mode;
> +
> let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> match metadata.file_type() {
> mode::IFREG => {
> @@ -677,9 +813,14 @@ impl Archiver {
> .add_file(c_file_name, file_size, stat.st_mtime)?;
> }
>
> - let offset: LinkOffset = self
> - .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> - .await?;
> + let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
> + encoder
> + .add_payload_ref(metadata, file_name, file_size, payload_offset)
> + .await?
> + } else {
> + self.add_regular_file(encoder, fd, file_name, metadata, file_size)
> + .await?
> + };
>
> if stat.st_nlink > 1 {
> self.hardlinks
> @@ -690,50 +831,43 @@ impl Archiver {
> }
> mode::IFDIR => {
> let dir = Dir::from_fd(fd.into_raw_fd())?;
> - self.add_directory(
> - encoder,
> - previous_metadata,
> - dir,
> - c_file_name,
> - &metadata,
> - stat,
> - )
> - .await
> + self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
> + .await
> }
> mode::IFSOCK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_socket(c_file_name)?;
> }
>
> - Ok(encoder.add_socket(&metadata, file_name).await?)
> + Ok(encoder.add_socket(metadata, file_name).await?)
> }
> mode::IFIFO => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_fifo(c_file_name)?;
> }
>
> - Ok(encoder.add_fifo(&metadata, file_name).await?)
> + Ok(encoder.add_fifo(metadata, file_name).await?)
> }
> mode::IFLNK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_symlink(c_file_name)?;
> }
>
> - self.add_symlink(encoder, fd, file_name, &metadata).await
> + self.add_symlink(encoder, fd, file_name, metadata).await
> }
> mode::IFBLK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_block_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, stat).await
> + self.add_device(encoder, file_name, metadata, stat).await
> }
> mode::IFCHR => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_char_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, stat).await
> + self.add_device(encoder, file_name, metadata, stat).await
> }
> other => bail!(
> "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -743,6 +877,199 @@ impl Archiver {
> }
> }
>
> + async fn flush_cached_reusing_if_below_threshold<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + keep_last_chunk: bool,
> + ) -> Result<(), Error> {
> + if self.cache.range().is_empty() {
> + // only non regular file entries (e.g. directories) in cache, allows to do regular encoding
> + self.encode_entries_to_archive(encoder, None).await?;
> + return Ok(());
> + }
> +
> + // Take ownership of previous last chunk, only update where it must be injected
> + let mut prev_last_chunk = self.cache.take_last_chunk();
doesn't need to be mut
> + if let Some(ref ref_payload_index) = self.previous_payload_index {
and should probably be moved in here, since it doesn't make sense without a previous payload index?
> + let (mut indices, start_padding, end_padding) =
> + lookup_dynamic_entries(ref_payload_index, self.cache.range().clone())?;
if lookup_dynamic_entries would take a &Range
> + let mut padding = start_padding + end_padding;
> + let range = self.cache.range();
this could be moved up, and the clone above can be dropped
> + let total_size = (range.end - range.start) + padding;
> +
> + // take into account used bytes of kept back chunk for padding
> + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) {
this should then be first instead of first_mut, and as_ref instead of
as_mut
> + if last.digest() == first.digest() {
> + // Update padding used for threshold calculation only
> + let used = last.size() - last.padding;
> + padding -= used;
> + }
> + }
> +
> + let ratio = padding as f64 / total_size as f64;
> +
> + // do not reuse chunks if introduced padding higher than threshold
> + // opt for re-encoding in that case
> + if ratio > CHUNK_PADDING_THRESHOLD {
> + log::debug!(
> + "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> + HumanByte::from(padding),
> + HumanByte::from(total_size),
> + indices.len(),
> + );
> + self.cache.update_last_chunk(prev_last_chunk);
> + self.encode_entries_to_archive(encoder, None).await?;
> + } else {
> + log::debug!(
> + "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> + HumanByte::from(padding),
> + HumanByte::from(total_size),
> + indices.len(),
> + );
> +
> + // check for cases where kept back last is not equal first chunk because the range
> + // end aligned with a chunk boundary, and the chunks therefore needs to be injected
> + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) {
> + if last.digest() != first.digest() {
> + // make sure to inject previous last chunk before encoding entries
> + self.inject_chunks_at_current_payload_position(encoder, &[last])?;
> + } else {
> + let used = last.size() - last.padding;
> + first.padding -= used;
> + }
the else is (basically) the same as above, maybe you originally meant to
unify them? you could save the digest cmp result as well, and then just
have an `if foo { inject }` here?
> + }
> +
> + let base_offset = Some(encoder.payload_position()?.add(start_padding));
> + self.encode_entries_to_archive(encoder, base_offset).await?;
> +
> + if keep_last_chunk {
> + self.cache.update_last_chunk(indices.pop());
> + }
> +
> + self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?;
> + }
> +
> + Ok(())
> + } else {
> + bail!("cannot reuse chunks without previous index reader");
> + }
> + }
> +
> + // Take ownership of cached entries and encode them to the archive
> + // Encode with reused payload chunks when base offset is some, reencode otherwise
> + async fn encode_entries_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + base_offset: Option<PayloadOffset>,
> + ) -> Result<(), Error> {
> + if let Some(prev) = self.cache.take_last_chunk() {
> + // make sure to inject previous last chunk before encoding entries
> + self.inject_chunks_at_current_payload_position(encoder, &[prev])?;
> + }
> +
> + let old_path = self.path.clone();
> + self.path = self.cache.start_path().clone();
> +
> + // take ownership of cached entries and reset caching state
> + let entries = self.cache.take_and_reset();
see comment on patch introducing the cache helper
> + log::debug!(
> + "Got {} cache entries to encode: reuse is {}",
> + entries.len(),
> + base_offset.is_some()
> + );
> +
> + for entry in entries {
> + match entry {
> + CacheEntry::RegEntry(CacheEntryData {
> + fd,
> + c_file_name,
> + stat,
> + metadata,
> + payload_offset,
> + }) => {
> + let file_name = OsStr::from_bytes(c_file_name.to_bytes());
> + self.path.push(file_name);
> + self.add_entry_to_archive(
> + encoder,
> + &mut None,
> + &c_file_name,
> + &stat,
> + fd,
> + &metadata,
> + base_offset.map(|base_offset| payload_offset.add(base_offset.raw())),
> + )
> + .await?;
> + self.path.pop();
> + }
> + CacheEntry::DirEntry(CacheEntryData {
> + c_file_name,
> + metadata,
> + ..
> + }) => {
> + let file_name = OsStr::from_bytes(c_file_name.to_bytes());
> + self.path.push(file_name);
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(&c_file_name)?;
> + }
> + let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> + encoder.create_directory(dir_name, &metadata).await?;
> + }
> + CacheEntry::DirEnd => {
> + encoder.finish().await?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + self.path.pop();
> + }
> + }
> + }
> +
> + self.path = old_path;
> +
> + Ok(())
> + }
> +
> + fn inject_chunks_at_current_payload_position<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + reused_chunks: &[ReusableDynamicEntry],
this actually consumes the chunks/reusable entries, so it should
probably take the Vec, and not a slice?
> + ) -> Result<(), Error> {
> + let mut injection_boundary = encoder.payload_position()?;
> +
> + for chunks in reused_chunks.chunks(128) {
> + let mut chunk_list = Vec::with_capacity(128);
even though we still can't get away without copying here I think(?), but
this could be
let chunks = chunks.to_vec();
? it should never be worse than pushing single elements, but in the best
case it's better optimized (but maybe I am wrong? ;))
then we could drop the mut
> + let mut size = PayloadOffset::default();
> +
> + for chunk in chunks.iter() {
> + log::debug!(
> + "Injecting chunk with {} padding (chunk size {})",
> + HumanByte::from(chunk.padding),
> + HumanByte::from(chunk.size()),
> + );
> + size = size.add(chunk.size());
> + chunk_list.push(chunk.clone());
and drop the push
> + }
> +
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: chunk_list,
and drop the variable name here
although I wonder whether we could have done all this
calculation/logging earlier already via lookup_dynamic_entries? might be
an optimization for the future, to avoid iterating twice when using
to_vec
> + size: size.raw() as usize,
> + };
> +
> + if let Some(sender) = self.forced_boundaries.as_mut() {
> + sender.send(inject_chunks)?;
> + } else {
> + bail!("missing injection queue");
> + };
> +
> + injection_boundary = injection_boundary.add(size.raw());
> + log::debug!("Advance payload position by: {size:?}");
> + encoder.advance(size)?;
> + }
> +
> + Ok(())
> + }
> +
> async fn add_directory<T: SeqWrite + Send>(
> &mut self,
> encoder: &mut Encoder<'_, T>,
> @@ -754,10 +1081,12 @@ impl Archiver {
> ) -> Result<(), Error> {
> let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
>
> - if let Some(ref catalog) = self.catalog {
> - catalog.lock().unwrap().start_directory(c_dir_name)?;
> + if !self.cache.caching_enabled() {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(c_dir_name)?;
> + }
> + encoder.create_directory(dir_name, metadata).await?;
> }
> - encoder.create_directory(dir_name, metadata).await?;
>
> let old_fs_magic = self.fs_magic;
> let old_fs_feature_flags = self.fs_feature_flags;
> @@ -797,9 +1126,13 @@ impl Archiver {
> self.fs_feature_flags = old_fs_feature_flags;
> self.current_st_dev = old_st_dev;
>
> - encoder.finish().await?;
> - if let Some(ref catalog) = self.catalog {
> - catalog.lock().unwrap().end_directory()?;
> + if !self.cache.caching_enabled() {
> + encoder.finish().await?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + } else {
> + self.cache.insert_dir_end();
> }
>
> result
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list