[pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching

Christian Ebner c.ebner at proxmox.com
Tue Apr 9 16:53:05 CEST 2024


On 4/5/24 10:33, Fabian Grünbichler wrote:
> Quoting Christian Ebner (2024-03-28 13:36:56)
>> +    async fn cache_or_flush_entries<T: SeqWrite + Send>(
>> +        &mut self,
>> +        encoder: &mut Encoder<'_, T>,
>> +        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
>> +        c_file_name: &CStr,
>> +        stat: &FileStat,
>> +        fd: OwnedFd,
>> +        metadata: &Metadata,
>> +    ) -> Result<(), Error> {
>> +        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
>> +        let reusable = if let Some(accessor) = previous_metadata_accessor {
>> +            self.is_reusable_entry(accessor, file_name, stat, metadata)
>> +                .await?
>> +        } else {
>> +            None
>> +        };
>> +
>> +        let file_size = stat.st_size as u64;
> 
> couldn't we get this via is_reusable?
> 
>> +        if let Some(start_offset) = reusable {
>> +            if let Some(ref ref_payload_index) = self.previous_payload_index {
>> +                let payload_size = file_size + size_of::<pxar::format::Header>() as u64;
> 
> or better yet, get this here directly ;)
> 
>> +                let end_offset = start_offset + payload_size;
> 
> or better yet, this one here ;)
> 
>> +                let (indices, start_padding, end_padding) =
>> +                    lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;
> 
> or better yet, just return the Range in the payload archive? :)

Okay, yes.. changed is_reusable_entry to return the Range<u64> of the 
payload in the payload archive, calculated based on the offset and size 
as encoded in the archive, which is already available there.

> 
>> +
>> +                let boundary = encoder.payload_position()?;
>> +                let offset =
>> +                    self.reused_chunks
>> +                        .insert(indices, boundary, start_padding, end_padding);
>> +
>> +                self.caching_enabled = true;
>> +                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
>> +                    fd,
>> +                    c_file_name.into(),
>> +                    *stat,
>> +                    metadata.clone(),
>> +                    offset,
>> +                ));
>> +                self.cached_entries.push(cache_entry);
>> +
>> +                match self.reused_chunks.suggested() {
>> +                    Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
>> +                    Suggested::Reencode => {
>> +                        self.flush_cached_to_archive(encoder, false, true).await?
>> +                    }
>> +                    Suggested::CheckNext => {}
>> +                }
>> +
>> +                return Ok(());
>> +            }
>> +        }
>> +
>> +        self.flush_cached_to_archive(encoder, false, true).await?;
>> +        self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
>> +            .await
> 
> this part here is where I think we mishandle some edge cases, like mentioned in
> the ReusedChunks patch comments.. even keeping back the last chunk doesn't save
> us from losing some re-usable files sometimes..

Still need to have a closer look at what you mean exactly here... The 
code path itself should be fine I think, or am I missing your point here?

It is rather the match against the `suggested` (now called `action`) 
where the wrong decision might be made.


>> +        if let Some((padding, chunk)) = last_chunk {
>> +            // Make sure that we flush this chunk even on clear calls
>> +            self.reused_chunks.must_flush_first = true;
> 
> might make sense to rename this one to "must_flush_first_chunk", else the other
> call sites might be interpreted as "must flush (all) chunks first"
> 

Yes, updated this according to your suggestion

>> +            let _offset = self
>> +                .reused_chunks
>> +                .insert(vec![chunk], injection_boundary, padding, 0);
>> +        }
>> +
>> +        Ok(())
>> +    }
>> +
>> +    fn clear_cached_chunks<T: SeqWrite + Send>(
>> +        &mut self,
>> +        encoder: &mut Encoder<'_, T>,
>> +    ) -> Result<(), Error> {
>> +        let reused_chunks = std::mem::take(&mut self.reused_chunks);
>> +
>> +        if !reused_chunks.must_flush_first {
>> +            return Ok(());
>> +        }
>> +
>> +        // First chunk was kept back to avoid duplication but needs to be injected
>> +        let injection_boundary = reused_chunks.start_boundary();
>> +        let payload_writer_position = encoder.payload_position()?.raw();
>> +
>> +        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
>> +            bail!(
>> +                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
>> +                injection_boundary.raw()
>> +            );
>> +        }
>> +
>> +        if let Some((padding, chunk)) = reused_chunks.chunks.first() {
>> +            let size = PayloadOffset::default().add(chunk.size());
>> +            log::debug!(
>> +                "Injecting chunk with {} padding (chunk size {})",
>> +                HumanByte::from(*padding),
>> +                HumanByte::from(chunk.size()),
>> +            );
>> +            let inject_chunks = InjectChunks {
>> +                boundary: injection_boundary.raw(),
>> +                chunks: vec![chunk.clone()],
>> +                size: size.raw() as usize,
>> +            };
>> +
>> +            self.total_injected_size += size.raw();
>> +            self.total_injected_count += 1;
>> +            if *padding > 0 {
>> +                self.partial_chunks_count += 1;
>> +            }
>> +
>> +            if let Some(boundary) = self.forced_boundaries.as_mut() {
>> +                let mut boundary = boundary.lock().unwrap();
>> +                boundary.push_back(inject_chunks);
>> +            } else {
>> +                bail!("missing injection queue");
>> +            };
>> +            encoder.advance(size)?;
> 
> this part is basically the loop in flush_reused_chunks and could be de-duplicated in some fashion..

Yes, moved the common code into an additional helper function 
`inject_chunks_at_boundary`.





More information about the pbs-devel mailing list