[pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use

Christian Ebner c.ebner at proxmox.com
Thu Nov 6 10:32:58 CET 2025


On 11/6/25 10:22 AM, Nicolas Frey wrote:
> On 11/6/25 10:08 AM, Christian Ebner wrote:
>> Please add a short commit message describing what the worker threads
>> cover, e.g. that this parameter controls the number of reader and
>> chunk verification threads.
>>
>> What tripped me over just now:
>> Is this intentionally not increasing the number of chunk verification
>> threads? Or was that overlooked? From the name of the parameter I
>> suspected this to act on both, reading and verifying. If this is not
>> the case, maybe the parameter should get renamed to a more telling
>> `parallel-chunk-readers` instead?
> 
> I wasn't sure if the number of threads for verification should be
> controlled via this as well, as the original patch only added a new
> thread pool for reading, whereas the verification pool was already
> implemented.
> I pointed this out in the cover letter, though it might have been
> better to put this here too:
> 
> The number of `worker-threads` only controls the thread pool for
> reading, though if it makes sense to reuse this for the verification
> pool, it could be adjusted to do so too.
> 
> I think it makes sense to use it to control the number of threads of
> both. Thanks for the feedback, I'll adjust it along with the other
> proposed changes in a v2!

Well, that was just an uninformed assumption from my side when reading 
the parameter name (and I did not re-read the cover letter today after 
having looked at this quickly yesterday, sorry for that).

But maybe you can also evaluate if it actually makes sense to control 
both by the same parameter, or if it only makes sense to e.g. increase 
the number of verification tasks (no point for that if the IO remains 
the bottleneck), or if it would make sense to have either 2 parameters 
or couple them by some proportionality constant.

>   >
>> further comment inline
>> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>>> Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
>>> ---
>>>    src/api2/admin/datastore.rs    | 13 +++++++++++--
>>>    src/api2/backup/environment.rs |  2 +-
>>>    src/backup/verify.rs           |  5 ++++-
>>>    src/server/verify_job.rs       |  3 ++-
>>>    4 files changed, 18 insertions(+), 5 deletions(-)
>>>
>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>>> index d192ee39..69a09081 100644
>>> --- a/src/api2/admin/datastore.rs
>>> +++ b/src/api2/admin/datastore.rs
>>> @@ -677,6 +677,14 @@ pub async fn status(
>>>                    schema: NS_MAX_DEPTH_SCHEMA,
>>>                    optional: true,
>>>                },
>>> +            "worker-threads": {
>>> +                description: "Set the number of worker threads to
>>> use for the job",
>>> +                type: Integer,
>>> +                optional: true,
>>> +                minimum: 1,
>>> +                maximum: 32,
>>> +                default: 1,
>>> +            },
>>
>> As mentioned on the pbs-api-types patch, this should reuse the same
>> schema as (will be) defined there, so this does not be to be re-
>> defined and stays in sync.
>>
>>>            },
>>>        },
>>>        returns: {
>>> @@ -690,7 +698,7 @@ pub async fn status(
>>>    )]
>>>    /// Verify backups.
>>>    ///
>>> -/// This function can verify a single backup snapshot, all backup
>>> from a backup group,
>>> +/// This function can verify a single backup snapshot, all backups
>>> from a backup group,
>>>    /// or all backups in the datastore.
>>>    #[allow(clippy::too_many_arguments)]
>>>    pub fn verify(
>>> @@ -702,6 +710,7 @@ pub fn verify(
>>>        ignore_verified: Option<bool>,
>>>        outdated_after: Option<i64>,
>>>        max_depth: Option<usize>,
>>> +    worker_threads: Option<usize>,
>>
>> this could be a plain `usize` already, so it does not need to be
>> unwrapped for each parallel worker instantiation. The unwrapping and
>> setting to default can already happen in the constructor.
>>
>>>        rpcenv: &mut dyn RpcEnvironment,
>>>    ) -> Result<Value, Error> {
>>>        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>> @@ -781,7 +790,7 @@ pub fn verify(
>>>            auth_id.to_string(),
>>>            to_stdout,
>>>            move |worker| {
>>> -            let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore)?;
>>> +            let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore, worker_threads)?;
>>>                let failed_dirs = if let Some(backup_dir) = backup_dir {
>>>                    let mut res = Vec::new();
>>>                    if !verify_worker.verify_backup_dir(
>>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/
>>> environment.rs
>>> index 0e8eab1b..5e6a73b9 100644
>>> --- a/src/api2/backup/environment.rs
>>> +++ b/src/api2/backup/environment.rs
>>> @@ -812,7 +812,7 @@ impl BackupEnvironment {
>>>                move |worker| {
>>>                    worker.log_message("Automatically verifying newly
>>> added snapshot");
>>>    -                let verify_worker =
>>> VerifyWorker::new(worker.clone(), datastore)?;
>>> +                let verify_worker =
>>> VerifyWorker::new(worker.clone(), datastore, None)?;
>>>                    if !verify_worker.verify_backup_dir_with_lock(
>>>                        &backup_dir,
>>>                        worker.upid().clone(),
>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>>> index 7f91f38c..e11dba8e 100644
>>> --- a/src/backup/verify.rs
>>> +++ b/src/backup/verify.rs
>>> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
>>>        verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>        backend: DatastoreBackend,
>>> +    worker_threads: Option<usize>,
>>
>> ... plain `usize` here
>>
>>>    }
>>>      impl VerifyWorker {
>>> @@ -39,6 +40,7 @@ impl VerifyWorker {
>>>        pub fn new(
>>>            worker: Arc<dyn WorkerTaskContext>,
>>>            datastore: Arc<DataStore>,
>>> +        worker_threads: Option<usize>,
>>>        ) -> Result<Self, Error> {
>>>            let backend = datastore.backend()?;
>>>            Ok(Self {
>>> @@ -49,6 +51,7 @@ impl VerifyWorker {
>>>                // start with 64 chunks since we assume there are few
>>> corrupt ones
>>>                corrupt_chunks:
>>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>>>                backend,
>>> +            worker_threads,
>>
>> unwrap_or(4) here... or even define a constant for the default value,
>> although if it is placed here, it will only occur once.
>>
>>>            })
>>>        }
>>>    @@ -220,7 +223,7 @@ impl VerifyWorker {
>>>                .datastore
>>>                .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>>>    -        let reader_pool = ParallelHandler::new("read chunks", 4, {
>>> +        let reader_pool = ParallelHandler::new("read chunks",
>>> self.worker_threads.unwrap_or(4), {
>>>                let decoder_pool = decoder_pool.channel();
>>>                let datastore = Arc::clone(&self.datastore);
>>>                let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>>> index c8792174..9d790b07 100644
>>> --- a/src/server/verify_job.rs
>>> +++ b/src/server/verify_job.rs
>>> @@ -41,7 +41,8 @@ pub fn do_verification_job(
>>>                    None => Default::default(),
>>>                };
>>>    -            let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore)?;
>>> +            let verify_worker =
>>> +                VerifyWorker::new(worker.clone(), datastore,
>>> verification_job.worker_threads)?;
>>>                let result = verify_worker.verify_all_backups(
>>>                    worker.upid(),
>>>                    ns,
>>
> 





More information about the pbs-devel mailing list