[pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently

Christian Ebner c.ebner at proxmox.com
Thu Jul 25 12:19:21 CEST 2024


Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.

Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.

Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 50 ++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 42 insertions(+), 8 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index e2d155c78..0a54217d4 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use http::StatusCode;
 use proxmox_human_byte::HumanByte;
 use proxmox_router::HttpError;
@@ -1452,16 +1454,48 @@ pub(crate) async fn pull_ns(
         new_groups.insert(group.clone());
     }
 
-    let mut progress = StoreProgress::new(list.len() as u64);
-    let mut pull_stats = PullStats::default();
+    let mut store_progress = StoreProgress::new(list.len() as u64);
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
-        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
+    let mut pull_group_tasks = FuturesUnordered::new();
+
+    let mut list_iter = list.iter();
+    // queue up to requested number of initial group sync tasks to the task pool
+    for _ in 0..params.group_sync_tasks.unwrap_or(1) {
+        if let Some(group) = list_iter.next() {
+            let task_progress = StoreProgress::new(list.len() as u64);
+            pull_group_tasks.push(pull_group_task(
+                params,
+                group,
+                namespace,
+                &target_ns,
+                task_progress,
+            ));
+        }
+    }
+
+    let mut pull_stats = PullStats::default();
+    // poll to initiate tasks, queue another remaining tasks for each finished one
+    while let Some(result) = pull_group_tasks.next().await {
+        let (progress, stats, has_errors) = result?;
+        errors |= has_errors;
+        pull_stats.add(stats);
+        store_progress.done_groups += progress.done_groups;
+        store_progress.done_snapshots += progress.done_snapshots;
+
+        matches!(params.group_sync_tasks, Some(n) if n > 1);
+        // queue another remaining group sync to the task pool
+        if let Some(group) = list_iter.next() {
+            let task_progress = StoreProgress::new(list.len() as u64);
+            pull_group_tasks.push(pull_group_task(
+                params,
+                group,
+                namespace,
+                &target_ns,
+                task_progress,
+            ));
+        }
     }
 
     if params.remove_vanished {
@@ -1516,5 +1550,5 @@ pub(crate) async fn pull_ns(
         };
     }
 
-    Ok((progress, pull_stats, errors))
+    Ok((store_progress, pull_stats, errors))
 }
-- 
2.39.2





More information about the pbs-devel mailing list