[pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
Gabriel Goller
g.goller at proxmox.com
Tue Jul 30 17:56:00 CEST 2024
On 25.07.2024 12:19, Christian Ebner wrote:
>Make the error handling and accounting logic for each group pull task
>reusable by moving it into its own helper function, returning the
>future.
>The store progress is placed behind a reference counted mutex to
>allow for concurrent access of status updates.
>
>Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>---
> pbs-datastore/src/store_progress.rs | 2 +-
> src/server/pull.rs | 102 +++++++++++++++++-----------
> 2 files changed, 65 insertions(+), 39 deletions(-)
>
>diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
>index a32bb9a9d..8afa60ace 100644
>--- a/pbs-datastore/src/store_progress.rs
>+++ b/pbs-datastore/src/store_progress.rs
>@@ -1,4 +1,4 @@
>-#[derive(Debug, Default)]
>+#[derive(Clone, Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
> /// Completed groups
>diff --git a/src/server/pull.rs b/src/server/pull.rs
>index 80443132e..e2d155c78 100644
>--- a/src/server/pull.rs
>+++ b/src/server/pull.rs
>@@ -1,8 +1,10 @@
> //! Sync datastore by pulling contents from remote server
>
> use std::collections::{HashMap, HashSet};
>+use std::future::Future;
> use std::io::{Seek, Write};
> use std::path::{Path, PathBuf};
>+use std::pin::Pin;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::{Duration, SystemTime};
>@@ -1023,7 +1025,7 @@ async fn pull_group(
> params: &PullParameters,
> source_namespace: &BackupNamespace,
> group: &BackupGroup,
>- progress: &mut StoreProgress,
>+ progress: Arc<Mutex<StoreProgress>>,
> ) -> Result<PullStats, Error> {
> let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>@@ -1079,7 +1081,10 @@ async fn pull_group(
> // start with 65536 chunks (up to 256 GiB)
> let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
>- progress.group_snapshots = list.len() as u64;
>+ {
>+ let mut progress = progress.lock().unwrap();
>+ progress.group_snapshots = list.len() as u64;
>+ }
>
> let mut pull_stats = PullStats::default();
>
>@@ -1095,8 +1100,11 @@ async fn pull_group(
> .await?;
> let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
>
>- progress.done_snapshots = pos as u64 + 1;
>- info!("percentage done: {progress}");
>+ {
>+ let mut progress = progress.lock().unwrap();
>+ progress.done_snapshots = pos as u64 + 1;
>+ info!("percentage done: {progress}");
>+ }
>
> let stats = result?; // stop on error
> pull_stats.add(stats);
>@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
> Ok(pull_stats)
> }
>
>+fn pull_group_task<'future>(
>+ params: &'future PullParameters,
>+ group: &'future BackupGroup,
>+ namespace: &'future BackupNamespace,
>+ target_namespace: &'future BackupNamespace,
>+ progress: StoreProgress,
>+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
This should be the same as making the function async:
async fn pull_group_task(...) -> Result<(...), Error> {}
Just posted these two things on the mailing list so that I don't forget
it, will follow up with a more detailed review. Will also have a look at
how we can improve the logging as it's quite janky atm :).
More information about the pbs-devel
mailing list