[pbs-devel] [PATCH v3 proxmox-backup 48/58] fix #3174: client: pxar: enable caching and meta comparison
Christian Ebner
c.ebner at proxmox.com
Thu Mar 28 13:36:57 CET 2024
Add the final glue logic to enable the look-ahead caching and
metadata comparison introduced in the preparatory patches.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- use reused chunk padding for final decision if entries with unchanged
metadata should be re-used or re-encoded
pbs-client/src/pxar/create.rs | 194 +++++++++++++++++++++++++++-------
1 file changed, 156 insertions(+), 38 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 07fa17ec4..f103127c4 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -40,6 +40,7 @@ use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
+const MAX_CACHE_SIZE: usize = 512;
#[derive(Default)]
struct ReusedChunks {
@@ -397,6 +398,12 @@ where
archiver
.archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
.await?;
+
+ // Re-encode all the remaining cached entries
+ archiver
+ .flush_cached_to_archive(&mut encoder, false, false)
+ .await?;
+
encoder.finish().await?;
encoder.close().await?;
@@ -454,7 +461,10 @@ impl Archiver {
for file_entry in file_list {
let file_name = file_entry.name.to_bytes();
- if is_root && file_name == b".pxarexclude-cli" {
+ if is_root
+ && file_name == b".pxarexclude-cli"
+ && previous_metadata_accessor.is_none()
+ {
self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
.await?;
continue;
@@ -472,6 +482,11 @@ impl Archiver {
.await
.map_err(|err| self.wrap_err(err))?;
}
+
+ if self.caching_enabled && !is_root {
+ self.cached_entries.push(CacheEntry::DirEnd);
+ }
+
self.path = old_path;
self.entry_counter = entry_counter;
self.patterns.truncate(old_patterns_count);
@@ -752,8 +767,6 @@ impl Archiver {
c_file_name: &CStr,
stat: &FileStat,
) -> Result<(), Error> {
- use pxar::format::mode;
-
let file_mode = stat.st_mode & libc::S_IFMT;
let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
OFlag::empty()
@@ -791,6 +804,97 @@ impl Archiver {
self.skip_e2big_xattr,
)?;
+ if self.previous_payload_index.is_none() {
+ return self
+ .add_entry_to_archive(
+ encoder,
+ previous_metadata,
+ c_file_name,
+ stat,
+ fd,
+ &metadata,
+ false,
+ None,
+ )
+ .await;
+ }
+
+ // Avoid having to many open file handles in cached entries
+ if self.cached_entries.len() > MAX_CACHE_SIZE {
+ self.flush_cached_to_archive(encoder, true, true).await?;
+ }
+
+ if metadata.is_regular_file() {
+ self.cache_or_flush_entries(
+ encoder,
+ previous_metadata,
+ c_file_name,
+ stat,
+ fd,
+ &metadata,
+ )
+ .await
+ } else if self.caching_enabled {
+ if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+ let fd_clone = fd.try_clone()?;
+ let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ *stat,
+ metadata.clone(),
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+
+ let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+ self.add_directory(
+ encoder,
+ previous_metadata,
+ dir,
+ c_file_name,
+ &metadata,
+ stat,
+ )
+ .await?;
+ } else {
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ *stat,
+ metadata,
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+ Ok(())
+ } else {
+ self.add_entry_to_archive(
+ encoder,
+ previous_metadata,
+ c_file_name,
+ stat,
+ fd,
+ &metadata,
+ false,
+ None,
+ )
+ .await
+ }
+ }
+
+ async fn add_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ previous_metadata: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ flush_reused: bool,
+ payload_offset: Option<PayloadOffset>,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
match metadata.file_type() {
mode::IFREG => {
@@ -819,72 +923,64 @@ impl Archiver {
.add_file(c_file_name, file_size, stat.st_mtime)?;
}
- let offset: LinkOffset = self
- .add_regular_file(encoder, fd, file_name, &metadata, file_size)
- .await?;
+ if flush_reused {
+ self.total_reused_payload_size +=
+ file_size + size_of::<pxar::format::Header>() as u64;
+ encoder
+ .add_payload_ref(metadata, file_name, file_size, payload_offset.unwrap())
+ .await?;
+ } else {
+ let offset: LinkOffset = self
+ .add_regular_file(encoder, fd, file_name, metadata, file_size)
+ .await?;
- if stat.st_nlink > 1 {
- self.hardlinks
- .insert(link_info, (self.path.clone(), offset));
+ if stat.st_nlink > 1 {
+ self.hardlinks
+ .insert(link_info, (self.path.clone(), offset));
+ }
}
Ok(())
}
mode::IFDIR => {
let dir = Dir::from_fd(fd.into_raw_fd())?;
-
- if let Some(ref catalog) = self.catalog {
- catalog.lock().unwrap().start_directory(c_file_name)?;
- }
- let result = self
- .add_directory(
- encoder,
- previous_metadata,
- dir,
- c_file_name,
- &metadata,
- stat,
- )
- .await;
- if let Some(ref catalog) = self.catalog {
- catalog.lock().unwrap().end_directory()?;
- }
- result
+ self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
+ .await
}
mode::IFSOCK => {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_socket(c_file_name)?;
}
- Ok(encoder.add_socket(&metadata, file_name).await?)
+ Ok(encoder.add_socket(metadata, file_name).await?)
}
mode::IFIFO => {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_fifo(c_file_name)?;
}
- Ok(encoder.add_fifo(&metadata, file_name).await?)
+ Ok(encoder.add_fifo(metadata, file_name).await?)
}
mode::IFLNK => {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_symlink(c_file_name)?;
}
- self.add_symlink(encoder, fd, file_name, &metadata).await
+ self.add_symlink(encoder, fd, file_name, metadata).await
}
mode::IFBLK => {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_block_device(c_file_name)?;
}
- self.add_device(encoder, file_name, &metadata, stat).await
+ self.add_device(encoder, file_name, metadata, stat).await
}
mode::IFCHR => {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().add_char_device(c_file_name)?;
}
- self.add_device(encoder, file_name, &metadata, stat).await
+ self.add_device(encoder, file_name, metadata, stat).await
}
other => bail!(
"encountered unknown file type: 0x{:x} (0o{:o})",
@@ -947,8 +1043,17 @@ impl Archiver {
}
self.flush_cached_to_archive(encoder, false, true).await?;
- self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
- .await
+ self.add_entry_to_archive(
+ encoder,
+ previous_metadata_accessor,
+ c_file_name,
+ stat,
+ fd,
+ metadata,
+ false,
+ None,
+ )
+ .await
}
async fn flush_cached_to_archive<T: SeqWrite + Send>(
@@ -1143,13 +1248,18 @@ impl Archiver {
encoder: &mut Encoder<'_, T>,
previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
dir: Dir,
- dir_name: &CStr,
+ c_dir_name: &CStr,
metadata: &Metadata,
stat: &FileStat,
) -> Result<(), Error> {
- let dir_name = OsStr::from_bytes(dir_name.to_bytes());
+ let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
- encoder.create_directory(dir_name, metadata).await?;
+ if !self.caching_enabled {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().start_directory(c_dir_name)?;
+ }
+ encoder.create_directory(dir_name, metadata).await?;
+ }
let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags;
@@ -1189,7 +1299,15 @@ impl Archiver {
self.fs_feature_flags = old_fs_feature_flags;
self.current_st_dev = old_st_dev;
- encoder.finish().await?;
+ if !self.caching_enabled {
+ encoder.finish().await?;
+ if let Some(ref catalog) = self.catalog {
+ if !self.caching_enabled {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ }
+ }
+
result
}
--
2.39.2
More information about the pbs-devel
mailing list