[pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output

Christian Ebner c.ebner at proxmox.com
Thu Jul 25 12:19:22 CEST 2024


In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 134 insertions(+), 31 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
     pub(crate) bytes: usize,
     pub(crate) elapsed: Duration,
     pub(crate) removed: Option<RemovedVanishedStats>,
+    pub(crate) log_buffer: Vec<String>,
 }
 
 impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
 }
 
 impl PullStats {
-    fn add(&mut self, rhs: PullStats) {
+    fn add(&mut self, mut rhs: PullStats) {
         self.chunk_count += rhs.chunk_count;
         self.bytes += rhs.bytes;
         self.elapsed += rhs.elapsed;
+        self.log_buffer.append(&mut rhs.log_buffer);
 
         if let Some(rhs_removed) = rhs.removed {
             if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
             if let Err(err) = std::fs::rename(&tmp_path, to_path) {
                 bail!("Atomic rename file {:?} failed - {}", to_path, err);
             }
-            info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
         }
 
         Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = bytes.load(Ordering::SeqCst);
     let chunk_count = chunk_count.load(Ordering::SeqCst);
 
-    info!(
+    let mut log_buffer = Vec::new();
+    let msg = format!(
         "downloaded {} ({}/s)",
         HumanByte::from(bytes),
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
+    log_info_buffer(msg, buffer_logs, &mut log_buffer);
 
     Ok(PullStats {
         chunk_count,
         bytes,
         elapsed,
         removed: None,
+        log_buffer,
     })
 }
 
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
 
     let mut pull_stats = PullStats::default();
 
-    info!("sync archive {archive_name}");
+    log_info_buffer(
+        format!("sync archive {archive_name}"),
+        buffer_logs,
+        &mut pull_stats.log_buffer,
+    );
 
     reader.load_file_into(archive_name, &tmp_path).await?;
 
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let mut pull_stats = PullStats::default();
     let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
                 reader.try_download_client_log(&client_log_name).await?;
+                log_info_buffer(
+                    format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             };
-            info!("no data changes");
+            log_info_buffer(
+                "no data changes".to_string(),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(pull_stats); // nothing changed
         }
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
             }
         }
 
-        let stats =
-            pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+        let stats = pull_single_archive(
+            reader.clone(),
+            snapshot,
+            item,
+            downloaded_chunks.clone(),
+            buffer_logs,
+        )
+        .await?;
         pull_stats.add(stats);
     }
 
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
 
     if !client_log_name.exists() {
         reader.try_download_client_log(&client_log_name).await?;
+        log_info_buffer(
+            format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    let pull_stats = if is_new {
-        info!("sync snapshot {}", snapshot.dir());
+    let mut pull_stats = PullStats::default();
+    if is_new {
+        log_info_buffer(
+            format!("sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
 
-        match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+        match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
             Err(err) => {
                 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                     snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
                 }
                 return Err(err);
             }
-            Ok(pull_stats) => {
-                info!("sync snapshot {} done", snapshot.dir());
-                pull_stats
+            Ok(stats) => {
+                pull_stats.add(stats);
+                log_info_buffer(
+                    format!("sync snapshot {}", snapshot.dir()),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             }
         }
     } else {
-        info!("re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(reader, snapshot, downloaded_chunks).await?
+        log_info_buffer(
+            format!("re-sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
+        let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+        pull_stats.add(stats);
     };
 
     Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
         .last_successful_backup(&target_ns, group)?
         .unwrap_or(i64::MIN);
 
+    let mut pull_stats = PullStats::default();
+    let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
     let list: Vec<BackupDir> = raw_list
         .into_iter()
         .enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
                 already_synced_skip_info.update(dir.time);
                 return false;
             } else if already_synced_skip_info.count > 0 {
-                info!("{already_synced_skip_info}");
+                log_info_buffer(
+                    format!("{already_synced_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 already_synced_skip_info.reset();
                 return true;
             }
@@ -1072,7 +1146,11 @@ async fn pull_group(
                 transfer_last_skip_info.update(dir.time);
                 return false;
             } else if transfer_last_skip_info.count > 0 {
-                info!("{transfer_last_skip_info}");
+                log_info_buffer(
+                    format!("{transfer_last_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 transfer_last_skip_info.reset();
             }
             true
@@ -1088,8 +1166,6 @@ async fn pull_group(
         progress.group_snapshots = list.len() as u64;
     }
 
-    let mut pull_stats = PullStats::default();
-
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
             .target
@@ -1100,12 +1176,17 @@ async fn pull_group(
             .source
             .reader(source_namespace, &from_snapshot)
             .await?;
-        let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
 
         {
             let mut progress = progress.lock().unwrap();
             progress.done_snapshots = pos as u64 + 1;
-            info!("percentage done: {progress}");
+            log_info_buffer(
+                format!("percentage done: {progress}"),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
         }
 
         let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
                 continue;
             }
             if snapshot.is_protected() {
-                info!(
-                    "don't delete vanished snapshot {} (protected)",
-                    snapshot.dir()
+                log_info_buffer(
+                    format!(
+                        "don't delete vanished snapshot {} (protected)",
+                        snapshot.dir()
+                    ),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
                 );
                 continue;
             }
-            info!("delete vanished snapshot {}", snapshot.dir());
+            log_info_buffer(
+                format!("delete vanished snapshot {}", snapshot.dir()),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             params
                 .target
                 .store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
     let mut pull_stats = PullStats::default();
     // poll to initiate tasks, queue another remaining tasks for each finished one
     while let Some(result) = pull_group_tasks.next().await {
-        let (progress, stats, has_errors) = result?;
+        let (progress, mut stats, has_errors) = result?;
         errors |= has_errors;
+        // Generate log output
+        for log_line in stats.log_buffer.iter() {
+            info!("{log_line}");
+        }
+        // clear log buffer before adding, don't need the logs anymore
+        stats.log_buffer.clear();
         pull_stats.add(stats);
         store_progress.done_groups += progress.done_groups;
         store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
 
     Ok((store_progress, pull_stats, errors))
 }
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+    if buffer_logs {
+        buffer.push(msg);
+    } else {
+        info!("{msg}");
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list