[pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct

Christian Ebner c.ebner at proxmox.com
Tue Nov 11 11:22:19 CET 2025


as mentioned in the previous patch, IMO this one should be introduced 
before that one, adding the parallel handler only on top of that.

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Introduces a new state struct `IndexVerifyState` so that we only need
> to pass around and clone one `Arc`.
> 
> Suggested-by: Christian Ebner <c.ebner at proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey at proxmox.com>
> ---
>   src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
>   1 file changed, 67 insertions(+), 63 deletions(-)
> 
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index c0ff15d4..9a20c8e1 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -34,6 +34,34 @@ pub struct VerifyWorker {
>       backend: DatastoreBackend,
>   }
>   
> +struct IndexVerifyState {
> +    read_bytes: AtomicU64,
> +    decoded_bytes: AtomicU64,
> +    errors: AtomicUsize,
> +    datastore: Arc<DataStore>,
> +    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    start_time: Instant,
> +}
> +
> +impl IndexVerifyState {
> +    fn new(
> +        datastore: &Arc<DataStore>,
> +        corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> +        verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> +    ) -> Self {
> +        Self {
> +            read_bytes: AtomicU64::new(0),
> +            decoded_bytes: AtomicU64::new(0),
> +            errors: AtomicUsize::new(0),
> +            datastore: Arc::clone(datastore),
> +            corrupt_chunks: Arc::clone(corrupt_chunks),
> +            verified_chunks: Arc::clone(verified_chunks),
> +            start_time: Instant::now(),
> +        }
> +    }
> +}
> +
>   impl VerifyWorker {
>       /// Creates a new VerifyWorker for a given task worker and datastore.
>       pub fn new(
> @@ -81,24 +109,20 @@ impl VerifyWorker {
>           index: Box<dyn IndexFile + Send>,
>           crypt_mode: CryptMode,
>       ) -> Result<(), Error> {
> -        let errors = Arc::new(AtomicUsize::new(0));
> -
> -        let start_time = Instant::now();
> -
> -        let read_bytes = Arc::new(AtomicU64::new(0));
> -        let decoded_bytes = Arc::new(AtomicU64::new(0));
> +        let verify_state = Arc::new(IndexVerifyState::new(
> +            &self.datastore,
> +            &self.corrupt_chunks,
> +            &self.verified_chunks,
> +        ));
>   
>           let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> -            let datastore = Arc::clone(&self.datastore);
> -            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> -            let verified_chunks = Arc::clone(&self.verified_chunks);
> -            let errors = Arc::clone(&errors);
> +            let verify_state = Arc::clone(&verify_state);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
>                       Err(err) => {
> -                        corrupt_chunks.lock().unwrap().insert(digest);
> +                        verify_state.corrupt_chunks.lock().unwrap().insert(digest);
>                           info!("can't verify chunk, unknown CryptMode - {err}");
> -                        errors.fetch_add(1, Ordering::SeqCst);
> +                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                           return Ok(());
>                       }
>                       Ok(mode) => mode,
> @@ -108,20 +132,20 @@ impl VerifyWorker {
>                       info!(
>                       "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
>                   );
> -                    errors.fetch_add(1, Ordering::SeqCst);
> +                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                   }
>   
>                   if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> -                    corrupt_chunks.lock().unwrap().insert(digest);
> +                    verify_state.corrupt_chunks.lock().unwrap().insert(digest);
>                       info!("{err}");
> -                    errors.fetch_add(1, Ordering::SeqCst);
> -                    match datastore.rename_corrupt_chunk(&digest) {
> +                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
> +                    match verify_state.datastore.rename_corrupt_chunk(&digest) {
>                           Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>                           Err(err) => info!("{err}"),
>                           _ => (),
>                       }
>                   } else {
> -                    verified_chunks.lock().unwrap().insert(digest);
> +                    verify_state.verified_chunks.lock().unwrap().insert(digest);
>                   }
>   
>                   Ok(())
> @@ -134,7 +158,7 @@ impl VerifyWorker {
>               } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
>                   let digest_str = hex::encode(digest);
>                   info!("chunk {digest_str} was marked as corrupt");
> -                errors.fetch_add(1, Ordering::SeqCst);
> +                verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                   true
>               } else {
>                   false
> @@ -155,27 +179,18 @@ impl VerifyWorker {
>   
>           let reader_pool = ParallelHandler::new("read chunks", 4, {
>               let decoder_pool = decoder_pool.channel();
> -            let datastore = Arc::clone(&self.datastore);
> -            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> -            let read_bytes = Arc::clone(&read_bytes);
> -            let decoded_bytes = Arc::clone(&decoded_bytes);
> -            let errors = Arc::clone(&errors);
> +            let verify_state = Arc::clone(&verify_state);
>               let backend = self.backend.clone();
>   
>               move |info: ChunkReadInfo| {
>                   Self::verify_chunk_by_backend(
>                       &backend,
> -                    Arc::clone(&datastore),
> -                    Arc::clone(&corrupt_chunks),
> -                    Arc::clone(&read_bytes),
> -                    Arc::clone(&decoded_bytes),
> -                    Arc::clone(&errors),
> +                    Arc::clone(&verify_state),
>                       &decoder_pool,
>                       &info,
>                   )
>               }
>           });
> -
>           for (pos, _) in chunk_list {
>               self.worker.check_abort()?;
>               self.worker.fail_on_shutdown()?;
> @@ -192,10 +207,10 @@ impl VerifyWorker {
>   
>           reader_pool.complete()?;
>   
> -        let elapsed = start_time.elapsed().as_secs_f64();
> +        let elapsed = verify_state.start_time.elapsed().as_secs_f64();
>   
> -        let read_bytes = read_bytes.load(Ordering::SeqCst);
> -        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> +        let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
> +        let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
>   
>           let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>           let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
> @@ -203,44 +218,39 @@ impl VerifyWorker {
>           let read_speed = read_bytes_mib / elapsed;
>           let decode_speed = decoded_bytes_mib / elapsed;
>   
> -        let error_count = errors.load(Ordering::SeqCst);
> +        let error_count = verify_state.errors.load(Ordering::SeqCst);
>   
>           info!(
>               "  verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
>           );
>   
> -        if errors.load(Ordering::SeqCst) > 0 {
> +        if verify_state.errors.load(Ordering::SeqCst) > 0 {
>               bail!("chunks could not be verified");
>           }
>   
>           Ok(())
>       }
>   
> -    #[allow(clippy::too_many_arguments)]
>       fn verify_chunk_by_backend(
>           backend: &DatastoreBackend,
> -        datastore: Arc<DataStore>,
> -        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> -        read_bytes: Arc<AtomicU64>,
> -        decoded_bytes: Arc<AtomicU64>,
> -        errors: Arc<AtomicUsize>,
> +        verify_state: Arc<IndexVerifyState>,
>           decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
>           info: &ChunkReadInfo,
>       ) -> Result<(), Error> {
>           match backend {
> -            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> +            DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
>                   Err(err) => Self::add_corrupt_chunk(
> -                    datastore,
> -                    corrupt_chunks,
> +                    verify_state,
>                       info.digest,
> -                    errors,
>                       &format!("can't verify chunk, load failed - {err}"),
>                   ),
>                   Ok(chunk) => {
>                       let size = info.size();
> -                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                    verify_state
> +                        .read_bytes
> +                        .fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                       decoder_pool.send((chunk, info.digest, size))?;
> -                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
> +                    verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                   }
>               },
>               DatastoreBackend::S3(s3_client) => {
> @@ -257,28 +267,28 @@ impl VerifyWorker {
>                           match chunk_result {
>                               Ok(chunk) => {
>                                   let size = info.size();
> -                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                                verify_state
> +                                    .read_bytes
> +                                    .fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                                   decoder_pool.send((chunk, info.digest, size))?;
> -                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
> +                                verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                               }
>                               Err(err) => {
> -                                errors.fetch_add(1, Ordering::SeqCst);
> +                                verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                                   error!("can't verify chunk, load failed - {err}");
>                               }
>                           }
>                       }
>                       Ok(None) => Self::add_corrupt_chunk(
> -                        datastore,
> -                        corrupt_chunks,
> +                        verify_state,
>                           info.digest,
> -                        errors,
>                           &format!(
>                               "can't verify missing chunk with digest {}",
>                               hex::encode(info.digest)
>                           ),
>                       ),
>                       Err(err) => {
> -                        errors.fetch_add(1, Ordering::SeqCst);
> +                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                           error!("can't verify chunk, load failed - {err}");
>                       }
>                   }
> @@ -287,19 +297,13 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> -    fn add_corrupt_chunk(
> -        datastore: Arc<DataStore>,
> -        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> -        digest: [u8; 32],
> -        errors: Arc<AtomicUsize>,
> -        message: &str,
> -    ) {
> +    fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
>           // Panic on poisoned mutex
> -        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
> +        let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
>           corrupt_chunks.insert(digest);
>           error!(message);
> -        errors.fetch_add(1, Ordering::SeqCst);
> -        match datastore.rename_corrupt_chunk(&digest) {
> +        verify_state.errors.fetch_add(1, Ordering::SeqCst);
> +        match verify_state.datastore.rename_corrupt_chunk(&digest) {
>               Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>               Err(err) => info!("{err}"),
>               _ => (),





More information about the pbs-devel mailing list