[PATCH PBS restore 1/1] Make retrieval of chunks concurrent when restoring backup, add more statistics on chunk fetching and writing to storage. Allow configuring the number of threads fetching chunks using PBS_RESTORE_CONCURRENCY environment variable.
Adam Kalisz
adam.kalisz at notnullmakers.com
Fri Jun 27 21:24:26 CEST 2025
Co-authored-by: Daniel Škarda <daniel.skarda at notnullmakers.com>
Tested-by: Matt Neuforth <mneuforth at milbankworks.com>
Financed-by: Václav Svátek <svatek at cmis.cz>
---
src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 84 insertions(+), 17 deletions(-)
diff --git a/src/restore.rs b/src/restore.rs
index 5a5a398..c75bf0d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,7 +1,11 @@
use std::convert::TryInto;
-use std::sync::{Arc, Mutex};
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, Mutex,
+};
use anyhow::{bail, format_err, Error};
+use futures::stream::StreamExt;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
@@ -69,7 +73,7 @@ impl RestoreTask {
let runtime = get_runtime_with_builder(|| {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all();
- builder.max_blocking_threads(2);
+ builder.max_blocking_threads(32);
builder.worker_threads(4);
builder.thread_name("proxmox-restore-worker");
builder
@@ -149,9 +153,7 @@ impl RestoreTask {
)?;
let most_used = index.find_most_used_chunks(8);
-
let file_info = manifest.lookup_file_info(&archive_name)?;
-
let chunk_reader = RemoteChunkReader::new(
Arc::clone(&client),
self.crypt_config.clone(),
@@ -162,13 +164,43 @@ impl RestoreTask {
let mut per = 0;
let mut bytes = 0;
let mut zeroes = 0;
-
+ let mut storage_nonzero_write_time = std::time::Duration::new(0, 0);
+ let mut storage_nonzero_writes: u64 = 0;
+ let mut chunk_fetch_time = std::time::Duration::new(0, 0);
+ let chunks_fetched = Arc::new(AtomicU64::new(0));
let start_time = std::time::Instant::now();
+ // Should be lower than max_blocking_threads in BackupSetup
+ let mut concurrent_requests: usize = 4;
+
+ if let Ok(val_str) = std::env::var("PBS_RESTORE_CONCURRENCY") {
+ match val_str.parse::<usize>() {
+ Ok(num_threads) if num_threads > 0 => {
+ if verbose {
+ eprintln!(
+ "Using custom concurrency level from environment ({} threads)",
+ num_threads
+ );
+ }
+ concurrent_requests = num_threads;
+ }
+ _ => {
+ if verbose {
+ eprintln!(
+ "Using default concurrency level ({} threads)",
+ concurrent_requests
+ );
+ }
+ }
+ }
+ }
+
+ let mut chunk_futures = Vec::new();
for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
+ let digest = index.index_digest(pos).unwrap().clone();
let offset = (pos * index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
+
+ if digest == zero_chunk_digest {
let res = write_zero_callback(offset, index.chunk_size as u64);
if res < 0 {
bail!("write_zero_callback failed ({})", res);
@@ -176,22 +208,54 @@ impl RestoreTask {
bytes += index.chunk_size;
zeroes += index.chunk_size;
} else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
+ let chunk_reader = chunk_reader.clone();
+ let chunks_fetched_clone = Arc::clone(&chunks_fetched);
+
+ let future = async move {
+ tokio::task::spawn_blocking(move || {
+ let start_chunk_fetch_time = std::time::Instant::now();
+ let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+ let fetch_elapsed = start_chunk_fetch_time.elapsed();
+ chunks_fetched_clone.fetch_add(1, Ordering::Relaxed);
+ Ok::<_, Error>((offset, raw_data, fetch_elapsed))
+ })
+ .await
+ .unwrap()
+ };
+ chunk_futures.push(future);
+ }
+ }
+
+ let mut stream = futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requests);
+
+ while let Some(result) = stream.next().await {
+ let (offset, raw_data, fetch_elapsed_time) = result?;
+ let start_storage_write_time = std::time::Instant::now();
+ let res = write_data_callback(offset, &raw_data);
+ let storage_write_elapsed = start_storage_write_time.elapsed();
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
}
+ storage_nonzero_write_time = storage_nonzero_write_time
+ .checked_add(storage_write_elapsed)
+ .unwrap_or_default();
+ storage_nonzero_writes += 1;
+ chunk_fetch_time += fetch_elapsed_time;
+ let chunk_len = raw_data.len();
+ bytes += chunk_len;
+
if verbose {
- let next_per = ((pos + 1) * 100) / index.index_count();
+ let next_per = (bytes * 100) / (index.index_count() * index.chunk_size);
if per != next_per {
eprintln!(
- "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ "progress {}% (read {} bytes, zeroes = {}% ({} bytes), \
+ nonzero writes = {}, chunks fetched = {}, duration {} sec)",
next_per,
bytes,
zeroes * 100 / bytes,
zeroes,
+ storage_nonzero_writes,
+ chunks_fetched.load(Ordering::Relaxed),
start_time.elapsed().as_secs()
);
per = next_per;
@@ -202,12 +266,15 @@ impl RestoreTask {
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
eprintln!(
- "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+ "restore image complete (bytes={}, avg fetch time={:.4}ms, avg time per nonzero write={:.4}ms, \
+ storage nonzero total write time={:.3}s, duration={:.2}s, speed={:.2}MB/s)",
bytes,
+ chunk_fetch_time.as_nanos() as f64 / (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0),
+ storage_nonzero_write_time.as_nanos() as f64 / (storage_nonzero_writes as f64 * 1_000_000.0),
+ storage_nonzero_write_time.as_secs_f64(),
elapsed.as_secs_f64(),
bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
);
-
Ok(())
}
--
2.47.2
More information about the pve-devel
mailing list