[PATCH PBS restore 1/1] Sorry about the first submission with the "cover letter"
Adam Kalisz
adam.kalisz at notnullmakers.com
Fri Jun 27 21:34:46 CEST 2025
On Fri, 2025-06-27 at 21:24 +0200, Adam Kalisz wrote:
> Co-authored-by: Daniel Škarda <daniel.skarda at notnullmakers.com>
> Tested-by: Matt Neuforth <mneuforth at milbankworks.com>
> Financed-by: Václav Svátek <svatek at cmis.cz>
> ---
> src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++-------
> --
> 1 file changed, 84 insertions(+), 17 deletions(-)
>
> diff --git a/src/restore.rs b/src/restore.rs
> index 5a5a398..c75bf0d 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -1,7 +1,11 @@
> use std::convert::TryInto;
> -use std::sync::{Arc, Mutex};
> +use std::sync::{
> + atomic::{AtomicU64, Ordering},
> + Arc, Mutex,
> +};
>
> use anyhow::{bail, format_err, Error};
> +use futures::stream::StreamExt;
> use once_cell::sync::OnceCell;
> use tokio::runtime::Runtime;
>
> @@ -69,7 +73,7 @@ impl RestoreTask {
> let runtime = get_runtime_with_builder(|| {
> let mut builder =
> tokio::runtime::Builder::new_multi_thread();
> builder.enable_all();
> - builder.max_blocking_threads(2);
> + builder.max_blocking_threads(32);
> builder.worker_threads(4);
> builder.thread_name("proxmox-restore-worker");
> builder
> @@ -149,9 +153,7 @@ impl RestoreTask {
> )?;
>
> let most_used = index.find_most_used_chunks(8);
> -
> let file_info = manifest.lookup_file_info(&archive_name)?;
> -
> let chunk_reader = RemoteChunkReader::new(
> Arc::clone(&client),
> self.crypt_config.clone(),
> @@ -162,13 +164,43 @@ impl RestoreTask {
> let mut per = 0;
> let mut bytes = 0;
> let mut zeroes = 0;
> -
> + let mut storage_nonzero_write_time =
> std::time::Duration::new(0, 0);
> + let mut storage_nonzero_writes: u64 = 0;
> + let mut chunk_fetch_time = std::time::Duration::new(0, 0);
> + let chunks_fetched = Arc::new(AtomicU64::new(0));
> let start_time = std::time::Instant::now();
> + // Should be lower than max_blocking_threads in BackupSetup
> + let mut concurrent_requests: usize = 4;
> +
> + if let Ok(val_str) =
> std::env::var("PBS_RESTORE_CONCURRENCY") {
> + match val_str.parse::<usize>() {
> + Ok(num_threads) if num_threads > 0 => {
> + if verbose {
> + eprintln!(
> + "Using custom concurrency level from
> environment ({} threads)",
> + num_threads
> + );
> + }
> + concurrent_requests = num_threads;
> + }
> + _ => {
> + if verbose {
> + eprintln!(
> + "Using default concurrency level ({}
> threads)",
> + concurrent_requests
> + );
> + }
> + }
> + }
> + }
> +
> + let mut chunk_futures = Vec::new();
>
> for pos in 0..index.index_count() {
> - let digest = index.index_digest(pos).unwrap();
> + let digest = index.index_digest(pos).unwrap().clone();
> let offset = (pos * index.chunk_size) as u64;
> - if digest == &zero_chunk_digest {
> +
> + if digest == zero_chunk_digest {
> let res = write_zero_callback(offset,
> index.chunk_size as u64);
> if res < 0 {
> bail!("write_zero_callback failed ({})", res);
> @@ -176,22 +208,54 @@ impl RestoreTask {
> bytes += index.chunk_size;
> zeroes += index.chunk_size;
> } else {
> - let raw_data = ReadChunk::read_chunk(&chunk_reader,
> digest)?;
> - let res = write_data_callback(offset, &raw_data);
> - if res < 0 {
> - bail!("write_data_callback failed ({})", res);
> - }
> - bytes += raw_data.len();
> + let chunk_reader = chunk_reader.clone();
> + let chunks_fetched_clone =
> Arc::clone(&chunks_fetched);
> +
> + let future = async move {
> + tokio::task::spawn_blocking(move || {
> + let start_chunk_fetch_time =
> std::time::Instant::now();
> + let raw_data =
> ReadChunk::read_chunk(&chunk_reader, &digest)?;
> + let fetch_elapsed =
> start_chunk_fetch_time.elapsed();
> + chunks_fetched_clone.fetch_add(1,
> Ordering::Relaxed);
> + Ok::<_, Error>((offset, raw_data,
> fetch_elapsed))
> + })
> + .await
> + .unwrap()
> + };
> + chunk_futures.push(future);
> + }
> + }
> +
> + let mut stream =
> futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requ
> ests);
> +
> + while let Some(result) = stream.next().await {
> + let (offset, raw_data, fetch_elapsed_time) = result?;
> + let start_storage_write_time =
> std::time::Instant::now();
> + let res = write_data_callback(offset, &raw_data);
> + let storage_write_elapsed =
> start_storage_write_time.elapsed();
> + if res < 0 {
> + bail!("write_data_callback failed ({})", res);
> }
> + storage_nonzero_write_time = storage_nonzero_write_time
> + .checked_add(storage_write_elapsed)
> + .unwrap_or_default();
> + storage_nonzero_writes += 1;
> + chunk_fetch_time += fetch_elapsed_time;
> + let chunk_len = raw_data.len();
> + bytes += chunk_len;
> +
> if verbose {
> - let next_per = ((pos + 1) * 100) /
> index.index_count();
> + let next_per = (bytes * 100) / (index.index_count()
> * index.chunk_size);
> if per != next_per {
> eprintln!(
> - "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), duration {} sec)",
> + "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), \
> + nonzero writes = {}, chunks fetched
> = {}, duration {} sec)",
> next_per,
> bytes,
> zeroes * 100 / bytes,
> zeroes,
> + storage_nonzero_writes,
> + chunks_fetched.load(Ordering::Relaxed),
> start_time.elapsed().as_secs()
> );
> per = next_per;
> @@ -202,12 +266,15 @@ impl RestoreTask {
> let end_time = std::time::Instant::now();
> let elapsed = end_time.duration_since(start_time);
> eprintln!(
> - "restore image complete (bytes={}, duration={:.2}s,
> speed={:.2}MB/s)",
> + "restore image complete (bytes={}, avg fetch
> time={:.4}ms, avg time per nonzero write={:.4}ms, \
> + storage nonzero total write time={:.3}s,
> duration={:.2}s, speed={:.2}MB/s)",
> bytes,
> + chunk_fetch_time.as_nanos() as f64 /
> (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0),
> + storage_nonzero_write_time.as_nanos() as f64 /
> (storage_nonzero_writes as f64 * 1_000_000.0),
> + storage_nonzero_write_time.as_secs_f64(),
> elapsed.as_secs_f64(),
> bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64())
> );
> -
> Ok(())
> }
>
More information about the pve-devel
mailing list