[pbs-devel] [PATCH proxmox-backup 3/3] client: progress log: allow to specify backup log interval

Christian Ebner c.ebner at proxmox.com
Mon Oct 21 14:55:22 CEST 2024


Adds the optional parameter `progress-log-interval` which allows to
specify the interval to use for backup progress log output.

The progress can be specified as time based value given as a
`TimeSpan` compatible value or as size based progress log, giving a
`HumanByte` compatible value. The variant is switched by the
specified prefix. If no variant prefix is given, the default time
based variant is used.
Minimum values of 1s and 100MiB are set for the corresponding variant
to protect from excessive output.

Further, if the parameter is set to `none` no progress log output is
generated.

Examplary client invocations are:

- no progress logging:
  `proxmox-backup-client backup root.pxar:/ --progress-log-interval=none`
- time based progress logging with 1m 30s interval
  `proxmox-backup-client backup root.pxar:/ --progress-log-interval="1m 30s"`
  `proxmox-backup-client backup root.pxar:/ --progress-log-interval="time:1m 30s"`
- size based progress logging with 512MiB interval
  `proxmox-backup-client backup root.pxar:/ --progress-log-interval="size:512MiB"`

Without providing the optional parameter, the current default is set
to 'time:1m'.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-client/src/backup_writer.rs   | 81 ++++++++++++++++++++++++-------
 proxmox-backup-client/src/main.rs | 27 +++++++++--
 2 files changed, 88 insertions(+), 20 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 37ee39e2e..824d4be70 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -12,7 +12,7 @@ use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
 use tokio_stream::wrappers::ReceiverStream;
 
-use pbs_api_types::{BackupDir, BackupNamespace};
+use pbs_api_types::{BackupDir, BackupNamespace, LogInterval};
 use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -53,6 +53,7 @@ pub struct UploadOptions {
     pub compress: bool,
     pub encrypt: bool,
     pub fixed_size: Option<u64>,
+    pub progress_log_interval: Option<LogInterval>,
 }
 
 struct UploadStats {
@@ -359,6 +360,7 @@ impl BackupWriter {
             options.compress,
             injections,
             archive,
+            options.progress_log_interval,
         )
         .await?;
 
@@ -653,6 +655,7 @@ impl BackupWriter {
         compress: bool,
         injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
         archive: &str,
+        progress_log_interval: Option<LogInterval>,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let total_chunks = Arc::new(AtomicUsize::new(0));
         let total_chunks2 = total_chunks.clone();
@@ -671,6 +674,8 @@ impl BackupWriter {
         let injected_len = Arc::new(AtomicUsize::new(0));
         let injected_len2 = injected_len.clone();
         let uploaded_len = Arc::new(AtomicUsize::new(0));
+        let uploaded_len2 = uploaded_len.clone();
+        let previous_byte_fraction = Arc::new(AtomicUsize::new(0));
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
@@ -684,23 +689,34 @@ impl BackupWriter {
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
-        let progress_handle = if archive.ends_with(".img")
-            || archive.ends_with(".pxar")
-            || archive.ends_with(".ppxar")
-        {
-            Some(tokio::spawn(async move {
-                loop {
-                    tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-                    progress_log(
-                        stream_len3.load(Ordering::SeqCst),
-                        uploaded_len.load(Ordering::SeqCst),
-                        &start_time,
-                    );
+        let mut progress_handle = None;
+        let mut progress_byte_interval = 0;
+        match progress_log_interval {
+            Some(LogInterval::Time(ref time_span)) => {
+                if archive.ends_with(".img")
+                    || archive.ends_with(".pxar")
+                    || archive.ends_with(".ppxar")
+                {
+                    let duration = std::primitive::f64::from(time_span.clone());
+                    progress_handle = Some(tokio::spawn(async move {
+                        loop {
+                            tokio::time::sleep(tokio::time::Duration::from_secs_f64(duration))
+                                .await;
+                            progress_log(
+                                stream_len3.load(Ordering::SeqCst),
+                                uploaded_len.load(Ordering::SeqCst),
+                                &start_time,
+                            )
+                        }
+                    }))
                 }
-            }))
-        } else {
-            None
-        };
+            }
+            Some(LogInterval::Size(ref human_byte)) => {
+                progress_byte_interval = human_byte.as_u64() as usize
+            }
+            Some(LogInterval::None) => {}
+            None => {}
+        }
 
         stream
             .inject_reused_chunks(injections, stream_len.clone())
@@ -717,6 +733,15 @@ impl BackupWriter {
                     for chunk in chunks {
                         let offset =
                             stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
+
+                        progress_log_by_byte_interval(
+                            progress_byte_interval,
+                            (offset + chunk.size()) as usize,
+                            &previous_byte_fraction,
+                            &uploaded_len2,
+                            &start_time,
+                        );
+
                         reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
                         injected_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
                         let digest = chunk.digest();
@@ -734,6 +759,14 @@ impl BackupWriter {
                     total_chunks.fetch_add(1, Ordering::SeqCst);
                     let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
 
+                    progress_log_by_byte_interval(
+                        progress_byte_interval,
+                        offset as usize + chunk_len,
+                        &previous_byte_fraction,
+                        &uploaded_len2,
+                        &start_time,
+                    );
+
                     let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
 
                     if let Some(ref crypt_config) = crypt_config {
@@ -922,6 +955,20 @@ impl BackupWriter {
     }
 }
 
+#[inline(always)]
+fn progress_log_by_byte_interval(
+    interval: usize,
+    pos: usize,
+    previous: &Arc<AtomicUsize>,
+    uploaded: &Arc<AtomicUsize>,
+    start_time: &Instant,
+) {
+    if interval > 0 && pos / interval > previous.load(Ordering::SeqCst) {
+        previous.store(pos / interval, Ordering::SeqCst);
+        progress_log(pos, uploaded.load(Ordering::SeqCst), start_time);
+    }
+}
+
 #[inline(always)]
 fn progress_log(size: usize, size_uploaded: usize, start_time: &Instant) {
     let size = HumanByte::from(size);
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index e4034aa99..bf2ebeded 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -26,9 +26,9 @@ use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 
 use pbs_api_types::{
     Authid, BackupDir, BackupGroup, BackupNamespace, BackupPart, BackupType, ClientRateLimitConfig,
-    CryptMode, Fingerprint, GroupListItem, PruneJobOptions, PruneListItem, RateLimitConfig,
-    SnapshotListItem, StorageStatus, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA,
+    CryptMode, Fingerprint, GroupListItem, LogInterval, PruneJobOptions, PruneListItem,
+    RateLimitConfig, SnapshotListItem, StorageStatus, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
 };
 use pbs_client::catalog_shell::Shell;
 use pbs_client::pxar::{ErrorHandler as PxarErrorHandler, MetadataArchiveReader, PxarPrevRef};
@@ -734,6 +734,10 @@ fn spawn_catalog_upload(
                optional: true,
                default: false,
            },
+           "progress-log-interval": {
+               type: LogInterval,
+               optional: true,
+           },
        }
    }
 )]
@@ -746,6 +750,7 @@ async fn create_backup(
     dry_run: bool,
     skip_e2big_xattr: bool,
     limit: ClientRateLimitConfig,
+    progress_log_interval: Option<LogInterval>,
     _info: &ApiMethod,
     _rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
@@ -781,6 +786,20 @@ async fn create_backup(
 
     let empty = Vec::new();
     let exclude_args = param["exclude"].as_array().unwrap_or(&empty);
+    let progress_log_interval = progress_log_interval.unwrap_or_else(|| LogInterval::default());
+    match progress_log_interval {
+        LogInterval::Time(ref time_span) => {
+            if !(std::primitive::f64::from(time_span.clone()) >= 1.0) {
+                bail!("minimum progress log time interval is 1s");
+            }
+        }
+        LogInterval::Size(ref human_byte) => {
+            if !(human_byte.as_u64() >= 100 * 1024 * 1024) {
+                bail!("minimum progress log size interval is 100 MiB");
+            }
+        }
+        LogInterval::None => {}
+    }
 
     let mut pattern_list = Vec::with_capacity(exclude_args.len());
     for entry in exclude_args {
@@ -1132,6 +1151,7 @@ async fn create_backup(
                     previous_manifest: previous_manifest.clone(),
                     compress: true,
                     encrypt: crypto.mode == CryptMode::Encrypt,
+                    progress_log_interval: Some(progress_log_interval.clone()),
                     ..UploadOptions::default()
                 };
 
@@ -1169,6 +1189,7 @@ async fn create_backup(
                     fixed_size: Some(size),
                     compress: true,
                     encrypt: crypto.mode == CryptMode::Encrypt,
+                    progress_log_interval: Some(progress_log_interval.clone()),
                 };
 
                 let stats =
-- 
2.39.5





More information about the pbs-devel mailing list