[pbs-devel] [PATCH v6 proxmox-backup 28/29] pxar: add lookahead caching to reduce chunk fragmentation

Christian Ebner c.ebner at proxmox.com
Thu Jan 25 14:26:07 CET 2024


For multiple consecutive runs with metadata based file change detection
the referencing of existing chunks can lead to fragmentation and an
increased size of the index file.

In order to reduce this, look ahead before adding files as reference and
only reference the chunks if excessive fragmentation can be avoided,
re-encode file regularly otherwise.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since v5:
- not present in previous version

 pbs-client/src/pxar/create.rs          | 435 ++++++++++++++++++++-----
 pbs-client/src/pxar/lookahead_cache.rs |  41 +++
 pbs-client/src/pxar/mod.rs             |   1 +
 3 files changed, 402 insertions(+), 75 deletions(-)
 create mode 100644 pbs-client/src/pxar/lookahead_cache.rs

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index a8e70651..e29401ee 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -31,11 +31,13 @@ use pbs_datastore::catalog::{
 use pbs_datastore::dynamic_index::{AppendableDynamicEntry, DynamicIndexReader};
 
 use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::lookahead_cache::{CacheEntry, CacheEntryData};
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
 
 const MAX_FILE_SIZE: u64 = 1024;
+const CACHED_PAYLOAD_THRESHOLD: u64 = 2 * 1024 * 1024;
 
 /// Pxar options for creating a pxar archive/stream
 #[derive(Default)]
@@ -240,6 +242,9 @@ struct Archiver {
     forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
     appendix: Appendix,
     prev_appendix: Option<AppendixStartOffset>,
+    cached_entries: Vec<CacheEntry>,
+    caching_enabled: bool,
+    cached_payload_size: u64,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -329,6 +334,9 @@ where
         forced_boundaries,
         appendix: Appendix::new(),
         prev_appendix: appendix_start,
+        cached_entries: Vec::new(),
+        caching_enabled: false,
+        cached_payload_size: 0,
     };
 
     if let Some(ref mut catalog) = archiver.catalog {
@@ -344,6 +352,17 @@ where
         .archive_dir_contents(&mut encoder, source_dir, prev_cat_parent.as_ref(), true)
         .await?;
 
+    if let Some(last) = archiver.cached_entries.pop() {
+        match last {
+            CacheEntry::DirEnd => {}
+            _ => archiver.cached_entries.push(last),
+        }
+    }
+
+    archiver
+        .flush_cached_to_archive(&mut encoder, false)
+        .await?;
+
     if archiver.appendix.is_empty() {
         encoder.finish(None).await?;
         if let Some(ref mut catalog) = archiver.catalog {
@@ -442,6 +461,11 @@ impl Archiver {
                 .await
                 .map_err(|err| self.wrap_err(err))?;
             }
+
+            if self.caching_enabled {
+                self.cached_entries.push(CacheEntry::DirEnd);
+            }
+
             self.path = old_path;
             self.entry_counter = entry_counter;
             self.patterns.truncate(old_patterns_count);
@@ -694,7 +718,7 @@ impl Archiver {
         let total = self.appendix.total;
         let appendix_offset = encoder.add_appendix(total).await?;
         let mut boundaries = self.forced_boundaries.lock().unwrap();
-        let mut position = unsafe { encoder.position_add(0) };
+        let mut position = unsafe { encoder.position_add(0)? };
 
         // Inject reused chunks in patches of 128 to not exceed upload post req size limit
         for chunks in self.appendix.chunks.chunks(128) {
@@ -707,24 +731,21 @@ impl Archiver {
                 size,
             };
             boundaries.push_back(inject_chunks);
-            position = unsafe { encoder.position_add(size as u64) };
+            position = unsafe { encoder.position_add(size as u64)? };
         }
 
         Ok((appendix_offset, total))
     }
 
-    async fn reuse_if_metadata_unchanged<T: SeqWrite + Send>(
+    async fn is_reusable_entry(
         &mut self,
-        encoder: &mut Encoder<'_, T>,
         c_file_name: &CStr,
-        metadata: &Metadata,
         stat: &FileStat,
         catalog_entries: &[DirEntry],
-    ) -> Result<bool, Error> {
-        let prev_ref = match self.previous_ref {
-            None => return Ok(false),
-            Some(ref mut prev_ref) => prev_ref,
-        };
+    ) -> Result<Option<u64>, Error> {
+        if stat.st_nlink > 1 || (stat.st_size as u64) < MAX_FILE_SIZE {
+            return Ok(None);
+        }
 
         let catalog_entry = catalog_entries
             .iter()
@@ -754,43 +775,17 @@ impl Archiver {
                 ctime,
                 self.prev_appendix.unwrap().raw() + appendix_ref_offset.raw(),
             ),
-            // The entry type found in the catalog is not a regular file,
+            // Entry not found or the entry type found in the catalog is not a regular file,
             // do not reuse entry but rather encode it again.
-            _ => return Ok(false),
+            _ => return Ok(None),
         };
 
         let file_size = stat.st_size as u64;
         if ctime != stat.st_ctime || size != file_size {
-            return Ok(false);
+            return Ok(None);
         }
 
-        // Since ctime is unchanged, use current metadata size to calculate size and thereby
-        // end offset for the file payload in the reference archive.
-        // This is required to find the existing indexes to be included in the new index file.
-        let mut bytes = pxar::encoder::encoded_size(c_file_name, metadata).await?;
-        // payload header size
-        bytes += std::mem::size_of::<pxar::format::Header>() as u64;
-
-        let end_offset = start_offset + bytes + file_size;
-        let (indices, start_padding, end_padding) =
-            prev_ref.index.indices(start_offset, end_offset)?;
-
-        let appendix_ref_offset = self.appendix.insert(indices, start_padding);
-        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
-        self.add_appendix_ref(encoder, file_name, appendix_ref_offset, file_size)
-            .await?;
-
-        if let Some(ref catalog) = self.catalog {
-            catalog.lock().unwrap().add_appendix_ref(
-                c_file_name,
-                file_size,
-                stat.st_mtime,
-                stat.st_ctime,
-                appendix_ref_offset,
-            )?;
-        }
-
-        Ok(true)
+        Ok(Some(start_offset))
     }
 
     async fn add_entry<T: SeqWrite + Send>(
@@ -801,8 +796,6 @@ impl Archiver {
         stat: &FileStat,
         prev_cat_entries: &[DirEntry],
     ) -> Result<(), Error> {
-        use pxar::format::mode;
-
         let file_mode = stat.st_mode & libc::S_IFMT;
         let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
             OFlag::empty()
@@ -822,6 +815,15 @@ impl Archiver {
             None => return Ok(()),
         };
 
+        let match_path = PathBuf::from("/").join(self.path.clone());
+        if self
+            .patterns
+            .matches(match_path.as_os_str().as_bytes(), stat.st_mode)?
+            == Some(MatchType::Exclude)
+        {
+            return Ok(());
+        }
+
         let metadata = get_metadata(
             fd.as_raw_fd(),
             stat,
@@ -830,16 +832,148 @@ impl Archiver {
             &mut self.fs_feature_flags,
         )?;
 
-        let match_path = PathBuf::from("/").join(self.path.clone());
-        if self
-            .patterns
-            .matches(match_path.as_os_str().as_bytes(), stat.st_mode)?
-            == Some(MatchType::Exclude)
-        {
-            return Ok(());
+        if self.previous_ref.is_none() {
+            return self
+                .add_entry_to_archive(encoder, c_file_name, stat, fd, &metadata, prev_cat_entries)
+                .await;
         }
 
+        if metadata.is_regular_file() {
+            self.cache_or_flush_entries(encoder, c_file_name, stat, fd, &metadata, prev_cat_entries)
+                .await
+        } else {
+            if self.caching_enabled {
+                if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+                    let fd_clone = fd.try_clone()?;
+                    let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+                        fd,
+                        c_file_name.into(),
+                        stat.clone(),
+                        metadata.clone(),
+                        Vec::new(),
+                        0,
+                    ));
+                    self.cached_entries.push(cache_entry);
+                    let prev_cat_entry = prev_cat_entries.iter().find(|entry| match entry {
+                        DirEntry {
+                            name,
+                            attr: DirEntryAttribute::Directory { .. },
+                        } => name == c_file_name.to_bytes(),
+                        _ => false,
+                    });
+
+                    let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+                    self.add_directory(encoder, dir, c_file_name, &metadata, stat, prev_cat_entry)
+                        .await?;
+                    if let Some(ref catalog) = self.catalog {
+                        if !self.caching_enabled {
+                            catalog.lock().unwrap().end_directory()?;
+                        }
+                    }
+                } else {
+                    let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                        fd,
+                        c_file_name.into(),
+                        stat.clone(),
+                        metadata,
+                        Vec::new(),
+                        0,
+                    ));
+                    self.cached_entries.push(cache_entry);
+                }
+                Ok(())
+            } else {
+                self.add_entry_to_archive(
+                    encoder,
+                    c_file_name,
+                    stat,
+                    fd,
+                    &metadata,
+                    prev_cat_entries,
+                )
+                .await
+            }
+        }
+    }
+
+    async fn cache_or_flush_entries<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+        prev_cat_entries: &[DirEntry],
+    ) -> Result<(), Error> {
+        let reusable = self
+            .is_reusable_entry(c_file_name, stat, prev_cat_entries)
+            .await?;
+
         let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        let file_size = stat.st_size as u64;
+        if let Some(start_offset) = reusable {
+            if let Some(ref prev_ref) = self.previous_ref {
+                // Since ctime is unchanged, use current metadata size to calculate size and thereby
+                // end offset for the file payload in the reference archive.
+                // This is required to find the existing indexes to be included in the new index file.
+                let mut bytes = pxar::encoder::encoded_size(c_file_name, metadata).await?;
+                // payload header size
+                bytes += std::mem::size_of::<pxar::format::Header>() as u64;
+
+                let end_offset = start_offset + bytes + file_size;
+                let (indices, start_padding, _end_padding) =
+                    prev_ref.index.indices(start_offset, end_offset)?;
+
+                if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
+                    self.flush_cached_to_archive(encoder, true).await?;
+                    let appendix_ref_offset = self.appendix.insert(indices, start_padding);
+                    self.add_appendix_ref(encoder, file_name, appendix_ref_offset, file_size)
+                        .await?;
+
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().add_appendix_ref(
+                            &c_file_name,
+                            file_size,
+                            stat.st_mtime,
+                            stat.st_ctime,
+                            appendix_ref_offset,
+                        )?;
+                    }
+                } else {
+                    self.caching_enabled = true;
+                    self.cached_payload_size += file_size;
+                    let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                        fd,
+                        c_file_name.into(),
+                        stat.clone(),
+                        metadata.clone(),
+                        indices,
+                        start_padding,
+                    ));
+                    self.cached_entries.push(cache_entry);
+                }
+
+                return Ok(());
+            }
+        }
+
+        self.flush_cached_to_archive(encoder, false).await?;
+        self.add_entry_to_archive(encoder, c_file_name, stat, fd, &metadata, prev_cat_entries)
+            .await
+    }
+
+    async fn add_entry_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+        prev_cat_entries: &[DirEntry],
+    ) -> Result<(), Error> {
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+        use pxar::format::mode;
         match metadata.file_type() {
             mode::IFREG => {
                 let link_info = HardLinkInfo {
@@ -852,28 +986,12 @@ impl Archiver {
                         if let Some(ref catalog) = self.catalog {
                             catalog.lock().unwrap().add_hardlink(c_file_name)?;
                         }
-
                         encoder.add_hardlink(file_name, path, *offset).await?;
-
                         return Ok(());
                     }
                 }
 
                 let file_size = stat.st_size as u64;
-                if file_size > MAX_FILE_SIZE
-                    && self
-                        .reuse_if_metadata_unchanged(
-                            encoder,
-                            c_file_name,
-                            &metadata,
-                            stat,
-                            prev_cat_entries,
-                        )
-                        .await?
-                {
-                    return Ok(());
-                }
-
                 let offset: LinkOffset = self
                     .add_regular_file(encoder, fd, file_name, &metadata, file_size)
                     .await?;
@@ -892,7 +1010,6 @@ impl Archiver {
                     self.hardlinks
                         .insert(link_info, (self.path.clone(), offset));
                 }
-
                 Ok(())
             }
             mode::IFDIR => {
@@ -912,7 +1029,9 @@ impl Archiver {
                     .add_directory(encoder, dir, c_file_name, &metadata, stat, prev_cat_entry)
                     .await;
                 if let Some(ref catalog) = self.catalog {
-                    catalog.lock().unwrap().end_directory()?;
+                    if !self.caching_enabled {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
                 }
                 result
             }
@@ -920,35 +1039,30 @@ impl Archiver {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_socket(c_file_name)?;
                 }
-
                 Ok(encoder.add_socket(&metadata, file_name).await?)
             }
             mode::IFIFO => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_fifo(c_file_name)?;
                 }
-
                 Ok(encoder.add_fifo(&metadata, file_name).await?)
             }
             mode::IFLNK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_symlink(c_file_name)?;
                 }
-
                 self.add_symlink(encoder, fd, file_name, &metadata).await
             }
             mode::IFBLK => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_block_device(c_file_name)?;
                 }
-
                 self.add_device(encoder, file_name, &metadata, stat).await
             }
             mode::IFCHR => {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().add_char_device(c_file_name)?;
                 }
-
                 self.add_device(encoder, file_name, &metadata, stat).await
             }
             other => bail!(
@@ -959,6 +1073,172 @@ impl Archiver {
         }
     }
 
+    async fn flush_cached_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        as_appendix_refs: bool,
+    ) -> Result<(), Error> {
+        let entries = std::mem::take(&mut self.cached_entries);
+
+        self.caching_enabled = false;
+        self.cached_payload_size = 0;
+
+        for entry in entries {
+            match entry {
+                CacheEntry::RegEntry(data) => {
+                    self.flush_entry_to_archive(encoder, data, as_appendix_refs)
+                        .await?
+                }
+                CacheEntry::DirEntry(data) => {
+                    self.flush_directory_to_archive(encoder, data).await?
+                }
+                CacheEntry::DirEnd => {
+                    let result = encoder.finish(None).await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
+                    result
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
+        &'a mut self,
+        encoder: &'a mut Encoder<'b, T>,
+        entry_data: CacheEntryData,
+    ) -> Result<(), Error> {
+        let CacheEntryData {
+            c_file_name,
+            metadata,
+            ..
+        } = entry_data;
+        let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+
+        if let Some(ref catalog) = self.catalog {
+            catalog.lock().unwrap().start_directory(&c_file_name)?;
+        }
+
+        encoder.create_directory(dir_name, &metadata).await?;
+
+        Ok(())
+    }
+
+    async fn flush_entry_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        entry_data: CacheEntryData,
+        as_appendix_refs: bool,
+    ) -> Result<(), Error> {
+        use pxar::format::mode;
+
+        let CacheEntryData {
+            fd,
+            c_file_name,
+            stat,
+            metadata,
+            indices,
+            start_padding,
+        } = entry_data;
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+        match metadata.file_type() {
+            mode::IFREG => {
+                let link_info = HardLinkInfo {
+                    st_dev: stat.st_dev,
+                    st_ino: stat.st_ino,
+                };
+
+                if stat.st_nlink > 1 {
+                    if let Some((path, offset)) = self.hardlinks.get(&link_info) {
+                        if let Some(ref catalog) = self.catalog {
+                            catalog.lock().unwrap().add_hardlink(&c_file_name)?;
+                        }
+                        encoder.add_hardlink(file_name, path, *offset).await?;
+                        return Ok(());
+                    }
+                }
+
+                let file_size = stat.st_size as u64;
+                if as_appendix_refs {
+                    let appendix_ref_offset = self.appendix.insert(indices, start_padding);
+                    let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+                    self.add_appendix_ref(encoder, file_name, appendix_ref_offset, file_size)
+                        .await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().add_appendix_ref(
+                            &c_file_name,
+                            file_size,
+                            stat.st_mtime,
+                            stat.st_ctime,
+                            appendix_ref_offset,
+                        )?;
+                    }
+                } else {
+                    let offset: LinkOffset = self
+                        .add_regular_file(encoder, fd, file_name, &metadata, file_size)
+                        .await?;
+
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().add_file(
+                            &c_file_name,
+                            file_size,
+                            stat.st_mtime,
+                            stat.st_ctime,
+                            offset,
+                        )?;
+                    }
+
+                    if stat.st_nlink > 1 {
+                        self.hardlinks
+                            .insert(link_info, (self.path.clone(), offset));
+                    }
+                }
+            }
+            mode::IFSOCK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_socket(&c_file_name)?;
+                }
+                encoder.add_socket(&metadata, file_name).await?;
+            }
+            mode::IFIFO => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_fifo(&c_file_name)?;
+                }
+                encoder.add_fifo(&metadata, file_name).await?;
+            }
+            mode::IFLNK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_symlink(&c_file_name)?;
+                }
+                self.add_symlink(encoder, fd, file_name, &metadata).await?;
+            }
+            mode::IFBLK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_block_device(&c_file_name)?;
+                }
+                self.add_device(encoder, file_name, &metadata, &stat)
+                    .await?;
+            }
+            mode::IFCHR => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_char_device(&c_file_name)?;
+                }
+                self.add_device(encoder, file_name, &metadata, &stat)
+                    .await?;
+            }
+            other => bail!(
+                "encountered unknown file type: 0x{:x} (0o{:o})",
+                other,
+                other
+            ),
+        }
+
+        Ok(())
+    }
+
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
@@ -970,7 +1250,9 @@ impl Archiver {
     ) -> Result<(), Error> {
         let dir_name = OsStr::from_bytes(dir_name.to_bytes());
 
-        let mut encoder = encoder.create_directory(dir_name, metadata).await?;
+        if !self.caching_enabled {
+            encoder.create_directory(dir_name, metadata).await?;
+        }
 
         let old_fs_magic = self.fs_magic;
         let old_fs_feature_flags = self.fs_feature_flags;
@@ -993,7 +1275,7 @@ impl Archiver {
             log::info!("skipping mount point: {:?}", self.path);
             Ok(())
         } else {
-            self.archive_dir_contents(&mut encoder, dir, prev_cat_parent, false)
+            self.archive_dir_contents(encoder, dir, prev_cat_parent, false)
                 .await
         };
 
@@ -1001,7 +1283,10 @@ impl Archiver {
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish(None).await?;
+        if !self.caching_enabled {
+            encoder.finish(None).await?;
+        }
+
         result
     }
 
diff --git a/pbs-client/src/pxar/lookahead_cache.rs b/pbs-client/src/pxar/lookahead_cache.rs
new file mode 100644
index 00000000..b664250b
--- /dev/null
+++ b/pbs-client/src/pxar/lookahead_cache.rs
@@ -0,0 +1,41 @@
+use nix::sys::stat::FileStat;
+use std::ffi::CString;
+use std::os::unix::io::OwnedFd;
+
+use pbs_datastore::dynamic_index::AppendableDynamicEntry;
+use pxar::Metadata;
+
+pub(crate) struct CacheEntryData {
+    pub(crate) fd: OwnedFd,
+    pub(crate) c_file_name: CString,
+    pub(crate) stat: FileStat,
+    pub(crate) metadata: Metadata,
+    pub(crate) indices: Vec<AppendableDynamicEntry>,
+    pub(crate) start_padding: u64,
+}
+
+impl CacheEntryData {
+    pub(crate) fn new(
+        fd: OwnedFd,
+        c_file_name: CString,
+        stat: FileStat,
+        metadata: Metadata,
+        indices: Vec<AppendableDynamicEntry>,
+        start_padding: u64,
+    ) -> Self {
+        Self {
+            fd,
+            c_file_name,
+            stat,
+            metadata,
+            indices,
+            start_padding,
+        }
+    }
+}
+
+pub(crate) enum CacheEntry {
+    RegEntry(CacheEntryData),
+    DirEntry(CacheEntryData),
+    DirEnd,
+}
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 24315f5f..3729ab10 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -50,6 +50,7 @@
 pub(crate) mod create;
 pub(crate) mod dir_stack;
 pub(crate) mod extract;
+pub(crate) mod lookahead_cache;
 pub(crate) mod metadata;
 pub(crate) mod tools;
 
-- 
2.39.2





More information about the pbs-devel mailing list