[pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Mar 12 15:08:49 CET 2024
On March 5, 2024 10:27 am, Christian Ebner wrote:
> Implements the methods to cache entries in a look-ahead cache and
> flush the entries to archive, either by re-using and injecting the
> payload chunks from the previous backup snapshot and storing the
> reference to it, or by re-encoding the chunks.
this is a bit terse for the amount of code below ;)
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - fix flushing of final chunk before archive finish
> - fix formatting
> - remove unneeded log output
>
> pbs-client/src/pxar/create.rs | 293 ++++++++++++++++++++++++++++++++++
> 1 file changed, 293 insertions(+)
>
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 3b221b54..b2ce898f 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -828,6 +828,299 @@ impl Archiver {
> }
> }
>
> + async fn cache_or_flush_entries<T: SeqWrite + Send>(
this is only called if we have a previous payload
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
so this must always be set for the call to make any sense, so we can
drop the Option<>.
> + c_file_name: &CStr,
> + stat: &FileStat,
> + fd: OwnedFd,
> + metadata: &Metadata,
> + ) -> Result<(), Error> {
> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> + let reusable = if let Some(accessor) = accessor {
> + self.is_reusable_entry(accessor, file_name, stat, metadata)
> + .await?
then this part here
> + } else {
> + None
> + };
> +
> + let file_size = stat.st_size as u64;
> + if let Some(start_offset) = reusable {
can just be inlined here
> + if let Some(ref ref_payload_index) = self.previous_payload_index {
> + let end_offset = start_offset + file_size;
> + let (indices, start_padding, _end_padding) =
> + ref_payload_index.indices(start_offset, end_offset)?;
already noted at the patch introducing `indices`, this is the only call
site, and one of the tuple members is not used..
> +
> + let boundary = encoder.payload_position()?;
> + let offset = self.reused_chunks.insert(indices, boundary, start_padding);
> +
> + if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
> + self.flush_cached_to_archive(encoder, true, true).await?;
> +
this vvvvvv
> + encoder
> + .add_payload_ref(metadata, file_name, file_size, offset)
> + .await?;
> +
> + if let Some(ref catalog) = self.catalog {
> + catalog
> + .lock()
> + .unwrap()
> + .add_file(&c_file_name, file_size, stat.st_mtime)?;
> + }
^^^^^^^
also happens in self.flush_cached_to_archive -> self.flush_entry_archive
since we pass reuse_chunks, so couldn't we just flush above and then continue
with the else branch below to cache this new entry, instead of
inline-adding the one bypassing the cache?
> + } else {
> + self.caching_enabled = true;
> + self.cached_payload_size += file_size;
> + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + stat.clone(),
> + metadata.clone(),
> + offset,
> + ));
> + self.cached_entries.push(cache_entry);
> + }
> +
> + return Ok(());
> + }
> + }
> +
> + self.flush_cached_to_archive(encoder, false, true).await?;
> + self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
> + .await
> + }
> +
> + async fn flush_cached_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + reuse_chunks: bool,
> + keep_back_last_chunk: bool,
> + ) -> Result<(), Error> {
> + if reuse_chunks {
> + self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
> + } else {
> + self.clear_cached_chunks(encoder)?;
> + }
> + let entries = std::mem::take(&mut self.cached_entries);
> +
> + self.caching_enabled = false;
> + self.cached_payload_size = 0;
> +
> + for entry in entries {
> + match entry {
> + CacheEntry::RegEntry(data) => {
> + self.flush_entry_to_archive(encoder, data, reuse_chunks)
> + .await?
> + }
> + CacheEntry::PxarExcludeCliEntry(entry, old_patterns_count) => {
> + self.encode_pxarexclude_cli(encoder, &entry.name, old_patterns_count)
> + .await?;
> + }
> + CacheEntry::DirEntry(data) => {
> + self.flush_directory_to_archive(encoder, data).await?
> + }
> + CacheEntry::DirEnd => {
> + let result = encoder.finish().await?;
nit: result is `()`, there's no point in keeping it
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + result
nor is there in returning it here
> + }
> + }
especially since the whole match result is discarded anyway ;)
> + }
> +
> + Ok(())
> + }
> +
> + fn flush_reused_chunks<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + keep_back_last_chunk: bool,
> + ) -> Result<(), Error> {
> + let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> + // Do not inject the last reused chunk directly, but keep it as base for further entries
> + // to reduce chunk duplication. Needs to be flushed even on cache clear!
> + let last_chunk = if keep_back_last_chunk {
> + reused_chunks.chunks.pop()
> + } else {
> + None
> + };
> +
> + let mut injection_boundary = reused_chunks.start_boundary();
> + for chunks in reused_chunks.chunks.chunks(128) {
> + let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: chunks.to_vec(),
> + size: size as usize,
> + };
> + let mut boundary = self.forced_boundaries.lock().unwrap();
> + boundary.push_back(inject_chunks);
> + injection_boundary = injection_boundary.add(size);
> + encoder.advance(size)?;
> + }
> +
> + if let Some(chunk) = last_chunk {
> + let _offset = self
> + .reused_chunks
> + .insert(vec![chunk], injection_boundary, 0);
> + // Make sure that we flush this chunk even on clear calls
> + self.reused_chunks.must_flush_first = true;
> + }
> +
> + Ok(())
> + }
> +
> + fn clear_cached_chunks<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + ) -> Result<(), Error> {
> + let reused_chunks = std::mem::take(&mut self.reused_chunks);
this might deserve a comment or a more explicit call ;) took me a while
to follow a long..
> +
> + if !reused_chunks.must_flush_first {
> + return Ok(());
> + }
> +
> + // First chunk was kept back to avoid duplication but needs to be injected
> + let injection_boundary = reused_chunks.start_boundary();
> + if let Some(chunk) = reused_chunks.chunks.first() {
> + let size = chunk.size();
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: vec![chunk.clone()],
> + size: size as usize,
> + };
> + let mut boundary = self.forced_boundaries.lock().unwrap();
> + boundary.push_back(inject_chunks);
> + encoder.advance(size)?;
> + } else {
> + bail!("missing first chunk");
> + }
> +
> + Ok(())
> + }
> +
> + async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
> + &'a mut self,
> + encoder: &'a mut Encoder<'b, T>,
> + entry_data: CacheEntryData,
> + ) -> Result<(), Error> {
> + let CacheEntryData {
> + c_file_name,
> + metadata,
> + ..
> + } = entry_data;
> + let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> +
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(&c_file_name)?;
> + }
> +
> + encoder.create_directory(dir_name, &metadata).await?;
> +
> + Ok(())
> + }
> +
> + async fn flush_entry_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + entry_data: CacheEntryData,
> + reuse_chunks: bool,
> + ) -> Result<(), Error> {
> + use pxar::format::mode;
> +
> + let CacheEntryData {
> + fd,
> + c_file_name,
> + stat,
> + metadata,
> + payload_offset,
> + } = entry_data;
> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
starting here this is almost 100% identical to add_entry_to_archive,
could we maybe somehow merge them?
> +
> + match metadata.file_type() {
> + mode::IFREG => {
> + let link_info = HardLinkInfo {
> + st_dev: stat.st_dev,
> + st_ino: stat.st_ino,
> + };
> +
> + if stat.st_nlink > 1 {
> + if let Some((path, offset)) = self.hardlinks.get(&link_info) {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_hardlink(&c_file_name)?;
> + }
> + encoder.add_hardlink(file_name, path, *offset).await?;
> + return Ok(());
> + }
> + }
> +
> + let file_size = stat.st_size as u64;
> + if let Some(ref catalog) = self.catalog {
> + catalog
> + .lock()
> + .unwrap()
> + .add_file(&c_file_name, file_size, stat.st_mtime)?;
> + }
> +
> + if reuse_chunks {
> + encoder
> + .add_payload_ref(&metadata, file_name, file_size, payload_offset)
> + .await?;
> + } else {
> + let offset: LinkOffset = self
> + .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> + .await?;
> +
> + if stat.st_nlink > 1 {
> + self.hardlinks
> + .insert(link_info, (self.path.clone(), offset));
> + }
> + }
> + }
> + mode::IFSOCK => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_socket(&c_file_name)?;
> + }
> + encoder.add_socket(&metadata, file_name).await?;
> + }
> + mode::IFIFO => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_fifo(&c_file_name)?;
> + }
> + encoder.add_fifo(&metadata, file_name).await?;
> + }
> + mode::IFLNK => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_symlink(&c_file_name)?;
> + }
> + self.add_symlink(encoder, fd, file_name, &metadata).await?;
> + }
> + mode::IFBLK => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_block_device(&c_file_name)?;
> + }
> + self.add_device(encoder, file_name, &metadata, &stat)
> + .await?;
> + }
> + mode::IFCHR => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_char_device(&c_file_name)?;
> + }
> + self.add_device(encoder, file_name, &metadata, &stat)
> + .await?;
> + }
> + other => bail!(
> + "encountered unknown file type: 0x{:x} (0o{:o})",
> + other,
> + other
> + ),
> + }
> +
> + Ok(())
> + }
> +
> async fn add_directory<T: SeqWrite + Send>(
> &mut self,
> encoder: &mut Encoder<'_, T>,
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list