[pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently

Gabriel Goller g.goller at proxmox.com
Tue Jul 30 17:54:01 CEST 2024


On 25.07.2024 12:19, Christian Ebner wrote:
>Currently, a sync job sequentially pulls the backup groups and the
>snapshots contained within them, therefore being limited in download
>speed by the http2 connection of the source reader instance in case
>of remote syncs. High latency networks suffer from limited download
>speed.
>
>Improve the throughput by allowing to pull up to a configured number
>of backup groups concurrently, by creating tasks connecting and
>pulling from the remote source in parallel.
>
>Link to issue in bugtracker:
>https://bugzilla.proxmox.com/show_bug.cgi?id=4182
>
>Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>---
> src/server/pull.rs | 50 ++++++++++++++++++++++++++++++++++++++--------
> 1 file changed, 42 insertions(+), 8 deletions(-)
>
>diff --git a/src/server/pull.rs b/src/server/pull.rs
>index e2d155c78..0a54217d4 100644
>--- a/src/server/pull.rs
>+++ b/src/server/pull.rs
>@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
> use std::time::{Duration, SystemTime};
>
> use anyhow::{bail, format_err, Error};
>+use futures::stream::FuturesUnordered;
>+use futures::StreamExt;
> use http::StatusCode;
> use proxmox_human_byte::HumanByte;
> use proxmox_router::HttpError;
>@@ -1452,16 +1454,48 @@ pub(crate) async fn pull_ns(
>         new_groups.insert(group.clone());
>     }
>
>-    let mut progress = StoreProgress::new(list.len() as u64);
>-    let mut pull_stats = PullStats::default();
>+    let mut store_progress = StoreProgress::new(list.len() as u64);
>
>     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
>
>-    for (done, group) in list.into_iter().enumerate() {
>-        progress.done_groups = done as u64;
>-        progress.done_snapshots = 0;
>-        progress.group_snapshots = 0;
>-        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
>+    let mut pull_group_tasks = FuturesUnordered::new();
>+
>+    let mut list_iter = list.iter();
>+    // queue up to requested number of initial group sync tasks to the task pool
>+    for _ in 0..params.group_sync_tasks.unwrap_or(1) {
>+        if let Some(group) = list_iter.next() {
>+            let task_progress = StoreProgress::new(list.len() as u64);
>+            pull_group_tasks.push(pull_group_task(
>+                params,
>+                group,
>+                namespace,
>+                &target_ns,
>+                task_progress,
>+            ));
>+        }
>+    }
>+
>+    let mut pull_stats = PullStats::default();
>+    // poll to initiate tasks, queue another remaining tasks for each finished one
>+    while let Some(result) = pull_group_tasks.next().await {
>+        let (progress, stats, has_errors) = result?;
>+        errors |= has_errors;
>+        pull_stats.add(stats);
>+        store_progress.done_groups += progress.done_groups;
>+        store_progress.done_snapshots += progress.done_snapshots;
>+
>+        matches!(params.group_sync_tasks, Some(n) if n > 1);

This can be removed, it does nothing.

>+        // queue another remaining group sync to the task pool
>+        if let Some(group) = list_iter.next() {
>+            let task_progress = StoreProgress::new(list.len() as u64);
>+            pull_group_tasks.push(pull_group_task(
>+                params,
>+                group,
>+                namespace,
>+                &target_ns,
>+                task_progress,
>+            ));
>+        }
>     }
>
>     if params.remove_vanished {
>@@ -1516,5 +1550,5 @@ pub(crate) async fn pull_ns(
>         };
>     }
>
>-    Ok((progress, pull_stats, errors))
>+    Ok((store_progress, pull_stats, errors))
> }
>-- 
>2.39.2
>
>
>
>_______________________________________________
>pbs-devel mailing list
>pbs-devel at lists.proxmox.com
>https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>




More information about the pbs-devel mailing list