[pbs-devel] [PATCH proxmox-backup v9 17/46] verify: implement chunk verification for stores with s3 backend
Hannes Laimer
h.laimer at proxmox.com
Mon Jul 21 15:35:31 CEST 2025
On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
> For datastores backed by an S3 compatible object store, rather than
> reading the chunks to be verified from the local filesystem, fetch
> them via the s3 client from the configured bucket.
>
Could we somehow take advantage of the hash S3 provides for objects[1]?
We can't use our hashes tough, so not if how we would...
[1] https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 8:
> - refactored corrupt chunk marking into more compact methods
>
> src/backup/verify.rs | 118 +++++++++++++++++++++++++++++++++++++------
> 1 file changed, 103 insertions(+), 15 deletions(-)
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index dea10f618..9a2148b56 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
> use std::time::Instant;
>
> use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
> use tracing::{error, info, warn};
>
> use proxmox_worker_task::WorkerTaskContext;
> @@ -15,7 +16,7 @@ use pbs_api_types::{
> UPID,
> };
> use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
> -use pbs_datastore::index::IndexFile;
> +use pbs_datastore::index::{ChunkReadInfo, IndexFile};
> use pbs_datastore::manifest::{BackupManifest, FileInfo};
> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
>
> @@ -89,6 +90,38 @@ impl VerifyWorker {
> }
> }
>
> + if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
> + let suffix = format!(".{}.bad", counter);
> + let target_key =
> + match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
> + Ok(target_key) => target_key,
> + Err(err) => {
> + info!("could not generate target key for corrupted chunk {path:?} - {err}");
> + return;
> + }
> + };
> + let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
> + Ok(object_key) => object_key,
> + Err(err) => {
> + info!("could not generate object key for corrupted chunk {path:?} - {err}");
> + return;
> + }
> + };
> + if proxmox_async::runtime::block_on(
> + s3_client.copy_object(object_key.clone(), target_key),
> + )
> + .is_ok()
> + {
> + if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
> + info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
> + }
> + } else {
> + info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
> + }
> + } else {
> + info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
> + }
> +
> match std::fs::rename(&path, &new_path) {
> Ok(_) => {
> info!("corrupted chunk renamed to {:?}", &new_path);
> @@ -189,20 +222,13 @@ impl VerifyWorker {
> continue; // already verified or marked corrupt
> }
>
> - match self.datastore.load_chunk(&info.digest) {
> - Err(err) => {
> - self.corrupt_chunks.lock().unwrap().insert(info.digest);
> - error!("can't verify chunk, load failed - {err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> - Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> - }
> - Ok(chunk) => {
> - let size = info.size();
> - read_bytes += chunk.raw_size();
> - decoder_pool.send((chunk, info.digest, size))?;
> - decoded_bytes += size;
> - }
> - }
> + self.verify_chunk_by_backend(
> + &info,
> + &mut read_bytes,
> + &mut decoded_bytes,
> + Arc::clone(&errors),
> + &decoder_pool,
> + )?;
> }
>
> decoder_pool.complete()?;
> @@ -228,6 +254,68 @@ impl VerifyWorker {
> Ok(())
> }
>
> + fn verify_chunk_by_backend(
> + &self,
> + info: &ChunkReadInfo,
> + read_bytes: &mut u64,
> + decoded_bytes: &mut u64,
> + errors: Arc<AtomicUsize>,
> + decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
> + ) -> Result<(), Error> {
> + match &self.backend {
> + DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> + Err(err) => self.add_corrupt_chunk(
> + info.digest,
> + errors,
> + &format!("can't verify chunk, load failed - {err}"),
> + ),
> + Ok(chunk) => {
> + let size = info.size();
> + *read_bytes += chunk.raw_size();
> + decoder_pool.send((chunk, info.digest, size))?;
> + *decoded_bytes += size;
> + }
> + },
> + DatastoreBackend::S3(s3_client) => {
> + let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
> + match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
> + Ok(Some(response)) => {
> + let bytes = proxmox_async::runtime::block_on(response.content.collect())?
> + .to_bytes();
> + let chunk = DataBlob::from_raw(bytes.to_vec())?;
> + let size = info.size();
> + *read_bytes += chunk.raw_size();
> + decoder_pool.send((chunk, info.digest, size))?;
> + *decoded_bytes += size;
> + }
> + Ok(None) => self.add_corrupt_chunk(
> + info.digest,
> + errors,
> + &format!(
> + "can't verify missing chunk with digest {}",
> + hex::encode(info.digest)
> + ),
> + ),
> + Err(err) => self.add_corrupt_chunk(
> + info.digest,
> + errors,
> + &format!("can't verify chunk, load failed - {err}"),
> + ),
> + }
> + }
> + }
> + Ok(())
> + }
> +
> + fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
> + // Panic on poisoned mutex
> + let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
> + corrupt_chunks.insert(digest);
> + error!(message);
> + errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
> + }
> +
> fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
> let mut path = backup_dir.relative_path();
> path.push(&info.filename);
More information about the pbs-devel
mailing list