[PATCH PBS restore 1/1] Make retrieval of chunks concurrent when restoring backup, add more statistics on chunk fetching and writing to storage. Allow configuring the number of threads fetching chunks using PBS_RESTORE_CONCURRENCY environment variable.

Adam Kalisz adam.kalisz at notnullmakers.com
Fri Jun 27 21:24:26 CEST 2025


Co-authored-by: Daniel Škarda <daniel.skarda at notnullmakers.com>
Tested-by: Matt Neuforth <mneuforth at milbankworks.com>
Financed-by: Václav Svátek <svatek at cmis.cz>
---
 src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 84 insertions(+), 17 deletions(-)

diff --git a/src/restore.rs b/src/restore.rs
index 5a5a398..c75bf0d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,7 +1,11 @@
 use std::convert::TryInto;
-use std::sync::{Arc, Mutex};
+use std::sync::{
+    atomic::{AtomicU64, Ordering},
+    Arc, Mutex,
+};
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::StreamExt;
 use once_cell::sync::OnceCell;
 use tokio::runtime::Runtime;
 
@@ -69,7 +73,7 @@ impl RestoreTask {
         let runtime = get_runtime_with_builder(|| {
             let mut builder = tokio::runtime::Builder::new_multi_thread();
             builder.enable_all();
-            builder.max_blocking_threads(2);
+            builder.max_blocking_threads(32);
             builder.worker_threads(4);
             builder.thread_name("proxmox-restore-worker");
             builder
@@ -149,9 +153,7 @@ impl RestoreTask {
         )?;
 
         let most_used = index.find_most_used_chunks(8);
-
         let file_info = manifest.lookup_file_info(&archive_name)?;
-
         let chunk_reader = RemoteChunkReader::new(
             Arc::clone(&client),
             self.crypt_config.clone(),
@@ -162,13 +164,43 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
-
+        let mut storage_nonzero_write_time = std::time::Duration::new(0, 0);
+        let mut storage_nonzero_writes: u64 = 0;
+        let mut chunk_fetch_time = std::time::Duration::new(0, 0);
+        let chunks_fetched = Arc::new(AtomicU64::new(0));
         let start_time = std::time::Instant::now();
+        // Should be lower than max_blocking_threads in BackupSetup
+        let mut concurrent_requests: usize = 4;
+
+        if let Ok(val_str) = std::env::var("PBS_RESTORE_CONCURRENCY") {
+            match val_str.parse::<usize>() {
+                Ok(num_threads) if num_threads > 0 => {
+                    if verbose {
+                        eprintln!(
+                            "Using custom concurrency level from environment ({} threads)",
+                            num_threads
+                        );
+                    }
+                    concurrent_requests = num_threads;
+                }
+                _ => {
+                    if verbose {
+                        eprintln!(
+                            "Using default concurrency level ({} threads)",
+                            concurrent_requests
+                        );
+                    }
+                }
+            }
+        }
+
+        let mut chunk_futures = Vec::new();
 
         for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
+            let digest = index.index_digest(pos).unwrap().clone();
             let offset = (pos * index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
+
+            if digest == zero_chunk_digest {
                 let res = write_zero_callback(offset, index.chunk_size as u64);
                 if res < 0 {
                     bail!("write_zero_callback failed ({})", res);
@@ -176,22 +208,54 @@ impl RestoreTask {
                 bytes += index.chunk_size;
                 zeroes += index.chunk_size;
             } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+                let chunk_reader = chunk_reader.clone();
+                let chunks_fetched_clone = Arc::clone(&chunks_fetched);
+
+                let future = async move {
+                    tokio::task::spawn_blocking(move || {
+                        let start_chunk_fetch_time = std::time::Instant::now();
+                        let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let fetch_elapsed = start_chunk_fetch_time.elapsed();
+                        chunks_fetched_clone.fetch_add(1, Ordering::Relaxed);
+                        Ok::<_, Error>((offset, raw_data, fetch_elapsed))
+                    })
+                    .await
+                    .unwrap()
+                };
+                chunk_futures.push(future);
+            }
+        }
+
+        let mut stream = futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requests);
+
+        while let Some(result) = stream.next().await {
+            let (offset, raw_data, fetch_elapsed_time) = result?;
+            let start_storage_write_time = std::time::Instant::now();
+            let res = write_data_callback(offset, &raw_data);
+            let storage_write_elapsed = start_storage_write_time.elapsed();
+            if res < 0 {
+                bail!("write_data_callback failed ({})", res);
             }
+            storage_nonzero_write_time = storage_nonzero_write_time
+                .checked_add(storage_write_elapsed)
+                .unwrap_or_default();
+            storage_nonzero_writes += 1;
+            chunk_fetch_time += fetch_elapsed_time;
+            let chunk_len = raw_data.len();
+            bytes += chunk_len;
+
             if verbose {
-                let next_per = ((pos + 1) * 100) / index.index_count();
+                let next_per = (bytes * 100) / (index.index_count() * index.chunk_size);
                 if per != next_per {
                     eprintln!(
-                        "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                        "progress {}% (read {} bytes, zeroes = {}% ({} bytes), \
+                                nonzero writes = {}, chunks fetched = {}, duration {} sec)",
                         next_per,
                         bytes,
                         zeroes * 100 / bytes,
                         zeroes,
+                        storage_nonzero_writes,
+                        chunks_fetched.load(Ordering::Relaxed),
                         start_time.elapsed().as_secs()
                     );
                     per = next_per;
@@ -202,12 +266,15 @@ impl RestoreTask {
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);
         eprintln!(
-            "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+            "restore image complete (bytes={}, avg fetch time={:.4}ms, avg time per nonzero write={:.4}ms, \
+            storage nonzero total write time={:.3}s, duration={:.2}s, speed={:.2}MB/s)",
             bytes,
+            chunk_fetch_time.as_nanos() as f64 / (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0),
+            storage_nonzero_write_time.as_nanos() as f64 / (storage_nonzero_writes as f64 * 1_000_000.0),
+            storage_nonzero_write_time.as_secs_f64(),
             elapsed.as_secs_f64(),
             bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
         );
-
         Ok(())
     }
 
-- 
2.47.2




More information about the pve-devel mailing list