[pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching
Christian Ebner
c.ebner at proxmox.com
Tue Apr 9 16:53:05 CEST 2024
On 4/5/24 10:33, Fabian Grünbichler wrote:
> Quoting Christian Ebner (2024-03-28 13:36:56)
>> + async fn cache_or_flush_entries<T: SeqWrite + Send>(
>> + &mut self,
>> + encoder: &mut Encoder<'_, T>,
>> + previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
>> + c_file_name: &CStr,
>> + stat: &FileStat,
>> + fd: OwnedFd,
>> + metadata: &Metadata,
>> + ) -> Result<(), Error> {
>> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
>> + let reusable = if let Some(accessor) = previous_metadata_accessor {
>> + self.is_reusable_entry(accessor, file_name, stat, metadata)
>> + .await?
>> + } else {
>> + None
>> + };
>> +
>> + let file_size = stat.st_size as u64;
>
> couldn't we get this via is_reusable?
>
>> + if let Some(start_offset) = reusable {
>> + if let Some(ref ref_payload_index) = self.previous_payload_index {
>> + let payload_size = file_size + size_of::<pxar::format::Header>() as u64;
>
> or better yet, get this here directly ;)
>
>> + let end_offset = start_offset + payload_size;
>
> or better yet, this one here ;)
>
>> + let (indices, start_padding, end_padding) =
>> + lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;
>
> or better yet, just return the Range in the payload archive? :)
Okay, yes.. changed is_reusable_entry to return the Range<u64> of the
payload in the payload archive, calculated based on the offset and size
as encoded in the archive, which is already available there.
>
>> +
>> + let boundary = encoder.payload_position()?;
>> + let offset =
>> + self.reused_chunks
>> + .insert(indices, boundary, start_padding, end_padding);
>> +
>> + self.caching_enabled = true;
>> + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
>> + fd,
>> + c_file_name.into(),
>> + *stat,
>> + metadata.clone(),
>> + offset,
>> + ));
>> + self.cached_entries.push(cache_entry);
>> +
>> + match self.reused_chunks.suggested() {
>> + Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
>> + Suggested::Reencode => {
>> + self.flush_cached_to_archive(encoder, false, true).await?
>> + }
>> + Suggested::CheckNext => {}
>> + }
>> +
>> + return Ok(());
>> + }
>> + }
>> +
>> + self.flush_cached_to_archive(encoder, false, true).await?;
>> + self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
>> + .await
>
> this part here is where I think we mishandle some edge cases, like mentioned in
> the ReusedChunks patch comments.. even keeping back the last chunk doesn't save
> us from losing some re-usable files sometimes..
Still need to have a closer look at what you mean exactly here... The
code path itself should be fine I think, or am I missing your point here?
It is rather the match against the `suggested` (now called `action`)
where the wrong decision might be made.
>> + if let Some((padding, chunk)) = last_chunk {
>> + // Make sure that we flush this chunk even on clear calls
>> + self.reused_chunks.must_flush_first = true;
>
> might make sense to rename this one to "must_flush_first_chunk", else the other
> call sites might be interpreted as "must flush (all) chunks first"
>
Yes, updated this according to your suggestion
>> + let _offset = self
>> + .reused_chunks
>> + .insert(vec![chunk], injection_boundary, padding, 0);
>> + }
>> +
>> + Ok(())
>> + }
>> +
>> + fn clear_cached_chunks<T: SeqWrite + Send>(
>> + &mut self,
>> + encoder: &mut Encoder<'_, T>,
>> + ) -> Result<(), Error> {
>> + let reused_chunks = std::mem::take(&mut self.reused_chunks);
>> +
>> + if !reused_chunks.must_flush_first {
>> + return Ok(());
>> + }
>> +
>> + // First chunk was kept back to avoid duplication but needs to be injected
>> + let injection_boundary = reused_chunks.start_boundary();
>> + let payload_writer_position = encoder.payload_position()?.raw();
>> +
>> + if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
>> + bail!(
>> + "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
>> + injection_boundary.raw()
>> + );
>> + }
>> +
>> + if let Some((padding, chunk)) = reused_chunks.chunks.first() {
>> + let size = PayloadOffset::default().add(chunk.size());
>> + log::debug!(
>> + "Injecting chunk with {} padding (chunk size {})",
>> + HumanByte::from(*padding),
>> + HumanByte::from(chunk.size()),
>> + );
>> + let inject_chunks = InjectChunks {
>> + boundary: injection_boundary.raw(),
>> + chunks: vec![chunk.clone()],
>> + size: size.raw() as usize,
>> + };
>> +
>> + self.total_injected_size += size.raw();
>> + self.total_injected_count += 1;
>> + if *padding > 0 {
>> + self.partial_chunks_count += 1;
>> + }
>> +
>> + if let Some(boundary) = self.forced_boundaries.as_mut() {
>> + let mut boundary = boundary.lock().unwrap();
>> + boundary.push_back(inject_chunks);
>> + } else {
>> + bail!("missing injection queue");
>> + };
>> + encoder.advance(size)?;
>
> this part is basically the loop in flush_reused_chunks and could be de-duplicated in some fashion..
Yes, moved the common code into an additional helper function
`inject_chunks_at_boundary`.
More information about the pbs-devel
mailing list