[pve-devel] [pbs-devel] [RFC PATCH v2 proxmox-backup-qemu] restore: make chunk loading more parallel

Dominik Csapak d.csapak at proxmox.com
Tue Jul 8 10:52:05 CEST 2025


Sorry, i sent the patch to the wrong list, this was originally meant for
pve-devel instead. Should i resend it there?

On 7/8/25 10:49, Dominik Csapak wrote:
> by using async futures to load chunks and stream::buffer_unordered to
> buffer up to 16 of them, depending on write/load speed, use tokio's task
> spawn to make sure the continue to run in the background, since
> buffer_unordered starts them, but does not poll them to completion
> unless we're awaiting.
> 
> With this, we don't need to increase the number of threads in the
> runtime to trigger parallel reads and network traffic to us. This way
> it's only limited by CPU if decoding and/or decrypting is the bottle
> neck.
> 
> I measured restoring a VM backup with a 60GiB disk (filled with ~42GiB
> data) and fast storage over a local network link (from PBS VM to the
> host). Let it 3  runs, but the variance was not that big, so here's some
> representative log output with various MAX_BUFFERED_FUTURES values.
> 
> benchmark   duration        speed   cpu percentage
> current      107.18s   573.25MB/s           < 100%
> 4:            44.74s  1373.34MB/s           ~ 180%
> 8:            32.30s  1902.42MB/s           ~ 290%
> 16:           25.75s  2386.44MB/s           ~ 360%
> 
> I saw an increase in CPU usage proportional to the speed increase, so
> while in the current version it uses less than a single core total,
> using 16 parallel futures resulted in 3-4 available threads of the
> tokio runtime to be utilized.
> 
> In general I'd like to limit the buffering somehow, but I don't think
> there is a good automatic metric we can use, and giving the admin a knob
> that is hard to explain what the actual ramifications about it are is
> also not good, so I settled for a value that showed improvement but does
> not seem too high.
> 
> In any case, if the target and/or source storage is too slow, there will
> be back/forward pressure, and this change should only matter for storage
> systems where IO depth plays a role and that are fast enough.
> 
> The way we count the finished chunks also changes a bit, since they
> can come unordered, so we can't rely on the index position to calculate
> the percentage.
> 
> This patch is loosely based on the patch from Adam Kalisz[0], but removes
> the need to increase the blocking threads and uses the (actually always
> used) underlying async implementation for reading remote chunks.
> 
> 0: https://lore.proxmox.com/pve-devel/mailman.719.1751052794.395.pve-devel@lists.proxmox.com/
> 
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> Based-on-patch-by: Adam Kalisz <adam.kalisz at notnullmakers.com>
> ---
> changes from RFC v1:
> * uses tokio task spawn to actually run the fetching in the background
> * redo the counting for the task output (pos was unordered so we got
>    weird ordering sometimes)
> 
> When actually running the fetching in the background the speed increase
> is much higher than just using buffer_unordered for the fetching
> futures, which is nice (altough the cpu usage is much higher now).
> 
> Since the benchmark was much faster with higher values, I used a
> different bigger VM this time around so the timings are more consistent
> and it makes sure the disk does not fit in the PBS's memory.
> 
> The question what count we should use remains though...
> 
>   src/restore.rs | 63 +++++++++++++++++++++++++++++++++++++-------------
>   1 file changed, 47 insertions(+), 16 deletions(-)
> 
> diff --git a/src/restore.rs b/src/restore.rs
> index 5a5a398..4e6c538 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -2,6 +2,7 @@ use std::convert::TryInto;
>   use std::sync::{Arc, Mutex};
>   
>   use anyhow::{bail, format_err, Error};
> +use futures::StreamExt;
>   use once_cell::sync::OnceCell;
>   use tokio::runtime::Runtime;
>   
> @@ -13,7 +14,7 @@ use pbs_datastore::cached_chunk_reader::CachedChunkReader;
>   use pbs_datastore::data_blob::DataChunkBuilder;
>   use pbs_datastore::fixed_index::FixedIndexReader;
>   use pbs_datastore::index::IndexFile;
> -use pbs_datastore::read_chunk::ReadChunk;
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>   use pbs_datastore::BackupManifest;
>   use pbs_key_config::load_and_decrypt_key;
>   use pbs_tools::crypt_config::CryptConfig;
> @@ -29,6 +30,9 @@ struct ImageAccessInfo {
>       archive_size: u64,
>   }
>   
> +// use this many buffered futures to make loading of chunks more concurrent
> +const MAX_BUFFERED_FUTURES: usize = 16;
> +
>   pub(crate) struct RestoreTask {
>       setup: BackupSetup,
>       runtime: Arc<Runtime>,
> @@ -165,26 +169,53 @@ impl RestoreTask {
>   
>           let start_time = std::time::Instant::now();
>   
> -        for pos in 0..index.index_count() {
> -            let digest = index.index_digest(pos).unwrap();
> +        let read_queue = (0..index.index_count()).map(|pos| {
> +            let digest = *index.index_digest(pos).unwrap();
>               let offset = (pos * index.chunk_size) as u64;
> -            if digest == &zero_chunk_digest {
> -                let res = write_zero_callback(offset, index.chunk_size as u64);
> -                if res < 0 {
> -                    bail!("write_zero_callback failed ({})", res);
> +            let chunk_reader = chunk_reader.clone();
> +            async move {
> +                let chunk = if digest == zero_chunk_digest {
> +                    None
> +                } else {
> +                    let raw_data = tokio::task::spawn(async move {
> +                        AsyncReadChunk::read_chunk(&chunk_reader, &digest).await
> +                    })
> +                    .await??;
> +                    Some(raw_data)
> +                };
> +
> +                Ok::<_, Error>((chunk, offset))
> +            }
> +        });
> +
> +        // this buffers futures and pre-fetches some chunks for us
> +        let mut stream = futures::stream::iter(read_queue).buffer_unordered(MAX_BUFFERED_FUTURES);
> +
> +        let mut count = 0;
> +        while let Some(res) = stream.next().await {
> +            let res = res?;
> +            match res {
> +                (None, offset) => {
> +                    let res = write_zero_callback(offset, index.chunk_size as u64);
> +                    if res < 0 {
> +                        bail!("write_zero_callback failed ({})", res);
> +                    }
> +                    bytes += index.chunk_size;
> +                    zeroes += index.chunk_size;
>                   }
> -                bytes += index.chunk_size;
> -                zeroes += index.chunk_size;
> -            } else {
> -                let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?;
> -                let res = write_data_callback(offset, &raw_data);
> -                if res < 0 {
> -                    bail!("write_data_callback failed ({})", res);
> +                (Some(raw_data), offset) => {
> +                    let res = write_data_callback(offset, &raw_data);
> +                    if res < 0 {
> +                        bail!("write_data_callback failed ({})", res);
> +                    }
> +                    bytes += raw_data.len();
>                   }
> -                bytes += raw_data.len();
>               }
> +
> +            count += 1;
> +
>               if verbose {
> -                let next_per = ((pos + 1) * 100) / index.index_count();
> +                let next_per = (count * 100) / index.index_count();
>                   if per != next_per {
>                       eprintln!(
>                           "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",





More information about the pve-devel mailing list