[pbs-devel] [PATCH proxmox-backup 1/3] server: sync: return `PullStats` for pull related methods

Christian Ebner c.ebner at proxmox.com
Wed Mar 6 15:11:51 CET 2024


Return basic statistics on pull related methods via `PullStats` objects,
in order to construct a global summary for sync jobs.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 125 ++++++++++++++++++++++++++++++---------------
 1 file changed, 85 insertions(+), 40 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5a4ba806..7d745c77 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
@@ -64,6 +64,21 @@ pub(crate) struct LocalSource {
     ns: BackupNamespace,
 }
 
+#[derive(Default)]
+pub(crate) struct PullStats {
+    pub(crate) chunk_count: usize,
+    pub(crate) bytes: usize,
+    pub(crate) elapsed: Duration,
+}
+
+impl PullStats {
+    fn add(&mut self, rhs: PullStats) {
+        self.chunk_count += rhs.chunk_count;
+        self.bytes += rhs.bytes;
+        self.elapsed += rhs.elapsed;
+    }
+}
+
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -559,7 +574,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
     let start_time = SystemTime::now();
@@ -594,12 +609,14 @@ async fn pull_index_chunks<I: IndexFile>(
     let verify_and_write_channel = verify_pool.channel();
 
     let bytes = Arc::new(AtomicUsize::new(0));
+    let chunk_count = Arc::new(AtomicUsize::new(0));
 
     stream
         .map(|info| {
             let target = Arc::clone(&target);
             let chunk_reader = chunk_reader.clone();
             let bytes = Arc::clone(&bytes);
+            let chunk_count = Arc::clone(&chunk_count);
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
@@ -620,6 +637,7 @@ async fn pull_index_chunks<I: IndexFile>(
                 })?;
 
                 bytes.fetch_add(raw_size, Ordering::SeqCst);
+                chunk_count.fetch_add(1, Ordering::SeqCst);
 
                 Ok(())
             })
@@ -632,18 +650,23 @@ async fn pull_index_chunks<I: IndexFile>(
 
     verify_pool.complete()?;
 
-    let elapsed = start_time.elapsed()?.as_secs_f64();
+    let elapsed = start_time.elapsed()?;
 
     let bytes = bytes.load(Ordering::SeqCst);
+    let chunk_count = chunk_count.load(Ordering::SeqCst);
 
     task_log!(
         worker,
         "downloaded {} bytes ({:.2} MiB/s)",
         bytes,
-        (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+        (bytes as f64) / (1024.0 * 1024.0 * elapsed.as_secs_f64())
     );
 
-    Ok(())
+    Ok(PullStats {
+        chunk_count,
+        bytes,
+        elapsed,
+    })
 }
 
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
@@ -677,7 +700,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
     path.push(archive_name);
@@ -685,6 +708,8 @@ async fn pull_single_archive<'a>(
     let mut tmp_path = path.clone();
     tmp_path.set_extension("tmp");
 
+    let mut pull_stats = PullStats::default();
+
     task_log!(worker, "sync archive {}", archive_name);
 
     reader
@@ -704,7 +729,7 @@ async fn pull_single_archive<'a>(
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
                 task_log!(worker, "skipping chunk sync for same datastore");
             } else {
-                pull_index_chunks(
+                let stats = pull_index_chunks(
                     worker,
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
@@ -712,6 +737,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
+                pull_stats.add(stats);
             }
         }
         ArchiveType::FixedIndex => {
@@ -724,7 +750,7 @@ async fn pull_single_archive<'a>(
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
                 task_log!(worker, "skipping chunk sync for same datastore");
             } else {
-                pull_index_chunks(
+                let stats = pull_index_chunks(
                     worker,
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
@@ -732,6 +758,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
+                pull_stats.add(stats);
             }
         }
         ArchiveType::Blob => {
@@ -743,7 +770,7 @@ async fn pull_single_archive<'a>(
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
-    Ok(())
+    Ok(pull_stats)
 }
 
 /// Actual implementation of pulling a snapshot.
@@ -760,7 +787,8 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
+    let mut pull_stats = PullStats::default();
     let mut manifest_name = snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME);
 
@@ -776,7 +804,7 @@ async fn pull_snapshot<'a>(
     {
         tmp_manifest_blob = data;
     } else {
-        return Ok(());
+        return Ok(pull_stats);
     }
 
     if manifest_name.exists() {
@@ -800,7 +828,7 @@ async fn pull_snapshot<'a>(
             };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
-            return Ok(()); // nothing changed
+            return Ok(pull_stats); // nothing changed
         }
     }
 
@@ -845,7 +873,7 @@ async fn pull_snapshot<'a>(
             }
         }
 
-        pull_single_archive(
+        let stats = pull_single_archive(
             worker,
             reader.clone(),
             snapshot,
@@ -853,6 +881,7 @@ async fn pull_snapshot<'a>(
             downloaded_chunks.clone(),
         )
         .await?;
+        pull_stats.add(stats);
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -868,7 +897,7 @@ async fn pull_snapshot<'a>(
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
 
-    Ok(())
+    Ok(pull_stats)
 }
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -880,31 +909,36 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    if is_new {
+    let pull_stats = if is_new {
         task_log!(worker, "sync snapshot {}", snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
-            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
-                snapshot.backup_ns(),
-                snapshot.as_ref(),
-                true,
-            ) {
-                task_log!(worker, "cleanup error - {}", cleanup_err);
+        match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+            Err(err) => {
+                if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
+                    snapshot.backup_ns(),
+                    snapshot.as_ref(),
+                    true,
+                ) {
+                    task_log!(worker, "cleanup error - {}", cleanup_err);
+                }
+                return Err(err);
+            }
+            Ok(pull_stats) => {
+                task_log!(worker, "sync snapshot {} done", snapshot.dir());
+                pull_stats
             }
-            return Err(err);
         }
-        task_log!(worker, "sync snapshot {} done", snapshot.dir());
     } else {
         task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
-    }
+        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
+    };
 
-    Ok(())
+    Ok(pull_stats)
 }
 
 #[derive(PartialEq, Eq)]
@@ -1009,7 +1043,7 @@ async fn pull_group(
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
     progress: &mut StoreProgress,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
@@ -1066,6 +1100,8 @@ async fn pull_group(
 
     progress.group_snapshots = list.len() as u64;
 
+    let mut pull_stats = PullStats::default();
+
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
             .target
@@ -1082,7 +1118,8 @@ async fn pull_group(
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
 
-        result?; // stop on error
+        let stats = result?; // stop on error
+        pull_stats.add(stats);
     }
 
     if params.remove_vanished {
@@ -1112,7 +1149,7 @@ async fn pull_group(
         }
     }
 
-    Ok(())
+    Ok(pull_stats)
 }
 
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1233,7 +1270,7 @@ fn check_and_remove_vanished_ns(
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
     mut params: PullParameters,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
@@ -1269,6 +1306,7 @@ pub(crate) async fn pull_store(
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
+    let mut pull_stats = PullStats::default();
 
     for namespace in namespaces {
         let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,9 +1341,11 @@ pub(crate) async fn pull_store(
         }
 
         match pull_ns(worker, &namespace, &mut params).await {
-            Ok((ns_progress, ns_errors)) => {
+            Ok((ns_progress, ns_pull_stats, ns_errors)) => {
                 errors |= ns_errors;
 
+                pull_stats.add(ns_pull_stats);
+
                 if params.max_depth != Some(0) {
                     groups += ns_progress.done_groups;
                     snapshots += ns_progress.done_snapshots;
@@ -1338,7 +1378,7 @@ pub(crate) async fn pull_store(
         bail!("sync failed with some errors.");
     }
 
-    Ok(())
+    Ok(pull_stats)
 }
 
 /// Pulls a namespace according to `params`.
@@ -1357,7 +1397,7 @@ pub(crate) async fn pull_ns(
     worker: &WorkerTask,
     namespace: &BackupNamespace,
     params: &mut PullParameters,
-) -> Result<(StoreProgress, bool), Error> {
+) -> Result<(StoreProgress, PullStats, bool), Error> {
     let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     list.sort_unstable_by(|a, b| {
@@ -1389,6 +1429,7 @@ pub(crate) async fn pull_ns(
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
+    let mut pull_stats = PullStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
@@ -1429,10 +1470,14 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
-        {
-            task_log!(worker, "sync group {} failed - {}", &group, err,);
-            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(worker, params, namespace, &group, &mut progress).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    task_log!(worker, "sync group {} failed - {}", &group, err,);
+                    errors = true; // do not stop here, instead continue
+                }
+            }
         }
     }
 
@@ -1479,5 +1524,5 @@ pub(crate) async fn pull_ns(
         };
     }
 
-    Ok((progress, errors))
+    Ok((progress, pull_stats, errors))
 }
-- 
2.39.2





More information about the pbs-devel mailing list