[pbs-devel] [RFC PATCH proxmox-backup 3/3] verify: use separate read pool for reading chunks
Dominik Csapak
d.csapak at proxmox.com
Mon Jul 7 15:27:06 CEST 2025
instead of having each 'worker thread' read and then verify it's chunk,
use a separate 'reader pool' that reads chunks in parallell but
independent from verifying.
While this does introduces 4 new threads, they should be mostly busy with
reading from disk and not doing anything cpu intensive.
The advantage vs the current system is that the threads can start to
read the next chunks while the previous ones are still being verified.
Due to the nature of the ParallelHandler, the channel is bounded to the
number of threads, so there won't be more than 4 chunks read in advance.
In my local tests I measured the following speed difference:
verified a single snapshot with ~64 GiB (4x the RAM size) with 12 cores
current: ~550MiB/s
previous patch (moving loading into threads): ~950MiB/s
this patch: ~1150MiB/s
Obviously it increased the IO and CPU load in line with the throughput.
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/backup/verify.rs | 49 +++++++++++++++++++++++++++-----------------
1 file changed, 30 insertions(+), 19 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 83dd0d9a3..b139819a6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -17,7 +17,7 @@ use pbs_api_types::{
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
-use pbs_datastore::{DataStore, StoreProgress};
+use pbs_datastore::{DataBlob, DataStore, StoreProgress};
use crate::tools::parallel_handler::ParallelHandler;
@@ -114,24 +114,8 @@ fn verify_index_chunks(
let corrupt_chunks = Arc::clone(&verify_worker.corrupt_chunks);
let verified_chunks = Arc::clone(&verify_worker.verified_chunks);
let errors = Arc::clone(&errors);
- let read_bytes = Arc::clone(&read_bytes);
- let decoded_bytes = Arc::clone(&decoded_bytes);
- move |(digest, size): ([u8; 32], u64)| {
- let chunk = match datastore.load_chunk(&digest) {
- Err(err) => {
- corrupt_chunks.lock().unwrap().insert(digest);
- error!("can't verify chunk, load failed - {err}");
- errors.fetch_add(1, Ordering::SeqCst);
- rename_corrupted_chunk(datastore.clone(), &digest);
- return Ok(());
- }
- Ok(chunk) => {
- read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
- decoded_bytes.fetch_add(size, Ordering::SeqCst);
- chunk
- }
- };
+ move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
corrupt_chunks.lock().unwrap().insert(digest);
@@ -162,6 +146,32 @@ fn verify_index_chunks(
}
});
+ let reader_pool = ParallelHandler::new("read chunks", 4, {
+ let datastore = Arc::clone(&verify_worker.datastore);
+ let corrupt_chunks = Arc::clone(&verify_worker.corrupt_chunks);
+ let errors = Arc::clone(&errors);
+ let read_bytes = Arc::clone(&read_bytes);
+ let decoded_bytes = Arc::clone(&decoded_bytes);
+ let decoder_pool = decoder_pool.channel();
+
+ move |(digest, size): ([u8; 32], u64)| {
+ match datastore.load_chunk(&digest) {
+ Err(err) => {
+ corrupt_chunks.lock().unwrap().insert(digest);
+ error!("can't verify chunk, load failed - {err}");
+ errors.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(datastore.clone(), &digest);
+ }
+ Ok(chunk) => {
+ read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+ decoded_bytes.fetch_add(size, Ordering::SeqCst);
+ decoder_pool.send((chunk, digest, size))?;
+ }
+ }
+ Ok(())
+ }
+ });
+
let skip_chunk = |digest: &[u8; 32]| -> bool {
if verify_worker
.verified_chunks
@@ -209,9 +219,10 @@ fn verify_index_chunks(
continue; // already verified or marked corrupt
}
- decoder_pool.send((info.digest, info.size()))?;
+ reader_pool.send((info.digest, info.size()))?;
}
+ reader_pool.complete()?;
decoder_pool.complete()?;
let elapsed = start_time.elapsed().as_secs_f64();
--
2.39.5
More information about the pbs-devel
mailing list