[pbs-devel] [PATCH v2 proxmox-backup 3/6] fix #4182: server: sync: allow pulling groups concurrently

Christian Ebner c.ebner at proxmox.com
Mon Jan 20 11:51:01 CET 2025


Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.

Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.

Make the error handling and accounting logic for each group pull
reusable by moving it into its own helper function, returning the
future.

The store progress is placed behind an atomic reference counted mutex
to allow for concurrent access of status updates.

Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- Merged previously split patches, no need for preparatory patch
- Strongly refactored to make code more concise

 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 111 ++++++++++++++++++----------
 2 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0986bc5c8..27315f1ae 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use proxmox_human_byte::HumanByte;
 use tracing::info;
 
@@ -513,7 +515,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    store_progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -602,7 +604,8 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    let mut local_progress = store_progress.lock().unwrap().clone();
+    local_progress.group_snapshots = list.len() as u64;
 
     let mut sync_stats = SyncStats::default();
 
@@ -619,8 +622,11 @@ async fn pull_group(
         let result =
             pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        store_progress.lock().unwrap().done_snapshots += 1;
+        // Update done groups progress by other parallel running pulls
+        local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("Percentage done: {local_progress}");
 
         let stats = result?; // stop on error
         sync_stats.add(stats);
@@ -864,6 +870,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
     Ok(sync_stats)
 }
 
+async fn pull_group_do(
+    params: &PullParameters,
+    group: &BackupGroup,
+    namespace: &BackupNamespace,
+    target_namespace: &BackupNamespace,
+    progress: Arc<Mutex<StoreProgress>>,
+) -> Result<SyncStats, ()> {
+    let (owner, _lock_guard) =
+        match params
+            .target
+            .store
+            .create_locked_backup_group(target_namespace, group, &params.owner)
+        {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                info!("create_locked_backup_group failed");
+                return Err(());
+            }
+        };
+
+    if params.owner != owner {
+        // only the owner is allowed to create additional snapshots
+        info!(
+            "sync group {group} failed - owner check failed ({} != {owner})",
+            params.owner,
+        );
+        return Err(());
+    }
+
+    match pull_group(params, namespace, group, progress.clone()).await {
+        Ok(sync_stats) => {
+            progress.lock().unwrap().done_groups += 1;
+            Ok(sync_stats)
+        }
+        Err(err) => {
+            info!("sync group {group} failed - {err}");
+            Err(())
+        }
+    }
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -902,48 +950,29 @@ pub(crate) async fn pull_ns(
         new_groups.insert(group.clone());
     }
 
-    let mut progress = StoreProgress::new(list.len() as u64);
+    let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
     let mut sync_stats = SyncStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let mut puller = FuturesUnordered::new();
+    let mut group_futures_iter = list
+        .iter()
+        .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
 
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
+    for _ in 0..params.parallel_groups.unwrap_or(1) {
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
+        }
+    }
 
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => sync_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
+    while let Some(result) = puller.next().await {
+        match result {
+            Ok(stats) => sync_stats.add(stats),
+            Err(()) => errors |= true,
+        };
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
         }
     }
 
@@ -999,5 +1028,7 @@ pub(crate) async fn pull_ns(
         };
     }
 
+    let progress = progress.lock().unwrap().clone();
+
     Ok((progress, sync_stats, errors))
 }
-- 
2.39.5





More information about the pbs-devel mailing list