[PATCH PBS restore 1/1] Sorry about the first submission with the "cover letter"

Adam Kalisz adam.kalisz at notnullmakers.com
Fri Jun 27 21:34:46 CEST 2025


On Fri, 2025-06-27 at 21:24 +0200, Adam Kalisz wrote:
> Co-authored-by: Daniel Škarda <daniel.skarda at notnullmakers.com>
> Tested-by: Matt Neuforth <mneuforth at milbankworks.com>
> Financed-by: Václav Svátek <svatek at cmis.cz>
> ---
>  src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++-------
> --
>  1 file changed, 84 insertions(+), 17 deletions(-)
> 
> diff --git a/src/restore.rs b/src/restore.rs
> index 5a5a398..c75bf0d 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -1,7 +1,11 @@
>  use std::convert::TryInto;
> -use std::sync::{Arc, Mutex};
> +use std::sync::{
> +    atomic::{AtomicU64, Ordering},
> +    Arc, Mutex,
> +};
>  
>  use anyhow::{bail, format_err, Error};
> +use futures::stream::StreamExt;
>  use once_cell::sync::OnceCell;
>  use tokio::runtime::Runtime;
>  
> @@ -69,7 +73,7 @@ impl RestoreTask {
>          let runtime = get_runtime_with_builder(|| {
>              let mut builder =
> tokio::runtime::Builder::new_multi_thread();
>              builder.enable_all();
> -            builder.max_blocking_threads(2);
> +            builder.max_blocking_threads(32);
>              builder.worker_threads(4);
>              builder.thread_name("proxmox-restore-worker");
>              builder
> @@ -149,9 +153,7 @@ impl RestoreTask {
>          )?;
>  
>          let most_used = index.find_most_used_chunks(8);
> -
>          let file_info = manifest.lookup_file_info(&archive_name)?;
> -
>          let chunk_reader = RemoteChunkReader::new(
>              Arc::clone(&client),
>              self.crypt_config.clone(),
> @@ -162,13 +164,43 @@ impl RestoreTask {
>          let mut per = 0;
>          let mut bytes = 0;
>          let mut zeroes = 0;
> -
> +        let mut storage_nonzero_write_time =
> std::time::Duration::new(0, 0);
> +        let mut storage_nonzero_writes: u64 = 0;
> +        let mut chunk_fetch_time = std::time::Duration::new(0, 0);
> +        let chunks_fetched = Arc::new(AtomicU64::new(0));
>          let start_time = std::time::Instant::now();
> +        // Should be lower than max_blocking_threads in BackupSetup
> +        let mut concurrent_requests: usize = 4;
> +
> +        if let Ok(val_str) =
> std::env::var("PBS_RESTORE_CONCURRENCY") {
> +            match val_str.parse::<usize>() {
> +                Ok(num_threads) if num_threads > 0 => {
> +                    if verbose {
> +                        eprintln!(
> +                            "Using custom concurrency level from
> environment ({} threads)",
> +                            num_threads
> +                        );
> +                    }
> +                    concurrent_requests = num_threads;
> +                }
> +                _ => {
> +                    if verbose {
> +                        eprintln!(
> +                            "Using default concurrency level ({}
> threads)",
> +                            concurrent_requests
> +                        );
> +                    }
> +                }
> +            }
> +        }
> +
> +        let mut chunk_futures = Vec::new();
>  
>          for pos in 0..index.index_count() {
> -            let digest = index.index_digest(pos).unwrap();
> +            let digest = index.index_digest(pos).unwrap().clone();
>              let offset = (pos * index.chunk_size) as u64;
> -            if digest == &zero_chunk_digest {
> +
> +            if digest == zero_chunk_digest {
>                  let res = write_zero_callback(offset,
> index.chunk_size as u64);
>                  if res < 0 {
>                      bail!("write_zero_callback failed ({})", res);
> @@ -176,22 +208,54 @@ impl RestoreTask {
>                  bytes += index.chunk_size;
>                  zeroes += index.chunk_size;
>              } else {
> -                let raw_data = ReadChunk::read_chunk(&chunk_reader,
> digest)?;
> -                let res = write_data_callback(offset, &raw_data);
> -                if res < 0 {
> -                    bail!("write_data_callback failed ({})", res);
> -                }
> -                bytes += raw_data.len();
> +                let chunk_reader = chunk_reader.clone();
> +                let chunks_fetched_clone =
> Arc::clone(&chunks_fetched);
> +
> +                let future = async move {
> +                    tokio::task::spawn_blocking(move || {
> +                        let start_chunk_fetch_time =
> std::time::Instant::now();
> +                        let raw_data =
> ReadChunk::read_chunk(&chunk_reader, &digest)?;
> +                        let fetch_elapsed =
> start_chunk_fetch_time.elapsed();
> +                        chunks_fetched_clone.fetch_add(1,
> Ordering::Relaxed);
> +                        Ok::<_, Error>((offset, raw_data,
> fetch_elapsed))
> +                    })
> +                    .await
> +                    .unwrap()
> +                };
> +                chunk_futures.push(future);
> +            }
> +        }
> +
> +        let mut stream =
> futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requ
> ests);
> +
> +        while let Some(result) = stream.next().await {
> +            let (offset, raw_data, fetch_elapsed_time) = result?;
> +            let start_storage_write_time =
> std::time::Instant::now();
> +            let res = write_data_callback(offset, &raw_data);
> +            let storage_write_elapsed =
> start_storage_write_time.elapsed();
> +            if res < 0 {
> +                bail!("write_data_callback failed ({})", res);
>              }
> +            storage_nonzero_write_time = storage_nonzero_write_time
> +                .checked_add(storage_write_elapsed)
> +                .unwrap_or_default();
> +            storage_nonzero_writes += 1;
> +            chunk_fetch_time += fetch_elapsed_time;
> +            let chunk_len = raw_data.len();
> +            bytes += chunk_len;
> +
>              if verbose {
> -                let next_per = ((pos + 1) * 100) /
> index.index_count();
> +                let next_per = (bytes * 100) / (index.index_count()
> * index.chunk_size);
>                  if per != next_per {
>                      eprintln!(
> -                        "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), duration {} sec)",
> +                        "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), \
> +                                nonzero writes = {}, chunks fetched
> = {}, duration {} sec)",
>                          next_per,
>                          bytes,
>                          zeroes * 100 / bytes,
>                          zeroes,
> +                        storage_nonzero_writes,
> +                        chunks_fetched.load(Ordering::Relaxed),
>                          start_time.elapsed().as_secs()
>                      );
>                      per = next_per;
> @@ -202,12 +266,15 @@ impl RestoreTask {
>          let end_time = std::time::Instant::now();
>          let elapsed = end_time.duration_since(start_time);
>          eprintln!(
> -            "restore image complete (bytes={}, duration={:.2}s,
> speed={:.2}MB/s)",
> +            "restore image complete (bytes={}, avg fetch
> time={:.4}ms, avg time per nonzero write={:.4}ms, \
> +            storage nonzero total write time={:.3}s,
> duration={:.2}s, speed={:.2}MB/s)",
>              bytes,
> +            chunk_fetch_time.as_nanos() as f64 /
> (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0),
> +            storage_nonzero_write_time.as_nanos() as f64 /
> (storage_nonzero_writes as f64 * 1_000_000.0),
> +            storage_nonzero_write_time.as_secs_f64(),
>              elapsed.as_secs_f64(),
>              bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
>          );
> -
>          Ok(())
>      }
>  


More information about the pve-devel mailing list