[pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module
Christian Ebner
c.ebner at proxmox.com
Mon Jul 15 12:15:44 CEST 2024
Move and rename the `PullStats` to `SyncStats` as well as moving the
`RemovedVanishedStats` to make them reusable for sync operations in
push direction as well as pull direction.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/server/mod.rs | 1 +
src/server/pull.rs | 121 ++++++++++++++-------------------------------
src/server/sync.rs | 51 +++++++++++++++++++
3 files changed, 89 insertions(+), 84 deletions(-)
create mode 100644 src/server/sync.rs
diff --git a/src/server/mod.rs b/src/server/mod.rs
index d2cbc931c..3acfcc1c4 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -34,6 +34,7 @@ pub use report::*;
pub mod auth;
pub(crate) mod pull;
+pub(crate) mod sync;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 823515e9a..24422ef41 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
@@ -34,6 +34,7 @@ use pbs_datastore::{
};
use pbs_tools::sha::sha256;
+use super::sync::{RemovedVanishedStats, SyncStats};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
@@ -64,54 +65,6 @@ pub(crate) struct LocalSource {
ns: BackupNamespace,
}
-#[derive(Default)]
-pub(crate) struct RemovedVanishedStats {
- pub(crate) groups: usize,
- pub(crate) snapshots: usize,
- pub(crate) namespaces: usize,
-}
-
-impl RemovedVanishedStats {
- fn add(&mut self, rhs: RemovedVanishedStats) {
- self.groups += rhs.groups;
- self.snapshots += rhs.snapshots;
- self.namespaces += rhs.namespaces;
- }
-}
-
-#[derive(Default)]
-pub(crate) struct PullStats {
- pub(crate) chunk_count: usize,
- pub(crate) bytes: usize,
- pub(crate) elapsed: Duration,
- pub(crate) removed: Option<RemovedVanishedStats>,
-}
-
-impl From<RemovedVanishedStats> for PullStats {
- fn from(removed: RemovedVanishedStats) -> Self {
- Self {
- removed: Some(removed),
- ..Default::default()
- }
- }
-}
-
-impl PullStats {
- fn add(&mut self, rhs: PullStats) {
- self.chunk_count += rhs.chunk_count;
- self.bytes += rhs.bytes;
- self.elapsed += rhs.elapsed;
-
- if let Some(rhs_removed) = rhs.removed {
- if let Some(ref mut removed) = self.removed {
- removed.add(rhs_removed);
- } else {
- self.removed = Some(rhs_removed);
- }
- }
- }
-}
-
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -569,7 +522,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
@@ -656,7 +609,7 @@ async fn pull_index_chunks<I: IndexFile>(
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
- Ok(PullStats {
+ Ok(SyncStats {
chunk_count,
bytes,
elapsed,
@@ -694,7 +647,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -702,7 +655,7 @@ async fn pull_single_archive<'a>(
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
info!("sync archive {archive_name}");
@@ -728,7 +681,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
@@ -748,7 +701,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
}
ArchiveType::Blob => {
@@ -760,7 +713,7 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Actual implementation of pulling a snapshot.
@@ -776,8 +729,8 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
- let mut pull_stats = PullStats::default();
+) -> Result<SyncStats, Error> {
+ let mut sync_stats = SyncStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
@@ -793,7 +746,7 @@ async fn pull_snapshot<'a>(
{
tmp_manifest_blob = data;
} else {
- return Ok(pull_stats);
+ return Ok(sync_stats);
}
if manifest_name.exists() {
@@ -815,7 +768,7 @@ async fn pull_snapshot<'a>(
};
info!("no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(pull_stats); // nothing changed
+ return Ok(sync_stats); // nothing changed
}
}
@@ -862,7 +815,7 @@ async fn pull_snapshot<'a>(
let stats =
pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -876,7 +829,7 @@ async fn pull_snapshot<'a>(
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -887,12 +840,12 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- let pull_stats = if is_new {
+ let sync_stats = if is_new {
info!("sync snapshot {}", snapshot.dir());
match pull_snapshot(reader, snapshot, downloaded_chunks).await {
@@ -906,9 +859,9 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
- Ok(pull_stats) => {
+ Ok(sync_stats) => {
info!("sync snapshot {} done", snapshot.dir());
- pull_stats
+ sync_stats
}
}
} else {
@@ -916,7 +869,7 @@ async fn pull_snapshot_from<'a>(
pull_snapshot(reader, snapshot, downloaded_chunks).await?
};
- Ok(pull_stats)
+ Ok(sync_stats)
}
#[derive(PartialEq, Eq)]
@@ -1020,7 +973,7 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1077,7 +1030,7 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
@@ -1095,7 +1048,7 @@ async fn pull_group(
info!("percentage done: {progress}");
let stats = result?; // stop on error
- pull_stats.add(stats);
+ sync_stats.add(stats);
}
if params.remove_vanished {
@@ -1121,7 +1074,7 @@ async fn pull_group(
.target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: 1,
groups: 0,
namespaces: 0,
@@ -1129,7 +1082,7 @@ async fn pull_group(
}
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1246,7 +1199,7 @@ fn check_and_remove_vanished_ns(
/// - remote namespaces are filtered by remote
/// - creation and removal of sub-NS checked here
/// - access to sub-NS checked here
-pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats, Error> {
+pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
@@ -1279,7 +1232,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,10 +1256,10 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
}
match pull_ns(&namespace, &mut params).await {
- Ok((ns_progress, ns_pull_stats, ns_errors)) => {
+ Ok((ns_progress, ns_sync_stats, ns_errors)) => {
errors |= ns_errors;
- pull_stats.add(ns_pull_stats);
+ sync_stats.add(ns_sync_stats);
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
@@ -1335,14 +1288,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
if params.remove_vanished {
let (has_errors, stats) = check_and_remove_vanished_ns(¶ms, synced_ns)?;
errors |= has_errors;
- pull_stats.add(PullStats::from(stats));
+ sync_stats.add(SyncStats::from(stats));
}
if errors {
bail!("sync failed with some errors.");
}
- Ok(pull_stats)
+ Ok(sync_stats)
}
/// Pulls a namespace according to `params`.
@@ -1360,7 +1313,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
pub(crate) async fn pull_ns(
namespace: &BackupNamespace,
params: &mut PullParameters,
-) -> Result<(StoreProgress, PullStats, bool), Error> {
+) -> Result<(StoreProgress, SyncStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
list.sort_unstable_by(|a, b| {
@@ -1390,7 +1343,7 @@ pub(crate) async fn pull_ns(
}
let mut progress = StoreProgress::new(list.len() as u64);
- let mut pull_stats = PullStats::default();
+ let mut sync_stats = SyncStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
@@ -1425,7 +1378,7 @@ pub(crate) async fn pull_ns(
errors = true; // do not stop here, instead continue
} else {
match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => pull_stats.add(stats),
+ Ok(stats) => sync_stats.add(stats),
Err(err) => {
info!("sync group {} failed - {err}", &group);
errors = true; // do not stop here, instead continue
@@ -1459,13 +1412,13 @@ pub(crate) async fn pull_ns(
Ok(stats) => {
if !stats.all_removed() {
info!("kept some protected snapshots of group '{local_group}'");
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
namespaces: 0,
}));
} else {
- pull_stats.add(PullStats::from(RemovedVanishedStats {
+ sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 1,
namespaces: 0,
@@ -1486,5 +1439,5 @@ pub(crate) async fn pull_ns(
};
}
- Ok((progress, pull_stats, errors))
+ Ok((progress, sync_stats, errors))
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
new file mode 100644
index 000000000..5f143ef63
--- /dev/null
+++ b/src/server/sync.rs
@@ -0,0 +1,51 @@
+//! Sync datastore contents from source to target, either in push or pull direction
+
+use std::time::Duration;
+
+#[derive(Default)]
+pub(crate) struct RemovedVanishedStats {
+ pub(crate) groups: usize,
+ pub(crate) snapshots: usize,
+ pub(crate) namespaces: usize,
+}
+
+impl RemovedVanishedStats {
+ pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) {
+ self.groups += rhs.groups;
+ self.snapshots += rhs.snapshots;
+ self.namespaces += rhs.namespaces;
+ }
+}
+
+#[derive(Default)]
+pub(crate) struct SyncStats {
+ pub(crate) chunk_count: usize,
+ pub(crate) bytes: usize,
+ pub(crate) elapsed: Duration,
+ pub(crate) removed: Option<RemovedVanishedStats>,
+}
+
+impl From<RemovedVanishedStats> for SyncStats {
+ fn from(removed: RemovedVanishedStats) -> Self {
+ Self {
+ removed: Some(removed),
+ ..Default::default()
+ }
+ }
+}
+
+impl SyncStats {
+ pub(crate) fn add(&mut self, rhs: SyncStats) {
+ self.chunk_count += rhs.chunk_count;
+ self.bytes += rhs.bytes;
+ self.elapsed += rhs.elapsed;
+
+ if let Some(rhs_removed) = rhs.removed {
+ if let Some(ref mut removed) = self.removed {
+ removed.add(rhs_removed);
+ } else {
+ self.removed = Some(rhs_removed);
+ }
+ }
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list