[pbs-devel] [PATCH v2 proxmox-backup 5/6] server: sync: allow pushing groups concurrently

Christian Ebner c.ebner at proxmox.com
Mon Jan 20 11:51:03 CET 2025


Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.

The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- not present in pervious version

 src/server/push.rs | 94 +++++++++++++++++++++++++++++++++-------------
 1 file changed, 68 insertions(+), 26 deletions(-)

diff --git a/src/server/push.rs b/src/server/push.rs
index c61f0fe73..b3de214b4 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -4,7 +4,7 @@ use std::collections::HashSet;
 use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Context, Error};
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
 use tracing::{info, warn};
@@ -535,41 +535,46 @@ pub(crate) async fn push_namespace(
 
     let mut errors = false;
     // Remember synced groups, remove others when the remove vanished flag is set
-    let mut synced_groups = HashSet::new();
-    let mut progress = StoreProgress::new(list.len() as u64);
+    let synced_groups = Arc::new(Mutex::new(HashSet::new()));
+    let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
     let mut stats = SyncStats::default();
 
     let (owned_target_groups, not_owned_target_groups) =
         fetch_target_groups(params, &target_namespace).await?;
+    let not_owned_target_groups = Arc::new(not_owned_target_groups);
+
+    let mut pusher = FuturesUnordered::new();
+    let mut group_futures_iter = list.iter().map(|group| {
+        push_group_do(
+            params,
+            namespace,
+            group,
+            progress.clone(),
+            synced_groups.clone(),
+            not_owned_target_groups.clone(),
+        )
+    });
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
-
-        if not_owned_target_groups.contains(&group) {
-            warn!(
-                "Group '{group}' not owned by remote user '{}' on target, skipping upload",
-                params.target.remote_user(),
-            );
-            continue;
+    for _ in 0..params.parallel_groups.unwrap_or(1) {
+        if let Some(future) = group_futures_iter.next() {
+            pusher.push(future);
         }
-        synced_groups.insert(group.clone());
+    }
 
-        match push_group(params, namespace, &group, &mut progress).await {
+    while let Some(result) = pusher.next().await {
+        match result {
             Ok(sync_stats) => stats.add(sync_stats),
-            Err(err) => {
-                warn!("Encountered errors: {err:#}");
-                warn!("Failed to push group {group} to remote!");
-                errors = true;
-            }
+            Err(()) => errors |= true,
+        };
+        if let Some(future) = group_futures_iter.next() {
+            pusher.push(future);
         }
     }
 
     if params.remove_vanished {
         // only ever allow to prune owned groups on target
         for target_group in owned_target_groups {
-            if synced_groups.contains(&target_group) {
+            if synced_groups.lock().unwrap().contains(&target_group) {
                 continue;
             }
             if !target_group.apply_filters(&params.group_filter) {
@@ -601,6 +606,8 @@ pub(crate) async fn push_namespace(
         }
     }
 
+    let progress = progress.lock().unwrap().clone();
+
     Ok((progress, stats, errors))
 }
 
@@ -648,6 +655,37 @@ async fn forget_target_snapshot(
     Ok(())
 }
 
+async fn push_group_do(
+    params: &PushParameters,
+    namespace: &BackupNamespace,
+    group: &BackupGroup,
+    progress: Arc<Mutex<StoreProgress>>,
+    synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+    not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+    if not_owned_target_groups.contains(&group) {
+        warn!(
+            "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+            params.target.remote_user(),
+        );
+        progress.lock().unwrap().done_groups += 1;
+        return Ok(SyncStats::default());
+    }
+
+    synced_groups.lock().unwrap().insert(group.clone());
+    match push_group(params, namespace, group, progress.clone()).await {
+        Ok(sync_stats) => {
+            progress.lock().unwrap().done_groups += 1;
+            Ok(sync_stats)
+        }
+        Err(err) => {
+            warn!("Group {group}: Encountered errors: {err:#}");
+            warn!("Failed to push group {group} to remote!");
+            Err(())
+        }
+    }
+}
+
 /// Push group including all snaphshots to target
 ///
 /// Iterate over all snapshots in the group and push them to the target.
@@ -661,7 +699,7 @@ pub(crate) async fn push_group(
     params: &PushParameters,
     namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    store_progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -716,7 +754,8 @@ pub(crate) async fn push_group(
         transfer_last_skip_info.reset();
     }
 
-    progress.group_snapshots = snapshots.len() as u64;
+    let mut local_progress = store_progress.lock().unwrap().clone();
+    local_progress.group_snapshots = snapshots.len() as u64;
 
     let mut stats = SyncStats::default();
     let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -725,8 +764,11 @@ pub(crate) async fn push_group(
             push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
         fetch_previous_manifest = true;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("Percentage done: {progress}");
+        store_progress.lock().unwrap().done_snapshots += 1;
+        local_progress.done_snapshots = pos as u64 + 1;
+        // Update done groups progress by other parallel running pushes
+        local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+        info!("Percentage done: {local_progress}");
 
         // stop on error
         let sync_stats = result?;
-- 
2.39.5





More information about the pbs-devel mailing list