[pbs-devel] [PATCH proxmox-backup v9 17/46] verify: implement chunk verification for stores with s3 backend

Hannes Laimer h.laimer at proxmox.com
Mon Jul 21 15:35:31 CEST 2025


On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
> For datastores backed by an S3 compatible object store, rather than
> reading the chunks to be verified from the local filesystem, fetch
> them via the s3 client from the configured bucket.
>

Could we somehow take advantage of the hash S3 provides for objects[1]?
We can't use our hashes tough, so not if how we would...

[1] https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 8:
> - refactored corrupt chunk marking into more compact methods
>
>  src/backup/verify.rs | 118 +++++++++++++++++++++++++++++++++++++------
>  1 file changed, 103 insertions(+), 15 deletions(-)
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index dea10f618..9a2148b56 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
>  use std::time::Instant;
>  
>  use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
>  use tracing::{error, info, warn};
>  
>  use proxmox_worker_task::WorkerTaskContext;
> @@ -15,7 +16,7 @@ use pbs_api_types::{
>      UPID,
>  };
>  use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
> -use pbs_datastore::index::IndexFile;
> +use pbs_datastore::index::{ChunkReadInfo, IndexFile};
>  use pbs_datastore::manifest::{BackupManifest, FileInfo};
>  use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
>  
> @@ -89,6 +90,38 @@ impl VerifyWorker {
>              }
>          }
>  
> +        if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
> +            let suffix = format!(".{}.bad", counter);
> +            let target_key =
> +                match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
> +                    Ok(target_key) => target_key,
> +                    Err(err) => {
> +                        info!("could not generate target key for corrupted chunk {path:?} - {err}");
> +                        return;
> +                    }
> +                };
> +            let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
> +                Ok(object_key) => object_key,
> +                Err(err) => {
> +                    info!("could not generate object key for corrupted chunk {path:?} - {err}");
> +                    return;
> +                }
> +            };
> +            if proxmox_async::runtime::block_on(
> +                s3_client.copy_object(object_key.clone(), target_key),
> +            )
> +            .is_ok()
> +            {
> +                if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
> +                    info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
> +                }
> +            } else {
> +                info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
> +            }
> +        } else {
> +            info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
> +        }
> +
>          match std::fs::rename(&path, &new_path) {
>              Ok(_) => {
>                  info!("corrupted chunk renamed to {:?}", &new_path);
> @@ -189,20 +222,13 @@ impl VerifyWorker {
>                  continue; // already verified or marked corrupt
>              }
>  
> -            match self.datastore.load_chunk(&info.digest) {
> -                Err(err) => {
> -                    self.corrupt_chunks.lock().unwrap().insert(info.digest);
> -                    error!("can't verify chunk, load failed - {err}");
> -                    errors.fetch_add(1, Ordering::SeqCst);
> -                    Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> -                }
> -                Ok(chunk) => {
> -                    let size = info.size();
> -                    read_bytes += chunk.raw_size();
> -                    decoder_pool.send((chunk, info.digest, size))?;
> -                    decoded_bytes += size;
> -                }
> -            }
> +            self.verify_chunk_by_backend(
> +                &info,
> +                &mut read_bytes,
> +                &mut decoded_bytes,
> +                Arc::clone(&errors),
> +                &decoder_pool,
> +            )?;
>          }
>  
>          decoder_pool.complete()?;
> @@ -228,6 +254,68 @@ impl VerifyWorker {
>          Ok(())
>      }
>  
> +    fn verify_chunk_by_backend(
> +        &self,
> +        info: &ChunkReadInfo,
> +        read_bytes: &mut u64,
> +        decoded_bytes: &mut u64,
> +        errors: Arc<AtomicUsize>,
> +        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
> +    ) -> Result<(), Error> {
> +        match &self.backend {
> +            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> +                Err(err) => self.add_corrupt_chunk(
> +                    info.digest,
> +                    errors,
> +                    &format!("can't verify chunk, load failed - {err}"),
> +                ),
> +                Ok(chunk) => {
> +                    let size = info.size();
> +                    *read_bytes += chunk.raw_size();
> +                    decoder_pool.send((chunk, info.digest, size))?;
> +                    *decoded_bytes += size;
> +                }
> +            },
> +            DatastoreBackend::S3(s3_client) => {
> +                let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
> +                match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
> +                    Ok(Some(response)) => {
> +                        let bytes = proxmox_async::runtime::block_on(response.content.collect())?
> +                            .to_bytes();
> +                        let chunk = DataBlob::from_raw(bytes.to_vec())?;
> +                        let size = info.size();
> +                        *read_bytes += chunk.raw_size();
> +                        decoder_pool.send((chunk, info.digest, size))?;
> +                        *decoded_bytes += size;
> +                    }
> +                    Ok(None) => self.add_corrupt_chunk(
> +                        info.digest,
> +                        errors,
> +                        &format!(
> +                            "can't verify missing chunk with digest {}",
> +                            hex::encode(info.digest)
> +                        ),
> +                    ),
> +                    Err(err) => self.add_corrupt_chunk(
> +                        info.digest,
> +                        errors,
> +                        &format!("can't verify chunk, load failed - {err}"),
> +                    ),
> +                }
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
> +        // Panic on poisoned mutex
> +        let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
> +        corrupt_chunks.insert(digest);
> +        error!(message);
> +        errors.fetch_add(1, Ordering::SeqCst);
> +        Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
> +    }
> +
>      fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
>          let mut path = backup_dir.relative_path();
>          path.push(&info.filename);





More information about the pbs-devel mailing list