[pbs-devel] [PATCH v6 proxmox-backup 47/65] fix #3174: client: pxar: enable caching and meta comparison

Dominik Csapak d.csapak at proxmox.com
Wed May 22 16:45:22 CEST 2024


hight level comment

this patch does many things and feels like it could be broken up more clearly

e.g. putting the catalog manipulation into add_directory could be it's own patch

some other comments inline:

On 5/14/24 12:34, Christian Ebner wrote:
> When walking the file system tree, check for each entry if it is
> reusable, meaning that the metadata did not change and the payload
> chunks can be reindexed instead of reencoding the whole data.
> 
> If the metadata matched, the range of the dynamic index entries for
> that file are looked up in the previous payload data index.
> Use the range and possible padding introduced by partial reuse of
> chunks to decide whether to reuse the dynamic entries and encode
> the file payloads as payload reference right away or cache the entry
> for now and keep looking ahead.
> 
> If however a non-reusable (because changed) entry is encountered
> before the padding threshold is reached, the entries on the cache are
> flushed to the archive by reencoding them, resetting the cached state.
> 
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are injected
> into the backup stream by sending them to the chunker via a dedicated
> channel, forcing a chunk boundary and inserting the chunks.
> 
> If the threshold value for reuse is reached, the chunks are injected
> in the payload stream and the references with the corresponding
> offsets encoded in the metadata stream.
> 
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed, by keeping back
> the last chunk, so following files might as well reuse that same
> chunk without double indexing it.  It is assured that this chunk is
> injected in the stream also in case that the following lookups lead to
> a cache clear and reencoding.
> 
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
>   pbs-client/src/pxar/create.rs | 494 +++++++++++++++++++++++++++++++---
>   1 file changed, 460 insertions(+), 34 deletions(-)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 7e6402de5..b2932c973 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
>   use proxmox_sys::error::SysError;
>   use pxar::accessor::aio::{Accessor, Directory};
>   use pxar::accessor::ReadAt;
> -use pxar::encoder::{LinkOffset, SeqWrite};
> +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
>   use pxar::{EntryKind, Metadata};
>   
> +use proxmox_human_byte::HumanByte;
>   use proxmox_io::vec;
>   use proxmox_lang::c_str;
>   use proxmox_sys::fs::{self, acl, xattr};
> @@ -33,10 +34,14 @@ use pbs_datastore::dynamic_index::DynamicIndexReader;
>   use pbs_datastore::index::IndexFile;
>   
>   use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
>   use crate::pxar::metadata::errno_is_unsupported;
>   use crate::pxar::tools::assert_single_path_component;
>   use crate::pxar::Flags;
>   
> +const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
> +const MAX_CACHE_SIZE: usize = 512;
> +
>   /// Pxar options for creating a pxar archive/stream
>   #[derive(Default)]
>   pub struct PxarCreateOptions {
> @@ -154,6 +159,11 @@ struct Archiver {
>       skip_e2big_xattr: bool,
>       forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>       previous_payload_index: Option<DynamicIndexReader>,
> +    cached_entries: Vec<CacheEntry>,
> +    cached_hardlinks: HashSet<HardLinkInfo>,
> +    cached_range: Range<u64>,
> +    cached_last_chunk: Option<ReusableDynamicEntry>,
> +    caching_enabled: bool,

could this be better if those were in an optional struct?

e.g.

cache: Option<PxarCreateCache>,

not sure if it makes the code easier to read or not...

>   }
>   
>   type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -213,6 +223,8 @@ where
>           set.insert(stat.st_dev);
>       }
>   
> +    let metadata_mode = options.previous_ref.is_some() && writers.payload_writer.is_some();
> +
>       let mut encoder = Encoder::new(
>           &mut writers.writer,
>           &metadata,
> @@ -256,11 +268,23 @@ where
>           skip_e2big_xattr: options.skip_e2big_xattr,
>           forced_boundaries,
>           previous_payload_index,
> +        cached_entries: Vec::new(),
> +        cached_range: Range::default(),
> +        cached_last_chunk: None,
> +        cached_hardlinks: HashSet::new(),
> +        caching_enabled: false,
>       };
>   
>       archiver
>           .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
>           .await?;
> +
> +    if metadata_mode {
> +        archiver
> +            .flush_cached_reusing_if_below_threshold(&mut encoder, false)
> +            .await?;
> +    }
> +
>       encoder.finish().await?;
>       encoder.close().await?;
>   
> @@ -318,7 +342,10 @@ impl Archiver {
>               for file_entry in file_list {
>                   let file_name = file_entry.name.to_bytes();
>   
> -                if is_root && file_name == b".pxarexclude-cli" {
> +                if is_root
> +                    && file_name == b".pxarexclude-cli"
> +                    && previous_metadata_accessor.is_none()
> +                {

IIUC we don't encode a '.pxarexclude-cli' when there is a previous ref ?
does that really make sense

>                       self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
>                           .await?;
>                       continue;
> @@ -336,6 +363,7 @@ impl Archiver {
>                   .await
>                   .map_err(|err| self.wrap_err(err))?;
>               }
> +
>               self.path = old_path;
>               self.entry_counter = entry_counter;
>               self.patterns.truncate(old_patterns_count);
> @@ -618,8 +646,6 @@ impl Archiver {
>           c_file_name: &CStr,
>           stat: &FileStat,
>       ) -> Result<(), Error> {
> -        use pxar::format::mode;
> -
>           let file_mode = stat.st_mode & libc::S_IFMT;
>           let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
>               OFlag::empty()
> @@ -657,6 +683,96 @@ impl Archiver {
>               self.skip_e2big_xattr,
>           )?;
>   
> +        if self.previous_payload_index.is_none() {
> +            return self
> +                .add_entry_to_archive(
> +                    encoder,
> +                    previous_metadata,
> +                    c_file_name,
> +                    stat,
> +                    fd,
> +                    &metadata,
> +                    None,
> +                )
> +                .await;
> +        }
> +
> +        // Avoid having to many open file handles in cached entries
> +        if self.cached_entries.len() > MAX_CACHE_SIZE {
> +            log::debug!("Max cache size reached, reuse cached entries");
> +            self.flush_cached_reusing_if_below_threshold(encoder, true)
> +                .await?;
> +        }
> +
> +        if metadata.is_regular_file() {
> +            self.cache_or_flush_entries(
> +                encoder,
> +                previous_metadata,
> +                c_file_name,
> +                stat,
> +                fd,
> +                &metadata,
> +            )
> +            .await
> +        } else if self.caching_enabled {
> +            if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
> +                let fd_clone = fd.try_clone()?;
> +                let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
> +                    fd,
> +                    c_file_name.into(),
> +                    *stat,
> +                    metadata.clone(),
> +                    PayloadOffset::default(),
> +                ));
> +                self.cached_entries.push(cache_entry);
> +
> +                let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
> +                self.add_directory(
> +                    encoder,
> +                    previous_metadata,
> +                    dir,
> +                    c_file_name,
> +                    &metadata,
> +                    stat,
> +                )
> +                .await?;
> +            } else {
> +                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> +                    fd,
> +                    c_file_name.into(),
> +                    *stat,
> +                    metadata,
> +                    PayloadOffset::default(),
> +                ));
> +                self.cached_entries.push(cache_entry);
> +            }
> +            Ok(())
> +        } else {
> +            self.add_entry_to_archive(
> +                encoder,
> +                previous_metadata,
> +                c_file_name,
> +                stat,
> +                fd,
> +                &metadata,
> +                None,
> +            )
> +            .await
> +        }

a few questions here:

'caching_enabled' is IMHO a bit misleading as this is only used for non file entries?

personally i find the split handling of caching vs flushing here and in cache_or_flush_entries
a bit confusing:

hardlinks/files are handled there, while dirs and all other filetypes are handled here?
why ?

i'd rather have all types handled there, wihch would make the code here much more readable
and having the code for all types there should also not be that ugly

> +    }
> +
> +    async fn add_entry_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +        payload_offset: Option<PayloadOffset>,
> +    ) -> Result<(), Error> {
> +        use pxar::format::mode;
> +
>           let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
>           match metadata.file_type() {
>               mode::IFREG => {
> @@ -685,9 +801,14 @@ impl Archiver {
>                           .add_file(c_file_name, file_size, stat.st_mtime)?;
>                   }
>   
> -                let offset: LinkOffset = self
> -                    .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> -                    .await?;
> +                let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
> +                    encoder
> +                        .add_payload_ref(metadata, file_name, file_size, payload_offset)
> +                        .await?
> +                } else {
> +                    self.add_regular_file(encoder, fd, file_name, metadata, file_size)
> +                        .await?
> +                };
>   
>                   if stat.st_nlink > 1 {
>                       self.hardlinks
> @@ -698,59 +819,43 @@ impl Archiver {
>               }
>               mode::IFDIR => {
>                   let dir = Dir::from_fd(fd.into_raw_fd())?;
> -
> -                if let Some(ref catalog) = self.catalog {
> -                    catalog.lock().unwrap().start_directory(c_file_name)?;
> -                }
> -                let result = self
> -                    .add_directory(
> -                        encoder,
> -                        previous_metadata,
> -                        dir,
> -                        c_file_name,
> -                        &metadata,
> -                        stat,
> -                    )
> -                    .await;
> -                if let Some(ref catalog) = self.catalog {
> -                    catalog.lock().unwrap().end_directory()?;
> -                }
> -                result
> +                self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
> +                    .await
>               }
>               mode::IFSOCK => {
>                   if let Some(ref catalog) = self.catalog {
>                       catalog.lock().unwrap().add_socket(c_file_name)?;
>                   }
>   
> -                Ok(encoder.add_socket(&metadata, file_name).await?)
> +                Ok(encoder.add_socket(metadata, file_name).await?)
>               }
>               mode::IFIFO => {
>                   if let Some(ref catalog) = self.catalog {
>                       catalog.lock().unwrap().add_fifo(c_file_name)?;
>                   }
>   
> -                Ok(encoder.add_fifo(&metadata, file_name).await?)
> +                Ok(encoder.add_fifo(metadata, file_name).await?)
>               }
>               mode::IFLNK => {
>                   if let Some(ref catalog) = self.catalog {
>                       catalog.lock().unwrap().add_symlink(c_file_name)?;
>                   }
>   
> -                self.add_symlink(encoder, fd, file_name, &metadata).await
> +                self.add_symlink(encoder, fd, file_name, metadata).await
>               }
>               mode::IFBLK => {
>                   if let Some(ref catalog) = self.catalog {
>                       catalog.lock().unwrap().add_block_device(c_file_name)?;
>                   }
>   
> -                self.add_device(encoder, file_name, &metadata, stat).await
> +                self.add_device(encoder, file_name, metadata, stat).await
>               }
>               mode::IFCHR => {
>                   if let Some(ref catalog) = self.catalog {
>                       catalog.lock().unwrap().add_char_device(c_file_name)?;
>                   }
>   
> -                self.add_device(encoder, file_name, &metadata, stat).await
> +                self.add_device(encoder, file_name, metadata, stat).await
>               }
>               other => bail!(
>                   "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -760,18 +865,329 @@ impl Archiver {
>           }
>       }
>   
> +    async fn cache_or_flush_entries<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +    ) -> Result<(), Error> {
> +        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> +        let reusable = if let Some(accessor) = previous_metadata_accessor {
> +            self.is_reusable_entry(accessor, file_name, metadata)
> +                .await?
> +        } else {
> +            None
> +        };

this part could be moved below the hardlink handling no?
because here we define 'reusable' but only need it further below

> +
> +        if stat.st_nlink > 1 {
> +            let link_info = HardLinkInfo {
> +                st_dev: stat.st_dev,
> +                st_ino: stat.st_ino,
> +            };
> +            if self.cached_hardlinks.contains(&link_info) {
> +                // This hardlink has been seen by the lookahead cache already, put it on the cache
> +                // with a dummy offset and continue without lookup and chunk injection.
> +                // On flushing or re-encoding, the logic there will store the actual hardlink with
> +                // offset.
> +                self.caching_enabled = true;
> +                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> +                    fd,
> +                    c_file_name.into(),
> +                    *stat,
> +                    metadata.clone(),
> +                    PayloadOffset::default(),
> +                ));
> +                self.cached_entries.push(cache_entry);
> +                return Ok(());
> +            } else {
> +                // mark this hardlink as seen by the lookahead cache
> +                self.cached_hardlinks.insert(link_info);
> +            }
> +        }
> +
> +        if let Some(payload_range) = reusable {

namely here.

also if we'd move the checking if previous_metadata_accessor 'is_some()'
into 'is_reusable_entry' we could inline the variable at all
making this function a bit shorter and easier to read

> +            // check for range continuation in payload archive
> +            if self.cached_range.end == 0 {
> +                // initialize first range to start and end with start of new range
> +                self.cached_range.start = payload_range.start;
> +                self.cached_range.end = payload_range.start;
> +            }
> +
> +            if self.cached_range.end == payload_range.start {
> +                self.cached_range.end = payload_range.end;
> +                log::debug!(
> +                    "Cache range continuation, new range: {:?}",
> +                    self.cached_range
> +                );
> +            } else {
> +                log::debug!("Cache range has hole, new range: {payload_range:?}");
> +                self.flush_cached_reusing_if_below_threshold(encoder, true)
> +                    .await?;
> +                // range has to be set after flushing of cached entries, which resets the range
> +                self.cached_range = payload_range.clone();
> +            }
> +
> +            // offset relative to start of current range, does not include possible padding of
> +            // actual chunks, which needs to be added before encoding the payload reference
> +            let offset =
> +                PayloadOffset::default().add(payload_range.start - self.cached_range.start);
> +            log::debug!("Offset relative to range start: {offset:?}");
> +
> +            self.caching_enabled = true;
> +            self.cached_entries
> +                .push(CacheEntry::RegEntry(CacheEntryData::new(
> +                    fd,
> +                    c_file_name.into(),
> +                    *stat,
> +                    metadata.clone(),
> +                    offset,
> +                )));
> +
> +            return Ok(());
> +        }
> +
> +        self.flush_cached_reencoding(encoder).await?;
> +        self.add_entry_to_archive(
> +            encoder,
> +            previous_metadata_accessor,
> +            c_file_name,
> +            stat,
> +            fd,
> +            metadata,
> +            None,
> +        )
> +        .await
> +    }
> +
> +    async fn flush_cached_reusing_if_below_threshold<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        keep_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        let mut prev_last_chunk = self.cached_last_chunk.take();
> +
> +        if self.cached_range.is_empty() {
> +            if let Some(prev) = prev_last_chunk {
> +                // make sure to inject previous last
> +                self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
> +            }
> +            // only non regular file entries (directories) in cache, allows to do regular encoding
> +            self.encode_entries_to_archive(encoder, None).await?;
> +            return Ok(());
> +        }
> +
> +        if let Some(ref ref_payload_index) = self.previous_payload_index {
> +            let (mut indices, start_padding, end_padding) =
> +                lookup_dynamic_entries(ref_payload_index, self.cached_range.clone())?;
> +            let mut padding = start_padding + end_padding;
> +            let total_size = (self.cached_range.end - self.cached_range.start) + padding;
> +
> +            // take into account used bytes of kept back chunk for padding
> +            if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) {
> +                if last.digest() == first.digest() {
> +                    // Update padding used for threshold calculation only
> +                    let used = last.size() - last.padding;
> +                    padding -= used;
> +                }
> +            }
> +
> +            let ratio = padding as f64 / total_size as f64;
> +
> +            if ratio > CHUNK_PADDING_THRESHOLD {
> +                log::debug!(
> +                    "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> +                    HumanByte::from(padding),
> +                    HumanByte::from(total_size),
> +                    indices.len(),
> +                );
> +                // do not reuse chunks if introduced padding higher than threshold
> +                // opt for re-encoding in that case
> +                if let Some(prev) = prev_last_chunk {
> +                    // make sure to inject previous last
> +                    self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;

this can be more directly written as `&[prev]` no need to allocate a vector for it

> +                }
> +                self.encode_entries_to_archive(encoder, None).await?;
> +            } else {
> +                log::debug!(
> +                    "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> +                    HumanByte::from(padding),
> +                    HumanByte::from(total_size),
> +                    indices.len(),
> +                );
> +
> +                // check for cases where kept back last is not equal first chunk because the range
> +                // end aligned with a chunk boundary, and the chunks therefore needs to be injected
> +                if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) {
> +                    if last.digest() != first.digest() {
> +                        // make sure to inject previous last
> +                        self.inject_chunks_at_current_payload_position(
> +                            encoder,
> +                            vec![last].as_slice(),

same here

> +                        )?;
> +                    } else {
> +                        let used = last.size() - last.padding;
> +                        first.padding -= used;
> +                    }
> +                }
> +
> +                let base_offset = Some(encoder.payload_position()?.add(start_padding));
> +                self.encode_entries_to_archive(encoder, base_offset).await?;
> +
> +                if keep_last_chunk {
> +                    self.cached_last_chunk = indices.pop();
> +                }
> +
> +                self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?;
> +            }
> +
> +            // clear range while keeping end for possible continuation if entries have been flushed
> +            // because the max cache size was reached
> +            self.cached_range = self.cached_range.end..self.cached_range.end;
> +            self.caching_enabled = false;
> +
> +            Ok(())
> +        } else {
> +            bail!("cannot reuse chunks without previous index reader");
> +        }
> +    }
> +
> +    // Clear the cache and reencode all cached entries
> +    // Make sure to inject a possibly kept back chunk from a previous chunk continuation attempt
> +    async fn flush_cached_reencoding<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +    ) -> Result<(), Error> {
> +        if let Some(prev) = self.cached_last_chunk.take() {
> +            // make sure to inject previous last
> +            self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;

and here too

> +        }
> +
> +        self.encode_entries_to_archive(encoder, None).await?;
> +
> +        self.cached_range = self.cached_range.end..self.cached_range.end;
> +        self.caching_enabled = false;
> +        Ok(())
> +    }
> +
> +    // Take ownership of cached entries and encode them to the archive
> +    // Encode with reused payload chunks when base offset is some, reencode otherwise
> +    async fn encode_entries_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        base_offset: Option<PayloadOffset>,
> +    ) -> Result<(), Error> {
> +        // take ownership of cached entries, leaving new empty cache behind
> +        let entries = std::mem::take(&mut self.cached_entries);
> +        log::debug!(
> +            "Got {} cache entries to encode: reuse is {}",
> +            entries.len(),
> +            base_offset.is_some()
> +        );
> +
> +        for entry in entries {
> +            match entry {
> +                CacheEntry::RegEntry(CacheEntryData {
> +                    fd,
> +                    c_file_name,
> +                    stat,
> +                    metadata,
> +                    payload_offset,
> +                }) => {
> +                    self.add_entry_to_archive(
> +                        encoder,
> +                        &mut None,
> +                        &c_file_name,
> +                        &stat,
> +                        fd,
> +                        &metadata,
> +                        base_offset.map(|base_offset| payload_offset.add(base_offset.raw())),
> +                    )
> +                    .await?
> +                }
> +                CacheEntry::DirEntry(CacheEntryData {
> +                    c_file_name,
> +                    metadata,
> +                    ..
> +                }) => {
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().start_directory(&c_file_name)?;
> +                    }
> +                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> +                    encoder.create_directory(dir_name, &metadata).await?;
> +                }
> +                CacheEntry::DirEnd => {
> +                    encoder.finish().await?;
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().end_directory()?;
> +                    }
> +                }
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn inject_chunks_at_current_payload_position<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        reused_chunks: &[ReusableDynamicEntry],
> +    ) -> Result<(), Error> {
> +        let mut injection_boundary = encoder.payload_position()?;
> +
> +        for chunks in reused_chunks.chunks(128) {
> +            let mut chunk_list = Vec::with_capacity(128);
> +            let mut size = PayloadOffset::default();
> +
> +            for chunk in chunks.iter() {
> +                log::debug!(
> +                    "Injecting chunk with {} padding (chunk size {})",
> +                    HumanByte::from(chunk.padding),
> +                    HumanByte::from(chunk.size()),
> +                );
> +                size = size.add(chunk.size());
> +                chunk_list.push(chunk.clone());
> +            }
> +
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: chunk_list,
> +                size: size.raw() as usize,
> +            };
> +
> +            if let Some(sender) = self.forced_boundaries.as_mut() {
> +                sender.send(inject_chunks)?;
> +            } else {
> +                bail!("missing injection queue");
> +            };
> +
> +            injection_boundary = injection_boundary.add(size.raw());
> +            log::debug!("Advance payload position by: {size:?}");
> +            encoder.advance(size)?;
> +        }
> +
> +        Ok(())
> +    }
> +
>       async fn add_directory<T: SeqWrite + Send>(
>           &mut self,
>           encoder: &mut Encoder<'_, T>,
>           previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
>           dir: Dir,
> -        dir_name: &CStr,
> +        c_dir_name: &CStr,
>           metadata: &Metadata,
>           stat: &FileStat,
>       ) -> Result<(), Error> {
> -        let dir_name = OsStr::from_bytes(dir_name.to_bytes());
> +        let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
>   
> -        encoder.create_directory(dir_name, metadata).await?;
> +        if !self.caching_enabled {
> +            if let Some(ref catalog) = self.catalog {
> +                catalog.lock().unwrap().start_directory(c_dir_name)?;
> +            }
> +            encoder.create_directory(dir_name, metadata).await?;
> +        }
>   
>           let old_fs_magic = self.fs_magic;
>           let old_fs_feature_flags = self.fs_feature_flags;
> @@ -811,7 +1227,17 @@ impl Archiver {
>           self.fs_feature_flags = old_fs_feature_flags;
>           self.current_st_dev = old_st_dev;
>   
> -        encoder.finish().await?;
> +        if !self.caching_enabled {
> +            encoder.finish().await?;
> +            if let Some(ref catalog) = self.catalog {
> +                if !self.caching_enabled {
> +                    catalog.lock().unwrap().end_directory()?;
> +                }
> +            }
> +        } else {
> +            self.cached_entries.push(CacheEntry::DirEnd);
> +        }
> +
>           result
>       }
>   





More information about the pbs-devel mailing list