[pbs-devel] [PATCH proxmox-backup v9 17/46] verify: implement chunk verification for stores with s3 backend
Christian Ebner
c.ebner at proxmox.com
Sat Jul 19 14:50:06 CEST 2025
For datastores backed by an S3 compatible object store, rather than
reading the chunks to be verified from the local filesystem, fetch
them via the s3 client from the configured bucket.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 8:
- refactored corrupt chunk marking into more compact methods
src/backup/verify.rs | 118 +++++++++++++++++++++++++++++++++++++------
1 file changed, 103 insertions(+), 15 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index dea10f618..9a2148b56 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{bail, Error};
+use http_body_util::BodyExt;
use tracing::{error, info, warn};
use proxmox_worker_task::WorkerTaskContext;
@@ -15,7 +16,7 @@ use pbs_api_types::{
UPID,
};
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
-use pbs_datastore::index::IndexFile;
+use pbs_datastore::index::{ChunkReadInfo, IndexFile};
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
@@ -89,6 +90,38 @@ impl VerifyWorker {
}
}
+ if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
+ let suffix = format!(".{}.bad", counter);
+ let target_key =
+ match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
+ Ok(target_key) => target_key,
+ Err(err) => {
+ info!("could not generate target key for corrupted chunk {path:?} - {err}");
+ return;
+ }
+ };
+ let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
+ Ok(object_key) => object_key,
+ Err(err) => {
+ info!("could not generate object key for corrupted chunk {path:?} - {err}");
+ return;
+ }
+ };
+ if proxmox_async::runtime::block_on(
+ s3_client.copy_object(object_key.clone(), target_key),
+ )
+ .is_ok()
+ {
+ if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
+ info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
+ }
+ } else {
+ info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
+ }
+ } else {
+ info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
+ }
+
match std::fs::rename(&path, &new_path) {
Ok(_) => {
info!("corrupted chunk renamed to {:?}", &new_path);
@@ -189,20 +222,13 @@ impl VerifyWorker {
continue; // already verified or marked corrupt
}
- match self.datastore.load_chunk(&info.digest) {
- Err(err) => {
- self.corrupt_chunks.lock().unwrap().insert(info.digest);
- error!("can't verify chunk, load failed - {err}");
- errors.fetch_add(1, Ordering::SeqCst);
- Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
- }
- Ok(chunk) => {
- let size = info.size();
- read_bytes += chunk.raw_size();
- decoder_pool.send((chunk, info.digest, size))?;
- decoded_bytes += size;
- }
- }
+ self.verify_chunk_by_backend(
+ &info,
+ &mut read_bytes,
+ &mut decoded_bytes,
+ Arc::clone(&errors),
+ &decoder_pool,
+ )?;
}
decoder_pool.complete()?;
@@ -228,6 +254,68 @@ impl VerifyWorker {
Ok(())
}
+ fn verify_chunk_by_backend(
+ &self,
+ info: &ChunkReadInfo,
+ read_bytes: &mut u64,
+ decoded_bytes: &mut u64,
+ errors: Arc<AtomicUsize>,
+ decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
+ ) -> Result<(), Error> {
+ match &self.backend {
+ DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
+ Err(err) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!("can't verify chunk, load failed - {err}"),
+ ),
+ Ok(chunk) => {
+ let size = info.size();
+ *read_bytes += chunk.raw_size();
+ decoder_pool.send((chunk, info.digest, size))?;
+ *decoded_bytes += size;
+ }
+ },
+ DatastoreBackend::S3(s3_client) => {
+ let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
+ match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
+ Ok(Some(response)) => {
+ let bytes = proxmox_async::runtime::block_on(response.content.collect())?
+ .to_bytes();
+ let chunk = DataBlob::from_raw(bytes.to_vec())?;
+ let size = info.size();
+ *read_bytes += chunk.raw_size();
+ decoder_pool.send((chunk, info.digest, size))?;
+ *decoded_bytes += size;
+ }
+ Ok(None) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!(
+ "can't verify missing chunk with digest {}",
+ hex::encode(info.digest)
+ ),
+ ),
+ Err(err) => self.add_corrupt_chunk(
+ info.digest,
+ errors,
+ &format!("can't verify chunk, load failed - {err}"),
+ ),
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+ // Panic on poisoned mutex
+ let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
+ corrupt_chunks.insert(digest);
+ error!(message);
+ errors.fetch_add(1, Ordering::SeqCst);
+ Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
+ }
+
fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
--
2.47.2
More information about the pbs-devel
mailing list