[pbs-devel] [PATCH v3 proxmox-backup 48/58] fix #3174: client: pxar: enable caching and meta comparison

Christian Ebner c.ebner at proxmox.com
Thu Mar 28 13:36:57 CET 2024


Add the final glue logic to enable the look-ahead caching and
metadata comparison introduced in the preparatory patches.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- use reused chunk padding for final decision if entries with unchanged
  metadata should be re-used or re-encoded

 pbs-client/src/pxar/create.rs | 194 +++++++++++++++++++++++++++-------
 1 file changed, 156 insertions(+), 38 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 07fa17ec4..f103127c4 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -40,6 +40,7 @@ use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
 
 const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
+const MAX_CACHE_SIZE: usize = 512;
 
 #[derive(Default)]
 struct ReusedChunks {
@@ -397,6 +398,12 @@ where
     archiver
         .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
         .await?;
+
+    // Re-encode all the remaining cached entries
+    archiver
+        .flush_cached_to_archive(&mut encoder, false, false)
+        .await?;
+
     encoder.finish().await?;
     encoder.close().await?;
 
@@ -454,7 +461,10 @@ impl Archiver {
             for file_entry in file_list {
                 let file_name = file_entry.name.to_bytes();
 
-                if is_root && file_name == b".pxarexclude-cli" {
+                if is_root
+                    && file_name == b".pxarexclude-cli"
+                    && previous_metadata_accessor.is_none()
+                {
                     self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
                         .await?;
                     continue;
@@ -472,6 +482,11 @@ impl Archiver {
                 .await
                 .map_err(|err| self.wrap_err(err))?;
             }
+
+            if self.caching_enabled && !is_root {
+                self.cached_entries.push(CacheEntry::DirEnd);
+            }
+
             self.path = old_path;
             self.entry_counter = entry_counter;
             self.patterns.truncate(old_patterns_count);
@@ -752,8 +767,6 @@ impl Archiver {
         c_file_name: &CStr,
         stat: &FileStat,
     ) -> Result<(), Error> {
-        use pxar::format::mode;
-
         let file_mode = stat.st_mode & libc::S_IFMT;
         let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
             OFlag::empty()
@@ -791,6 +804,97 @@ impl Archiver {
             self.skip_e2big_xattr,
         )?;
 
+        if self.previous_payload_index.is_none() {
+            return self
+                .add_entry_to_archive(
+                    encoder,
+                    previous_metadata,
+                    c_file_name,
+                    stat,
+                    fd,
+                    &metadata,
+                    false,
+                    None,
+                )
+                .await;
+        }
+
+        // Avoid having to many open file handles in cached entries
+        if self.cached_entries.len() > MAX_CACHE_SIZE {
+            self.flush_cached_to_archive(encoder, true, true).await?;
+        }
+
+        if metadata.is_regular_file() {
+            self.cache_or_flush_entries(
+                encoder,
+                previous_metadata,
+                c_file_name,
+                stat,
+                fd,
+                &metadata,
+            )
+            .await
+        } else if self.caching_enabled {
+            if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+                let fd_clone = fd.try_clone()?;
+                let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    PayloadOffset::default(),
+                ));
+                self.cached_entries.push(cache_entry);
+
+                let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+                self.add_directory(
+                    encoder,
+                    previous_metadata,
+                    dir,
+                    c_file_name,
+                    &metadata,
+                    stat,
+                )
+                .await?;
+            } else {
+                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata,
+                    PayloadOffset::default(),
+                ));
+                self.cached_entries.push(cache_entry);
+            }
+            Ok(())
+        } else {
+            self.add_entry_to_archive(
+                encoder,
+                previous_metadata,
+                c_file_name,
+                stat,
+                fd,
+                &metadata,
+                false,
+                None,
+            )
+            .await
+        }
+    }
+
+    async fn add_entry_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        previous_metadata: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+        flush_reused: bool,
+        payload_offset: Option<PayloadOffset>,
+    ) -> Result<(), Error> {
+        use pxar::format::mode;
+
         let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
         match metadata.file_type() {
             mode::IFREG => {
@@ -819,72 +923,64 @@ impl Archiver {
                         .add_file(c_file_name, file_size, stat.st_mtime)?;
                 }
 
-                let offset: LinkOffset = self
-                    .add_regular_file(encoder, fd, file_name, &metadata, file_size)
-                    .await?;
+                if flush_reused {
+                    self.total_reused_payload_size +=
+                        file_size + size_of::<pxar::format::Header>() as u64;
+                    encoder
+                        .add_payload_ref(metadata, file_name, file_size, payload_offset.unwrap())
+                        .await?;
+                } else {
+                    let offset: LinkOffset = self
+                        .add_regular_file(encoder, fd, file_name, metadata, file_size)
+                        .await?;
 
-                if stat.st_nlink > 1 {
-                    self.hardlinks
-                        .insert(link_info, (self.path.clone(), offset));
+                    if stat.st_nlink > 1 {
+                        self.hardlinks
+                            .insert(link_info, (self.path.clone(), offset));
+                    }
                 }
 
                 Ok(())
             }
             mode::IFDIR => {
                 let dir = Dir::from_fd(fd.into_raw_fd())?;
-
-                if let Some(ref catalog) = self.catalog {
-                    catalog.lock().unwrap().start_directory(c_file_name)?;
-                }
-                let result = self
-                    .add_directory(
-                        encoder,
-                        previous_metadata,
-                        dir,
-                        c_file_name,
-                        &metadata,
-                        stat,
-                    )
-                    .await;
-                if let Some(ref catalog) = self.catalog {
-                    catalog.lock().unwrap().end_directory()?;
-                }
-                result
+                self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
+                    .await
             }
             mode::IFSOCK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_socket(c_file_name)?;
                 }
 
-                Ok(encoder.add_socket(&metadata, file_name).await?)
+                Ok(encoder.add_socket(metadata, file_name).await?)
             }
             mode::IFIFO => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_fifo(c_file_name)?;
                 }
 
-                Ok(encoder.add_fifo(&metadata, file_name).await?)
+                Ok(encoder.add_fifo(metadata, file_name).await?)
             }
             mode::IFLNK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_symlink(c_file_name)?;
                 }
 
-                self.add_symlink(encoder, fd, file_name, &metadata).await
+                self.add_symlink(encoder, fd, file_name, metadata).await
             }
             mode::IFBLK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_block_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, stat).await
+                self.add_device(encoder, file_name, metadata, stat).await
             }
             mode::IFCHR => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_char_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, stat).await
+                self.add_device(encoder, file_name, metadata, stat).await
             }
             other => bail!(
                 "encountered unknown file type: 0x{:x} (0o{:o})",
@@ -947,8 +1043,17 @@ impl Archiver {
         }
 
         self.flush_cached_to_archive(encoder, false, true).await?;
-        self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
-            .await
+        self.add_entry_to_archive(
+            encoder,
+            previous_metadata_accessor,
+            c_file_name,
+            stat,
+            fd,
+            metadata,
+            false,
+            None,
+        )
+        .await
     }
 
     async fn flush_cached_to_archive<T: SeqWrite + Send>(
@@ -1143,13 +1248,18 @@ impl Archiver {
         encoder: &mut Encoder<'_, T>,
         previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
         dir: Dir,
-        dir_name: &CStr,
+        c_dir_name: &CStr,
         metadata: &Metadata,
         stat: &FileStat,
     ) -> Result<(), Error> {
-        let dir_name = OsStr::from_bytes(dir_name.to_bytes());
+        let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
 
-        encoder.create_directory(dir_name, metadata).await?;
+        if !self.caching_enabled {
+            if let Some(ref catalog) = self.catalog {
+                catalog.lock().unwrap().start_directory(c_dir_name)?;
+            }
+            encoder.create_directory(dir_name, metadata).await?;
+        }
 
         let old_fs_magic = self.fs_magic;
         let old_fs_feature_flags = self.fs_feature_flags;
@@ -1189,7 +1299,15 @@ impl Archiver {
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish().await?;
+        if !self.caching_enabled {
+            encoder.finish().await?;
+            if let Some(ref catalog) = self.catalog {
+                if !self.caching_enabled {
+                    catalog.lock().unwrap().end_directory()?;
+                }
+            }
+        }
+
         result
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list