[pbs-devel] [PATCH v2 proxmox-backup 5/6] server: sync: allow pushing groups concurrently
Christian Ebner
c.ebner at proxmox.com
Mon Jan 20 11:51:03 CET 2025
Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- not present in pervious version
src/server/push.rs | 94 +++++++++++++++++++++++++++++++++-------------
1 file changed, 68 insertions(+), 26 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index c61f0fe73..b3de214b4 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -4,7 +4,7 @@ use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Context, Error};
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, warn};
@@ -535,41 +535,46 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
- let mut progress = StoreProgress::new(list.len() as u64);
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
+ let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(params, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
+
+ let mut pusher = FuturesUnordered::new();
+ let mut group_futures_iter = list.iter().map(|group| {
+ push_group_do(
+ params,
+ namespace,
+ group,
+ progress.clone(),
+ synced_groups.clone(),
+ not_owned_target_groups.clone(),
+ )
+ });
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
-
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
+ for _ in 0..params.parallel_groups.unwrap_or(1) {
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
- synced_groups.insert(group.clone());
+ }
- match push_group(params, namespace, &group, &mut progress).await {
+ while let Some(result) = pusher.next().await {
+ match result {
Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
- }
+ Err(()) => errors |= true,
+ };
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
}
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -601,6 +606,8 @@ pub(crate) async fn push_namespace(
}
}
+ let progress = progress.lock().unwrap().clone();
+
Ok((progress, stats, errors))
}
@@ -648,6 +655,37 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ progress: Arc<Mutex<StoreProgress>>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(&group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ progress.lock().unwrap().done_groups += 1;
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ match push_group(params, namespace, group, progress.clone()).await {
+ Ok(sync_stats) => {
+ progress.lock().unwrap().done_groups += 1;
+ Ok(sync_stats)
+ }
+ Err(err) => {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ Err(())
+ }
+ }
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -661,7 +699,7 @@ pub(crate) async fn push_group(
params: &PushParameters,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -716,7 +754,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = store_progress.lock().unwrap().clone();
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -725,8 +764,11 @@ pub(crate) async fn push_group(
push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ store_progress.lock().unwrap().done_snapshots += 1;
+ local_progress.done_snapshots = pos as u64 + 1;
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+ info!("Percentage done: {local_progress}");
// stop on error
let sync_stats = result?;
--
2.39.5
More information about the pbs-devel
mailing list