[pbs-devel] [PATCH v3 proxmox-backup 44/58] client: pxar: add previous reference to archiver

Christian Ebner c.ebner at proxmox.com
Thu Mar 28 13:36:53 CET 2024


Read the previous snaphosts manifest and check if a split archive
with the same name is given. If so, create the accessor instance to
read the previous archive entries to be able to lookup and compare
the metata for the entries, allowing to make a decision if the
entry is reusable or not.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- renamed accessor to previous metadata_accessor
- get backup reader for previous snapshot after creating the writer
  instance for the new snapshot
- adapted to only use metadata mode for all or non of the given archives

 pbs-client/src/pxar/create.rs                 | 55 ++++++++++++++++---
 proxmox-backup-client/src/main.rs             | 51 ++++++++++++++++-
 .../src/proxmox_restore_daemon/api.rs         |  1 +
 pxar-bin/src/main.rs                          |  1 +
 4 files changed, 97 insertions(+), 11 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 95a91a59b..79925bba2 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -19,7 +19,7 @@ use nix::sys::stat::{FileStat, Mode};
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use pbs_datastore::index::IndexFile;
 use proxmox_sys::error::SysError;
-use pxar::accessor::aio::Accessor;
+use pxar::accessor::aio::{Accessor, Directory};
 use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
 use pxar::Metadata;
 
@@ -159,7 +159,7 @@ impl ReusedChunks {
 }
 
 /// Pxar options for creating a pxar archive/stream
-#[derive(Default, Clone)]
+#[derive(Default)]
 pub struct PxarCreateOptions {
     /// Device/mountpoint st_dev numbers that should be included. None for no limitation.
     pub device_set: Option<HashSet<u64>>,
@@ -171,6 +171,8 @@ pub struct PxarCreateOptions {
     pub skip_lost_and_found: bool,
     /// Skip xattrs of files that return E2BIG error
     pub skip_e2big_xattr: bool,
+    /// Reference state for partial backups
+    pub previous_ref: Option<PxarPrevRef>,
 }
 
 /// Statefull information of previous backups snapshots for partial backups
@@ -270,6 +272,7 @@ struct Archiver {
     file_copy_buffer: Vec<u8>,
     skip_e2big_xattr: bool,
     reused_chunks: ReusedChunks,
+    previous_payload_index: Option<DynamicIndexReader>,
     forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
 }
 
@@ -346,6 +349,15 @@ where
             MatchType::Exclude,
         )?);
     }
+    let (previous_payload_index, previous_metadata_accessor) =
+        if let Some(refs) = options.previous_ref {
+            (
+                Some(refs.payload_index),
+                refs.accessor.open_root().await.ok(),
+            )
+        } else {
+            (None, None)
+        };
 
     let mut archiver = Archiver {
         feature_flags,
@@ -363,11 +375,12 @@ where
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         skip_e2big_xattr: options.skip_e2big_xattr,
         reused_chunks: ReusedChunks::new(),
+        previous_payload_index,
         forced_boundaries,
     };
 
     archiver
-        .archive_dir_contents(&mut encoder, source_dir, true)
+        .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
         .await?;
     encoder.finish().await?;
     encoder.close().await?;
@@ -399,6 +412,7 @@ impl Archiver {
     fn archive_dir_contents<'a, T: SeqWrite + Send>(
         &'a mut self,
         encoder: &'a mut Encoder<'_, T>,
+        mut previous_metadata_accessor: Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
         mut dir: Dir,
         is_root: bool,
     ) -> BoxFuture<'a, Result<(), Error>> {
@@ -433,9 +447,15 @@ impl Archiver {
 
                 (self.callback)(&file_entry.path)?;
                 self.path = file_entry.path;
-                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
-                    .await
-                    .map_err(|err| self.wrap_err(err))?;
+                self.add_entry(
+                    encoder,
+                    &mut previous_metadata_accessor,
+                    dir_fd,
+                    &file_entry.name,
+                    &file_entry.stat,
+                )
+                .await
+                .map_err(|err| self.wrap_err(err))?;
             }
             self.path = old_path;
             self.entry_counter = entry_counter;
@@ -683,6 +703,7 @@ impl Archiver {
     async fn add_entry<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
+        previous_metadata: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
         parent: RawFd,
         c_file_name: &CStr,
         stat: &FileStat,
@@ -772,7 +793,14 @@ impl Archiver {
                     catalog.lock().unwrap().start_directory(c_file_name)?;
                 }
                 let result = self
-                    .add_directory(encoder, dir, c_file_name, &metadata, stat)
+                    .add_directory(
+                        encoder,
+                        previous_metadata,
+                        dir,
+                        c_file_name,
+                        &metadata,
+                        stat,
+                    )
                     .await;
                 if let Some(ref catalog) = self.catalog {
                     catalog.lock().unwrap().end_directory()?;
@@ -825,6 +853,7 @@ impl Archiver {
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
+        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
         dir: Dir,
         dir_name: &CStr,
         metadata: &Metadata,
@@ -855,7 +884,17 @@ impl Archiver {
             log::info!("skipping mount point: {:?}", self.path);
             Ok(())
         } else {
-            self.archive_dir_contents(encoder, dir, false).await
+            let mut dir_accessor = None;
+            if let Some(accessor) = previous_metadata_accessor.as_mut() {
+                if let Some(file_entry) = accessor.lookup(dir_name).await? {
+                    if file_entry.entry().is_dir() {
+                        let dir = file_entry.enter_directory().await?;
+                        dir_accessor = Some(dir);
+                    }
+                }
+            }
+            self.archive_dir_contents(encoder, dir_accessor, dir, false)
+                .await
         };
 
         self.fs_magic = old_fs_magic;
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 0b747453c..66dcaa63e 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -688,6 +688,10 @@ fn spawn_catalog_upload(
                schema: TRAFFIC_CONTROL_BURST_SCHEMA,
                optional: true,
            },
+           "change-detection-mode": {
+               schema: BACKUP_DETECTION_MODE_SPEC,
+               optional: true,
+           },
            "exclude": {
                type: Array,
                description: "List of paths or patterns for matching files to exclude.",
@@ -882,6 +886,9 @@ async fn create_backup(
 
     let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
 
+    let detection_mode = param["change-detection-mode"].as_str().unwrap_or("data");
+    let detection_mode = parse_backup_detection_mode_specification(detection_mode)?;
+
     let http_client = connect_rate_limited(&repo, rate_limit)?;
     record_repository(&repo);
 
@@ -982,7 +989,7 @@ async fn create_backup(
         None
     };
 
-    let mut manifest = BackupManifest::new(snapshot);
+    let mut manifest = BackupManifest::new(snapshot.clone());
 
     let mut catalog = None;
     let mut catalog_result_rx = None;
@@ -1029,14 +1036,13 @@ async fn create_backup(
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
             }
             (BackupSpecificationType::PXAR, false) => {
-                let metadata_mode = false; // Until enabled via param
-
                 let target_base = if let Some(base) = target_base.strip_suffix(".pxar") {
                     base.to_string()
                 } else {
                     bail!("unexpected suffix in target: {target_base}");
                 };
 
+                let metadata_mode = detection_mode.is_metadata();
                 let (target, payload_target) = if metadata_mode {
                     (
                         format!("{target_base}.mpxar.{extension}"),
@@ -1061,12 +1067,51 @@ async fn create_backup(
                     .unwrap()
                     .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
 
+                let mut previous_ref = None;
+                if metadata_mode {
+                    if let Some(ref manifest) = previous_manifest {
+                        let list = api_datastore_list_snapshots(
+                            &http_client,
+                            repo.store(),
+                            &backup_ns,
+                            Some(&snapshot.group),
+                        )
+                        .await?;
+                        let mut list: Vec<SnapshotListItem> = serde_json::from_value(list)?;
+
+                        // BackupWriter::start created a new snapshot, get the one before
+                        if list.len() > 1 {
+                            list.sort_unstable_by(|a, b| b.backup.time.cmp(&a.backup.time));
+                            let backup_dir: BackupDir =
+                                (snapshot.group.clone(), list[1].backup.time).into();
+                            let backup_reader = BackupReader::start(
+                                &http_client,
+                                crypt_config.clone(),
+                                repo.store(),
+                                &backup_ns,
+                                &backup_dir,
+                                true,
+                            )
+                            .await?;
+                            previous_ref = prepare_reference(
+                                &target,
+                                manifest.clone(),
+                                &client,
+                                backup_reader.clone(),
+                                crypt_config.clone(),
+                            )
+                            .await?
+                        }
+                    }
+                }
+
                 let pxar_options = pbs_client::pxar::PxarCreateOptions {
                     device_set: devices.clone(),
                     patterns: pattern_list.clone(),
                     entries_max: entries_max as usize,
                     skip_lost_and_found,
                     skip_e2big_xattr,
+                    previous_ref,
                 };
 
                 let upload_options = UploadOptions {
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index 0883d6cda..e50cb8184 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -355,6 +355,7 @@ fn extract(
                         patterns,
                         skip_lost_and_found: false,
                         skip_e2big_xattr: false,
+                        previous_ref: None,
                     };
 
                     let pxar_writer = TokioWriter::new(writer);
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index d46c98d2b..c6d3794bb 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -358,6 +358,7 @@ async fn create_archive(
         patterns,
         skip_lost_and_found: false,
         skip_e2big_xattr: false,
+        previous_ref: None,
     };
 
     let source = PathBuf::from(source);
-- 
2.39.2





More information about the pbs-devel mailing list