[pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output
Christian Ebner
c.ebner at proxmox.com
Thu Jul 25 12:19:22 CEST 2024
In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
1 file changed, 134 insertions(+), 31 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
pub(crate) bytes: usize,
pub(crate) elapsed: Duration,
pub(crate) removed: Option<RemovedVanishedStats>,
+ pub(crate) log_buffer: Vec<String>,
}
impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
}
impl PullStats {
- fn add(&mut self, rhs: PullStats) {
+ fn add(&mut self, mut rhs: PullStats) {
self.chunk_count += rhs.chunk_count;
self.bytes += rhs.bytes;
self.elapsed += rhs.elapsed;
+ self.log_buffer.append(&mut rhs.log_buffer);
if let Some(rhs_removed) = rhs.removed {
if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
if let Err(err) = std::fs::rename(&tmp_path, to_path) {
bail!("Atomic rename file {:?} failed - {}", to_path, err);
}
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
}
Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst);
let chunk_count = chunk_count.load(Ordering::SeqCst);
- info!(
+ let mut log_buffer = Vec::new();
+ let msg = format!(
"downloaded {} ({}/s)",
HumanByte::from(bytes),
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
+ log_info_buffer(msg, buffer_logs, &mut log_buffer);
Ok(PullStats {
chunk_count,
bytes,
elapsed,
removed: None,
+ log_buffer,
})
}
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
let mut pull_stats = PullStats::default();
- info!("sync archive {archive_name}");
+ log_info_buffer(
+ format!("sync archive {archive_name}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
reader.load_file_into(archive_name, &tmp_path).await?;
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
- info!("no data changes");
+ log_info_buffer(
+ "no data changes".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(pull_stats); // nothing changed
}
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
}
}
- let stats =
- pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+ let stats = pull_single_archive(
+ reader.clone(),
+ snapshot,
+ item,
+ downloaded_chunks.clone(),
+ buffer_logs,
+ )
+ .await?;
pull_stats.add(stats);
}
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
snapshot
.cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- let pull_stats = if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ let mut pull_stats = PullStats::default();
+ if is_new {
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
- match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+ match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
Err(err) => {
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
- Ok(pull_stats) => {
- info!("sync snapshot {} done", snapshot.dir());
- pull_stats
+ Ok(stats) => {
+ pull_stats.add(stats);
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
} else {
- info!("re-sync snapshot {}", snapshot.dir());
- pull_snapshot(reader, snapshot, downloaded_chunks).await?
+ log_info_buffer(
+ format!("re-sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
+ let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+ pull_stats.add(stats);
};
Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
.last_successful_backup(&target_ns, group)?
.unwrap_or(i64::MIN);
+ let mut pull_stats = PullStats::default();
+ let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
let list: Vec<BackupDir> = raw_list
.into_iter()
.enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
already_synced_skip_info.update(dir.time);
return false;
} else if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_info_buffer(
+ format!("{already_synced_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
already_synced_skip_info.reset();
return true;
}
@@ -1072,7 +1146,11 @@ async fn pull_group(
transfer_last_skip_info.update(dir.time);
return false;
} else if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_info_buffer(
+ format!("{transfer_last_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
transfer_last_skip_info.reset();
}
true
@@ -1088,8 +1166,6 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
}
- let mut pull_stats = PullStats::default();
-
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
@@ -1100,12 +1176,17 @@ async fn pull_group(
.source
.reader(source_namespace, &from_snapshot)
.await?;
- let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+ let result =
+ pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
{
let mut progress = progress.lock().unwrap();
progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ log_info_buffer(
+ format!("percentage done: {progress}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
continue;
}
if snapshot.is_protected() {
- info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
+ log_info_buffer(
+ format!(
+ "don't delete vanished snapshot {} (protected)",
+ snapshot.dir()
+ ),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
);
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ log_info_buffer(
+ format!("delete vanished snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
params
.target
.store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
let mut pull_stats = PullStats::default();
// poll to initiate tasks, queue another remaining tasks for each finished one
while let Some(result) = pull_group_tasks.next().await {
- let (progress, stats, has_errors) = result?;
+ let (progress, mut stats, has_errors) = result?;
errors |= has_errors;
+ // Generate log output
+ for log_line in stats.log_buffer.iter() {
+ info!("{log_line}");
+ }
+ // clear log buffer before adding, don't need the logs anymore
+ stats.log_buffer.clear();
pull_stats.add(stats);
store_progress.done_groups += progress.done_groups;
store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
Ok((store_progress, pull_stats, errors))
}
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+ if buffer_logs {
+ buffer.push(msg);
+ } else {
+ info!("{msg}");
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list