[pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching

Fabian Grünbichler f.gruenbichler at proxmox.com
Fri Apr 5 10:33:54 CEST 2024


Quoting Christian Ebner (2024-03-28 13:36:56)
> Implements the methods to cache entries in a look-ahead cache and
> flush the entries to archive, either by re-using and injecting the
> payload chunks from the previous backup snapshot and storing the
> reference to it, or by re-encoding the chunks.
> 
> When walking the file system tree, check for each entry if it is
> re-usable, meaning that the metadata did not change and the payload
> chunks can be re-indexed instead of re-encoding the whole data.
> Since the ammount of payload data might be small as compared to the

s/ammount/amount

> actual chunk size, a decision whether to re-use or re-encode is
> postponed if the reused payload does not fall below a threshold value,
> but the chunks where continuous.
> In this case, put the entry's file handle an metadata on the cache and

s/an/and/

> enable caching mode, and continue with the next entry.
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are stored in
> memory, to be injected for re-usable file entries.
> 
> If the threshold value for re-use is reached, the chunks are injected
> in the payload stream and the references with the corresponding offsets
> encoded in the metadata stream.
> If however a non-reusable (because changed) entry is encountered before
> the threshold is reached, the entries on the cache are flushed to the
> archive by re-encoding them, the memorized chunks and payload reference
> offsets are discarted.

s/discarted/discarded/

> 
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed also when the
> reuse threshold is reached, by keeping back the last chunk in the
> memorized list, so following files might as well rei-use that chunk.

s/rei/re/

> It is assured that this chunk is however injected in the stream also in
> case that the following lookups lead to a cache clear and re-encoding.
> 
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.

thanks for adding a high-level description!

> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - completely reworked
> - strongly reduced duplicate code
> 
>  pbs-client/src/pxar/create.rs | 259 ++++++++++++++++++++++++++++++++++
>  1 file changed, 259 insertions(+)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index c64084a74..07fa17ec4 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
>  use std::ffi::{CStr, CString, OsStr};
>  use std::fmt;
>  use std::io::{self, Read};
> +use std::mem::size_of;
>  use std::ops::Range;
>  use std::os::unix::ffi::OsStrExt;
>  use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
> @@ -23,6 +24,7 @@ use pxar::accessor::aio::{Accessor, Directory};
>  use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
>  use pxar::{EntryKind, Metadata};
>  
> +use proxmox_human_byte::HumanByte;
>  use proxmox_io::vec;
>  use proxmox_lang::c_str;
>  use proxmox_sys::fs::{self, acl, xattr};
> @@ -32,6 +34,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
>  use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
>  
>  use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
>  use crate::pxar::metadata::errno_is_unsupported;
>  use crate::pxar::tools::assert_single_path_component;
>  use crate::pxar::Flags;
> @@ -274,6 +277,12 @@ struct Archiver {
>      reused_chunks: ReusedChunks,
>      previous_payload_index: Option<DynamicIndexReader>,
>      forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
> +    cached_entries: Vec<CacheEntry>,
> +    caching_enabled: bool,
> +    total_injected_size: u64,
> +    total_injected_count: u64,
> +    partial_chunks_count: u64,
> +    total_reused_payload_size: u64,
>  }
>  
>  type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -377,6 +386,12 @@ where
>          reused_chunks: ReusedChunks::new(),
>          previous_payload_index,
>          forced_boundaries,
> +        cached_entries: Vec::new(),
> +        caching_enabled: false,
> +        total_injected_size: 0,
> +        total_injected_count: 0,
> +        partial_chunks_count: 0,
> +        total_reused_payload_size: 0,
>      };
>  
>      archiver
> @@ -879,6 +894,250 @@ impl Archiver {
>          }
>      }
>  
> +    async fn cache_or_flush_entries<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +    ) -> Result<(), Error> {
> +        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> +        let reusable = if let Some(accessor) = previous_metadata_accessor {
> +            self.is_reusable_entry(accessor, file_name, stat, metadata)
> +                .await?
> +        } else {
> +            None
> +        };
> +
> +        let file_size = stat.st_size as u64;

couldn't we get this via is_reusable?

> +        if let Some(start_offset) = reusable {
> +            if let Some(ref ref_payload_index) = self.previous_payload_index {
> +                let payload_size = file_size + size_of::<pxar::format::Header>() as u64;

or better yet, get this here directly ;)

> +                let end_offset = start_offset + payload_size;

or better yet, this one here ;)

> +                let (indices, start_padding, end_padding) =
> +                    lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;

or better yet, just return the Range in the payload archive? :)

> +
> +                let boundary = encoder.payload_position()?;
> +                let offset =
> +                    self.reused_chunks
> +                        .insert(indices, boundary, start_padding, end_padding);
> +
> +                self.caching_enabled = true;
> +                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> +                    fd,
> +                    c_file_name.into(),
> +                    *stat,
> +                    metadata.clone(),
> +                    offset,
> +                ));
> +                self.cached_entries.push(cache_entry);
> +
> +                match self.reused_chunks.suggested() {
> +                    Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
> +                    Suggested::Reencode => {
> +                        self.flush_cached_to_archive(encoder, false, true).await?
> +                    }
> +                    Suggested::CheckNext => {}
> +                }
> +
> +                return Ok(());
> +            }
> +        }
> +
> +        self.flush_cached_to_archive(encoder, false, true).await?;
> +        self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
> +            .await

this part here is where I think we mishandle some edge cases, like mentioned in
the ReusedChunks patch comments.. even keeping back the last chunk doesn't save
us from losing some re-usable files sometimes..

> +    }
> +
> +    async fn flush_cached_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        reuse_chunks: bool,
> +        keep_back_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        let entries = std::mem::take(&mut self.cached_entries);
> +
> +        if !reuse_chunks {
> +            self.clear_cached_chunks(encoder)?;
> +        }
> +
> +        for entry in entries {
> +            match entry {
> +                CacheEntry::RegEntry(CacheEntryData {
> +                    fd,
> +                    c_file_name,
> +                    stat,
> +                    metadata,
> +                    payload_offset,
> +                }) => {
> +                    self.add_entry_to_archive(
> +                        encoder,
> +                        &mut None,
> +                        &c_file_name,
> +                        &stat,
> +                        fd,
> +                        &metadata,
> +                        reuse_chunks,
> +                        Some(payload_offset),
> +                    )
> +                    .await?
> +                }
> +                CacheEntry::DirEntry(CacheEntryData {
> +                    c_file_name,
> +                    metadata,
> +                    ..
> +                }) => {
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().start_directory(&c_file_name)?;
> +                    }
> +                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> +                    encoder.create_directory(dir_name, &metadata).await?;
> +                }
> +                CacheEntry::DirEnd => {
> +                    encoder.finish().await?;
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().end_directory()?;
> +                    }
> +                }
> +            }
> +        }
> +
> +        self.caching_enabled = false;
> +
> +        if reuse_chunks {
> +            self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn flush_reused_chunks<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        keep_back_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> +        // Do not inject the last reused chunk directly, but keep it as base for further entries
> +        // to reduce chunk duplication. Needs to be flushed even on cache clear!
> +        let last_chunk = if keep_back_last_chunk {
> +            reused_chunks.chunks.pop()
> +        } else {
> +            None
> +        };
> +
> +        let mut injection_boundary = reused_chunks.start_boundary();
> +        let payload_writer_position = encoder.payload_position()?.raw();
> +
> +        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
> +            bail!(
> +                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
> +                injection_boundary.raw(),
> +            );
> +        }
> +
> +        for chunks in reused_chunks.chunks.chunks(128) {
> +            let mut chunk_list = Vec::with_capacity(128);
> +            let mut size = PayloadOffset::default();
> +            for (padding, chunk) in chunks.iter() {
> +                log::debug!(
> +                    "Injecting chunk with {} padding (chunk size {})",
> +                    HumanByte::from(*padding),
> +                    HumanByte::from(chunk.size()),
> +                );
> +                self.total_injected_size += chunk.size();
> +                self.total_injected_count += 1;
> +                if *padding > 0 {
> +                    self.partial_chunks_count += 1;
> +                }
> +                size = size.add(chunk.size());
> +                chunk_list.push(chunk.clone());
> +            }
> +
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: chunk_list,
> +                size: size.raw() as usize,
> +            };
> +
> +            if let Some(boundary) = self.forced_boundaries.as_mut() {
> +                let mut boundary = boundary.lock().unwrap();
> +                boundary.push_back(inject_chunks);
> +            } else {
> +                bail!("missing injection queue");
> +            };
> +
> +            injection_boundary = injection_boundary.add(size.raw());
> +            encoder.advance(size)?;
> +        }
> +
> +        if let Some((padding, chunk)) = last_chunk {
> +            // Make sure that we flush this chunk even on clear calls
> +            self.reused_chunks.must_flush_first = true;

might make sense to rename this one to "must_flush_first_chunk", else the other
call sites might be interpreted as "must flush (all) chunks first"

> +            let _offset = self
> +                .reused_chunks
> +                .insert(vec![chunk], injection_boundary, padding, 0);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn clear_cached_chunks<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +    ) -> Result<(), Error> {
> +        let reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> +        if !reused_chunks.must_flush_first {
> +            return Ok(());
> +        }
> +
> +        // First chunk was kept back to avoid duplication but needs to be injected
> +        let injection_boundary = reused_chunks.start_boundary();
> +        let payload_writer_position = encoder.payload_position()?.raw();
> +
> +        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
> +            bail!(
> +                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
> +                injection_boundary.raw()
> +            );
> +        }
> +
> +        if let Some((padding, chunk)) = reused_chunks.chunks.first() {
> +            let size = PayloadOffset::default().add(chunk.size());
> +            log::debug!(
> +                "Injecting chunk with {} padding (chunk size {})",
> +                HumanByte::from(*padding),
> +                HumanByte::from(chunk.size()),
> +            );
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: vec![chunk.clone()],
> +                size: size.raw() as usize,
> +            };
> +
> +            self.total_injected_size += size.raw();
> +            self.total_injected_count += 1;
> +            if *padding > 0 {
> +                self.partial_chunks_count += 1;
> +            }
> +
> +            if let Some(boundary) = self.forced_boundaries.as_mut() {
> +                let mut boundary = boundary.lock().unwrap();
> +                boundary.push_back(inject_chunks);
> +            } else {
> +                bail!("missing injection queue");
> +            };
> +            encoder.advance(size)?;

this part is basically the loop in flush_reused_chunks and could be de-duplicated in some fashion..

> +        } else {
> +            bail!("missing first chunk");
> +        }
> +
> +        Ok(())
> +    }
> +
>      async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
>




More information about the pbs-devel mailing list