[pbs-devel] [RFC v2 proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Mar 13 12:12:00 CET 2024


On March 5, 2024 10:27 am, Christian Ebner wrote:
> Add the final glue logic to enable the look-ahead caching and
> metadata comparison introduced in the preparatory patches.

I have to say the call stacks here are not getting easier to follow with
all the intermingled caching_enabled logic...

create_archive
-> archive_dir_contents
--> loop over files -> add_entry
--->.add_entry_to_archive or flush cache and cache or
add_entry_to_archive
-> flush_cached_to_archive
-> encoder.finish

maybe it does get a bit disentangled or easier if
add_entry/flush_entry_to_archive are merged?

> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - fix pxar exclude cli entry caching
> 
>  pbs-client/src/pxar/create.rs | 121 +++++++++++++++++++++++++++++++---
>  1 file changed, 113 insertions(+), 8 deletions(-)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index b2ce898f..bb4597bc 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -32,10 +32,14 @@ use pbs_datastore::dynamic_index::{
>  };
>  
>  use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
>  use crate::pxar::metadata::errno_is_unsupported;
>  use crate::pxar::tools::assert_single_path_component;
>  use crate::pxar::Flags;
>  
> +const MAX_CACHE_SIZE: usize = 512;
> +const CACHED_PAYLOAD_THRESHOLD: u64 = 2 * 1024 * 1024;
> +
>  #[derive(Default)]
>  struct ReusedChunks {
>      start_boundary: PayloadOffset,
> @@ -253,6 +257,9 @@ struct Archiver {
>      reused_chunks: ReusedChunks,
>      previous_payload_index: Option<DynamicIndexReader>,
>      forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> +    cached_entries: Vec<CacheEntry>,
> +    caching_enabled: bool,
> +    cached_payload_size: u64,

you can probably already guess ;) this should be combined/refactored
into some common "re-use enabled" struct.

>  }
>  
>  type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -335,16 +342,32 @@ where
>          reused_chunks: ReusedChunks::new(),
>          previous_payload_index,
>          forced_boundaries,
> +        cached_entries: Vec::new(),
> +        caching_enabled: false,
> +        cached_payload_size: 0,
>      };
>  
>      archiver
>          .archive_dir_contents(&mut encoder, accessor, source_dir, true)
>          .await?;
> +
> +    if let Some(last) = archiver.cached_entries.pop() {
> +        match last {
> +            // do not close final directory, this is done by the caller
> +            CacheEntry::DirEnd => {}
> +            _ => archiver.cached_entries.push(last),
> +        }
> +    }

    // do not close final directory, this is done by the caller
    if let Some(CacheEntry::DirEnd) = archiver.cached_entries.last() {
        archiver.cached_entries.pop();
    }
 
should do the same but a bit cheaper / easier to read.

but - "caller" is kind of misleading here, right? because it's not the
caller of `create_archive` that handles the top-level DirEnd, it's  the
call to `encoder.finish()` right below..

it kinda seems like it would be an error to end up here with some other
last cached element? if so, then maybe it would make sense to make this
an invariant instead:

match archiver.cached_entries.pop() {
    Some(CachEntry::DirEnd) | None => { // OK },
    Some(entry) => { bail!("Finished creating archive with cache, but
    last cache element is {entry:?} instead of top-level directory end
    marker."); },
}

OTOH, it's archive_dir_contents itself that adds that entry, and it
knows whether it is called for the top-level dir or not, so it could
just skip adding it in the first place in the root case?

> +
> +    archiver
> +        .flush_cached_to_archive(&mut encoder, true, false)
> +        .await?;
> +
>      encoder.finish().await?;
>      Ok(())
>  }
>  
> -struct FileListEntry {
> +pub(crate) struct FileListEntry {
>      name: CString,
>      path: PathBuf,
>      stat: FileStat,
> @@ -396,8 +419,15 @@ impl Archiver {
>                  let file_name = file_entry.name.to_bytes();
>  
>                  if is_root && file_name == b".pxarexclude-cli" {
> -                    self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
> -                        .await?;
> +                    if self.caching_enabled {
> +                        self.cached_entries.push(CacheEntry::PxarExcludeCliEntry(
> +                            file_entry,
> +                            old_patterns_count,
> +                        ));
> +                    } else {
> +                        self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
> +                            .await?;
> +                    }
>                      continue;
>                  }
>  
> @@ -413,6 +443,11 @@ impl Archiver {
>                  .await
>                  .map_err(|err| self.wrap_err(err))?;
>              }
> +
> +            if self.caching_enabled {
> +                self.cached_entries.push(CacheEntry::DirEnd);
> +            }
> +
>              self.path = old_path;
>              self.entry_counter = entry_counter;
>              self.patterns.truncate(old_patterns_count);
> @@ -693,8 +728,6 @@ impl Archiver {
>          c_file_name: &CStr,
>          stat: &FileStat,
>      ) -> Result<(), Error> {
> -        use pxar::format::mode;
> -
>          let file_mode = stat.st_mode & libc::S_IFMT;
>          let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
>              OFlag::empty()
> @@ -732,6 +765,71 @@ impl Archiver {
>              self.skip_e2big_xattr,
>          )?;
>  
> +        if self.previous_payload_index.is_none() {
> +            return self
> +                .add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
> +                .await;
> +        }
> +
> +        // Avoid having to many open file handles in cached entries
> +        if self.cached_entries.len() > MAX_CACHE_SIZE {
> +            self.flush_cached_to_archive(encoder, false, true).await?;
> +        }
> +
> +        if metadata.is_regular_file() {
> +            self.cache_or_flush_entries(encoder, accessor, c_file_name, stat, fd, &metadata)
> +                .await
> +        } else {
> +            if self.caching_enabled {
> +                if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
> +                    let fd_clone = fd.try_clone()?;
> +                    let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
> +                        fd,
> +                        c_file_name.into(),
> +                        stat.clone(),
> +                        metadata.clone(),
> +                        PayloadOffset::default(),
> +                    ));
> +                    self.cached_entries.push(cache_entry);
> +
> +                    let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
> +                    self.add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
> +                        .await?;
> +
> +                    if let Some(ref catalog) = self.catalog {
> +                        if !self.caching_enabled {
> +                            catalog.lock().unwrap().end_directory()?;
> +                        }
> +                    }
> +                } else {
> +                    let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> +                        fd,
> +                        c_file_name.into(),
> +                        stat.clone(),
> +                        metadata,
> +                        PayloadOffset::default(),
> +                    ));
> +                    self.cached_entries.push(cache_entry);
> +                }
> +                Ok(())
> +            } else {
> +                self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
> +                    .await
> +            }
> +        }
> +    }
> +
> +    async fn add_entry_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +    ) -> Result<(), Error> {
> +        use pxar::format::mode;
> +
>          let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
>          match metadata.file_type() {
>              mode::IFREG => {
> @@ -781,7 +879,9 @@ impl Archiver {
>                      .add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
>                      .await;
>                  if let Some(ref catalog) = self.catalog {
> -                    catalog.lock().unwrap().end_directory()?;
> +                    if !self.caching_enabled {
> +                        catalog.lock().unwrap().end_directory()?;
> +                    }
>                  }
>                  result
>              }
> @@ -1132,7 +1232,9 @@ impl Archiver {
>      ) -> Result<(), Error> {
>          let dir_name = OsStr::from_bytes(dir_name.to_bytes());
>  
> -        encoder.create_directory(dir_name, metadata).await?;
> +        if !self.caching_enabled {
> +            encoder.create_directory(dir_name, metadata).await?;
> +        }
>  
>          let old_fs_magic = self.fs_magic;
>          let old_fs_feature_flags = self.fs_feature_flags;
> @@ -1172,7 +1274,10 @@ impl Archiver {
>          self.fs_feature_flags = old_fs_feature_flags;
>          self.current_st_dev = old_st_dev;
>  
> -        encoder.finish().await?;
> +        if !self.caching_enabled {
> +            encoder.finish().await?;
> +        }
> +
>          result
>      }
>  
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list