[pbs-devel] [PATCH proxmox-backup v2 2/4] api: verify: determine the number of threads to use with {read, verify}-threads

Christian Ebner c.ebner at proxmox.com
Fri Nov 7 10:41:32 CET 2025


On 11/7/25 10:31 AM, Christian Ebner wrote:
> some small issues inline
> 
> On 11/6/25 5:13 PM, Nicolas Frey wrote:
>> use previously introduced {read,verify}-threads in API, where default
>> values match the ones of the schema definition.
>>
>> Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
>> ---
>>   src/api2/admin/datastore.rs    | 18 +++++++++++++++---
>>   src/api2/backup/environment.rs |  2 +-
>>   src/backup/verify.rs           | 19 ++++++++++++++++---
>>   src/server/verify_job.rs       |  7 ++++++-
>>   4 files changed, 38 insertions(+), 8 deletions(-)
>>
>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>> index 6e269ef9..fde4c247 100644
>> --- a/src/api2/admin/datastore.rs
>> +++ b/src/api2/admin/datastore.rs
>> @@ -45,7 +45,8 @@ use pbs_api_types::{
>>       BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, 
>> DATASTORE_SCHEMA,
>>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, 
>> NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
>>       PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, 
>> PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
>> -    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, 
>> VERIFICATION_OUTDATED_AFTER_SCHEMA,
>> +    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, READ_THREADS_SCHEMA, 
>> UPID, UPID_SCHEMA,
>> +    VERIFICATION_OUTDATED_AFTER_SCHEMA, VERIFY_THREADS_SCHEMA,
>>   };
>>   use pbs_client::pxar::{create_tar, create_zip};
>>   use pbs_config::CachedUserInfo;
>> @@ -675,6 +676,14 @@ pub async fn status(
>>                   schema: NS_MAX_DEPTH_SCHEMA,
>>                   optional: true,
>>               },
>> +            "read-threads": {
>> +                schema: READ_THREADS_SCHEMA,
>> +                optional: true,
>> +            },
>> +            "verify-threads": {
>> +                schema: VERIFY_THREADS_SCHEMA,
>> +                optional: true,
>> +            },
>>           },
>>       },
>>       returns: {
>> @@ -688,7 +697,7 @@ pub async fn status(
>>   )]
>>   /// Verify backups.
>>   ///
>> -/// This function can verify a single backup snapshot, all backup 
>> from a backup group,
>> +/// This function can verify a single backup snapshot, all backups 
>> from a backup group,
> 
> nit: this is an unrelated change, please put this into a dedicated commit
> 
>>   /// or all backups in the datastore.
>>   #[allow(clippy::too_many_arguments)]
>>   pub fn verify(
>> @@ -700,6 +709,8 @@ pub fn verify(
>>       ignore_verified: Option<bool>,
>>       outdated_after: Option<i64>,
>>       max_depth: Option<usize>,
>> +    read_threads: Option<usize>,
>> +    verify_threads: Option<usize>,
>>       rpcenv: &mut dyn RpcEnvironment,
>>   ) -> Result<Value, Error> {
>>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>> @@ -779,7 +790,8 @@ pub fn verify(
>>           auth_id.to_string(),
>>           to_stdout,
>>           move |worker| {
>> -            let verify_worker = VerifyWorker::new(worker.clone(), 
>> datastore)?;
>> +            let verify_worker =
>> +                VerifyWorker::new(worker.clone(), datastore, 
>> read_threads, verify_threads)?;
>>               let failed_dirs = if let Some(backup_dir) = backup_dir {
>>                   let mut res = Vec::new();
>>                   if !verify_worker.verify_backup_dir(
>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/ 
>> environment.rs
>> index 0faf6c8e..06696c78 100644
>> --- a/src/api2/backup/environment.rs
>> +++ b/src/api2/backup/environment.rs
>> @@ -795,7 +795,7 @@ impl BackupEnvironment {
>>               move |worker| {
>>                   worker.log_message("Automatically verifying newly 
>> added snapshot");
>> -                let verify_worker = VerifyWorker::new(worker.clone(), 
>> datastore)?;
>> +                let verify_worker = VerifyWorker::new(worker.clone(), 
>> datastore, None, None)?;
>>                   if !verify_worker.verify_backup_dir_with_lock(
>>                       &backup_dir,
>>                       worker.upid().clone(),
>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>> index 910a3ed5..f3cbe4d6 100644
>> --- a/src/backup/verify.rs
>> +++ b/src/backup/verify.rs
>> @@ -32,6 +32,8 @@ pub struct VerifyWorker {
>>       verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>       corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>       backend: DatastoreBackend,
>> +    read_threads: usize,
>> +    verify_threads: usize,
>>   }
>>   struct IndexVerifyState {
>> @@ -44,10 +46,13 @@ struct IndexVerifyState {
>>   }
>>   impl VerifyWorker {
>> -    /// Creates a new VerifyWorker for a given task worker and 
>> datastore.
>> +    /// Creates a new VerifyWorker for a given task worker and 
>> datastore. \
>> +    /// Default values for read_threads: 1, verify_threads: 4
> 
> is this backslash intentional? Also, I don't think we should explicitly 
> document the defaults here.
> 
>>       pub fn new(
>>           worker: Arc<dyn WorkerTaskContext>,
>>           datastore: Arc<DataStore>,
>> +        read_threads: Option<usize>,
>> +        verify_threads: Option<usize>,
>>       ) -> Result<Self, Error> {
>>           let backend = datastore.backend()?;
>>           Ok(Self {
>> @@ -58,6 +63,8 @@ impl VerifyWorker {
>>               // start with 64 chunks since we assume there are few 
>> corrupt ones
>>               corrupt_chunks: 
>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>>               backend,
>> +            read_threads: read_threads.unwrap_or(1),
>> +            verify_threads: verify_threads.unwrap_or(4),
>>           })
>>       }
>> @@ -101,7 +108,7 @@ impl VerifyWorker {
>>               verified_chunks: Arc::clone(&self.verified_chunks),
>>           });
>> -        let decoder_pool = ParallelHandler::new("verify chunk 
>> decoder", 4, {
>> +        let decoder_pool = ParallelHandler::new("verify chunk 
>> decoder", self.verify_threads, {
>>               let verify_state = Arc::clone(&verify_state);
>>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>>                   let chunk_crypt_mode = match chunk.crypt_mode() {
>> @@ -163,7 +170,13 @@ impl VerifyWorker {
>>               .datastore
>>               .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>> -        let reader_pool = ParallelHandler::new("read chunks", 4, {
>> +        log::info!(
>> +            "  using {} read and {} verify thread(s)",
>> +            self.read_threads,
>> +            self.verify_threads,
>> +        );

also, forgot to mention that this now shows up for each index file being 
verified. This should be moved to only be logged once at the start of 
the verification job.

>> +
>> +        let reader_pool = ParallelHandler::new("read chunks", 
>> self.read_threads, {
>>               let decoder_pool = decoder_pool.channel();
>>               let verify_state = Arc::clone(&verify_state);
>>               let backend = self.backend.clone();
>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>> index c8792174..e0b03155 100644
>> --- a/src/server/verify_job.rs
>> +++ b/src/server/verify_job.rs
>> @@ -41,7 +41,12 @@ pub fn do_verification_job(
>>                   None => Default::default(),
>>               };
>> -            let verify_worker = VerifyWorker::new(worker.clone(), 
>> datastore)?;
>> +            let verify_worker = VerifyWorker::new(
>> +                worker.clone(),
>> +                datastore,
>> +                verification_job.read_threads,
>> +                verification_job.verify_threads,
>> +            )?;
>>               let result = verify_worker.verify_all_backups(
>>                   worker.upid(),
>>                   ns,
> 
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 





More information about the pbs-devel mailing list