[pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper

Christian Ebner c.ebner at proxmox.com
Thu Jul 25 12:19:20 CEST 2024


Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 102 +++++++++++++++++-----------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
 //! Sync datastore by pulling contents from remote server
 
 use std::collections::{HashMap, HashSet};
+use std::future::Future;
 use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    {
+        let mut progress = progress.lock().unwrap();
+        progress.group_snapshots = list.len() as u64;
+    }
 
     let mut pull_stats = PullStats::default();
 
@@ -1095,8 +1100,11 @@ async fn pull_group(
             .await?;
         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        {
+            let mut progress = progress.lock().unwrap();
+            progress.done_snapshots = pos as u64 + 1;
+            info!("percentage done: {progress}");
+        }
 
         let stats = result?; // stop on error
         pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     Ok(pull_stats)
 }
 
+fn pull_group_task<'future>(
+    params: &'future PullParameters,
+    group: &'future BackupGroup,
+    namespace: &'future BackupNamespace,
+    target_namespace: &'future BackupNamespace,
+    progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+    Box::pin(async move {
+        let progress = Arc::new(Mutex::new(progress));
+        let mut pull_stats = PullStats::default();
+        let mut errors = false;
+
+        let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+            target_namespace,
+            group,
+            &params.owner,
+        ) {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                errors = true;
+                // do not stop here, instead continue
+                info!("create_locked_backup_group failed");
+                return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+            }
+        };
+
+        // permission check
+        if params.owner != owner {
+            // only the owner is allowed to create additional snapshots
+            info!(
+                "sync group {group} failed - owner check failed ({} != {owner})",
+                params.owner,
+            );
+            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(params, namespace, group, progress.clone()).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    info!("sync group {group} failed - {err}");
+                    errors = true; // do not bail here, instead continue
+                }
+            }
+        }
+
+        let progress = progress.lock().unwrap().clone();
+        Ok((progress, pull_stats, errors))
+    })
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
-
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
-
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
-        }
+        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
     }
 
     if params.remove_vanished {
-- 
2.39.2





More information about the pbs-devel mailing list