[pve-devel] [PATCH] restore: make chunk loading more parallel
Dominik Csapak
d.csapak at proxmox.com
Tue Jul 8 11:14:45 CEST 2025
by using async futures to load chunks and stream::buffer_unordered to
buffer up to 16 of them, depending on write/load speed, use tokio's task
spawn to make sure the continue to run in the background, since
buffer_unordered starts them, but does not poll them to completion
unless we're awaiting.
With this, we don't need to increase the number of threads in the
runtime to trigger parallel reads and network traffic to us. This way
it's only limited by CPU if decoding and/or decrypting is the bottle
neck.
I measured restoring a VM backup with a 60GiB disk (filled with ~42GiB
data) and fast storage over a local network link (from PBS VM to the
host). Let it 3 runs, but the variance was not that big, so here's some
representative log output with various MAX_BUFFERED_FUTURES values.
benchmark duration speed cpu percentage
current 107.18s 573.25MB/s < 100%
4: 44.74s 1373.34MB/s ~ 180%
8: 32.30s 1902.42MB/s ~ 290%
16: 25.75s 2386.44MB/s ~ 360%
I saw an increase in CPU usage proportional to the speed increase, so
while in the current version it uses less than a single core total,
using 16 parallel futures resulted in 3-4 available threads of the
tokio runtime to be utilized.
In general I'd like to limit the buffering somehow, but I don't think
there is a good automatic metric we can use, and giving the admin a knob
that is hard to explain what the actual ramifications about it are is
also not good, so I settled for a value that showed improvement but does
not seem too high.
In any case, if the target and/or source storage is too slow, there will
be back/forward pressure, and this change should only matter for storage
systems where IO depth plays a role and that are fast enough.
The way we count the finished chunks also changes a bit, since they
can come unordered, so we can't rely on the index position to calculate
the percentage.
This patch is loosely based on the patch from Adam Kalisz[0], but removes
the need to increase the blocking threads and uses the (actually always
used) underlying async implementation for reading remote chunks.
0: https://lore.proxmox.com/pve-devel/mailman.719.1751052794.395.pve-devel@lists.proxmox.com/
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
Based-on-patch-by: Adam Kalisz <adam.kalisz at notnullmakers.com>
---
resend to both pve and pbs devel, sorry for the noise
changes from RFC v1:
* uses tokio task spawn to actually run the fetching in the background
* redo the counting for the task output (pos was unordered so we got
weird ordering sometimes)
When actually running the fetching in the background the speed increase
is much higher than just using buffer_unordered for the fetching
futures, which is nice (altough the cpu usage is much higher now).
Since the benchmark was much faster with higher values, I used a
different bigger VM this time around so the timings are more consistent
and it makes sure the disk does not fit in the PBS's memory.
The question what count we should use remains though...
src/restore.rs | 63 +++++++++++++++++++++++++++++++++++++-------------
1 file changed, 47 insertions(+), 16 deletions(-)
diff --git a/src/restore.rs b/src/restore.rs
index 5a5a398..4e6c538 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -2,6 +2,7 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
+use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
@@ -13,7 +14,7 @@ use pbs_datastore::cached_chunk_reader::CachedChunkReader;
use pbs_datastore::data_blob::DataChunkBuilder;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
-use pbs_datastore::read_chunk::ReadChunk;
+use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::BackupManifest;
use pbs_key_config::load_and_decrypt_key;
use pbs_tools::crypt_config::CryptConfig;
@@ -29,6 +30,9 @@ struct ImageAccessInfo {
archive_size: u64,
}
+// use this many buffered futures to make loading of chunks more concurrent
+const MAX_BUFFERED_FUTURES: usize = 16;
+
pub(crate) struct RestoreTask {
setup: BackupSetup,
runtime: Arc<Runtime>,
@@ -165,26 +169,53 @@ impl RestoreTask {
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
+ let read_queue = (0..index.index_count()).map(|pos| {
+ let digest = *index.index_digest(pos).unwrap();
let offset = (pos * index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
+ let chunk_reader = chunk_reader.clone();
+ async move {
+ let chunk = if digest == zero_chunk_digest {
+ None
+ } else {
+ let raw_data = tokio::task::spawn(async move {
+ AsyncReadChunk::read_chunk(&chunk_reader, &digest).await
+ })
+ .await??;
+ Some(raw_data)
+ };
+
+ Ok::<_, Error>((chunk, offset))
+ }
+ });
+
+ // this buffers futures and pre-fetches some chunks for us
+ let mut stream = futures::stream::iter(read_queue).buffer_unordered(MAX_BUFFERED_FUTURES);
+
+ let mut count = 0;
+ while let Some(res) = stream.next().await {
+ let res = res?;
+ match res {
+ (None, offset) => {
+ let res = write_zero_callback(offset, index.chunk_size as u64);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes += index.chunk_size;
+ zeroes += index.chunk_size;
}
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
+ (Some(raw_data), offset) => {
+ let res = write_data_callback(offset, &raw_data);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes += raw_data.len();
}
- bytes += raw_data.len();
}
+
+ count += 1;
+
if verbose {
- let next_per = ((pos + 1) * 100) / index.index_count();
+ let next_per = (count * 100) / index.index_count();
if per != next_per {
eprintln!(
"progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
--
2.39.5
More information about the pve-devel
mailing list