[pbs-devel] [PATCH proxmox-backup v8 17/45] verify: implement chunk verification for stores with s3 backend

Christian Ebner c.ebner at proxmox.com
Fri Jul 18 13:45:30 CEST 2025


On 7/18/25 10:56 AM, Lukas Wagner wrote:
> On  2025-07-15 14:53, Christian Ebner wrote:
>> For datastores backed by an S3 compatible object store, rather than
>> reading the chunks to be verified from the local filesystem, fetch
>> them via the s3 client from the configured bucket.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - no changes
>>
>>   src/backup/verify.rs | 89 ++++++++++++++++++++++++++++++++++++++------
>>   1 file changed, 77 insertions(+), 12 deletions(-)
>>
>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>> index dea10f618..3a4a1d0d5 100644
>> --- a/src/backup/verify.rs
>> +++ b/src/backup/verify.rs
>> @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
>>   use std::time::Instant;
>>   
>>   use anyhow::{bail, Error};
>> +use http_body_util::BodyExt;
>>   use tracing::{error, info, warn};
>>   
>>   use proxmox_worker_task::WorkerTaskContext;
>> @@ -89,6 +90,38 @@ impl VerifyWorker {
>>               }
>>           }
>>   
>> +        if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
>> +            let suffix = format!(".{}.bad", counter);
>> +            let target_key =
>> +                match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
>> +                    Ok(target_key) => target_key,
>> +                    Err(err) => {
>> +                        info!("could not generate target key for corrupted chunk {path:?} - {err}");
>> +                        return;
>> +                    }
>> +                };
>> +            let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
>> +                Ok(object_key) => object_key,
>> +                Err(err) => {
>> +                    info!("could not generate object key for corrupted chunk {path:?} - {err}");
>> +                    return;
>> +                }
>> +            };
>> +            if proxmox_async::runtime::block_on(
>> +                s3_client.copy_object(object_key.clone(), target_key),
>> +            )
>> +            .is_ok()
>> +            {
>> +                if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
>> +                    info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
>> +                }
>> +            } else {
>> +                info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
>> +            }
>> +        } else {
>> +            info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
>> +        }
>> +
>>           match std::fs::rename(&path, &new_path) {
>>               Ok(_) => {
>>                   info!("corrupted chunk renamed to {:?}", &new_path);
>> @@ -189,18 +222,50 @@ impl VerifyWorker {
>>                   continue; // already verified or marked corrupt
>>               }
>>   
>> -            match self.datastore.load_chunk(&info.digest) {
>> -                Err(err) => {
>> -                    self.corrupt_chunks.lock().unwrap().insert(info.digest);
>> -                    error!("can't verify chunk, load failed - {err}");
>> -                    errors.fetch_add(1, Ordering::SeqCst);
>> -                    Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
>> -                }
>> -                Ok(chunk) => {
>> -                    let size = info.size();
>> -                    read_bytes += chunk.raw_size();
>> -                    decoder_pool.send((chunk, info.digest, size))?;
>> -                    decoded_bytes += size;
>> +            match &self.backend {
> 
> The whole method becomes uncomfortably large, maybe move the entire match &self.backend into a new method?

Okay, took me a bit since all the rquired interdependencies here, but 
now this is all placed in a verify_chunk_by_backend()

> 
>> +                DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
>> +                    Err(err) => {
>> +                        self.corrupt_chunks.lock().unwrap().insert(info.digest);
> 
> Maybe add a new method self.add_corrupt_chunk
> 
> fn add_corrupt_chunk(&mut self, chunk: ...) {
>      // Panic on poisoned mutex
>      let mut chunks = self.corrupt_chunks.lock().unwrap();
> 
>      chunks.insert(chunk);
> }
> 
> or the like

and this moved into a dedicated helper as suggested, but with the error 
counting and message included.

> 
>> +                        error!("can't verify chunk, load failed - {err}");
>> +                        errors.fetch_add(1, Ordering::SeqCst);
>> +                        Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
>> +                    }
>> +                    Ok(chunk) => {
>> +                        let size = info.size();
>> +                        read_bytes += chunk.raw_size();
>> +                        decoder_pool.send((chunk, info.digest, size))?;
>> +                        decoded_bytes += size;
>> +                    }
>> +                },
>> +                DatastoreBackend::S3(s3_client) => {
>> +                    let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
>> +                    match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
>> +                        Ok(Some(response)) => {
>> +                            let bytes =
>> +                                proxmox_async::runtime::block_on(response.content.collect())?
>> +                                    .to_bytes();
>> +                            let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> +                            let size = info.size();
>> +                            read_bytes += chunk.raw_size();
>> +                            decoder_pool.send((chunk, info.digest, size))?;
>> +                            decoded_bytes += size;
>> +                        }
>> +                        Ok(None) => {
>> +                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
>> +                            error!(
>> +                                "can't verify missing chunk with digest {}",
>> +                                hex::encode(info.digest)
>> +                            );
>> +                            errors.fetch_add(1, Ordering::SeqCst);
>> +                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
>> +                        }
>> +                        Err(err) => {
>> +                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
>> +                            error!("can't verify chunk, load failed - {err}");
>> +                            errors.fetch_add(1, Ordering::SeqCst);
>> +                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
>> +                        }
>> +                    }
>>                   }
>>               }
>>           }
> 





More information about the pbs-devel mailing list