[pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
Nicolas Frey
n.frey at proxmox.com
Thu Nov 6 12:22:18 CET 2025
On 11/6/25 10:32 AM, Christian Ebner wrote:
> On 11/6/25 10:22 AM, Nicolas Frey wrote:
>> On 11/6/25 10:08 AM, Christian Ebner wrote:
>>> Please add a short commit message describing what the worker threads
>>> cover, e.g. that this parameter controls the number of reader and
>>> chunk verification threads.
>>>
>>> What tripped me over just now:
>>> Is this intentionally not increasing the number of chunk verification
>>> threads? Or was that overlooked? From the name of the parameter I
>>> suspected this to act on both, reading and verifying. If this is not
>>> the case, maybe the parameter should get renamed to a more telling
>>> `parallel-chunk-readers` instead?
>>
>> I wasn't sure if the number of threads for verification should be
>> controlled via this as well, as the original patch only added a new
>> thread pool for reading, whereas the verification pool was already
>> implemented.
>> I pointed this out in the cover letter, though it might have been
>> better to put this here too:
>>
>> The number of `worker-threads` only controls the thread pool for
>> reading, though if it makes sense to reuse this for the verification
>> pool, it could be adjusted to do so too.
>>
>> I think it makes sense to use it to control the number of threads of
>> both. Thanks for the feedback, I'll adjust it along with the other
>> proposed changes in a v2!
>
> Well, that was just an uninformed assumption from my side when reading
> the parameter name (and I did not re-read the cover letter today after
> having looked at this quickly yesterday, sorry for that).
That makes sense, the parameter name does not accurately describe the
function it serves here anyway, so that should have been named a bit
better.
>
> But maybe you can also evaluate if it actually makes sense to control
> both by the same parameter, or if it only makes sense to e.g. increase
> the number of verification tasks (no point for that if the IO remains
> the bottleneck), or if it would make sense to have either 2 parameters
> or couple them by some proportionality constant.
>
I had an idea along the lines of:
self.worker_threads.mul(2).clamp(4, 32),
though the proportionality factor should be tested to determine what
would actually be sensible here and of course be documented accordingly.
I also thought a minimum of 4 threads for verification makes sense, as
when the default value of 1 thread is used, it has somewhat the same
behavior as before adding the read thread pool (i.e. 1 thread for
reading, 4 threads for verification) and would scale somewhat
accordingly. The threads should also clamped to a max of 32 to respect
the constraints of the schema also stating 32 as a max.
What do you think?
>> >
>>> further comment inline
>>> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>>>> Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
>>>> ---
>>>> src/api2/admin/datastore.rs | 13 +++++++++++--
>>>> src/api2/backup/environment.rs | 2 +-
>>>> src/backup/verify.rs | 5 ++++-
>>>> src/server/verify_job.rs | 3 ++-
>>>> 4 files changed, 18 insertions(+), 5 deletions(-)
>>>>
>>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/
>>>> datastore.rs
>>>> index d192ee39..69a09081 100644
>>>> --- a/src/api2/admin/datastore.rs
>>>> +++ b/src/api2/admin/datastore.rs
>>>> @@ -677,6 +677,14 @@ pub async fn status(
>>>> schema: NS_MAX_DEPTH_SCHEMA,
>>>> optional: true,
>>>> },
>>>> + "worker-threads": {
>>>> + description: "Set the number of worker threads to
>>>> use for the job",
>>>> + type: Integer,
>>>> + optional: true,
>>>> + minimum: 1,
>>>> + maximum: 32,
>>>> + default: 1,
>>>> + },
>>>
>>> As mentioned on the pbs-api-types patch, this should reuse the same
>>> schema as (will be) defined there, so this does not be to be re-
>>> defined and stays in sync.
>>>
>>>> },
>>>> },
>>>> returns: {
>>>> @@ -690,7 +698,7 @@ pub async fn status(
>>>> )]
>>>> /// Verify backups.
>>>> ///
>>>> -/// This function can verify a single backup snapshot, all backup
>>>> from a backup group,
>>>> +/// This function can verify a single backup snapshot, all backups
>>>> from a backup group,
>>>> /// or all backups in the datastore.
>>>> #[allow(clippy::too_many_arguments)]
>>>> pub fn verify(
>>>> @@ -702,6 +710,7 @@ pub fn verify(
>>>> ignore_verified: Option<bool>,
>>>> outdated_after: Option<i64>,
>>>> max_depth: Option<usize>,
>>>> + worker_threads: Option<usize>,
>>>
>>> this could be a plain `usize` already, so it does not need to be
>>> unwrapped for each parallel worker instantiation. The unwrapping and
>>> setting to default can already happen in the constructor.
>>>
>>>> rpcenv: &mut dyn RpcEnvironment,
>>>> ) -> Result<Value, Error> {
>>>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>>> @@ -781,7 +790,7 @@ pub fn verify(
>>>> auth_id.to_string(),
>>>> to_stdout,
>>>> move |worker| {
>>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore)?;
>>>> + let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore, worker_threads)?;
>>>> let failed_dirs = if let Some(backup_dir) =
>>>> backup_dir {
>>>> let mut res = Vec::new();
>>>> if !verify_worker.verify_backup_dir(
>>>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/
>>>> environment.rs
>>>> index 0e8eab1b..5e6a73b9 100644
>>>> --- a/src/api2/backup/environment.rs
>>>> +++ b/src/api2/backup/environment.rs
>>>> @@ -812,7 +812,7 @@ impl BackupEnvironment {
>>>> move |worker| {
>>>> worker.log_message("Automatically verifying newly
>>>> added snapshot");
>>>> - let verify_worker =
>>>> VerifyWorker::new(worker.clone(), datastore)?;
>>>> + let verify_worker =
>>>> VerifyWorker::new(worker.clone(), datastore, None)?;
>>>> if !verify_worker.verify_backup_dir_with_lock(
>>>> &backup_dir,
>>>> worker.upid().clone(),
>>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>>>> index 7f91f38c..e11dba8e 100644
>>>> --- a/src/backup/verify.rs
>>>> +++ b/src/backup/verify.rs
>>>> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
>>>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>> backend: DatastoreBackend,
>>>> + worker_threads: Option<usize>,
>>>
>>> ... plain `usize` here
>>>
>>>> }
>>>> impl VerifyWorker {
>>>> @@ -39,6 +40,7 @@ impl VerifyWorker {
>>>> pub fn new(
>>>> worker: Arc<dyn WorkerTaskContext>,
>>>> datastore: Arc<DataStore>,
>>>> + worker_threads: Option<usize>,
>>>> ) -> Result<Self, Error> {
>>>> let backend = datastore.backend()?;
>>>> Ok(Self {
>>>> @@ -49,6 +51,7 @@ impl VerifyWorker {
>>>> // start with 64 chunks since we assume there are few
>>>> corrupt ones
>>>> corrupt_chunks:
>>>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>>>> backend,
>>>> + worker_threads,
>>>
>>> unwrap_or(4) here... or even define a constant for the default value,
>>> although if it is placed here, it will only occur once.
>>>
>>>> })
>>>> }
>>>> @@ -220,7 +223,7 @@ impl VerifyWorker {
>>>> .datastore
>>>> .get_chunks_in_order(&*index, skip_chunk,
>>>> check_abort)?;
>>>> - let reader_pool = ParallelHandler::new("read chunks",
>>>> 4, {
>>>> + let reader_pool = ParallelHandler::new("read chunks",
>>>> self.worker_threads.unwrap_or(4), {
>>>> let decoder_pool = decoder_pool.channel();
>>>> let datastore = Arc::clone(&self.datastore);
>>>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>>>> index c8792174..9d790b07 100644
>>>> --- a/src/server/verify_job.rs
>>>> +++ b/src/server/verify_job.rs
>>>> @@ -41,7 +41,8 @@ pub fn do_verification_job(
>>>> None => Default::default(),
>>>> };
>>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore)?;
>>>> + let verify_worker =
>>>> + VerifyWorker::new(worker.clone(), datastore,
>>>> verification_job.worker_threads)?;
>>>> let result = verify_worker.verify_all_backups(
>>>> worker.upid(),
>>>> ns,
>>>
>>
>
More information about the pbs-devel
mailing list