[pbs-devel] [PATCH v6 proxmox-backup 20/29] fix #3174: archiver: reuse files with unchanged metadata

Christian Ebner c.ebner at proxmox.com
Thu Jan 25 14:25:59 CET 2024


During pxar archive encoding, check regular files against their
previous backup catalogs metadata, if present.

Instead of re-encoding files with unchanged metadata with file size over
a given threshold limit, mark the entries as appendix references in the
pxar archive and append the chunks containing the file payload in the
appendix.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since v5:
- fix formatting using `cargo fmt`

 pbs-client/src/pxar/create.rs                 | 312 +++++++++++++++++-
 proxmox-backup-client/src/main.rs             |   8 +-
 .../src/proxmox_restore_daemon/api.rs         |   1 +
 pxar-bin/src/main.rs                          |   1 +
 src/tape/file_formats/snapshot_archive.rs     |   2 +-
 5 files changed, 303 insertions(+), 21 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 9980c393..a8e70651 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -17,21 +17,26 @@ use nix::sys::stat::{FileStat, Mode};
 
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use proxmox_sys::error::SysError;
-use pxar::encoder::{LinkOffset, SeqWrite};
+use pxar::encoder::{AppendixRefOffset, LinkOffset, SeqWrite};
 use pxar::Metadata;
 
 use proxmox_io::vec;
 use proxmox_lang::c_str;
 use proxmox_sys::fs::{self, acl, xattr};
 
-use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader};
-use pbs_datastore::dynamic_index::{DynamicEntry, DynamicIndexReader};
+use pbs_datastore::catalog::{
+    AppendixStartOffset, BackupCatalogWriter, CatalogReader, CatalogV2Extension, DirEntry,
+    DirEntryAttribute,
+};
+use pbs_datastore::dynamic_index::{AppendableDynamicEntry, DynamicIndexReader};
 
 use crate::inject_reused_chunks::InjectChunks;
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
 
+const MAX_FILE_SIZE: u64 = 1024;
+
 /// Pxar options for creating a pxar archive/stream
 #[derive(Default)]
 pub struct PxarCreateOptions {
@@ -45,6 +50,8 @@ pub struct PxarCreateOptions {
     pub skip_lost_and_found: bool,
     /// Reference state for partial backups
     pub previous_ref: Option<PxarPrevRef>,
+    /// Catalog archive name
+    pub archive_name: Option<CString>,
 }
 
 /// Contains statefull information of previous backups snapshots for partial backups
@@ -127,6 +134,93 @@ struct HardLinkInfo {
     st_ino: u64,
 }
 
+struct Appendix {
+    total: AppendixRefOffset,
+    chunks: Vec<AppendableDynamicEntry>,
+}
+
+impl Appendix {
+    fn new() -> Self {
+        Self {
+            total: AppendixRefOffset::default(),
+            chunks: Vec::new(),
+        }
+    }
+
+    fn is_empty(&self) -> bool {
+        self.chunks.is_empty()
+    }
+
+    fn insert(
+        &mut self,
+        indices: Vec<AppendableDynamicEntry>,
+        start_padding: u64,
+    ) -> AppendixRefOffset {
+        if let Some(offset) = self.digest_sequence_contained(&indices) {
+            AppendixRefOffset::default().add(offset + start_padding)
+        } else if let Some(offset) = self.last_digest_matched(&indices) {
+            for chunk in indices.into_iter().skip(1) {
+                self.total = self.total.add(chunk.size());
+                self.chunks.push(chunk);
+            }
+            AppendixRefOffset::default().add(offset + start_padding)
+        } else {
+            let offset = self.total;
+            for chunk in indices.into_iter() {
+                self.total = self.total.add(chunk.size());
+                self.chunks.push(chunk);
+            }
+            offset.add(start_padding)
+        }
+    }
+
+    fn digest_sequence_contained(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+        let digest = if let Some(first) = indices.first() {
+            first.digest()
+        } else {
+            return None;
+        };
+
+        let mut offset = 0;
+        let mut iter = self.chunks.iter();
+        while let Some(position) = iter.position(|e| {
+            offset += e.size();
+            e.digest() == digest
+        }) {
+            if indices.len() + position > self.chunks.len() {
+                return None;
+            }
+
+            for (ind, chunk) in indices.iter().skip(1).enumerate() {
+                if chunk.digest() != self.chunks[ind + position].digest() {
+                    return None;
+                }
+            }
+
+            offset -= self.chunks[position].size();
+            return Some(offset);
+        }
+
+        None
+    }
+
+    fn last_digest_matched(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+        let digest = if let Some(first) = indices.first() {
+            first.digest()
+        } else {
+            return None;
+        };
+
+        if let Some(last) = self.chunks.last() {
+            if last.digest() == digest {
+                return Some(self.total.raw() - last.size());
+            }
+        }
+
+        None
+    }
+}
+
 struct Archiver {
     feature_flags: Flags,
     fs_feature_flags: Flags,
@@ -144,7 +238,8 @@ struct Archiver {
     file_copy_buffer: Vec<u8>,
     previous_ref: Option<PxarPrevRef>,
     forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
-    inject: (usize, Vec<DynamicEntry>),
+    appendix: Appendix,
+    prev_appendix: Option<AppendixStartOffset>,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -155,7 +250,7 @@ pub async fn create_archive<T, F>(
     feature_flags: Flags,
     callback: F,
     catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
-    options: PxarCreateOptions,
+    mut options: PxarCreateOptions,
     forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
 ) -> Result<(), Error>
 where
@@ -200,6 +295,22 @@ where
         )?);
     }
 
+    let (appendix_start, prev_cat_parent) = if let Some(ref mut prev_ref) = options.previous_ref {
+        let entry = prev_ref
+            .catalog
+            .lookup_recursive(prev_ref.archive_name.as_bytes())?;
+        let parent = match entry.attr {
+            DirEntryAttribute::Archive { .. } => Some(entry),
+            _ => None,
+        };
+        let appendix_start = prev_ref
+            .catalog
+            .appendix_offset(prev_ref.archive_name.as_bytes())?;
+        (appendix_start, parent)
+    } else {
+        (None, None)
+    };
+
     let mut archiver = Archiver {
         feature_flags,
         fs_feature_flags,
@@ -216,13 +327,42 @@ where
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         previous_ref: options.previous_ref,
         forced_boundaries,
-        inject: (0, Vec::new()),
+        appendix: Appendix::new(),
+        prev_appendix: appendix_start,
     };
 
+    if let Some(ref mut catalog) = archiver.catalog {
+        if let Some(ref archive_name) = options.archive_name {
+            catalog
+                .lock()
+                .unwrap()
+                .start_archive(archive_name.as_c_str())?;
+        }
+    }
+
     archiver
-        .archive_dir_contents(&mut encoder, source_dir, true)
+        .archive_dir_contents(&mut encoder, source_dir, prev_cat_parent.as_ref(), true)
         .await?;
-    encoder.finish().await?;
+
+    if archiver.appendix.is_empty() {
+        encoder.finish(None).await?;
+        if let Some(ref mut catalog) = archiver.catalog {
+            if options.archive_name.is_some() {
+                catalog.lock().unwrap().end_archive(None)?;
+            }
+        }
+    } else {
+        let (appendix_offset, appendix_size) = archiver.add_appendix(&mut encoder).await?;
+        encoder
+            .finish(Some((appendix_offset, appendix_size)))
+            .await?;
+        if let Some(catalog) = archiver.catalog {
+            if options.archive_name.is_some() {
+                catalog.lock().unwrap().end_archive(Some(appendix_offset))?;
+            }
+        }
+    }
+
     Ok(())
 }
 
@@ -251,6 +391,7 @@ impl Archiver {
         &'a mut self,
         encoder: &'a mut Encoder<'b, T>,
         mut dir: Dir,
+        prev_cat_parent: Option<&'a DirEntry>,
         is_root: bool,
     ) -> BoxFuture<'a, Result<(), Error>> {
         async move {
@@ -273,6 +414,13 @@ impl Archiver {
 
             let old_path = std::mem::take(&mut self.path);
 
+            let mut prev_catalog_dir_entries = Vec::new();
+            if let Some(ref mut prev_ref) = self.previous_ref {
+                if let Some(ref parent) = prev_cat_parent {
+                    prev_catalog_dir_entries = prev_ref.catalog.read_dir(parent)?;
+                }
+            }
+
             for file_entry in file_list {
                 let file_name = file_entry.name.to_bytes();
 
@@ -284,9 +432,15 @@ impl Archiver {
 
                 (self.callback)(&file_entry.path)?;
                 self.path = file_entry.path;
-                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
-                    .await
-                    .map_err(|err| self.wrap_err(err))?;
+                self.add_entry(
+                    encoder,
+                    dir_fd,
+                    &file_entry.name,
+                    &file_entry.stat,
+                    &prev_catalog_dir_entries,
+                )
+                .await
+                .map_err(|err| self.wrap_err(err))?;
             }
             self.path = old_path;
             self.entry_counter = entry_counter;
@@ -533,12 +687,119 @@ impl Archiver {
         Ok(())
     }
 
+    async fn add_appendix<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(pxar::encoder::AppendixStartOffset, AppendixRefOffset), Error> {
+        let total = self.appendix.total;
+        let appendix_offset = encoder.add_appendix(total).await?;
+        let mut boundaries = self.forced_boundaries.lock().unwrap();
+        let mut position = unsafe { encoder.position_add(0) };
+
+        // Inject reused chunks in patches of 128 to not exceed upload post req size limit
+        for chunks in self.appendix.chunks.chunks(128) {
+            let size = chunks
+                .iter()
+                .fold(0, |sum, chunk| sum + chunk.size() as usize);
+            let inject_chunks = InjectChunks {
+                boundary: position,
+                chunks: chunks.to_vec(),
+                size,
+            };
+            boundaries.push_back(inject_chunks);
+            position = unsafe { encoder.position_add(size as u64) };
+        }
+
+        Ok((appendix_offset, total))
+    }
+
+    async fn reuse_if_metadata_unchanged<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        c_file_name: &CStr,
+        metadata: &Metadata,
+        stat: &FileStat,
+        catalog_entries: &[DirEntry],
+    ) -> Result<bool, Error> {
+        let prev_ref = match self.previous_ref {
+            None => return Ok(false),
+            Some(ref mut prev_ref) => prev_ref,
+        };
+
+        let catalog_entry = catalog_entries
+            .iter()
+            .find(|entry| entry.name == c_file_name.to_bytes());
+
+        let (size, ctime, start_offset) = match catalog_entry {
+            Some(&DirEntry {
+                attr:
+                    DirEntryAttribute::File {
+                        size,
+                        mtime: _,
+                        extension: Some(CatalogV2Extension { ctime, file_offset }),
+                    },
+                ..
+            }) => (size, ctime, file_offset.raw()),
+            Some(&DirEntry {
+                attr:
+                    DirEntryAttribute::AppendixRef {
+                        size,
+                        mtime: _,
+                        ctime,
+                        appendix_ref_offset,
+                    },
+                ..
+            }) => (
+                size,
+                ctime,
+                self.prev_appendix.unwrap().raw() + appendix_ref_offset.raw(),
+            ),
+            // The entry type found in the catalog is not a regular file,
+            // do not reuse entry but rather encode it again.
+            _ => return Ok(false),
+        };
+
+        let file_size = stat.st_size as u64;
+        if ctime != stat.st_ctime || size != file_size {
+            return Ok(false);
+        }
+
+        // Since ctime is unchanged, use current metadata size to calculate size and thereby
+        // end offset for the file payload in the reference archive.
+        // This is required to find the existing indexes to be included in the new index file.
+        let mut bytes = pxar::encoder::encoded_size(c_file_name, metadata).await?;
+        // payload header size
+        bytes += std::mem::size_of::<pxar::format::Header>() as u64;
+
+        let end_offset = start_offset + bytes + file_size;
+        let (indices, start_padding, end_padding) =
+            prev_ref.index.indices(start_offset, end_offset)?;
+
+        let appendix_ref_offset = self.appendix.insert(indices, start_padding);
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        self.add_appendix_ref(encoder, file_name, appendix_ref_offset, file_size)
+            .await?;
+
+        if let Some(ref catalog) = self.catalog {
+            catalog.lock().unwrap().add_appendix_ref(
+                c_file_name,
+                file_size,
+                stat.st_mtime,
+                stat.st_ctime,
+                appendix_ref_offset,
+            )?;
+        }
+
+        Ok(true)
+    }
+
     async fn add_entry<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
         parent: RawFd,
         c_file_name: &CStr,
         stat: &FileStat,
+        prev_cat_entries: &[DirEntry],
     ) -> Result<(), Error> {
         use pxar::format::mode;
 
@@ -599,6 +860,20 @@ impl Archiver {
                 }
 
                 let file_size = stat.st_size as u64;
+                if file_size > MAX_FILE_SIZE
+                    && self
+                        .reuse_if_metadata_unchanged(
+                            encoder,
+                            c_file_name,
+                            &metadata,
+                            stat,
+                            prev_cat_entries,
+                        )
+                        .await?
+                {
+                    return Ok(());
+                }
+
                 let offset: LinkOffset = self
                     .add_regular_file(encoder, fd, file_name, &metadata, file_size)
                     .await?;
@@ -626,8 +901,15 @@ impl Archiver {
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().start_directory(c_file_name)?;
                 }
+                let prev_cat_entry = prev_cat_entries.iter().find(|entry| match entry {
+                    DirEntry {
+                        name,
+                        attr: DirEntryAttribute::Directory { .. },
+                    } => name == c_file_name.to_bytes(),
+                    _ => false,
+                });
                 let result = self
-                    .add_directory(encoder, dir, c_file_name, &metadata, stat)
+                    .add_directory(encoder, dir, c_file_name, &metadata, stat, prev_cat_entry)
                     .await;
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().end_directory()?;
@@ -684,6 +966,7 @@ impl Archiver {
         dir_name: &CStr,
         metadata: &Metadata,
         stat: &FileStat,
+        prev_cat_parent: Option<&DirEntry>,
     ) -> Result<(), Error> {
         let dir_name = OsStr::from_bytes(dir_name.to_bytes());
 
@@ -710,14 +993,15 @@ impl Archiver {
             log::info!("skipping mount point: {:?}", self.path);
             Ok(())
         } else {
-            self.archive_dir_contents(&mut encoder, dir, false).await
+            self.archive_dir_contents(&mut encoder, dir, prev_cat_parent, false)
+                .await
         };
 
         self.fs_magic = old_fs_magic;
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish().await?;
+        encoder.finish(None).await?;
         result
     }
 
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 442616b7..a087b43b 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -48,7 +48,7 @@ use pbs_client::{
     FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
     BACKUP_SOURCE_SCHEMA,
 };
-use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
+use pbs_datastore::catalog::{CatalogReader, CatalogWriter};
 use pbs_datastore::chunk_store::verify_chunk_size;
 use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -1005,10 +1005,6 @@ async fn create_backup(
                 let catalog = catalog.as_ref().unwrap();
 
                 log_file("directory", &filename, &target);
-                catalog
-                    .lock()
-                    .unwrap()
-                    .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
 
                 let pxar_options = pbs_client::pxar::PxarCreateOptions {
                     device_set: devices.clone(),
@@ -1016,6 +1012,7 @@ async fn create_backup(
                     entries_max: entries_max as usize,
                     skip_lost_and_found,
                     previous_ref: None,
+                    archive_name: Some(std::ffi::CString::new(target.as_str())?),
                 };
 
                 let upload_options = UploadOptions {
@@ -1036,7 +1033,6 @@ async fn create_backup(
                 )
                 .await?;
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
-                catalog.lock().unwrap().end_directory()?;
             }
             (BackupSpecificationType::IMAGE, false) => {
                 log_file("image", &filename, &target);
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index 5eff673e..734fa976 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -359,6 +359,7 @@ fn extract(
                         patterns,
                         skip_lost_and_found: false,
                         previous_ref: None,
+                        archive_name: None,
                     };
 
                     let pxar_writer = TokioWriter::new(writer);
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index c019f3e4..6d365115 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -371,6 +371,7 @@ async fn create_archive(
         patterns,
         skip_lost_and_found: false,
         previous_ref: None,
+        archive_name: None,
     };
 
     let writer = pxar::encoder::sync::StandardWriter::new(writer);
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index abeefbd3..686eddc1 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -91,7 +91,7 @@ pub fn tape_write_snapshot_archive<'a>(
                 proxmox_lang::io_bail!("file '{}' shrunk while reading", filename);
             }
         }
-        encoder.finish()?;
+        encoder.finish(None)?;
         Ok(())
     });
 
-- 
2.39.2





More information about the pbs-devel mailing list