[pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching
Fabian Grünbichler
f.gruenbichler at proxmox.com
Fri Apr 5 10:33:54 CEST 2024
Quoting Christian Ebner (2024-03-28 13:36:56)
> Implements the methods to cache entries in a look-ahead cache and
> flush the entries to archive, either by re-using and injecting the
> payload chunks from the previous backup snapshot and storing the
> reference to it, or by re-encoding the chunks.
>
> When walking the file system tree, check for each entry if it is
> re-usable, meaning that the metadata did not change and the payload
> chunks can be re-indexed instead of re-encoding the whole data.
> Since the ammount of payload data might be small as compared to the
s/ammount/amount
> actual chunk size, a decision whether to re-use or re-encode is
> postponed if the reused payload does not fall below a threshold value,
> but the chunks where continuous.
> In this case, put the entry's file handle an metadata on the cache and
s/an/and/
> enable caching mode, and continue with the next entry.
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are stored in
> memory, to be injected for re-usable file entries.
>
> If the threshold value for re-use is reached, the chunks are injected
> in the payload stream and the references with the corresponding offsets
> encoded in the metadata stream.
> If however a non-reusable (because changed) entry is encountered before
> the threshold is reached, the entries on the cache are flushed to the
> archive by re-encoding them, the memorized chunks and payload reference
> offsets are discarted.
s/discarted/discarded/
>
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed also when the
> reuse threshold is reached, by keeping back the last chunk in the
> memorized list, so following files might as well rei-use that chunk.
s/rei/re/
> It is assured that this chunk is however injected in the stream also in
> case that the following lookups lead to a cache clear and re-encoding.
>
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.
thanks for adding a high-level description!
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - completely reworked
> - strongly reduced duplicate code
>
> pbs-client/src/pxar/create.rs | 259 ++++++++++++++++++++++++++++++++++
> 1 file changed, 259 insertions(+)
>
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index c64084a74..07fa17ec4 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
> use std::ffi::{CStr, CString, OsStr};
> use std::fmt;
> use std::io::{self, Read};
> +use std::mem::size_of;
> use std::ops::Range;
> use std::os::unix::ffi::OsStrExt;
> use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
> @@ -23,6 +24,7 @@ use pxar::accessor::aio::{Accessor, Directory};
> use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
> use pxar::{EntryKind, Metadata};
>
> +use proxmox_human_byte::HumanByte;
> use proxmox_io::vec;
> use proxmox_lang::c_str;
> use proxmox_sys::fs::{self, acl, xattr};
> @@ -32,6 +34,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
> use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
>
> use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
> use crate::pxar::metadata::errno_is_unsupported;
> use crate::pxar::tools::assert_single_path_component;
> use crate::pxar::Flags;
> @@ -274,6 +277,12 @@ struct Archiver {
> reused_chunks: ReusedChunks,
> previous_payload_index: Option<DynamicIndexReader>,
> forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
> + cached_entries: Vec<CacheEntry>,
> + caching_enabled: bool,
> + total_injected_size: u64,
> + total_injected_count: u64,
> + partial_chunks_count: u64,
> + total_reused_payload_size: u64,
> }
>
> type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -377,6 +386,12 @@ where
> reused_chunks: ReusedChunks::new(),
> previous_payload_index,
> forced_boundaries,
> + cached_entries: Vec::new(),
> + caching_enabled: false,
> + total_injected_size: 0,
> + total_injected_count: 0,
> + partial_chunks_count: 0,
> + total_reused_payload_size: 0,
> };
>
> archiver
> @@ -879,6 +894,250 @@ impl Archiver {
> }
> }
>
> + async fn cache_or_flush_entries<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
> + c_file_name: &CStr,
> + stat: &FileStat,
> + fd: OwnedFd,
> + metadata: &Metadata,
> + ) -> Result<(), Error> {
> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> + let reusable = if let Some(accessor) = previous_metadata_accessor {
> + self.is_reusable_entry(accessor, file_name, stat, metadata)
> + .await?
> + } else {
> + None
> + };
> +
> + let file_size = stat.st_size as u64;
couldn't we get this via is_reusable?
> + if let Some(start_offset) = reusable {
> + if let Some(ref ref_payload_index) = self.previous_payload_index {
> + let payload_size = file_size + size_of::<pxar::format::Header>() as u64;
or better yet, get this here directly ;)
> + let end_offset = start_offset + payload_size;
or better yet, this one here ;)
> + let (indices, start_padding, end_padding) =
> + lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;
or better yet, just return the Range in the payload archive? :)
> +
> + let boundary = encoder.payload_position()?;
> + let offset =
> + self.reused_chunks
> + .insert(indices, boundary, start_padding, end_padding);
> +
> + self.caching_enabled = true;
> + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + offset,
> + ));
> + self.cached_entries.push(cache_entry);
> +
> + match self.reused_chunks.suggested() {
> + Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
> + Suggested::Reencode => {
> + self.flush_cached_to_archive(encoder, false, true).await?
> + }
> + Suggested::CheckNext => {}
> + }
> +
> + return Ok(());
> + }
> + }
> +
> + self.flush_cached_to_archive(encoder, false, true).await?;
> + self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
> + .await
this part here is where I think we mishandle some edge cases, like mentioned in
the ReusedChunks patch comments.. even keeping back the last chunk doesn't save
us from losing some re-usable files sometimes..
> + }
> +
> + async fn flush_cached_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + reuse_chunks: bool,
> + keep_back_last_chunk: bool,
> + ) -> Result<(), Error> {
> + let entries = std::mem::take(&mut self.cached_entries);
> +
> + if !reuse_chunks {
> + self.clear_cached_chunks(encoder)?;
> + }
> +
> + for entry in entries {
> + match entry {
> + CacheEntry::RegEntry(CacheEntryData {
> + fd,
> + c_file_name,
> + stat,
> + metadata,
> + payload_offset,
> + }) => {
> + self.add_entry_to_archive(
> + encoder,
> + &mut None,
> + &c_file_name,
> + &stat,
> + fd,
> + &metadata,
> + reuse_chunks,
> + Some(payload_offset),
> + )
> + .await?
> + }
> + CacheEntry::DirEntry(CacheEntryData {
> + c_file_name,
> + metadata,
> + ..
> + }) => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(&c_file_name)?;
> + }
> + let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> + encoder.create_directory(dir_name, &metadata).await?;
> + }
> + CacheEntry::DirEnd => {
> + encoder.finish().await?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + }
> + }
> + }
> +
> + self.caching_enabled = false;
> +
> + if reuse_chunks {
> + self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
> + }
> +
> + Ok(())
> + }
> +
> + fn flush_reused_chunks<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + keep_back_last_chunk: bool,
> + ) -> Result<(), Error> {
> + let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> + // Do not inject the last reused chunk directly, but keep it as base for further entries
> + // to reduce chunk duplication. Needs to be flushed even on cache clear!
> + let last_chunk = if keep_back_last_chunk {
> + reused_chunks.chunks.pop()
> + } else {
> + None
> + };
> +
> + let mut injection_boundary = reused_chunks.start_boundary();
> + let payload_writer_position = encoder.payload_position()?.raw();
> +
> + if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
> + bail!(
> + "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
> + injection_boundary.raw(),
> + );
> + }
> +
> + for chunks in reused_chunks.chunks.chunks(128) {
> + let mut chunk_list = Vec::with_capacity(128);
> + let mut size = PayloadOffset::default();
> + for (padding, chunk) in chunks.iter() {
> + log::debug!(
> + "Injecting chunk with {} padding (chunk size {})",
> + HumanByte::from(*padding),
> + HumanByte::from(chunk.size()),
> + );
> + self.total_injected_size += chunk.size();
> + self.total_injected_count += 1;
> + if *padding > 0 {
> + self.partial_chunks_count += 1;
> + }
> + size = size.add(chunk.size());
> + chunk_list.push(chunk.clone());
> + }
> +
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: chunk_list,
> + size: size.raw() as usize,
> + };
> +
> + if let Some(boundary) = self.forced_boundaries.as_mut() {
> + let mut boundary = boundary.lock().unwrap();
> + boundary.push_back(inject_chunks);
> + } else {
> + bail!("missing injection queue");
> + };
> +
> + injection_boundary = injection_boundary.add(size.raw());
> + encoder.advance(size)?;
> + }
> +
> + if let Some((padding, chunk)) = last_chunk {
> + // Make sure that we flush this chunk even on clear calls
> + self.reused_chunks.must_flush_first = true;
might make sense to rename this one to "must_flush_first_chunk", else the other
call sites might be interpreted as "must flush (all) chunks first"
> + let _offset = self
> + .reused_chunks
> + .insert(vec![chunk], injection_boundary, padding, 0);
> + }
> +
> + Ok(())
> + }
> +
> + fn clear_cached_chunks<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + ) -> Result<(), Error> {
> + let reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> + if !reused_chunks.must_flush_first {
> + return Ok(());
> + }
> +
> + // First chunk was kept back to avoid duplication but needs to be injected
> + let injection_boundary = reused_chunks.start_boundary();
> + let payload_writer_position = encoder.payload_position()?.raw();
> +
> + if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
> + bail!(
> + "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
> + injection_boundary.raw()
> + );
> + }
> +
> + if let Some((padding, chunk)) = reused_chunks.chunks.first() {
> + let size = PayloadOffset::default().add(chunk.size());
> + log::debug!(
> + "Injecting chunk with {} padding (chunk size {})",
> + HumanByte::from(*padding),
> + HumanByte::from(chunk.size()),
> + );
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: vec![chunk.clone()],
> + size: size.raw() as usize,
> + };
> +
> + self.total_injected_size += size.raw();
> + self.total_injected_count += 1;
> + if *padding > 0 {
> + self.partial_chunks_count += 1;
> + }
> +
> + if let Some(boundary) = self.forced_boundaries.as_mut() {
> + let mut boundary = boundary.lock().unwrap();
> + boundary.push_back(inject_chunks);
> + } else {
> + bail!("missing injection queue");
> + };
> + encoder.advance(size)?;
this part is basically the loop in flush_reused_chunks and could be de-duplicated in some fashion..
> + } else {
> + bail!("missing first chunk");
> + }
> +
> + Ok(())
> + }
> +
> async fn add_directory<T: SeqWrite + Send>(
> &mut self,
> encoder: &mut Encoder<'_, T>,
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
More information about the pbs-devel
mailing list