[pbs-devel] [PATCH proxmox-backup v2 2/4] api: verify: determine the number of threads to use with {read, verify}-threads

Christian Ebner c.ebner at proxmox.com
Fri Nov 7 10:31:56 CET 2025


some small issues inline

On 11/6/25 5:13 PM, Nicolas Frey wrote:
> use previously introduced {read,verify}-threads in API, where default
> values match the ones of the schema definition.
> 
> Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
> ---
>   src/api2/admin/datastore.rs    | 18 +++++++++++++++---
>   src/api2/backup/environment.rs |  2 +-
>   src/backup/verify.rs           | 19 ++++++++++++++++---
>   src/server/verify_job.rs       |  7 ++++++-
>   4 files changed, 38 insertions(+), 8 deletions(-)
> 
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 6e269ef9..fde4c247 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -45,7 +45,8 @@ use pbs_api_types::{
>       BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
>       PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> -    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
> +    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, READ_THREADS_SCHEMA, UPID, UPID_SCHEMA,
> +    VERIFICATION_OUTDATED_AFTER_SCHEMA, VERIFY_THREADS_SCHEMA,
>   };
>   use pbs_client::pxar::{create_tar, create_zip};
>   use pbs_config::CachedUserInfo;
> @@ -675,6 +676,14 @@ pub async fn status(
>                   schema: NS_MAX_DEPTH_SCHEMA,
>                   optional: true,
>               },
> +            "read-threads": {
> +                schema: READ_THREADS_SCHEMA,
> +                optional: true,
> +            },
> +            "verify-threads": {
> +                schema: VERIFY_THREADS_SCHEMA,
> +                optional: true,
> +            },
>           },
>       },
>       returns: {
> @@ -688,7 +697,7 @@ pub async fn status(
>   )]
>   /// Verify backups.
>   ///
> -/// This function can verify a single backup snapshot, all backup from a backup group,
> +/// This function can verify a single backup snapshot, all backups from a backup group,

nit: this is an unrelated change, please put this into a dedicated commit

>   /// or all backups in the datastore.
>   #[allow(clippy::too_many_arguments)]
>   pub fn verify(
> @@ -700,6 +709,8 @@ pub fn verify(
>       ignore_verified: Option<bool>,
>       outdated_after: Option<i64>,
>       max_depth: Option<usize>,
> +    read_threads: Option<usize>,
> +    verify_threads: Option<usize>,
>       rpcenv: &mut dyn RpcEnvironment,
>   ) -> Result<Value, Error> {
>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> @@ -779,7 +790,8 @@ pub fn verify(
>           auth_id.to_string(),
>           to_stdout,
>           move |worker| {
> -            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +            let verify_worker =
> +                VerifyWorker::new(worker.clone(), datastore, read_threads, verify_threads)?;
>               let failed_dirs = if let Some(backup_dir) = backup_dir {
>                   let mut res = Vec::new();
>                   if !verify_worker.verify_backup_dir(
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 0faf6c8e..06696c78 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -795,7 +795,7 @@ impl BackupEnvironment {
>               move |worker| {
>                   worker.log_message("Automatically verifying newly added snapshot");
>   
> -                let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +                let verify_worker = VerifyWorker::new(worker.clone(), datastore, None, None)?;
>                   if !verify_worker.verify_backup_dir_with_lock(
>                       &backup_dir,
>                       worker.upid().clone(),
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 910a3ed5..f3cbe4d6 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -32,6 +32,8 @@ pub struct VerifyWorker {
>       verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>       corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>       backend: DatastoreBackend,
> +    read_threads: usize,
> +    verify_threads: usize,
>   }
>   
>   struct IndexVerifyState {
> @@ -44,10 +46,13 @@ struct IndexVerifyState {
>   }
>   
>   impl VerifyWorker {
> -    /// Creates a new VerifyWorker for a given task worker and datastore.
> +    /// Creates a new VerifyWorker for a given task worker and datastore. \
> +    /// Default values for read_threads: 1, verify_threads: 4

is this backslash intentional? Also, I don't think we should explicitly 
document the defaults here.

>       pub fn new(
>           worker: Arc<dyn WorkerTaskContext>,
>           datastore: Arc<DataStore>,
> +        read_threads: Option<usize>,
> +        verify_threads: Option<usize>,
>       ) -> Result<Self, Error> {
>           let backend = datastore.backend()?;
>           Ok(Self {
> @@ -58,6 +63,8 @@ impl VerifyWorker {
>               // start with 64 chunks since we assume there are few corrupt ones
>               corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
>               backend,
> +            read_threads: read_threads.unwrap_or(1),
> +            verify_threads: verify_threads.unwrap_or(4),
>           })
>       }
>   
> @@ -101,7 +108,7 @@ impl VerifyWorker {
>               verified_chunks: Arc::clone(&self.verified_chunks),
>           });
>   
> -        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> +        let decoder_pool = ParallelHandler::new("verify chunk decoder", self.verify_threads, {
>               let verify_state = Arc::clone(&verify_state);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
> @@ -163,7 +170,13 @@ impl VerifyWorker {
>               .datastore
>               .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>   
> -        let reader_pool = ParallelHandler::new("read chunks", 4, {
> +        log::info!(
> +            "  using {} read and {} verify thread(s)",
> +            self.read_threads,
> +            self.verify_threads,
> +        );
> +
> +        let reader_pool = ParallelHandler::new("read chunks", self.read_threads, {
>               let decoder_pool = decoder_pool.channel();
>               let verify_state = Arc::clone(&verify_state);
>               let backend = self.backend.clone();
> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
> index c8792174..e0b03155 100644
> --- a/src/server/verify_job.rs
> +++ b/src/server/verify_job.rs
> @@ -41,7 +41,12 @@ pub fn do_verification_job(
>                   None => Default::default(),
>               };
>   
> -            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +            let verify_worker = VerifyWorker::new(
> +                worker.clone(),
> +                datastore,
> +                verification_job.read_threads,
> +                verification_job.verify_threads,
> +            )?;
>               let result = verify_worker.verify_all_backups(
>                   worker.upid(),
>                   ns,






More information about the pbs-devel mailing list