[pbs-devel] [PATCH v3 proxmox-backup 02/33] server: sync: move sync related stats to common module

Christian Ebner c.ebner at proxmox.com
Thu Sep 12 16:32:51 CEST 2024


Move and rename the `PullStats` to `SyncStats` as well as moving the
`RemovedVanishedStats` to make them reusable for sync operations in
push direction as well as pull direction.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- no changes

 src/server/mod.rs  |   1 +
 src/server/pull.rs | 121 ++++++++++++++-------------------------------
 src/server/sync.rs |  51 +++++++++++++++++++
 3 files changed, 89 insertions(+), 84 deletions(-)
 create mode 100644 src/server/sync.rs

diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7f845e5b8..468847c2e 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -34,6 +34,7 @@ pub use report::*;
 pub mod auth;
 
 pub(crate) mod pull;
+pub(crate) mod sync;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
     let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index de1bb5d5f..4a97bfaa3 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
@@ -34,6 +34,7 @@ use pbs_datastore::{
 };
 use pbs_tools::sha::sha256;
 
+use super::sync::{RemovedVanishedStats, SyncStats};
 use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
@@ -64,54 +65,6 @@ pub(crate) struct LocalSource {
     ns: BackupNamespace,
 }
 
-#[derive(Default)]
-pub(crate) struct RemovedVanishedStats {
-    pub(crate) groups: usize,
-    pub(crate) snapshots: usize,
-    pub(crate) namespaces: usize,
-}
-
-impl RemovedVanishedStats {
-    fn add(&mut self, rhs: RemovedVanishedStats) {
-        self.groups += rhs.groups;
-        self.snapshots += rhs.snapshots;
-        self.namespaces += rhs.namespaces;
-    }
-}
-
-#[derive(Default)]
-pub(crate) struct PullStats {
-    pub(crate) chunk_count: usize,
-    pub(crate) bytes: usize,
-    pub(crate) elapsed: Duration,
-    pub(crate) removed: Option<RemovedVanishedStats>,
-}
-
-impl From<RemovedVanishedStats> for PullStats {
-    fn from(removed: RemovedVanishedStats) -> Self {
-        Self {
-            removed: Some(removed),
-            ..Default::default()
-        }
-    }
-}
-
-impl PullStats {
-    fn add(&mut self, rhs: PullStats) {
-        self.chunk_count += rhs.chunk_count;
-        self.bytes += rhs.bytes;
-        self.elapsed += rhs.elapsed;
-
-        if let Some(rhs_removed) = rhs.removed {
-            if let Some(ref mut removed) = self.removed {
-                removed.add(rhs_removed);
-            } else {
-                self.removed = Some(rhs_removed);
-            }
-        }
-    }
-}
-
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -576,7 +529,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
     let start_time = SystemTime::now();
@@ -663,7 +616,7 @@ async fn pull_index_chunks<I: IndexFile>(
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
 
-    Ok(PullStats {
+    Ok(SyncStats {
         chunk_count,
         bytes,
         elapsed,
@@ -701,7 +654,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
     path.push(archive_name);
@@ -709,7 +662,7 @@ async fn pull_single_archive<'a>(
     let mut tmp_path = path.clone();
     tmp_path.set_extension("tmp");
 
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     info!("sync archive {archive_name}");
 
@@ -735,7 +688,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
-                pull_stats.add(stats);
+                sync_stats.add(stats);
             }
         }
         ArchiveType::FixedIndex => {
@@ -755,7 +708,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
-                pull_stats.add(stats);
+                sync_stats.add(stats);
             }
         }
         ArchiveType::Blob => {
@@ -767,7 +720,7 @@ async fn pull_single_archive<'a>(
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Actual implementation of pulling a snapshot.
@@ -783,8 +736,8 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
-    let mut pull_stats = PullStats::default();
+) -> Result<SyncStats, Error> {
+    let mut sync_stats = SyncStats::default();
     let mut manifest_name = snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME);
 
@@ -800,7 +753,7 @@ async fn pull_snapshot<'a>(
     {
         tmp_manifest_blob = data;
     } else {
-        return Ok(pull_stats);
+        return Ok(sync_stats);
     }
 
     if manifest_name.exists() {
@@ -822,7 +775,7 @@ async fn pull_snapshot<'a>(
             };
             info!("no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
-            return Ok(pull_stats); // nothing changed
+            return Ok(sync_stats); // nothing changed
         }
     }
 
@@ -869,7 +822,7 @@ async fn pull_snapshot<'a>(
 
         let stats =
             pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
-        pull_stats.add(stats);
+        sync_stats.add(stats);
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -883,7 +836,7 @@ async fn pull_snapshot<'a>(
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -894,12 +847,12 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    let pull_stats = if is_new {
+    let sync_stats = if is_new {
         info!("sync snapshot {}", snapshot.dir());
 
         match pull_snapshot(reader, snapshot, downloaded_chunks).await {
@@ -913,9 +866,9 @@ async fn pull_snapshot_from<'a>(
                 }
                 return Err(err);
             }
-            Ok(pull_stats) => {
+            Ok(sync_stats) => {
                 info!("sync snapshot {} done", snapshot.dir());
-                pull_stats
+                sync_stats
             }
         }
     } else {
@@ -923,7 +876,7 @@ async fn pull_snapshot_from<'a>(
         pull_snapshot(reader, snapshot, downloaded_chunks).await?
     };
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 #[derive(PartialEq, Eq)]
@@ -1027,7 +980,7 @@ async fn pull_group(
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
     progress: &mut StoreProgress,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
@@ -1084,7 +1037,7 @@ async fn pull_group(
 
     progress.group_snapshots = list.len() as u64;
 
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
@@ -1102,7 +1055,7 @@ async fn pull_group(
         info!("percentage done: {progress}");
 
         let stats = result?; // stop on error
-        pull_stats.add(stats);
+        sync_stats.add(stats);
     }
 
     if params.remove_vanished {
@@ -1128,7 +1081,7 @@ async fn pull_group(
                 .target
                 .store
                 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
-            pull_stats.add(PullStats::from(RemovedVanishedStats {
+            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                 snapshots: 1,
                 groups: 0,
                 namespaces: 0,
@@ -1136,7 +1089,7 @@ async fn pull_group(
         }
     }
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1253,7 +1206,7 @@ fn check_and_remove_vanished_ns(
 /// - remote namespaces are filtered by remote
 /// - creation and removal of sub-NS checked here
 /// - access to sub-NS checked here
-pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats, Error> {
+pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats, Error> {
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
@@ -1286,7 +1239,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     for namespace in namespaces {
         let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1310,10 +1263,10 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
         }
 
         match pull_ns(&namespace, &mut params).await {
-            Ok((ns_progress, ns_pull_stats, ns_errors)) => {
+            Ok((ns_progress, ns_sync_stats, ns_errors)) => {
                 errors |= ns_errors;
 
-                pull_stats.add(ns_pull_stats);
+                sync_stats.add(ns_sync_stats);
 
                 if params.max_depth != Some(0) {
                     groups += ns_progress.done_groups;
@@ -1342,14 +1295,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     if params.remove_vanished {
         let (has_errors, stats) = check_and_remove_vanished_ns(&params, synced_ns)?;
         errors |= has_errors;
-        pull_stats.add(PullStats::from(stats));
+        sync_stats.add(SyncStats::from(stats));
     }
 
     if errors {
         bail!("sync failed with some errors.");
     }
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Pulls a namespace according to `params`.
@@ -1367,7 +1320,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
 pub(crate) async fn pull_ns(
     namespace: &BackupNamespace,
     params: &mut PullParameters,
-) -> Result<(StoreProgress, PullStats, bool), Error> {
+) -> Result<(StoreProgress, SyncStats, bool), Error> {
     let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     list.sort_unstable_by(|a, b| {
@@ -1397,7 +1350,7 @@ pub(crate) async fn pull_ns(
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
@@ -1432,7 +1385,7 @@ pub(crate) async fn pull_ns(
             errors = true; // do not stop here, instead continue
         } else {
             match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
+                Ok(stats) => sync_stats.add(stats),
                 Err(err) => {
                     info!("sync group {} failed - {err}", &group);
                     errors = true; // do not stop here, instead continue
@@ -1466,13 +1419,13 @@ pub(crate) async fn pull_ns(
                     Ok(stats) => {
                         if !stats.all_removed() {
                             info!("kept some protected snapshots of group '{local_group}'");
-                            pull_stats.add(PullStats::from(RemovedVanishedStats {
+                            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 0,
                                 namespaces: 0,
                             }));
                         } else {
-                            pull_stats.add(PullStats::from(RemovedVanishedStats {
+                            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 1,
                                 namespaces: 0,
@@ -1493,5 +1446,5 @@ pub(crate) async fn pull_ns(
         };
     }
 
-    Ok((progress, pull_stats, errors))
+    Ok((progress, sync_stats, errors))
 }
diff --git a/src/server/sync.rs b/src/server/sync.rs
new file mode 100644
index 000000000..5f143ef63
--- /dev/null
+++ b/src/server/sync.rs
@@ -0,0 +1,51 @@
+//! Sync datastore contents from source to target, either in push or pull direction
+
+use std::time::Duration;
+
+#[derive(Default)]
+pub(crate) struct RemovedVanishedStats {
+    pub(crate) groups: usize,
+    pub(crate) snapshots: usize,
+    pub(crate) namespaces: usize,
+}
+
+impl RemovedVanishedStats {
+    pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) {
+        self.groups += rhs.groups;
+        self.snapshots += rhs.snapshots;
+        self.namespaces += rhs.namespaces;
+    }
+}
+
+#[derive(Default)]
+pub(crate) struct SyncStats {
+    pub(crate) chunk_count: usize,
+    pub(crate) bytes: usize,
+    pub(crate) elapsed: Duration,
+    pub(crate) removed: Option<RemovedVanishedStats>,
+}
+
+impl From<RemovedVanishedStats> for SyncStats {
+    fn from(removed: RemovedVanishedStats) -> Self {
+        Self {
+            removed: Some(removed),
+            ..Default::default()
+        }
+    }
+}
+
+impl SyncStats {
+    pub(crate) fn add(&mut self, rhs: SyncStats) {
+        self.chunk_count += rhs.chunk_count;
+        self.bytes += rhs.bytes;
+        self.elapsed += rhs.elapsed;
+
+        if let Some(rhs_removed) = rhs.removed {
+            if let Some(ref mut removed) = self.removed {
+                removed.add(rhs_removed);
+            } else {
+                self.removed = Some(rhs_removed);
+            }
+        }
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list