[pbs-devel] [RFC PATCH 2/4] fix #3786: server/datastore: add deep sync parameter to pull sync jobs

Stefan Sterz s.sterz at proxmox.com
Wed Jun 15 10:20:38 CEST 2022


Signed-off-by: Stefan Sterz <s.sterz at proxmox.com>
---
 pbs-datastore/src/backup_info.rs | 22 +++++++++++++++++++++-
 src/server/pull.rs               | 28 ++++++++++++++++++----------
 2 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pbs-datastore/src/backup_info.rs b/pbs-datastore/src/backup_info.rs
index 10320a35..89461c66 100644
--- a/pbs-datastore/src/backup_info.rs
+++ b/pbs-datastore/src/backup_info.rs
@@ -9,7 +9,8 @@ use anyhow::{bail, format_err, Error};
 use proxmox_sys::fs::{lock_dir_noblock, replace_file, CreateOptions};
 
 use pbs_api_types::{
-    Authid, BackupNamespace, BackupType, GroupFilter, BACKUP_DATE_REGEX, BACKUP_FILE_REGEX,
+    Authid, BackupNamespace, BackupType, GroupFilter, SnapshotVerifyState, VerifyState,
+    BACKUP_DATE_REGEX, BACKUP_FILE_REGEX,
 };
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 
@@ -544,6 +545,25 @@ impl BackupDir {
 
         Ok(())
     }
+
+    /// Returns true if the last verification of the snapshot failed and false otherwise.
+    ///
+    /// Note that a snapshot that has not been verified will also return false.
+    pub fn is_corrupt(&self) -> bool {
+        let mut to_return = false;
+
+        let _ = self.update_manifest(|m| {
+            let verify = m.unprotected["verify_state"].clone();
+
+            if let Ok(verify) = serde_json::from_value::<SnapshotVerifyState>(verify) {
+                if verify.state == VerifyState::Failed {
+                    to_return = true;
+                }
+            }
+        });
+
+        to_return
+    }
 }
 
 impl AsRef<pbs_api_types::BackupNamespace> for BackupDir {
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 6778c66b..767b394c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -57,7 +57,7 @@ pub struct PullParameters {
     /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
     max_depth: Option<usize>,
     /// Whether to re-sync corrupted snapshots
-    _deep_sync: bool,
+    deep_sync: bool,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
     /// Rate limits for all transfers from `remote`
@@ -111,7 +111,7 @@ impl PullParameters {
             owner,
             remove_vanished,
             max_depth,
-            _deep_sync: deep_sync,
+            deep_sync,
             group_filter,
             limit,
         })
@@ -371,6 +371,7 @@ async fn pull_snapshot(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     snapshot: &pbs_datastore::BackupDir,
+    params: &PullParameters,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let mut manifest_name = snapshot.full_path();
@@ -437,7 +438,10 @@ async fn pull_snapshot(
         let mut path = snapshot.full_path();
         path.push(&item.filename);
 
-        if path.exists() {
+        // if a snapshot could not be verified, the index file will stay the same, but it'll point
+        // to at least one corrupted chunk. hence, skip this check if the last verification job
+        // failed and we are running a deep sync.
+        if !(params.deep_sync && snapshot.is_corrupt()) && path.exists() {
             match archive_type(&item.filename)? {
                 ArchiveType::DynamicIndex => {
                     let index = DynamicIndexReader::open(&path)?;
@@ -513,6 +517,7 @@ async fn pull_snapshot_from(
     worker: &WorkerTask,
     reader: Arc<BackupReader>,
     snapshot: &pbs_datastore::BackupDir,
+    params: &PullParameters,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let (_path, is_new, _snap_lock) = snapshot
@@ -522,7 +527,7 @@ async fn pull_snapshot_from(
     if is_new {
         task_log!(worker, "sync snapshot {}", snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+        if let Err(err) = pull_snapshot(worker, reader, snapshot, params, downloaded_chunks).await {
             if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                 snapshot.backup_ns(),
                 snapshot.as_ref(),
@@ -535,7 +540,7 @@ async fn pull_snapshot_from(
         task_log!(worker, "sync snapshot {} done", snapshot.dir());
     } else {
         task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
+        pull_snapshot(worker, reader, snapshot, params, downloaded_chunks).await?;
         task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
     }
 
@@ -666,10 +671,12 @@ async fn pull_group(
 
         remote_snapshots.insert(snapshot.time);
 
-        if let Some(last_sync_time) = last_sync {
-            if last_sync_time > snapshot.time {
-                skip_info.update(snapshot.time);
-                continue;
+        if !params.deep_sync {
+            if let Some(last_sync_time) = last_sync {
+                if last_sync_time > snapshot.time {
+                    skip_info.update(snapshot.time);
+                    continue;
+                }
             }
         }
 
@@ -699,7 +706,8 @@ async fn pull_group(
 
         let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(worker, reader, &snapshot, params, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
-- 
2.30.2






More information about the pbs-devel mailing list