[pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper

Gabriel Goller g.goller at proxmox.com
Tue Jul 30 17:56:00 CEST 2024


On 25.07.2024 12:19, Christian Ebner wrote:
>Make the error handling and accounting logic for each group pull task
>reusable by moving it into its own helper function, returning the
>future.
>The store progress is placed behind a reference counted mutex to
>allow for concurrent access of status updates.
>
>Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>---
> pbs-datastore/src/store_progress.rs |   2 +-
> src/server/pull.rs                  | 102 +++++++++++++++++-----------
> 2 files changed, 65 insertions(+), 39 deletions(-)
>
>diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
>index a32bb9a9d..8afa60ace 100644
>--- a/pbs-datastore/src/store_progress.rs
>+++ b/pbs-datastore/src/store_progress.rs
>@@ -1,4 +1,4 @@
>-#[derive(Debug, Default)]
>+#[derive(Clone, Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
>     /// Completed groups
>diff --git a/src/server/pull.rs b/src/server/pull.rs
>index 80443132e..e2d155c78 100644
>--- a/src/server/pull.rs
>+++ b/src/server/pull.rs
>@@ -1,8 +1,10 @@
> //! Sync datastore by pulling contents from remote server
>
> use std::collections::{HashMap, HashSet};
>+use std::future::Future;
> use std::io::{Seek, Write};
> use std::path::{Path, PathBuf};
>+use std::pin::Pin;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::{Duration, SystemTime};
>@@ -1023,7 +1025,7 @@ async fn pull_group(
>     params: &PullParameters,
>     source_namespace: &BackupNamespace,
>     group: &BackupGroup,
>-    progress: &mut StoreProgress,
>+    progress: Arc<Mutex<StoreProgress>>,
> ) -> Result<PullStats, Error> {
>     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
>     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>@@ -1079,7 +1081,10 @@ async fn pull_group(
>     // start with 65536 chunks (up to 256 GiB)
>     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
>-    progress.group_snapshots = list.len() as u64;
>+    {
>+        let mut progress = progress.lock().unwrap();
>+        progress.group_snapshots = list.len() as u64;
>+    }
>
>     let mut pull_stats = PullStats::default();
>
>@@ -1095,8 +1100,11 @@ async fn pull_group(
>             .await?;
>         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
>
>-        progress.done_snapshots = pos as u64 + 1;
>-        info!("percentage done: {progress}");
>+        {
>+            let mut progress = progress.lock().unwrap();
>+            progress.done_snapshots = pos as u64 + 1;
>+            info!("percentage done: {progress}");
>+        }
>
>         let stats = result?; // stop on error
>         pull_stats.add(stats);
>@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
>     Ok(pull_stats)
> }
>
>+fn pull_group_task<'future>(
>+    params: &'future PullParameters,
>+    group: &'future BackupGroup,
>+    namespace: &'future BackupNamespace,
>+    target_namespace: &'future BackupNamespace,
>+    progress: StoreProgress,
>+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>

This should be the same as making the function async:

async fn pull_group_task(...) -> Result<(...), Error> {}


Just posted these two things on the mailing list so that I don't forget
it, will follow up with a more detailed review. Will also have a look at
how we can improve the logging as it's quite janky atm :).




More information about the pbs-devel mailing list