[pbs-devel] [PATCH proxmox-backup v5 25/46] verify: implement chunk verification for stores with s3 backend

Christian Ebner c.ebner at proxmox.com
Thu Jul 3 15:18:16 CEST 2025


For datastores backed by an S3 compatible object store, rather than
reading the chunks to be verified from the local filesystem, fetch
them via the s3 client from the configured bucket.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/backup/verify.rs | 75 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 63 insertions(+), 12 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index dea10f618..ac3bee765 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -1,10 +1,12 @@
 use pbs_config::BackupLockGuard;
+use pbs_s3_client::RelS3ObjectKey;
 use std::collections::HashSet;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::Instant;
 
 use anyhow::{bail, Error};
+use http_body_util::BodyExt;
 use tracing::{error, info, warn};
 
 use proxmox_worker_task::WorkerTaskContext;
@@ -89,6 +91,23 @@ impl VerifyWorker {
             }
         }
 
+        if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
+            let suffix = format!(".{}.bad", counter);
+            let target_key = RelS3ObjectKey::from_digest_with_suffix(digest, &suffix);
+            if proxmox_async::runtime::block_on(s3_client.copy_object(digest.into(), target_key))
+                .is_ok()
+            {
+                if proxmox_async::runtime::block_on(s3_client.delete_object(digest.into())).is_err()
+                {
+                    info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
+                }
+            } else {
+                info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
+            }
+        } else {
+            info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
+        }
+
         match std::fs::rename(&path, &new_path) {
             Ok(_) => {
                 info!("corrupted chunk renamed to {:?}", &new_path);
@@ -189,18 +208,50 @@ impl VerifyWorker {
                 continue; // already verified or marked corrupt
             }
 
-            match self.datastore.load_chunk(&info.digest) {
-                Err(err) => {
-                    self.corrupt_chunks.lock().unwrap().insert(info.digest);
-                    error!("can't verify chunk, load failed - {err}");
-                    errors.fetch_add(1, Ordering::SeqCst);
-                    Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
-                }
-                Ok(chunk) => {
-                    let size = info.size();
-                    read_bytes += chunk.raw_size();
-                    decoder_pool.send((chunk, info.digest, size))?;
-                    decoded_bytes += size;
+            match &self.backend {
+                DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
+                    Err(err) => {
+                        self.corrupt_chunks.lock().unwrap().insert(info.digest);
+                        error!("can't verify chunk, load failed - {err}");
+                        errors.fetch_add(1, Ordering::SeqCst);
+                        Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
+                    }
+                    Ok(chunk) => {
+                        let size = info.size();
+                        read_bytes += chunk.raw_size();
+                        decoder_pool.send((chunk, info.digest, size))?;
+                        decoded_bytes += size;
+                    }
+                },
+                DatastoreBackend::S3(s3_client) => {
+                    match proxmox_async::runtime::block_on(s3_client.get_object(info.digest.into()))
+                    {
+                        Ok(Some(response)) => {
+                            let bytes =
+                                proxmox_async::runtime::block_on(response.content.collect())?
+                                    .to_bytes();
+                            let chunk = DataBlob::from_raw(bytes.to_vec())?;
+                            let size = info.size();
+                            read_bytes += chunk.raw_size();
+                            decoder_pool.send((chunk, info.digest, size))?;
+                            decoded_bytes += size;
+                        }
+                        Ok(None) => {
+                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
+                            error!(
+                                "can't verify missing chunk with digest {}",
+                                hex::encode(info.digest)
+                            );
+                            errors.fetch_add(1, Ordering::SeqCst);
+                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
+                        }
+                        Err(err) => {
+                            self.corrupt_chunks.lock().unwrap().insert(info.digest);
+                            error!("can't verify chunk, load failed - {err}");
+                            errors.fetch_add(1, Ordering::SeqCst);
+                            Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
+                        }
+                    }
                 }
             }
         }
-- 
2.47.2





More information about the pbs-devel mailing list