[pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching
Christian Ebner
c.ebner at proxmox.com
Wed Feb 28 15:02:23 CET 2024
Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
pbs-client/src/pxar/create.rs | 284 ++++++++++++++++++++++++++++++++++
1 file changed, 284 insertions(+)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 39864483..fbcccb2e 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -828,6 +828,290 @@ impl Archiver {
}
}
+ async fn cache_or_flush_entries<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ ) -> Result<(), Error> {
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+ let reusable = if let Some(accessor) = accessor {
+ self.is_reusable_entry(accessor, file_name, stat, metadata)
+ .await?
+ } else {
+ None
+ };
+
+ let file_size = stat.st_size as u64;
+ if let Some(start_offset) = reusable {
+ if let Some(ref ref_payload_index) = self.previous_payload_index {
+ let end_offset = start_offset + file_size;
+ let (indices, start_padding, _end_padding) =
+ ref_payload_index.indices(start_offset, end_offset)?;
+
+ //Insert chunks into reused_chunks, getting the correct payload offset
+ let boundary = encoder.payload_position()?;
+ let offset = self.reused_chunks.insert(indices, boundary, start_padding);
+
+ if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
+ self.flush_cached_to_archive(encoder, true).await?;
+
+ encoder
+ .add_payload_ref(metadata, file_name, file_size, offset)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+ } else {
+ log::debug!("lookahead-cache: {file_name:?}");
+ self.caching_enabled = true;
+ self.cached_payload_size += file_size;
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata.clone(),
+ offset,
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+
+ return Ok(());
+ }
+ log::debug!("re-encode: {file_name:?} no previous payload index.");
+ }
+
+ self.flush_cached_to_archive(encoder, false).await?;
+ self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ }
+
+ async fn flush_cached_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ if reuse_chunks {
+ self.flush_reused_chunks(encoder)?;
+ } else {
+ self.clear_cached_chunks(encoder)?;
+ }
+ let entries = std::mem::take(&mut self.cached_entries);
+
+ self.caching_enabled = false;
+ self.cached_payload_size = 0;
+
+ for entry in entries {
+ match entry {
+ CacheEntry::RegEntry(data) => {
+ self.flush_entry_to_archive(encoder, data, reuse_chunks)
+ .await?
+ }
+ CacheEntry::DirEntry(data) => {
+ self.flush_directory_to_archive(encoder, data).await?
+ }
+ CacheEntry::DirEnd => {
+ let result = encoder.finish().await?;
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ result
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn flush_reused_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ let last_chunk = reused_chunks.chunks.pop();
+
+ // Do not inject directly, but keep around for possible followups
+ // Needs to be flushed in any case!
+ let mut injection_boundary = reused_chunks.start_boundary();
+ for chunks in reused_chunks.chunks.chunks(128) {
+ let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: chunks.to_vec(),
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ injection_boundary = injection_boundary.add(size);
+ encoder.advance(size)?;
+ }
+
+ if let Some(chunk) = last_chunk {
+ let _offset = self.reused_chunks.insert(vec![chunk], injection_boundary, 0);
+ // Make sure that we flush this chunk even on clear calls
+ self.reused_chunks.must_flush_first = true;
+ }
+
+ Ok(())
+ }
+
+ fn clear_cached_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ if !reused_chunks.must_flush_first {
+ return Ok(());
+ }
+
+ // First chunk was kept back to avoid duplication but needs to be injected
+ let injection_boundary = reused_chunks.start_boundary();
+ if let Some(chunk) = reused_chunks.chunks.first() {
+ let size = chunk.size();
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: vec![chunk.clone()],
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ encoder.advance(size)?;
+ } else {
+ bail!("missing first chunk");
+ }
+
+ Ok(())
+ }
+
+ async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
+ &'a mut self,
+ encoder: &'a mut Encoder<'b, T>,
+ entry_data: CacheEntryData,
+ ) -> Result<(), Error> {
+ let CacheEntryData {
+ c_file_name,
+ metadata,
+ ..
+ } = entry_data;
+ let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().start_directory(&c_file_name)?;
+ }
+
+ encoder.create_directory(dir_name, &metadata).await?;
+
+ Ok(())
+ }
+
+ async fn flush_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ entry_data: CacheEntryData,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
+ let CacheEntryData {
+ fd,
+ c_file_name,
+ stat,
+ metadata,
+ payload_offset,
+ } = entry_data;
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+ match metadata.file_type() {
+ mode::IFREG => {
+ let link_info = HardLinkInfo {
+ st_dev: stat.st_dev,
+ st_ino: stat.st_ino,
+ };
+
+ if stat.st_nlink > 1 {
+ if let Some((path, offset)) = self.hardlinks.get(&link_info) {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_hardlink(&c_file_name)?;
+ }
+ encoder.add_hardlink(file_name, path, *offset).await?;
+ return Ok(());
+ }
+ }
+
+ let file_size = stat.st_size as u64;
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+
+ if reuse_chunks {
+ encoder
+ .add_payload_ref(&metadata, file_name, file_size, payload_offset)
+ .await?;
+ } else {
+ let offset: LinkOffset = self
+ .add_regular_file(encoder, fd, file_name, &metadata, file_size)
+ .await?;
+
+ if stat.st_nlink > 1 {
+ self.hardlinks
+ .insert(link_info, (self.path.clone(), offset));
+ }
+ }
+ }
+ mode::IFSOCK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_socket(&c_file_name)?;
+ }
+ encoder.add_socket(&metadata, file_name).await?;
+ }
+ mode::IFIFO => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_fifo(&c_file_name)?;
+ }
+ encoder.add_fifo(&metadata, file_name).await?;
+ }
+ mode::IFLNK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_symlink(&c_file_name)?;
+ }
+ self.add_symlink(encoder, fd, file_name, &metadata).await?;
+ }
+ mode::IFBLK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_block_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ mode::IFCHR => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_char_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ other => bail!(
+ "encountered unknown file type: 0x{:x} (0o{:o})",
+ other,
+ other
+ ),
+ }
+
+ Ok(())
+ }
+
async fn add_directory<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
--
2.39.2
More information about the pbs-devel
mailing list