[pbs-devel] [PATCH proxmox-backup v11 17/46] verify: implement chunk verification for stores with s3 backend
Christian Ebner
c.ebner at proxmox.com
Tue Jul 22 12:10:37 CEST 2025
For datastores backed by an S3 compatible object store, rather than
reading the chunks to be verified from the local filesystem, fetch
them via the s3 client from the configured bucket.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>
Reviewed-by: Hannes Laimer <h.laimer at proxmox.com>
---
changes since version 10:
- no changes
src/backup/verify.rs | 122 +++++++++++++++++++++++++++++++++++++------
1 file changed, 107 insertions(+), 15 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index dea10f618..b1452f267 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{bail, Error};
+use http_body_util::BodyExt;
use tracing::{error, info, warn};
use proxmox_worker_task::WorkerTaskContext;
@@ -15,7 +16,7 @@ use pbs_api_types::{
UPID,
};
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
-use pbs_datastore::index::IndexFile;
+use pbs_datastore::index::{ChunkReadInfo, IndexFile};
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
@@ -89,6 +90,42 @@ impl VerifyWorker {
}
}
+ if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
+ let suffix = format!(".{}.bad", counter);
+ let target_key =
+ match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
+ Ok(target_key) => target_key,
+ Err(err) => {
+ info!("could not generate target key for corrupted chunk {path:?} - {err}");
+ return;
+ }
+ };
+ let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
+ Ok(object_key) => object_key,
+ Err(err) => {
+ info!("could not generate object key for corrupted chunk {path:?} - {err}");
+ return;
+ }
+ };
+ if proxmox_async::runtime::block_on(
+ s3_client.copy_object(object_key.clone(), target_key),
+ )
+ .is_ok()
+ {
+ if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
+ info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
+ }
+ } else {
+ info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
+ // Early return to leave the potentially locally cached chunk in the same state as
+ // on the object store. Verification might have failed because of connection issue
+ // after all.
+ return;
+ }
+ } else {
+ info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
+ }
+
match std::fs::rename(&path, &new_path) {
Ok(_) => {
info!("corrupted chunk renamed to {:?}", &new_path);
@@ -189,20 +226,13 @@ impl VerifyWorker {
continue; // already verified or marked corrupt
}
- match self.datastore.load_chunk(&info.digest) {
- Err(err) => {
- self.corrupt_chunks.lock().unwrap().insert(info.digest);
- error!("can't verify chunk, load failed - {err}");
- errors.fetch_add(1, Ordering::SeqCst);
- Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
- }
- Ok(chunk) => {
- let size = info.size();
- read_bytes += chunk.raw_size();
- decoder_pool.send((chunk, info.digest, size))?;
- decoded_bytes += size;
- }
- }
+ self.verify_chunk_by_backend(
+ &info,
+ &mut read_bytes,
+ &mut decoded_bytes,
+ Arc::clone(&errors),
+ &decoder_pool,
+ )?;
}
decoder_pool.complete()?;
@@ -228,6 +258,68 @@ impl VerifyWorker {
Ok(())
}
+ fn verify_chunk_by_backend(
+ &self,
+ info: &ChunkReadInfo,
+ read_bytes: &mut u64,
+ decoded_bytes: &mut u64,
+ errors: Arc<AtomicUsize>,
+ decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
+ ) -> Result<(), Error> {
+ match &self.backend {
+ DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
+ Err(err) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!("can't verify chunk, load failed - {err}"),
+ ),
+ Ok(chunk) => {
+ let size = info.size();
+ *read_bytes += chunk.raw_size();
+ decoder_pool.send((chunk, info.digest, size))?;
+ *decoded_bytes += size;
+ }
+ },
+ DatastoreBackend::S3(s3_client) => {
+ let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
+ match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
+ Ok(Some(response)) => {
+ let bytes = proxmox_async::runtime::block_on(response.content.collect())?
+ .to_bytes();
+ let chunk = DataBlob::from_raw(bytes.to_vec())?;
+ let size = info.size();
+ *read_bytes += chunk.raw_size();
+ decoder_pool.send((chunk, info.digest, size))?;
+ *decoded_bytes += size;
+ }
+ Ok(None) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!(
+ "can't verify missing chunk with digest {}",
+ hex::encode(info.digest)
+ ),
+ ),
+ Err(err) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!("can't verify chunk, load failed - {err}"),
+ ),
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+ // Panic on poisoned mutex
+ let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
+ corrupt_chunks.insert(digest);
+ error!(message);
+ errors.fetch_add(1, Ordering::SeqCst);
+ Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
+ }
+
fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
--
2.47.2
More information about the pbs-devel
mailing list