[pbs-devel] [PATCH proxmox-backup v10 17/46] verify: implement chunk verification for stores with s3 backend

Christian Ebner c.ebner at proxmox.com
Mon Jul 21 18:44:38 CEST 2025


For datastores backed by an S3 compatible object store, rather than
reading the chunks to be verified from the local filesystem, fetch
them via the s3 client from the configured bucket.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 9:
- fix chunk rename inconsistency if chunk could not be renamed on s3
  backend

 src/backup/verify.rs | 122 +++++++++++++++++++++++++++++++++++++------
 1 file changed, 107 insertions(+), 15 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index dea10f618..b1452f267 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
 use std::time::Instant;
 
 use anyhow::{bail, Error};
+use http_body_util::BodyExt;
 use tracing::{error, info, warn};
 
 use proxmox_worker_task::WorkerTaskContext;
@@ -15,7 +16,7 @@ use pbs_api_types::{
     UPID,
 };
 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
-use pbs_datastore::index::IndexFile;
+use pbs_datastore::index::{ChunkReadInfo, IndexFile};
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
 
@@ -89,6 +90,42 @@ impl VerifyWorker {
             }
         }
 
+        if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
+            let suffix = format!(".{}.bad", counter);
+            let target_key =
+                match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
+                    Ok(target_key) => target_key,
+                    Err(err) => {
+                        info!("could not generate target key for corrupted chunk {path:?} - {err}");
+                        return;
+                    }
+                };
+            let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
+                Ok(object_key) => object_key,
+                Err(err) => {
+                    info!("could not generate object key for corrupted chunk {path:?} - {err}");
+                    return;
+                }
+            };
+            if proxmox_async::runtime::block_on(
+                s3_client.copy_object(object_key.clone(), target_key),
+            )
+            .is_ok()
+            {
+                if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
+                    info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
+                }
+            } else {
+                info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
+                // Early return to leave the potentially locally cached chunk in the same state as
+                // on the object store. Verification might have failed because of connection issue
+                // after all.
+                return;
+            }
+        } else {
+            info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
+        }
+
         match std::fs::rename(&path, &new_path) {
             Ok(_) => {
                 info!("corrupted chunk renamed to {:?}", &new_path);
@@ -189,20 +226,13 @@ impl VerifyWorker {
                 continue; // already verified or marked corrupt
             }
 
-            match self.datastore.load_chunk(&info.digest) {
-                Err(err) => {
-                    self.corrupt_chunks.lock().unwrap().insert(info.digest);
-                    error!("can't verify chunk, load failed - {err}");
-                    errors.fetch_add(1, Ordering::SeqCst);
-                    Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
-                }
-                Ok(chunk) => {
-                    let size = info.size();
-                    read_bytes += chunk.raw_size();
-                    decoder_pool.send((chunk, info.digest, size))?;
-                    decoded_bytes += size;
-                }
-            }
+            self.verify_chunk_by_backend(
+                &info,
+                &mut read_bytes,
+                &mut decoded_bytes,
+                Arc::clone(&errors),
+                &decoder_pool,
+            )?;
         }
 
         decoder_pool.complete()?;
@@ -228,6 +258,68 @@ impl VerifyWorker {
         Ok(())
     }
 
+    fn verify_chunk_by_backend(
+        &self,
+        info: &ChunkReadInfo,
+        read_bytes: &mut u64,
+        decoded_bytes: &mut u64,
+        errors: Arc<AtomicUsize>,
+        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
+    ) -> Result<(), Error> {
+        match &self.backend {
+            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
+                Err(err) => self.add_corrupt_chunk(
+                    info.digest,
+                    errors,
+                    &format!("can't verify chunk, load failed - {err}"),
+                ),
+                Ok(chunk) => {
+                    let size = info.size();
+                    *read_bytes += chunk.raw_size();
+                    decoder_pool.send((chunk, info.digest, size))?;
+                    *decoded_bytes += size;
+                }
+            },
+            DatastoreBackend::S3(s3_client) => {
+                let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
+                match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
+                    Ok(Some(response)) => {
+                        let bytes = proxmox_async::runtime::block_on(response.content.collect())?
+                            .to_bytes();
+                        let chunk = DataBlob::from_raw(bytes.to_vec())?;
+                        let size = info.size();
+                        *read_bytes += chunk.raw_size();
+                        decoder_pool.send((chunk, info.digest, size))?;
+                        *decoded_bytes += size;
+                    }
+                    Ok(None) => self.add_corrupt_chunk(
+                        info.digest,
+                        errors,
+                        &format!(
+                            "can't verify missing chunk with digest {}",
+                            hex::encode(info.digest)
+                        ),
+                    ),
+                    Err(err) => self.add_corrupt_chunk(
+                        info.digest,
+                        errors,
+                        &format!("can't verify chunk, load failed - {err}"),
+                    ),
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+        // Panic on poisoned mutex
+        let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
+        corrupt_chunks.insert(digest);
+        error!(message);
+        errors.fetch_add(1, Ordering::SeqCst);
+        Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
+    }
+
     fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
         let mut path = backup_dir.relative_path();
         path.push(&info.filename);
-- 
2.47.2





More information about the pbs-devel mailing list