[pbs-devel] [PATCH v7 proxmox-backup 03/31] client: backup writer: refactor backup and upload stats counters

Christian Ebner c.ebner at proxmox.com
Mon Nov 11 16:43:25 CET 2024


In preparation for push support in sync jobs.

Extend and move `BackupStats` into `backup_stats` submodule and add
method to create them from `UploadStats`.

Further, introduce `UploadCounters` struct to hold the Arc clones of
the chunk upload statistics counters, simplifying the house keeping.

By bundling the counters into the struct, they can be passed as
single function parameter when factoring out the common stream future
in the subsequent implementation of the chunk upload for sync jobs
in push direction.

Co-developed-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 6:
- no changes

 pbs-client/src/backup_stats.rs         | 119 ++++++++++++++++++++
 pbs-client/src/backup_writer.rs        | 145 +++++++++----------------
 pbs-client/src/inject_reused_chunks.rs |  14 +--
 pbs-client/src/lib.rs                  |   3 +
 4 files changed, 180 insertions(+), 101 deletions(-)
 create mode 100644 pbs-client/src/backup_stats.rs

diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
new file mode 100644
index 000000000..f0563a001
--- /dev/null
+++ b/pbs-client/src/backup_stats.rs
@@ -0,0 +1,119 @@
+//! Implements counters to generate statistics for log outputs during uploads with backup writer
+
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::pxar::create::ReusableDynamicEntry;
+
+/// Basic backup run statistics and archive checksum
+pub struct BackupStats {
+    pub size: u64,
+    pub csum: [u8; 32],
+    pub duration: Duration,
+    pub chunk_count: u64,
+}
+
+/// Extended backup run statistics and archive checksum
+pub(crate) struct UploadStats {
+    pub(crate) chunk_count: usize,
+    pub(crate) chunk_reused: usize,
+    pub(crate) chunk_injected: usize,
+    pub(crate) size: usize,
+    pub(crate) size_reused: usize,
+    pub(crate) size_injected: usize,
+    pub(crate) size_compressed: usize,
+    pub(crate) duration: Duration,
+    pub(crate) csum: [u8; 32],
+}
+
+impl UploadStats {
+    /// Convert the upload stats to the more concise [`BackupStats`]
+    #[inline(always)]
+    pub(crate) fn to_backup_stats(&self) -> BackupStats {
+        BackupStats {
+            chunk_count: self.chunk_count as u64,
+            size: self.size as u64,
+            duration: self.duration,
+            csum: self.csum,
+        }
+    }
+}
+
+/// Atomic counters for accounting upload stream progress information
+#[derive(Clone)]
+pub(crate) struct UploadCounters {
+    injected_chunk_count: Arc<AtomicUsize>,
+    known_chunk_count: Arc<AtomicUsize>,
+    total_chunk_count: Arc<AtomicUsize>,
+    compressed_stream_len: Arc<AtomicU64>,
+    injected_stream_len: Arc<AtomicUsize>,
+    reused_stream_len: Arc<AtomicUsize>,
+    total_stream_len: Arc<AtomicUsize>,
+}
+
+impl UploadCounters {
+    /// Create and zero init new upload counters
+    pub(crate) fn new() -> Self {
+        Self {
+            total_chunk_count: Arc::new(AtomicUsize::new(0)),
+            injected_chunk_count: Arc::new(AtomicUsize::new(0)),
+            known_chunk_count: Arc::new(AtomicUsize::new(0)),
+            compressed_stream_len: Arc::new(AtomicU64::new(0)),
+            injected_stream_len: Arc::new(AtomicUsize::new(0)),
+            reused_stream_len: Arc::new(AtomicUsize::new(0)),
+            total_stream_len: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+
+    #[inline(always)]
+    pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize {
+        self.known_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.reused_stream_len
+            .fetch_add(chunk_len, Ordering::SeqCst);
+        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+    }
+
+    #[inline(always)]
+    pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.compressed_stream_len
+            .fetch_add(chunk_raw_size, Ordering::SeqCst);
+        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+    }
+
+    #[inline(always)]
+    pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.injected_chunk_count.fetch_add(1, Ordering::SeqCst);
+
+        self.reused_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+        self.injected_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+        self.total_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+    }
+
+    #[inline(always)]
+    pub(crate) fn total_stream_len(&self) -> usize {
+        self.total_stream_len.load(Ordering::SeqCst)
+    }
+
+    /// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
+    #[inline(always)]
+    pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats {
+        UploadStats {
+            chunk_count: self.total_chunk_count.load(Ordering::SeqCst),
+            chunk_reused: self.known_chunk_count.load(Ordering::SeqCst),
+            chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst),
+            size: self.total_stream_len.load(Ordering::SeqCst),
+            size_reused: self.reused_stream_len.load(Ordering::SeqCst),
+            size_injected: self.injected_stream_len.load(Ordering::SeqCst),
+            size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize,
+            duration,
+            csum,
+        }
+    }
+}
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 4d2e8a801..8b9afdb95 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
 use std::future::Future;
-use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
+use std::time::Instant;
 
 use anyhow::{bail, format_err, Error};
 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
@@ -23,6 +24,7 @@ use pbs_tools::crypt_config::CryptConfig;
 use proxmox_human_byte::HumanByte;
 use proxmox_time::TimeSpan;
 
+use super::backup_stats::{BackupStats, UploadCounters, UploadStats};
 use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
 
@@ -40,11 +42,6 @@ impl Drop for BackupWriter {
     }
 }
 
-pub struct BackupStats {
-    pub size: u64,
-    pub csum: [u8; 32],
-}
-
 /// Options for uploading blobs/streams to the server
 #[derive(Default, Clone)]
 pub struct UploadOptions {
@@ -54,18 +51,6 @@ pub struct UploadOptions {
     pub fixed_size: Option<u64>,
 }
 
-struct UploadStats {
-    chunk_count: usize,
-    chunk_reused: usize,
-    chunk_injected: usize,
-    size: usize,
-    size_reused: usize,
-    size_injected: usize,
-    size_compressed: usize,
-    duration: std::time::Duration,
-    csum: [u8; 32],
-}
-
 struct ChunkUploadResponse {
     future: h2::client::ResponseFuture,
     size: usize,
@@ -194,6 +179,7 @@ impl BackupWriter {
         mut reader: R,
         file_name: &str,
     ) -> Result<BackupStats, Error> {
+        let start_time = Instant::now();
         let mut raw_data = Vec::new();
         // fixme: avoid loading into memory
         reader.read_to_end(&mut raw_data)?;
@@ -211,7 +197,12 @@ impl BackupWriter {
                 raw_data,
             )
             .await?;
-        Ok(BackupStats { size, csum })
+        Ok(BackupStats {
+            size,
+            csum,
+            duration: start_time.elapsed(),
+            chunk_count: 0,
+        })
     }
 
     pub async fn upload_blob_from_data(
@@ -220,6 +211,7 @@ impl BackupWriter {
         file_name: &str,
         options: UploadOptions,
     ) -> Result<BackupStats, Error> {
+        let start_time = Instant::now();
         let blob = match (options.encrypt, &self.crypt_config) {
             (false, _) => DataBlob::encode(&data, None, options.compress)?,
             (true, None) => bail!("requested encryption without a crypt config"),
@@ -243,7 +235,12 @@ impl BackupWriter {
                 raw_data,
             )
             .await?;
-        Ok(BackupStats { size, csum })
+        Ok(BackupStats {
+            size,
+            csum,
+            duration: start_time.elapsed(),
+            chunk_count: 0,
+        })
     }
 
     pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
@@ -421,10 +418,7 @@ impl BackupWriter {
             "csum": hex::encode(upload_stats.csum),
         });
         let _value = self.h2.post(&close_path, Some(param)).await?;
-        Ok(BackupStats {
-            size: upload_stats.size as u64,
-            csum: upload_stats.csum,
-        })
+        Ok(upload_stats.to_backup_stats())
     }
 
     fn response_queue() -> (
@@ -653,23 +647,9 @@ impl BackupWriter {
         injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
         archive: &str,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
-        let total_chunks = Arc::new(AtomicUsize::new(0));
-        let total_chunks2 = total_chunks.clone();
-        let known_chunk_count = Arc::new(AtomicUsize::new(0));
-        let known_chunk_count2 = known_chunk_count.clone();
-        let injected_chunk_count = Arc::new(AtomicUsize::new(0));
-        let injected_chunk_count2 = injected_chunk_count.clone();
-
-        let stream_len = Arc::new(AtomicUsize::new(0));
-        let stream_len2 = stream_len.clone();
-        let stream_len3 = stream_len.clone();
-        let compressed_stream_len = Arc::new(AtomicU64::new(0));
-        let compressed_stream_len2 = compressed_stream_len.clone();
-        let reused_len = Arc::new(AtomicUsize::new(0));
-        let reused_len2 = reused_len.clone();
-        let injected_len = Arc::new(AtomicUsize::new(0));
-        let injected_len2 = injected_len.clone();
-        let uploaded_len = Arc::new(AtomicUsize::new(0));
+        let mut counters = UploadCounters::new();
+        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let counters_readonly = counters.clone();
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
@@ -687,11 +667,12 @@ impl BackupWriter {
             || archive.ends_with(".pxar")
             || archive.ends_with(".ppxar")
         {
+            let counters = counters.clone();
             Some(tokio::spawn(async move {
                 loop {
                     tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
 
-                    let size = HumanByte::from(stream_len3.load(Ordering::SeqCst));
+                    let size = HumanByte::from(counters.total_stream_len());
                     let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
                     let elapsed = TimeSpan::from(start_time.elapsed());
 
@@ -703,22 +684,15 @@ impl BackupWriter {
         };
 
         stream
-            .inject_reused_chunks(injections, stream_len.clone())
+            .inject_reused_chunks(injections, counters.clone())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
                     // account for injected chunks
-                    let count = chunks.len();
-                    total_chunks.fetch_add(count, Ordering::SeqCst);
-                    injected_chunk_count.fetch_add(count, Ordering::SeqCst);
-
                     let mut known = Vec::new();
                     let mut guard = index_csum.lock().unwrap();
                     let csum = guard.as_mut().unwrap();
                     for chunk in chunks {
-                        let offset =
-                            stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
-                        reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
-                        injected_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
+                        let offset = counters.add_injected_chunk(&chunk) as u64;
                         let digest = chunk.digest();
                         known.push((offset, digest));
                         let end_offset = offset + chunk.size();
@@ -731,9 +705,6 @@ impl BackupWriter {
                     // account for not injected chunks (new and known)
                     let chunk_len = data.len();
 
-                    total_chunks.fetch_add(1, Ordering::SeqCst);
-                    let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
-
                     let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
 
                     if let Some(ref crypt_config) = crypt_config {
@@ -741,7 +712,29 @@ impl BackupWriter {
                     }
 
                     let mut known_chunks = known_chunks.lock().unwrap();
-                    let digest = chunk_builder.digest();
+                    let digest = *chunk_builder.digest();
+                    let (offset, res) = if known_chunks.contains(&digest) {
+                        let offset = counters.add_known_chunk(chunk_len) as u64;
+                        (offset, MergedChunkInfo::Known(vec![(offset, digest)]))
+                    } else {
+                        match chunk_builder.build() {
+                            Ok((chunk, digest)) => {
+                                let offset =
+                                    counters.add_new_chunk(chunk_len, chunk.raw_size()) as u64;
+                                known_chunks.insert(digest);
+                                (
+                                    offset,
+                                    MergedChunkInfo::New(ChunkInfo {
+                                        chunk,
+                                        digest,
+                                        chunk_len: chunk_len as u64,
+                                        offset,
+                                    }),
+                                )
+                            }
+                            Err(err) => return future::err(err),
+                        }
+                    };
 
                     let mut guard = index_csum.lock().unwrap();
                     let csum = guard.as_mut().unwrap();
@@ -751,26 +744,9 @@ impl BackupWriter {
                     if !is_fixed_chunk_size {
                         csum.update(&chunk_end.to_le_bytes());
                     }
-                    csum.update(digest);
+                    csum.update(&digest);
 
-                    let chunk_is_known = known_chunks.contains(digest);
-                    if chunk_is_known {
-                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
-                        reused_len.fetch_add(chunk_len, Ordering::SeqCst);
-                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
-                    } else {
-                        let compressed_stream_len2 = compressed_stream_len.clone();
-                        known_chunks.insert(*digest);
-                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
-                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
-                            MergedChunkInfo::New(ChunkInfo {
-                                chunk,
-                                digest,
-                                chunk_len: chunk_len as u64,
-                                offset,
-                            })
-                        }))
-                    }
+                    future::ok(res)
                 }
             })
             .merge_known_chunks()
@@ -837,15 +813,6 @@ impl BackupWriter {
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let duration = start_time.elapsed();
-                let chunk_count = total_chunks2.load(Ordering::SeqCst);
-                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
-                let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
-                let size = stream_len2.load(Ordering::SeqCst);
-                let size_reused = reused_len2.load(Ordering::SeqCst);
-                let size_injected = injected_len2.load(Ordering::SeqCst);
-                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
-
                 let mut guard = index_csum_2.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
@@ -853,17 +820,7 @@ impl BackupWriter {
                     handle.abort();
                 }
 
-                futures::future::ok(UploadStats {
-                    chunk_count,
-                    chunk_reused,
-                    chunk_injected,
-                    size,
-                    size_reused,
-                    size_injected,
-                    size_compressed,
-                    duration,
-                    csum,
-                })
+                futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
             })
     }
 
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
index 4b2922012..6da2bcd16 100644
--- a/pbs-client/src/inject_reused_chunks.rs
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -1,13 +1,13 @@
 use std::cmp;
 use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{mpsc, Arc};
+use std::sync::mpsc;
 use std::task::{Context, Poll};
 
 use anyhow::{anyhow, Error};
 use futures::{ready, Stream};
 use pin_project_lite::pin_project;
 
+use crate::backup_stats::UploadCounters;
 use crate::pxar::create::ReusableDynamicEntry;
 
 pin_project! {
@@ -16,7 +16,7 @@ pin_project! {
         input: S,
         next_injection: Option<InjectChunks>,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     }
 }
 
@@ -42,7 +42,7 @@ pub trait InjectReusedChunks: Sized {
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     ) -> InjectReusedChunksQueue<Self>;
 }
 
@@ -53,13 +53,13 @@ where
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     ) -> InjectReusedChunksQueue<Self> {
         InjectReusedChunksQueue {
             input: self,
             next_injection: None,
             injections,
-            stream_len,
+            counters,
         }
     }
 }
@@ -85,7 +85,7 @@ where
 
             if let Some(inject) = this.next_injection.take() {
                 // got reusable dynamic entries to inject
-                let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+                let offset = this.counters.total_stream_len() as u64;
 
                 match inject.boundary.cmp(&offset) {
                     // inject now
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 3d2da27b9..b875347bb 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -41,4 +41,7 @@ pub use backup_specification::*;
 mod chunk_stream;
 pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData};
 
+mod backup_stats;
+pub use backup_stats::BackupStats;
+
 pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
-- 
2.39.5





More information about the pbs-devel mailing list