[pbs-devel] [PATCH v2 proxmox-backup 4/6] server: pull: prefix log messages and add error context

Christian Ebner c.ebner at proxmox.com
Mon Jan 20 11:51:02 CET 2025


Pulling groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.

Therefore, prefix pull job log messages by the corresponding group or
snapshot and set the error context accordingly.

Also, reword some messages, inline variables in format strings and
start log lines with capital letters to get consistent output.

Example output for a sequential pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (111.223 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (19.884 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...
```

Example output for a parallel pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot vm/200/2025-01-15T12:30:06Z: start sync
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive qemu-server.conf.blob
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive drive-scsi0.img.fidx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (206.124 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (1.972 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- Prefix logs by respective context rather than buffer and write the
  log output only after finishing the group sync

 src/server/pull.rs | 110 ++++++++++++++++++++++++++-------------------
 src/server/sync.rs |   9 ++--
 2 files changed, 70 insertions(+), 49 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 27315f1ae..f2d024dab 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
 use futures::stream::FuturesUnordered;
 use futures::StreamExt;
 use proxmox_human_byte::HumanByte;
@@ -136,6 +136,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    prefix: &str,
 ) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -218,7 +219,7 @@ async fn pull_index_chunks<I: IndexFile>(
     let chunk_count = chunk_count.load(Ordering::SeqCst);
 
     info!(
-        "downloaded {} ({}/s)",
+        "{prefix}: downloaded {} ({}/s)",
         HumanByte::from(bytes),
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
@@ -262,6 +263,8 @@ async fn pull_single_archive<'a>(
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<SyncStats, Error> {
+    let prefix = format!("Snapshot {}", snapshot.dir());
+
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
     path.push(archive_name);
@@ -271,28 +274,36 @@ async fn pull_single_archive<'a>(
 
     let mut sync_stats = SyncStats::default();
 
-    info!("sync archive {archive_name}");
+    info!("{prefix}: sync archive {archive_name}");
+
+    let prefix = format!("Snapshot {}: archive {archive_name}", snapshot.dir());
 
     reader.load_file_into(archive_name, &tmp_path).await?;
 
-    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
+    let mut tmpfile = std::fs::OpenOptions::new()
+        .read(true)
+        .open(&tmp_path)
+        .context(format!("archive {archive_name}"))?;
 
     match ArchiveType::from_path(archive_name)? {
         ArchiveType::DynamicIndex => {
             let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
-                format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+                format_err!(
+                    "archive {archive_name}: unable to read dynamic index {tmp_path:?} - {err}"
+                )
             })?;
             let (csum, size) = index.compute_csum();
-            verify_archive(archive_info, &csum, size)?;
+            verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                info!("{prefix}: skipping chunk sync for same datastore");
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    &prefix,
                 )
                 .await?;
                 sync_stats.add(stats);
@@ -300,19 +311,22 @@ async fn pull_single_archive<'a>(
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
-                format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+                format_err!(
+                    "archive {archive_name}: unable to read fixed index '{tmp_path:?}' - {err}"
+                )
             })?;
             let (csum, size) = index.compute_csum();
-            verify_archive(archive_info, &csum, size)?;
+            verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                info!("{prefix}: skipping chunk sync for same datastore");
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    &prefix,
                 )
                 .await?;
                 sync_stats.add(stats);
@@ -321,11 +335,11 @@ async fn pull_single_archive<'a>(
         ArchiveType::Blob => {
             tmpfile.rewind()?;
             let (csum, size) = sha256(&mut tmpfile)?;
-            verify_archive(archive_info, &csum, size)?;
+            verify_archive(archive_info, &csum, size).context(prefix.clone())?;
         }
     }
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
-        bail!("Atomic rename file {:?} failed - {}", path, err);
+        bail!("archive {archive_name}: Atomic rename file {path:?} failed - {err}");
     }
     Ok(sync_stats)
 }
@@ -345,6 +359,7 @@ async fn pull_snapshot<'a>(
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt: bool,
 ) -> Result<SyncStats, Error> {
+    let prefix = format!("Snapshot {}", snapshot.dir());
     let mut sync_stats = SyncStats::default();
     let mut manifest_name = snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME.as_ref());
@@ -357,7 +372,8 @@ async fn pull_snapshot<'a>(
     let tmp_manifest_blob;
     if let Some(data) = reader
         .load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name)
-        .await?
+        .await
+        .context(prefix.clone())?
     {
         tmp_manifest_blob = data;
     } else {
@@ -367,21 +383,21 @@ async fn pull_snapshot<'a>(
     if manifest_name.exists() && !corrupt {
         let manifest_blob = proxmox_lang::try_block!({
             let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
-                format_err!("unable to open local manifest {manifest_name:?} - {err}")
+                format_err!("{prefix}: unable to open local manifest {manifest_name:?} - {err}")
             })?;
 
             let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
             Ok(manifest_blob)
         })
         .map_err(|err: Error| {
-            format_err!("unable to read local manifest {manifest_name:?} - {err}")
+            format_err!("{prefix}: unable to read local manifest {manifest_name:?} - {err}")
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
                 reader.try_download_client_log(&client_log_name).await?;
             };
-            info!("no data changes");
+            info!("{prefix}: no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(sync_stats); // nothing changed
         }
@@ -402,7 +418,7 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            info!("{prefix}: detected changed file {path:?} - {err}");
                         }
                     }
                 }
@@ -412,7 +428,7 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            info!("{prefix}: detected changed file {path:?} - {err}");
                         }
                     }
                 }
@@ -422,7 +438,7 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            info!("{prefix}: detected changed file {path:?} - {err}");
                         }
                     }
                 }
@@ -435,7 +451,7 @@ async fn pull_snapshot<'a>(
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
-        bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+        bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}");
     }
 
     if !client_log_name.exists() {
@@ -443,7 +459,7 @@ async fn pull_snapshot<'a>(
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
-        .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
+        .map_err(|err| format_err!("{prefix}: failed to cleanup unreferenced files - {err}"))?;
 
     Ok(sync_stats)
 }
@@ -458,12 +474,15 @@ async fn pull_snapshot_from<'a>(
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt: bool,
 ) -> Result<SyncStats, Error> {
+    let prefix = format!("Snapshot {}", snapshot.dir());
+
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
-        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())
+        .context(prefix.clone())?;
 
     let sync_stats = if is_new {
-        info!("sync snapshot {}", snapshot.dir());
+        info!("{prefix}: start sync");
 
         // this snapshot is new, so it can never be corrupt
         match pull_snapshot(reader, snapshot, downloaded_chunks, false).await {
@@ -473,22 +492,24 @@ async fn pull_snapshot_from<'a>(
                     snapshot.as_ref(),
                     true,
                 ) {
-                    info!("cleanup error - {cleanup_err}");
+                    info!("{prefix}: cleanup error - {cleanup_err}");
                 }
                 return Err(err);
             }
             Ok(sync_stats) => {
-                info!("sync snapshot {} done", snapshot.dir());
+                info!("{prefix}: sync done");
                 sync_stats
             }
         }
     } else {
         if corrupt {
-            info!("re-sync snapshot {} due to corruption", snapshot.dir());
+            info!("{prefix}: re-sync snapshot due to corruption");
         } else {
-            info!("re-sync snapshot {}", snapshot.dir());
+            info!("{prefix}: re-sync snapshot");
         }
-        pull_snapshot(reader, snapshot, downloaded_chunks, corrupt).await?
+        pull_snapshot(reader, snapshot, downloaded_chunks, corrupt)
+            .await
+            .context(prefix.clone())?
     };
 
     Ok(sync_stats)
@@ -517,6 +538,7 @@ async fn pull_group(
     group: &BackupGroup,
     store_progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, Error> {
+    let prefix = format!("Group {group}");
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
@@ -524,6 +546,7 @@ async fn pull_group(
         .source
         .list_backup_dirs(source_namespace, group)
         .await?;
+
     raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
 
     let total_amount = raw_list.len();
@@ -593,11 +616,11 @@ async fn pull_group(
         .collect();
 
     if already_synced_skip_info.count > 0 {
-        info!("{already_synced_skip_info}");
+        info!("{prefix}: {already_synced_skip_info}");
         already_synced_skip_info.reset();
     }
     if transfer_last_skip_info.count > 0 {
-        info!("{transfer_last_skip_info}");
+        info!("{prefix}: {transfer_last_skip_info}");
         transfer_last_skip_info.reset();
     }
 
@@ -645,12 +668,12 @@ async fn pull_group(
             }
             if snapshot.is_protected() {
                 info!(
-                    "don't delete vanished snapshot {} (protected)",
-                    snapshot.dir()
+                    "{prefix}: don't delete vanished snapshot {} (protected)",
+                    snapshot.dir(),
                 );
                 continue;
             }
-            info!("delete vanished snapshot {}", snapshot.dir());
+            info!("{prefix}: delete vanished snapshot {}", snapshot.dir());
             params
                 .target
                 .store
@@ -849,10 +872,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
             }
             Err(err) => {
                 errors = true;
-                info!(
-                    "Encountered errors while syncing namespace {} - {err}",
-                    &namespace,
-                );
+                info!("Encountered errors while syncing namespace {namespace} - {err}");
             }
         };
     }
@@ -877,6 +897,7 @@ async fn pull_group_do(
     target_namespace: &BackupNamespace,
     progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, ()> {
+    let prefix = format!("Group {group}");
     let (owner, _lock_guard) =
         match params
             .target
@@ -885,8 +906,7 @@ async fn pull_group_do(
         {
             Ok(result) => result,
             Err(err) => {
-                info!("sync group {group} failed - group lock failed: {err}");
-                info!("create_locked_backup_group failed");
+                info!("{prefix}: creating locked backup group failed: {err}");
                 return Err(());
             }
         };
@@ -894,7 +914,7 @@ async fn pull_group_do(
     if params.owner != owner {
         // only the owner is allowed to create additional snapshots
         info!(
-            "sync group {group} failed - owner check failed ({} != {owner})",
+            "{prefix}: owner check failed: ({} != {owner})",
             params.owner,
         );
         return Err(());
@@ -906,7 +926,7 @@ async fn pull_group_do(
             Ok(sync_stats)
         }
         Err(err) => {
-            info!("sync group {group} failed - {err}");
+            info!("{prefix}: pulling group failed: {err:#}");
             Err(())
         }
     }
@@ -939,7 +959,7 @@ pub(crate) async fn pull_ns(
     list.sort_unstable();
 
     info!(
-        "found {} groups to sync (out of {unfiltered_count} total)",
+        "Found {} groups to sync (out of {unfiltered_count} total)",
         list.len()
     );
 
@@ -991,7 +1011,7 @@ pub(crate) async fn pull_ns(
                 if !local_group.apply_filters(&params.group_filter) {
                     continue;
                 }
-                info!("delete vanished group '{local_group}'");
+                info!("Delete vanished group '{local_group}'");
                 let delete_stats_result = params
                     .target
                     .store
@@ -1000,7 +1020,7 @@ pub(crate) async fn pull_ns(
                 match delete_stats_result {
                     Ok(stats) => {
                         if !stats.all_removed() {
-                            info!("kept some protected snapshots of group '{local_group}'");
+                            info!("Kept some protected snapshots of group '{local_group}'");
                             sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 0,
@@ -1023,7 +1043,7 @@ pub(crate) async fn pull_ns(
             Ok(())
         });
         if let Err(err) = result {
-            info!("error during cleanup: {err}");
+            info!("Error during cleanup: {err}");
             errors = true;
         };
     }
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 8e5d39182..fc8173516 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -134,13 +134,13 @@ impl SyncSourceReader for RemoteSourceReader {
                 Some(HttpError { code, message }) => match *code {
                     StatusCode::NOT_FOUND => {
                         info!(
-                            "skipping snapshot {} - vanished since start of sync",
+                            "Snapshot {}: skipped because vanished since start of sync",
                             &self.dir
                         );
                         return Ok(None);
                     }
                     _ => {
-                        bail!("HTTP error {code} - {message}");
+                        bail!("Snapshot {}: HTTP error {code} - {message}", &self.dir);
                     }
                 },
                 None => {
@@ -173,7 +173,8 @@ impl SyncSourceReader for RemoteSourceReader {
                 bail!("Atomic rename file {to_path:?} failed - {err}");
             }
             info!(
-                "got backup log file {client_log_name}",
+                "Snapshot {snapshot}: got backup log file {client_log_name}",
+                snapshot = &self.dir,
                 client_log_name = client_log_name.deref()
             );
         }
@@ -381,7 +382,7 @@ impl SyncSource for RemoteSource {
                 let snapshot = item.backup;
                 // in-progress backups can't be synced
                 if item.size.is_none() {
-                    info!("skipping snapshot {snapshot} - in-progress backup");
+                    info!("Snapshot {snapshot}: skipped because backup in progress");
                     return None;
                 }
 
-- 
2.39.5





More information about the pbs-devel mailing list