[pbs-devel] [PATCH proxmox-backup v8 17/45] verify: implement chunk verification for stores with s3 backend

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 10:56:25 CEST 2025


On  2025-07-15 14:53, Christian Ebner wrote:
> For datastores backed by an S3 compatible object store, rather than
> reading the chunks to be verified from the local filesystem, fetch
> them via the s3 client from the configured bucket.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
>  src/backup/verify.rs | 89 ++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 77 insertions(+), 12 deletions(-)
> 
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index dea10f618..3a4a1d0d5 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
>  use std::time::Instant;
>  
>  use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
>  use tracing::{error, info, warn};
>  
>  use proxmox_worker_task::WorkerTaskContext;
> @@ -89,6 +90,38 @@ impl VerifyWorker {
>              }
>          }
>  
> +        if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
> +            let suffix = format!(".{}.bad", counter);
> +            let target_key =
> +                match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
> +                    Ok(target_key) => target_key,
> +                    Err(err) => {
> +                        info!("could not generate target key for corrupted chunk {path:?} - {err}");
> +                        return;
> +                    }
> +                };
> +            let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
> +                Ok(object_key) => object_key,
> +                Err(err) => {
> +                    info!("could not generate object key for corrupted chunk {path:?} - {err}");
> +                    return;
> +                }
> +            };
> +            if proxmox_async::runtime::block_on(
> +                s3_client.copy_object(object_key.clone(), target_key),
> +            )
> +            .is_ok()
> +            {
> +                if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
> +                    info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
> +                }
> +            } else {
> +                info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
> +            }
> +        } else {
> +            info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
> +        }
> +
>          match std::fs::rename(&path, &new_path) {
>              Ok(_) => {
>                  info!("corrupted chunk renamed to {:?}", &new_path);
> @@ -189,18 +222,50 @@ impl VerifyWorker {
>                  continue; // already verified or marked corrupt
>              }
>  
> -            match self.datastore.load_chunk(&info.digest) {
> -                Err(err) => {
> -                    self.corrupt_chunks.lock().unwrap().insert(info.digest);
> -                    error!("can't verify chunk, load failed - {err}");
> -                    errors.fetch_add(1, Ordering::SeqCst);
> -                    Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> -                }
> -                Ok(chunk) => {
> -                    let size = info.size();
> -                    read_bytes += chunk.raw_size();
> -                    decoder_pool.send((chunk, info.digest, size))?;
> -                    decoded_bytes += size;
> +            match &self.backend {

The whole method becomes uncomfortably large, maybe move the entire match &self.backend into a new method?

> +                DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> +                    Err(err) => {
> +                        self.corrupt_chunks.lock().unwrap().insert(info.digest);

Maybe add a new method self.add_corrupt_chunk

fn add_corrupt_chunk(&mut self, chunk: ...) {
    // Panic on poisoned mutex
    let mut chunks = self.corrupt_chunks.lock().unwrap();

    chunks.insert(chunk);
}

or the like

> +                        error!("can't verify chunk, load failed - {err}");
> +                        errors.fetch_add(1, Ordering::SeqCst);
> +                        Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> +                    }
> +                    Ok(chunk) => {
> +                        let size = info.size();
> +                        read_bytes += chunk.raw_size();
> +                        decoder_pool.send((chunk, info.digest, size))?;
> +                        decoded_bytes += size;
> +                    }
> +                },
> +                DatastoreBackend::S3(s3_client) => {
> +                    let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
> +                    match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
> +                        Ok(Some(response)) => {
> +                            let bytes =
> +                                proxmox_async::runtime::block_on(response.content.collect())?
> +                                    .to_bytes();
> +                            let chunk = DataBlob::from_raw(bytes.to_vec())?;
> +                            let size = info.size();
> +                            read_bytes += chunk.raw_size();
> +                            decoder_pool.send((chunk, info.digest, size))?;
> +                            decoded_bytes += size;
> +                        }
> +                        Ok(None) => {
> +                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
> +                            error!(
> +                                "can't verify missing chunk with digest {}",
> +                                hex::encode(info.digest)
> +                            );
> +                            errors.fetch_add(1, Ordering::SeqCst);
> +                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> +                        }
> +                        Err(err) => {
> +                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
> +                            error!("can't verify chunk, load failed - {err}");
> +                            errors.fetch_add(1, Ordering::SeqCst);
> +                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> +                        }
> +                    }
>                  }
>              }
>          }

-- 
- Lukas





More information about the pbs-devel mailing list