[pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use

Nicolas Frey n.frey at proxmox.com
Wed Nov 5 16:51:27 CET 2025


Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
---
 src/api2/admin/datastore.rs    | 13 +++++++++++--
 src/api2/backup/environment.rs |  2 +-
 src/backup/verify.rs           |  5 ++++-
 src/server/verify_job.rs       |  3 ++-
 4 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index d192ee39..69a09081 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -677,6 +677,14 @@ pub async fn status(
                 schema: NS_MAX_DEPTH_SCHEMA,
                 optional: true,
             },
+            "worker-threads": {
+                description: "Set the number of worker threads to use for the job",
+                type: Integer,
+                optional: true,
+                minimum: 1,
+                maximum: 32,
+                default: 1,
+            },
         },
     },
     returns: {
@@ -690,7 +698,7 @@ pub async fn status(
 )]
 /// Verify backups.
 ///
-/// This function can verify a single backup snapshot, all backup from a backup group,
+/// This function can verify a single backup snapshot, all backups from a backup group,
 /// or all backups in the datastore.
 #[allow(clippy::too_many_arguments)]
 pub fn verify(
@@ -702,6 +710,7 @@ pub fn verify(
     ignore_verified: Option<bool>,
     outdated_after: Option<i64>,
     max_depth: Option<usize>,
+    worker_threads: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -781,7 +790,7 @@ pub fn verify(
         auth_id.to_string(),
         to_stdout,
         move |worker| {
-            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+            let verify_worker = VerifyWorker::new(worker.clone(), datastore, worker_threads)?;
             let failed_dirs = if let Some(backup_dir) = backup_dir {
                 let mut res = Vec::new();
                 if !verify_worker.verify_backup_dir(
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 0e8eab1b..5e6a73b9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -812,7 +812,7 @@ impl BackupEnvironment {
             move |worker| {
                 worker.log_message("Automatically verifying newly added snapshot");
 
-                let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+                let verify_worker = VerifyWorker::new(worker.clone(), datastore, None)?;
                 if !verify_worker.verify_backup_dir_with_lock(
                     &backup_dir,
                     worker.upid().clone(),
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 7f91f38c..e11dba8e 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -32,6 +32,7 @@ pub struct VerifyWorker {
     verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     backend: DatastoreBackend,
+    worker_threads: Option<usize>,
 }
 
 impl VerifyWorker {
@@ -39,6 +40,7 @@ impl VerifyWorker {
     pub fn new(
         worker: Arc<dyn WorkerTaskContext>,
         datastore: Arc<DataStore>,
+        worker_threads: Option<usize>,
     ) -> Result<Self, Error> {
         let backend = datastore.backend()?;
         Ok(Self {
@@ -49,6 +51,7 @@ impl VerifyWorker {
             // start with 64 chunks since we assume there are few corrupt ones
             corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
             backend,
+            worker_threads,
         })
     }
 
@@ -220,7 +223,7 @@ impl VerifyWorker {
             .datastore
             .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
 
-        let reader_pool = ParallelHandler::new("read chunks", 4, {
+        let reader_pool = ParallelHandler::new("read chunks", self.worker_threads.unwrap_or(4), {
             let decoder_pool = decoder_pool.channel();
             let datastore = Arc::clone(&self.datastore);
             let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index c8792174..9d790b07 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -41,7 +41,8 @@ pub fn do_verification_job(
                 None => Default::default(),
             };
 
-            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+            let verify_worker =
+                VerifyWorker::new(worker.clone(), datastore, verification_job.worker_threads)?;
             let result = verify_worker.verify_all_backups(
                 worker.upid(),
                 ns,
-- 
2.47.3




More information about the pbs-devel mailing list