[pbs-devel] [PATCH proxmox-backup 1/3] server: sync: return `PullStats` for pull related methods
Christian Ebner
c.ebner at proxmox.com
Wed Mar 6 15:11:51 CET 2024
Return basic statistics on pull related methods via `PullStats` objects,
in order to construct a global summary for sync jobs.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/server/pull.rs | 125 ++++++++++++++++++++++++++++++---------------
1 file changed, 85 insertions(+), 40 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5a4ba806..7d745c77 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::StatusCode;
@@ -64,6 +64,21 @@ pub(crate) struct LocalSource {
ns: BackupNamespace,
}
+#[derive(Default)]
+pub(crate) struct PullStats {
+ pub(crate) chunk_count: usize,
+ pub(crate) bytes: usize,
+ pub(crate) elapsed: Duration,
+}
+
+impl PullStats {
+ fn add(&mut self, rhs: PullStats) {
+ self.chunk_count += rhs.chunk_count;
+ self.bytes += rhs.bytes;
+ self.elapsed += rhs.elapsed;
+ }
+}
+
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -559,7 +574,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
@@ -594,12 +609,14 @@ async fn pull_index_chunks<I: IndexFile>(
let verify_and_write_channel = verify_pool.channel();
let bytes = Arc::new(AtomicUsize::new(0));
+ let chunk_count = Arc::new(AtomicUsize::new(0));
stream
.map(|info| {
let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone();
let bytes = Arc::clone(&bytes);
+ let chunk_count = Arc::clone(&chunk_count);
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
@@ -620,6 +637,7 @@ async fn pull_index_chunks<I: IndexFile>(
})?;
bytes.fetch_add(raw_size, Ordering::SeqCst);
+ chunk_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
@@ -632,18 +650,23 @@ async fn pull_index_chunks<I: IndexFile>(
verify_pool.complete()?;
- let elapsed = start_time.elapsed()?.as_secs_f64();
+ let elapsed = start_time.elapsed()?;
let bytes = bytes.load(Ordering::SeqCst);
+ let chunk_count = chunk_count.load(Ordering::SeqCst);
task_log!(
worker,
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
- (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+ (bytes as f64) / (1024.0 * 1024.0 * elapsed.as_secs_f64())
);
- Ok(())
+ Ok(PullStats {
+ chunk_count,
+ bytes,
+ elapsed,
+ })
}
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
@@ -677,7 +700,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -685,6 +708,8 @@ async fn pull_single_archive<'a>(
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
+ let mut pull_stats = PullStats::default();
+
task_log!(worker, "sync archive {}", archive_name);
reader
@@ -704,7 +729,7 @@ async fn pull_single_archive<'a>(
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
@@ -712,6 +737,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
@@ -724,7 +750,7 @@ async fn pull_single_archive<'a>(
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
@@ -732,6 +758,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::Blob => {
@@ -743,7 +770,7 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- Ok(())
+ Ok(pull_stats)
}
/// Actual implementation of pulling a snapshot.
@@ -760,7 +787,8 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
+ let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
@@ -776,7 +804,7 @@ async fn pull_snapshot<'a>(
{
tmp_manifest_blob = data;
} else {
- return Ok(());
+ return Ok(pull_stats);
}
if manifest_name.exists() {
@@ -800,7 +828,7 @@ async fn pull_snapshot<'a>(
};
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(()); // nothing changed
+ return Ok(pull_stats); // nothing changed
}
}
@@ -845,7 +873,7 @@ async fn pull_snapshot<'a>(
}
}
- pull_single_archive(
+ let stats = pull_single_archive(
worker,
reader.clone(),
snapshot,
@@ -853,6 +881,7 @@ async fn pull_snapshot<'a>(
downloaded_chunks.clone(),
)
.await?;
+ pull_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -868,7 +897,7 @@ async fn pull_snapshot<'a>(
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -880,31 +909,36 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- if is_new {
+ let pull_stats = if is_new {
task_log!(worker, "sync snapshot {}", snapshot.dir());
- if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
- if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
- snapshot.backup_ns(),
- snapshot.as_ref(),
- true,
- ) {
- task_log!(worker, "cleanup error - {}", cleanup_err);
+ match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+ Err(err) => {
+ if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
+ snapshot.backup_ns(),
+ snapshot.as_ref(),
+ true,
+ ) {
+ task_log!(worker, "cleanup error - {}", cleanup_err);
+ }
+ return Err(err);
+ }
+ Ok(pull_stats) => {
+ task_log!(worker, "sync snapshot {} done", snapshot.dir());
+ pull_stats
}
- return Err(err);
}
- task_log!(worker, "sync snapshot {} done", snapshot.dir());
} else {
task_log!(worker, "re-sync snapshot {}", snapshot.dir());
- pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
- }
+ pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
+ };
- Ok(())
+ Ok(pull_stats)
}
#[derive(PartialEq, Eq)]
@@ -1009,7 +1043,7 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1066,6 +1100,8 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
+ let mut pull_stats = PullStats::default();
+
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
@@ -1082,7 +1118,8 @@ async fn pull_group(
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
- result?; // stop on error
+ let stats = result?; // stop on error
+ pull_stats.add(stats);
}
if params.remove_vanished {
@@ -1112,7 +1149,7 @@ async fn pull_group(
}
}
- Ok(())
+ Ok(pull_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1233,7 +1270,7 @@ fn check_and_remove_vanished_ns(
pub(crate) async fn pull_store(
worker: &WorkerTask,
mut params: PullParameters,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
@@ -1269,6 +1306,7 @@ pub(crate) async fn pull_store(
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
+ let mut pull_stats = PullStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,9 +1341,11 @@ pub(crate) async fn pull_store(
}
match pull_ns(worker, &namespace, &mut params).await {
- Ok((ns_progress, ns_errors)) => {
+ Ok((ns_progress, ns_pull_stats, ns_errors)) => {
errors |= ns_errors;
+ pull_stats.add(ns_pull_stats);
+
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
snapshots += ns_progress.done_snapshots;
@@ -1338,7 +1378,7 @@ pub(crate) async fn pull_store(
bail!("sync failed with some errors.");
}
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a namespace according to `params`.
@@ -1357,7 +1397,7 @@ pub(crate) async fn pull_ns(
worker: &WorkerTask,
namespace: &BackupNamespace,
params: &mut PullParameters,
-) -> Result<(StoreProgress, bool), Error> {
+) -> Result<(StoreProgress, PullStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
list.sort_unstable_by(|a, b| {
@@ -1389,6 +1429,7 @@ pub(crate) async fn pull_ns(
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let mut pull_stats = PullStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
@@ -1429,10 +1470,14 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
- {
- task_log!(worker, "sync group {} failed - {}", &group, err,);
- errors = true; // do not stop here, instead continue
+ } else {
+ match pull_group(worker, params, namespace, &group, &mut progress).await {
+ Ok(stats) => pull_stats.add(stats),
+ Err(err) => {
+ task_log!(worker, "sync group {} failed - {}", &group, err,);
+ errors = true; // do not stop here, instead continue
+ }
+ }
}
}
@@ -1479,5 +1524,5 @@ pub(crate) async fn pull_ns(
};
}
- Ok((progress, errors))
+ Ok((progress, pull_stats, errors))
}
--
2.39.2
More information about the pbs-devel
mailing list