[pbs-devel] [PATCH proxmox-backup v8 17/45] verify: implement chunk verification for stores with s3 backend
Lukas Wagner
l.wagner at proxmox.com
Fri Jul 18 10:56:25 CEST 2025
On 2025-07-15 14:53, Christian Ebner wrote:
> For datastores backed by an S3 compatible object store, rather than
> reading the chunks to be verified from the local filesystem, fetch
> them via the s3 client from the configured bucket.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> src/backup/verify.rs | 89 ++++++++++++++++++++++++++++++++++++++------
> 1 file changed, 77 insertions(+), 12 deletions(-)
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index dea10f618..3a4a1d0d5 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
> use std::time::Instant;
>
> use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
> use tracing::{error, info, warn};
>
> use proxmox_worker_task::WorkerTaskContext;
> @@ -89,6 +90,38 @@ impl VerifyWorker {
> }
> }
>
> + if let Ok(DatastoreBackend::S3(s3_client)) = datastore.backend() {
> + let suffix = format!(".{}.bad", counter);
> + let target_key =
> + match pbs_datastore::s3::object_key_from_digest_with_suffix(digest, &suffix) {
> + Ok(target_key) => target_key,
> + Err(err) => {
> + info!("could not generate target key for corrupted chunk {path:?} - {err}");
> + return;
> + }
> + };
> + let object_key = match pbs_datastore::s3::object_key_from_digest(digest) {
> + Ok(object_key) => object_key,
> + Err(err) => {
> + info!("could not generate object key for corrupted chunk {path:?} - {err}");
> + return;
> + }
> + };
> + if proxmox_async::runtime::block_on(
> + s3_client.copy_object(object_key.clone(), target_key),
> + )
> + .is_ok()
> + {
> + if proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).is_err() {
> + info!("failed to delete corrupt chunk on s3 backend: {digest_str}");
> + }
> + } else {
> + info!("failed to copy corrupt chunk on s3 backend: {digest_str}");
> + }
> + } else {
> + info!("failed to get s3 backend while trying to rename bad chunk: {digest_str}");
> + }
> +
> match std::fs::rename(&path, &new_path) {
> Ok(_) => {
> info!("corrupted chunk renamed to {:?}", &new_path);
> @@ -189,18 +222,50 @@ impl VerifyWorker {
> continue; // already verified or marked corrupt
> }
>
> - match self.datastore.load_chunk(&info.digest) {
> - Err(err) => {
> - self.corrupt_chunks.lock().unwrap().insert(info.digest);
> - error!("can't verify chunk, load failed - {err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> - Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> - }
> - Ok(chunk) => {
> - let size = info.size();
> - read_bytes += chunk.raw_size();
> - decoder_pool.send((chunk, info.digest, size))?;
> - decoded_bytes += size;
> + match &self.backend {
The whole method becomes uncomfortably large, maybe move the entire match &self.backend into a new method?
> + DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> + Err(err) => {
> + self.corrupt_chunks.lock().unwrap().insert(info.digest);
Maybe add a new method self.add_corrupt_chunk
fn add_corrupt_chunk(&mut self, chunk: ...) {
// Panic on poisoned mutex
let mut chunks = self.corrupt_chunks.lock().unwrap();
chunks.insert(chunk);
}
or the like
> + error!("can't verify chunk, load failed - {err}");
> + errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> + }
> + Ok(chunk) => {
> + let size = info.size();
> + read_bytes += chunk.raw_size();
> + decoder_pool.send((chunk, info.digest, size))?;
> + decoded_bytes += size;
> + }
> + },
> + DatastoreBackend::S3(s3_client) => {
> + let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?;
> + match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) {
> + Ok(Some(response)) => {
> + let bytes =
> + proxmox_async::runtime::block_on(response.content.collect())?
> + .to_bytes();
> + let chunk = DataBlob::from_raw(bytes.to_vec())?;
> + let size = info.size();
> + read_bytes += chunk.raw_size();
> + decoder_pool.send((chunk, info.digest, size))?;
> + decoded_bytes += size;
> + }
> + Ok(None) => {
> + self.corrupt_chunks.lock().unwrap().insert(info.digest);
> + error!(
> + "can't verify missing chunk with digest {}",
> + hex::encode(info.digest)
> + );
> + errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> + }
> + Err(err) => {
> + self.corrupt_chunks.lock().unwrap().insert(info.digest);
> + error!("can't verify chunk, load failed - {err}");
> + errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(self.datastore.clone(), &info.digest);
> + }
> + }
> }
> }
> }
--
- Lukas
More information about the pbs-devel
mailing list