[pbs-devel] [PATCH proxmox-backup 13/15] verify: factor out common parameters

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon Jan 25 14:42:58 CET 2021


all the verify methods pass along the following:
- task worker
- datastore
- corrupt and verified chunks

might as well pull that out into a common type, with the added bonus of
now having a single point for construction instead of copying the
default capacaties in three different modules..

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 src/api2/admin/datastore.rs    |  17 +---
 src/api2/backup/environment.rs |  10 +-
 src/backup/verify.rs           | 174 ++++++++++++++-------------------
 src/server/verify_job.rs       |   3 +-
 4 files changed, 85 insertions(+), 119 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ba8f3417..3d5b6af6 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -3,7 +3,6 @@
 use std::collections::HashSet;
 use std::ffi::OsStr;
 use std::os::unix::ffi::OsStrExt;
-use std::sync::{Arc, Mutex};
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
 
@@ -672,17 +671,12 @@ pub fn verify(
         auth_id.clone(),
         to_stdout,
         move |worker| {
-            let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
-            let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
-
+            let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
             let failed_dirs = if let Some(backup_dir) = backup_dir {
                 let mut res = Vec::new();
                 if !verify_backup_dir(
-                    datastore,
+                    &verify_worker,
                     &backup_dir,
-                    verified_chunks,
-                    corrupt_chunks,
-                    worker.clone(),
                     worker.upid().clone(),
                     None,
                 )? {
@@ -691,12 +685,9 @@ pub fn verify(
                 res
             } else if let Some(backup_group) = backup_group {
                 let failed_dirs = verify_backup_group(
-                    datastore,
+                    &verify_worker,
                     &backup_group,
-                    verified_chunks,
-                    corrupt_chunks,
                     &mut StoreProgress::new(1),
-                    worker.clone(),
                     worker.upid(),
                     None,
                 )?;
@@ -711,7 +702,7 @@ pub fn verify(
                     None
                 };
 
-                verify_all_backups(datastore, worker.clone(), worker.upid(), owner, None)?
+                verify_all_backups(&verify_worker, worker.upid(), owner, None)?
             };
             if !failed_dirs.is_empty() {
                 worker.log("Failed to verify the following snapshots/groups:");
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 38061816..c8f52b6e 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,6 +1,6 @@
 use anyhow::{bail, format_err, Error};
 use std::sync::{Arc, Mutex};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use nix::dir::Dir;
 
 use ::serde::{Serialize};
@@ -525,15 +525,11 @@ impl BackupEnvironment {
             move |worker| {
                 worker.log("Automatically verifying newly added snapshot");
 
-                let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
-                let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
 
+                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
                 if !verify_backup_dir_with_lock(
-                    datastore,
+                    &verify_worker,
                     &backup_dir,
-                    verified_chunks,
-                    corrupt_chunks,
-                    worker.clone(),
                     worker.upid().clone(),
                     None,
                     snap_lock,
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 5e4bc7fb..ac4a6c29 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -29,6 +29,29 @@ use crate::{
     tools::fs::lock_dir_noblock_shared,
 };
 
+/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
+/// already been verified or detected as corrupt.
+pub struct VerifyWorker {
+    worker: Arc<dyn TaskState + Send + Sync>,
+    datastore: Arc<DataStore>,
+    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+}
+
+impl VerifyWorker {
+    /// Creates a new VerifyWorker for a given task worker and datastore.
+    pub fn new(worker: Arc<dyn TaskState + Send + Sync>, datastore: Arc<DataStore>) -> Self {
+        Self {
+            worker,
+            datastore,
+            // start with 16k chunks == up to 64G data
+            verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16*1024))),
+            // start with 64 chunks since we assume there are few corrupt ones
+            corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
+        }
+    }
+}
+
 fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
 
     let blob = datastore.load_blob(backup_dir, &info.filename)?;
@@ -82,12 +105,9 @@ fn rename_corrupted_chunk(
 }
 
 fn verify_index_chunks(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     index: Box<dyn IndexFile + Send>,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     crypt_mode: CryptMode,
-    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let errors = Arc::new(AtomicUsize::new(0));
@@ -97,10 +117,10 @@ fn verify_index_chunks(
     let mut read_bytes = 0;
     let mut decoded_bytes = 0;
 
-    let worker2 = Arc::clone(&worker);
-    let datastore2 = Arc::clone(&datastore);
-    let corrupt_chunks2 = Arc::clone(&corrupt_chunks);
-    let verified_chunks2 = Arc::clone(&verified_chunks);
+    let worker2 = Arc::clone(&verify_worker.worker);
+    let datastore2 = Arc::clone(&verify_worker.datastore);
+    let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
+    let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
     let errors2 = Arc::clone(&errors);
 
     let decoder_pool = ParallelHandler::new(
@@ -141,29 +161,29 @@ fn verify_index_chunks(
 
     for pos in 0..index.index_count() {
 
-        worker.check_abort()?;
+        verify_worker.worker.check_abort()?;
         crate::tools::fail_on_shutdown()?;
 
         let info = index.chunk_info(pos).unwrap();
         let size = info.size();
 
-        if verified_chunks.lock().unwrap().contains(&info.digest) {
+        if verify_worker.verified_chunks.lock().unwrap().contains(&info.digest) {
             continue; // already verified
         }
 
-        if corrupt_chunks.lock().unwrap().contains(&info.digest) {
+        if verify_worker.corrupt_chunks.lock().unwrap().contains(&info.digest) {
             let digest_str = proxmox::tools::digest_to_hex(&info.digest);
-            task_log!(worker, "chunk {} was marked as corrupt", digest_str);
+            task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
             errors.fetch_add(1, Ordering::SeqCst);
             continue;
         }
 
-        match datastore.load_chunk(&info.digest) {
+        match verify_worker.datastore.load_chunk(&info.digest) {
             Err(err) => {
-                corrupt_chunks.lock().unwrap().insert(info.digest);
-                task_log!(worker, "can't verify chunk, load failed - {}", err);
+                verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
+                task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
                 errors.fetch_add(1, Ordering::SeqCst);
-                rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
+                rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker);
                 continue;
             }
             Ok(chunk) => {
@@ -187,7 +207,7 @@ fn verify_index_chunks(
     let error_count = errors.load(Ordering::SeqCst);
 
     task_log!(
-        worker,
+        verify_worker.worker,
         "  verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
         read_bytes_mib,
         decoded_bytes_mib,
@@ -205,18 +225,15 @@ fn verify_index_chunks(
 }
 
 fn verify_fixed_index(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
     info: &FileInfo,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let mut path = backup_dir.relative_path();
     path.push(&info.filename);
 
-    let index = datastore.open_fixed_reader(&path)?;
+    let index = verify_worker.datastore.open_fixed_reader(&path)?;
 
     let (csum, size) = index.compute_csum();
     if size != info.size {
@@ -228,28 +245,22 @@ fn verify_fixed_index(
     }
 
     verify_index_chunks(
-        datastore,
+        verify_worker,
         Box::new(index),
-        verified_chunks,
-        corrupt_chunks,
         info.chunk_crypt_mode(),
-        worker,
     )
 }
 
 fn verify_dynamic_index(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
     info: &FileInfo,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<dyn TaskState + Send + Sync>,
 ) -> Result<(), Error> {
 
     let mut path = backup_dir.relative_path();
     path.push(&info.filename);
 
-    let index = datastore.open_dynamic_reader(&path)?;
+    let index = verify_worker.datastore.open_dynamic_reader(&path)?;
 
     let (csum, size) = index.compute_csum();
     if size != info.size {
@@ -261,12 +272,9 @@ fn verify_dynamic_index(
     }
 
     verify_index_chunks(
-        datastore,
+        verify_worker,
         Box::new(index),
-        verified_chunks,
-        corrupt_chunks,
         info.chunk_crypt_mode(),
-        worker,
     )
 }
 
@@ -280,34 +288,28 @@ fn verify_dynamic_index(
 /// - Ok(false) if there were verification errors
 /// - Err(_) if task was aborted
 pub fn verify_backup_dir(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<dyn TaskState + Send + Sync>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<bool, Error> {
     let snap_lock = lock_dir_noblock_shared(
-        &datastore.snapshot_path(&backup_dir),
+        &verify_worker.datastore.snapshot_path(&backup_dir),
         "snapshot",
         "locked by another operation");
     match snap_lock {
         Ok(snap_lock) => verify_backup_dir_with_lock(
-            datastore,
+            verify_worker,
             backup_dir,
-            verified_chunks,
-            corrupt_chunks,
-            worker,
             upid,
             filter,
             snap_lock
         ),
         Err(err) => {
             task_log!(
-                worker,
+                verify_worker.worker,
                 "SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
-                datastore.name(),
+                verify_worker.datastore.name(),
                 backup_dir,
                 err,
             );
@@ -318,22 +320,19 @@ pub fn verify_backup_dir(
 
 /// See verify_backup_dir
 pub fn verify_backup_dir_with_lock(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     backup_dir: &BackupDir,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    worker: Arc<dyn TaskState + Send + Sync>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
     _snap_lock: Dir,
 ) -> Result<bool, Error> {
-    let manifest = match datastore.load_manifest(&backup_dir) {
+    let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
         Ok((manifest, _)) => manifest,
         Err(err) => {
             task_log!(
-                worker,
+                verify_worker.worker,
                 "verify {}:{} - manifest load error: {}",
-                datastore.name(),
+                verify_worker.datastore.name(),
                 backup_dir,
                 err,
             );
@@ -344,54 +343,48 @@ pub fn verify_backup_dir_with_lock(
     if let Some(filter) = filter {
         if !filter(&manifest) {
             task_log!(
-                worker,
+                verify_worker.worker,
                 "SKIPPED: verify {}:{} (recently verified)",
-                datastore.name(),
+                verify_worker.datastore.name(),
                 backup_dir,
             );
             return Ok(true);
         }
     }
 
-    task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
+    task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
 
     let mut error_count = 0;
 
     let mut verify_result = VerifyState::Ok;
     for info in manifest.files() {
         let result = proxmox::try_block!({
-            task_log!(worker, "  check {}", info.filename);
+            task_log!(verify_worker.worker, "  check {}", info.filename);
             match archive_type(&info.filename)? {
                 ArchiveType::FixedIndex =>
                     verify_fixed_index(
-                        datastore.clone(),
+                        verify_worker,
                         &backup_dir,
                         info,
-                        verified_chunks.clone(),
-                        corrupt_chunks.clone(),
-                        worker.clone(),
                     ),
                 ArchiveType::DynamicIndex =>
                     verify_dynamic_index(
-                        datastore.clone(),
+                        verify_worker,
                         &backup_dir,
                         info,
-                        verified_chunks.clone(),
-                        corrupt_chunks.clone(),
-                        worker.clone(),
                     ),
-                ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
+                ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), &backup_dir, info),
             }
         });
 
-        worker.check_abort()?;
+        verify_worker.worker.check_abort()?;
         crate::tools::fail_on_shutdown()?;
 
         if let Err(err) = result {
             task_log!(
-                worker,
+                verify_worker.worker,
                 "verify {}:{}/{} failed: {}",
-                datastore.name(),
+                verify_worker.datastore.name(),
                 backup_dir,
                 info.filename,
                 err,
@@ -407,7 +400,7 @@ pub fn verify_backup_dir_with_lock(
         upid,
     };
     let verify_state = serde_json::to_value(verify_state)?;
-    datastore.update_manifest(&backup_dir, |manifest| {
+    verify_worker.datastore.update_manifest(&backup_dir, |manifest| {
         manifest.unprotected["verify_state"] = verify_state;
     }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
 
@@ -422,24 +415,21 @@ pub fn verify_backup_dir_with_lock(
 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
 pub fn verify_backup_group(
-    datastore: Arc<DataStore>,
+    verify_worker: &VerifyWorker,
     group: &BackupGroup,
-    verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
-    corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     progress: &mut StoreProgress,
-    worker: Arc<dyn TaskState + Send + Sync>,
     upid: &UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<Vec<String>, Error> {
 
     let mut errors = Vec::new();
-    let mut list = match group.list_backups(&datastore.base_path()) {
+    let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
         Ok(list) => list,
         Err(err) => {
             task_log!(
-                worker,
+                verify_worker.worker,
                 "verify group {}:{} - unable to list backups: {}",
-                datastore.name(),
+                verify_worker.datastore.name(),
                 group,
                 err,
             );
@@ -448,18 +438,15 @@ pub fn verify_backup_group(
     };
 
     let snapshot_count = list.len();
-    task_log!(worker, "verify group {}:{} ({} snapshots)", datastore.name(), group, snapshot_count);
+    task_log!(verify_worker.worker, "verify group {}:{} ({} snapshots)", verify_worker.datastore.name(), group, snapshot_count);
 
     progress.group_snapshots = snapshot_count as u64;
 
     BackupInfo::sort_list(&mut list, false); // newest first
     for (pos, info) in list.into_iter().enumerate() {
         if !verify_backup_dir(
-            datastore.clone(),
+            verify_worker,
             &info.backup_dir,
-            verified_chunks.clone(),
-            corrupt_chunks.clone(),
-            worker.clone(),
             upid.clone(),
             filter,
         )? {
@@ -467,7 +454,7 @@ pub fn verify_backup_group(
         }
         progress.done_snapshots = pos as u64 + 1;
         task_log!(
-            worker,
+            verify_worker.worker,
             "percentage done: {}",
             progress
         );
@@ -484,22 +471,22 @@ pub fn verify_backup_group(
 /// - Ok(failed_dirs) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
 pub fn verify_all_backups(
-    datastore: Arc<DataStore>,
-    worker: Arc<dyn TaskState + Send + Sync>,
+    verify_worker: &VerifyWorker,
     upid: &UPID,
     owner: Option<Authid>,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<Vec<String>, Error> {
     let mut errors = Vec::new();
+    let worker = Arc::clone(&verify_worker.worker);
 
-    task_log!(worker, "verify datastore {}", datastore.name());
+    task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
 
     if let Some(owner) = &owner {
         task_log!(worker, "limiting to backups owned by {}", owner);
     }
 
     let filter_by_owner = |group: &BackupGroup| {
-        match (datastore.get_owner(group), &owner) {
+        match (verify_worker.datastore.get_owner(group), &owner) {
             (Ok(ref group_owner), Some(owner)) => {
                 group_owner == owner
                     || (group_owner.is_token()
@@ -527,7 +514,7 @@ pub fn verify_all_backups(
         }
     };
 
-    let mut list = match BackupInfo::list_backup_groups(&datastore.base_path()) {
+    let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
         Ok(list) => list
             .into_iter()
             .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
@@ -545,12 +532,6 @@ pub fn verify_all_backups(
 
     list.sort_unstable();
 
-    // start with 16384 chunks (up to 65GB)
-    let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
-
-    // start with 64 chunks since we assume there are few corrupt ones
-    let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
-
     let group_count = list.len();
     task_log!(worker, "found {} groups", group_count);
 
@@ -562,12 +543,9 @@ pub fn verify_all_backups(
         progress.group_snapshots = 0;
 
         let mut group_errors = verify_backup_group(
-            datastore.clone(),
+            verify_worker,
             &group,
-            verified_chunks.clone(),
-            corrupt_chunks.clone(),
             &mut progress,
-            worker.clone(),
             upid,
             filter,
         )?;
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index ca6eb554..1dd8baa7 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -67,7 +67,8 @@ pub fn do_verification_job(
                 task_log!(worker,"task triggered by schedule '{}'", event_str);
             }
 
-            let result = verify_all_backups(datastore, worker.clone(), worker.upid(), None, Some(&filter));
+            let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+            let result = verify_all_backups(&verify_worker, worker.upid(), None, Some(&filter));
             let job_result = match result {
                 Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()),
                 Ok(ref failed_dirs) => {
-- 
2.20.1






More information about the pbs-devel mailing list