[pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters

Christian Ebner c.ebner at proxmox.com
Mon Jul 15 12:15:47 CEST 2024


In preparation for push support in sync jobs.

Introduce `UploadStatsCounters` struct to hold the Arc clones of the
chunk upload statistics counters. By bundling them into the struct,
they can be passed as single function parameter when factoring out
the common stream future implementation in the subsequent
implementation of the chunk upload for push support in sync jobs.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-client/src/backup_writer.rs | 52 ++++++++++++++++++---------------
 1 file changed, 28 insertions(+), 24 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 813c8d602..a67b471a7 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -66,6 +66,16 @@ struct UploadStats {
     csum: [u8; 32],
 }
 
+struct UploadStatsCounters {
+    injected_chunk_count: Arc<AtomicUsize>,
+    known_chunk_count: Arc<AtomicUsize>,
+    total_chunks: Arc<AtomicUsize>,
+    compressed_stream_len: Arc<AtomicU64>,
+    injected_len: Arc<AtomicUsize>,
+    reused_len: Arc<AtomicUsize>,
+    stream_len: Arc<AtomicUsize>,
+}
+
 type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
 
@@ -647,20 +657,23 @@ impl BackupWriter {
         injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let total_chunks = Arc::new(AtomicUsize::new(0));
-        let total_chunks2 = total_chunks.clone();
         let known_chunk_count = Arc::new(AtomicUsize::new(0));
-        let known_chunk_count2 = known_chunk_count.clone();
         let injected_chunk_count = Arc::new(AtomicUsize::new(0));
-        let injected_chunk_count2 = injected_chunk_count.clone();
 
         let stream_len = Arc::new(AtomicUsize::new(0));
-        let stream_len2 = stream_len.clone();
         let compressed_stream_len = Arc::new(AtomicU64::new(0));
-        let compressed_stream_len2 = compressed_stream_len.clone();
         let reused_len = Arc::new(AtomicUsize::new(0));
-        let reused_len2 = reused_len.clone();
         let injected_len = Arc::new(AtomicUsize::new(0));
-        let injected_len2 = injected_len.clone();
+
+        let counters = UploadStatsCounters {
+            injected_chunk_count: injected_chunk_count.clone(),
+            known_chunk_count: known_chunk_count.clone(),
+            total_chunks: total_chunks.clone(),
+            compressed_stream_len: compressed_stream_len.clone(),
+            injected_len: injected_len.clone(),
+            reused_len: reused_len.clone(),
+            stream_len: stream_len.clone(),
+        };
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
@@ -803,27 +816,18 @@ impl BackupWriter {
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let duration = start_time.elapsed();
-                let chunk_count = total_chunks2.load(Ordering::SeqCst);
-                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
-                let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
-                let size = stream_len2.load(Ordering::SeqCst);
-                let size_reused = reused_len2.load(Ordering::SeqCst);
-                let size_injected = injected_len2.load(Ordering::SeqCst);
-                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
-
                 let mut guard = index_csum_2.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
                 futures::future::ok(UploadStats {
-                    chunk_count,
-                    chunk_reused,
-                    chunk_injected,
-                    size,
-                    size_reused,
-                    size_injected,
-                    size_compressed,
-                    duration,
+                    chunk_count: counters.total_chunks.load(Ordering::SeqCst),
+                    chunk_reused: counters.known_chunk_count.load(Ordering::SeqCst),
+                    chunk_injected: counters.injected_chunk_count.load(Ordering::SeqCst),
+                    size: counters.stream_len.load(Ordering::SeqCst),
+                    size_reused: counters.reused_len.load(Ordering::SeqCst),
+                    size_injected: counters.injected_len.load(Ordering::SeqCst),
+                    size_compressed: counters.compressed_stream_len.load(Ordering::SeqCst) as usize,
+                    duration: start_time.elapsed(),
                     csum,
                 })
             })
-- 
2.39.2





More information about the pbs-devel mailing list