[pbs-devel] [PATCH v5 proxmox-backup 01/31] client: backup writer: refactor backup and upload stats counters

Fabian Grünbichler f.gruenbichler at proxmox.com
Fri Oct 25 12:20:56 CEST 2024


as discussed off-list, I think this could be improved further

refactoring also caught a missing increment of the compressed stream
length ;)

diff on top of the whole series:

diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index 7aa618667..87a2b1c53 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -4,6 +4,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
+use crate::pxar::create::ReusableDynamicEntry;
+
 /// Basic backup run statistics and archive checksum
 pub struct BackupStats {
     pub size: u64,
@@ -64,52 +66,40 @@ impl UploadCounters {
         }
     }
 
-    /// Increment total chunk counter by `count`, returns previous value
-    #[inline(always)]
-    pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize {
-        self.total_chunk_count.fetch_add(count, Ordering::SeqCst)
-    }
-
-    /// Increment known chunk counter by `count`, returns previous value
-    #[inline(always)]
-    pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize {
-        self.known_chunk_count.fetch_add(count, Ordering::SeqCst)
-    }
-
-    /// Increment injected  chunk counter by `count`, returns previous value
-    #[inline(always)]
-    pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize {
-        self.injected_chunk_count.fetch_add(count, Ordering::SeqCst)
-    }
-
-    /// Increment stream length counter by given size, returns previous value
     #[inline(always)]
-    pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize {
-        self.total_stream_len.fetch_add(size, Ordering::SeqCst)
+    pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize {
+        self.known_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.reused_stream_len
+            .fetch_add(chunk_len, Ordering::SeqCst);
+        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
     }
 
-    /// Increment reused length counter by given size, returns previous value
     #[inline(always)]
-    pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize {
-        self.reused_stream_len.fetch_add(size, Ordering::SeqCst)
+    pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.compressed_stream_len
+            .fetch_add(chunk_raw_size, Ordering::SeqCst);
+        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
     }
 
-    /// Increment compressed length counter by given size, returns previous value
     #[inline(always)]
-    pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 {
-        self.compressed_stream_len.fetch_add(size, Ordering::SeqCst)
-    }
+    pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+        self.injected_chunk_count.fetch_add(1, Ordering::SeqCst);
 
-    /// Increment stream length counter by given size, returns previous value
-    #[inline(always)]
-    pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize {
-        self.injected_stream_len.fetch_add(size, Ordering::SeqCst)
+        self.reused_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+        self.injected_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+        self.total_stream_len
+            .fetch_add(chunk.size() as usize, Ordering::SeqCst)
     }
 
     /// Return a Arc clone to the total stream length counter
     #[inline(always)]
-    pub(crate) fn total_stream_len_counter(&self) -> Arc<AtomicUsize> {
-        self.total_stream_len.clone()
+    pub(crate) fn total_stream_len_counter(&self) -> usize {
+        self.total_stream_len.load(Ordering::SeqCst)
     }
 
     /// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a09757486..58b1c226f 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -304,9 +304,9 @@ impl BackupWriter {
             .and_then(move |mut merged_chunk_info| {
                 match merged_chunk_info {
                     MergedChunkInfo::New(ref chunk_info) => {
-                        counters.inc_total_chunks(1);
                         let chunk_len = chunk_info.chunk_len;
-                        let offset = counters.inc_total_stream_len(chunk_len as usize);
+                        let offset =
+                            counters.add_new_chunk(chunk_len as usize, chunk_info.chunk.raw_size());
                         let end_offset = offset as u64 + chunk_len;
                         let mut guard = index_csum.lock().unwrap();
                         let csum = guard.as_mut().unwrap();
@@ -317,10 +317,7 @@ impl BackupWriter {
                     }
                     MergedChunkInfo::Known(ref mut known_chunk_list) => {
                         for (chunk_len, digest) in known_chunk_list {
-                            counters.inc_total_chunks(1);
-                            counters.inc_known_chunks(1);
-                            counters.inc_reused_stream_len(*chunk_len as usize);
-                            let offset = counters.inc_total_stream_len(*chunk_len as usize);
+                            let offset = counters.add_known_chunk(*chunk_len as usize);
                             let end_offset = offset as u64 + *chunk_len;
                             let mut guard = index_csum.lock().unwrap();
                             let csum = guard.as_mut().unwrap();
@@ -753,21 +750,15 @@ impl BackupWriter {
         let index_csum_2 = index_csum.clone();
 
         let stream = stream
-            .inject_reused_chunks(injections, counters.total_stream_len_counter())
+            .inject_reused_chunks(injections, counters.clone())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
                     // account for injected chunks
-                    let count = chunks.len();
-                    counters.inc_total_chunks(count);
-                    counters.inc_injected_chunks(count);
-
                     let mut known = Vec::new();
                     let mut guard = index_csum.lock().unwrap();
                     let csum = guard.as_mut().unwrap();
                     for chunk in chunks {
-                        let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64;
-                        counters.inc_reused_stream_len(chunk.size() as usize);
-                        counters.inc_injected_stream_len(chunk.size() as usize);
+                        let offset = counters.add_injected_chunk(&chunk) as u64;
                         let digest = chunk.digest();
                         known.push((offset, digest));
                         let end_offset = offset + chunk.size();
@@ -780,9 +771,6 @@ impl BackupWriter {
                     // account for not injected chunks (new and known)
                     let chunk_len = data.len();
 
-                    counters.inc_total_chunks(1);
-                    let offset = counters.inc_total_stream_len(chunk_len) as u64;
-
                     let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
 
                     if let Some(ref crypt_config) = crypt_config {
@@ -790,7 +778,30 @@ impl BackupWriter {
                     }
 
                     let mut known_chunks = known_chunks.lock().unwrap();
-                    let digest = chunk_builder.digest();
+                    let digest = chunk_builder.digest().clone();
+                    let chunk_is_known = known_chunks.contains(&digest);
+                    let (offset, res) = if chunk_is_known {
+                        let offset = counters.add_known_chunk(chunk_len) as u64;
+                        (offset, MergedChunkInfo::Known(vec![(offset, digest)]))
+                    } else {
+                        match chunk_builder.build() {
+                            Ok((chunk, digest)) => {
+                                let offset =
+                                    counters.add_new_chunk(chunk_len, chunk.raw_size()) as u64;
+                                known_chunks.insert(digest);
+                                (
+                                    offset,
+                                    MergedChunkInfo::New(ChunkInfo {
+                                        chunk,
+                                        digest,
+                                        chunk_len: chunk_len as u64,
+                                        offset,
+                                    }),
+                                )
+                            }
+                            Err(err) => return future::ready(Err(err)),
+                        }
+                    };
 
                     let mut guard = index_csum.lock().unwrap();
                     let csum = guard.as_mut().unwrap();
@@ -800,26 +811,9 @@ impl BackupWriter {
                     if !is_fixed_chunk_size {
                         csum.update(&chunk_end.to_le_bytes());
                     }
-                    csum.update(digest);
+                    csum.update(&digest);
 
-                    let chunk_is_known = known_chunks.contains(digest);
-                    if chunk_is_known {
-                        counters.inc_known_chunks(1);
-                        counters.inc_reused_stream_len(chunk_len);
-                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
-                    } else {
-                        let mut counters = counters.clone();
-                        known_chunks.insert(*digest);
-                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
-                            counters.inc_compressed_stream_len(chunk.raw_size());
-                            MergedChunkInfo::New(ChunkInfo {
-                                chunk,
-                                digest,
-                                chunk_len: chunk_len as u64,
-                                offset,
-                            })
-                        }))
-                    }
+                    future::ok(res)
                 }
             })
             .merge_known_chunks();
@@ -848,8 +842,7 @@ impl BackupWriter {
         let upload_chunk_path = format!("{prefix}_chunk");
 
         let start_time = std::time::Instant::now();
-        let total_stream_len = counters.total_stream_len_counter();
-        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let uploaded_len = Arc::new(AtomicUsize::new(0));
 
         let (upload_queue, upload_result) =
             Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
@@ -858,11 +851,12 @@ impl BackupWriter {
             || archive.ends_with(".pxar")
             || archive.ends_with(".ppxar")
         {
+            let counters = counters.clone();
             Some(tokio::spawn(async move {
                 loop {
                     tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
 
-                    let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
+                    let size = HumanByte::from(counters.total_stream_len_counter());
                     let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
                     let elapsed = TimeSpan::from(start_time.elapsed());
 
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
index 4b2922012..b93b8b846 100644
--- a/pbs-client/src/inject_reused_chunks.rs
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -1,13 +1,13 @@
 use std::cmp;
 use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{mpsc, Arc};
+use std::sync::mpsc;
 use std::task::{Context, Poll};
 
 use anyhow::{anyhow, Error};
 use futures::{ready, Stream};
 use pin_project_lite::pin_project;
 
+use crate::backup_stats::UploadCounters;
 use crate::pxar::create::ReusableDynamicEntry;
 
 pin_project! {
@@ -16,7 +16,7 @@ pin_project! {
         input: S,
         next_injection: Option<InjectChunks>,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     }
 }
 
@@ -42,7 +42,7 @@ pub trait InjectReusedChunks: Sized {
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     ) -> InjectReusedChunksQueue<Self>;
 }
 
@@ -53,13 +53,13 @@ where
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        stream_len: Arc<AtomicUsize>,
+        counters: UploadCounters,
     ) -> InjectReusedChunksQueue<Self> {
         InjectReusedChunksQueue {
             input: self,
             next_injection: None,
             injections,
-            stream_len,
+            counters,
         }
     }
 }
@@ -85,7 +85,7 @@ where
 
             if let Some(inject) = this.next_injection.take() {
                 // got reusable dynamic entries to inject
-                let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+                let offset = this.counters.total_stream_len_counter() as u64;
 
                 match inject.boundary.cmp(&offset) {
                     // inject now


On October 18, 2024 10:42 am, Christian Ebner wrote:
> In preparation for push support in sync jobs.
> 
> Extend and move `BackupStats` into `backup_stats` submodule and add
> method to create them from `UploadStats`.
> 
> Further, introduce `UploadCounters` struct to hold the Arc clones of
> the chunk upload statistics counters, simplifying the house keeping.
> 
> By bundling the counters into the struct, they can be passed as
> single function parameter when factoring out the common stream future
> in the subsequent implementation of the chunk upload for sync jobs
> in push direction.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 4:
> - Rebased onto current master
> 
> changes since version 3:
> - not present in previous version
> 
>  pbs-client/src/backup_stats.rs  | 130 ++++++++++++++++++++++++++++++++
>  pbs-client/src/backup_writer.rs | 111 +++++++++------------------
>  pbs-client/src/lib.rs           |   3 +
>  3 files changed, 169 insertions(+), 75 deletions(-)
>  create mode 100644 pbs-client/src/backup_stats.rs
> 
> diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
> new file mode 100644
> index 000000000..7aa618667
> --- /dev/null
> +++ b/pbs-client/src/backup_stats.rs
> @@ -0,0 +1,130 @@
> +//! Implements counters to generate statistics for log outputs during uploads with backup writer
> +
> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> +use std::sync::Arc;
> +use std::time::Duration;
> +
> +/// Basic backup run statistics and archive checksum
> +pub struct BackupStats {
> +    pub size: u64,
> +    pub csum: [u8; 32],
> +    pub duration: Duration,
> +    pub chunk_count: u64,
> +}
> +
> +/// Extended backup run statistics and archive checksum
> +pub(crate) struct UploadStats {
> +    pub(crate) chunk_count: usize,
> +    pub(crate) chunk_reused: usize,
> +    pub(crate) chunk_injected: usize,
> +    pub(crate) size: usize,
> +    pub(crate) size_reused: usize,
> +    pub(crate) size_injected: usize,
> +    pub(crate) size_compressed: usize,
> +    pub(crate) duration: Duration,
> +    pub(crate) csum: [u8; 32],
> +}
> +
> +impl UploadStats {
> +    /// Convert the upload stats to the more concise [`BackupStats`]
> +    #[inline(always)]
> +    pub(crate) fn to_backup_stats(&self) -> BackupStats {
> +        BackupStats {
> +            chunk_count: self.chunk_count as u64,
> +            size: self.size as u64,
> +            duration: self.duration,
> +            csum: self.csum,
> +        }
> +    }
> +}
> +
> +/// Atomic counters for accounting upload stream progress information
> +#[derive(Clone)]
> +pub(crate) struct UploadCounters {
> +    injected_chunk_count: Arc<AtomicUsize>,
> +    known_chunk_count: Arc<AtomicUsize>,
> +    total_chunk_count: Arc<AtomicUsize>,
> +    compressed_stream_len: Arc<AtomicU64>,
> +    injected_stream_len: Arc<AtomicUsize>,
> +    reused_stream_len: Arc<AtomicUsize>,
> +    total_stream_len: Arc<AtomicUsize>,
> +}
> +
> +impl UploadCounters {
> +    /// Create and zero init new upload counters
> +    pub(crate) fn new() -> Self {
> +        Self {
> +            total_chunk_count: Arc::new(AtomicUsize::new(0)),
> +            injected_chunk_count: Arc::new(AtomicUsize::new(0)),
> +            known_chunk_count: Arc::new(AtomicUsize::new(0)),
> +            compressed_stream_len: Arc::new(AtomicU64::new(0)),
> +            injected_stream_len: Arc::new(AtomicUsize::new(0)),
> +            reused_stream_len: Arc::new(AtomicUsize::new(0)),
> +            total_stream_len: Arc::new(AtomicUsize::new(0)),
> +        }
> +    }
> +
> +    /// Increment total chunk counter by `count`, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize {
> +        self.total_chunk_count.fetch_add(count, Ordering::SeqCst)
> +    }
> +
> +    /// Increment known chunk counter by `count`, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize {
> +        self.known_chunk_count.fetch_add(count, Ordering::SeqCst)
> +    }
> +
> +    /// Increment injected  chunk counter by `count`, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize {
> +        self.injected_chunk_count.fetch_add(count, Ordering::SeqCst)
> +    }
> +
> +    /// Increment stream length counter by given size, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize {
> +        self.total_stream_len.fetch_add(size, Ordering::SeqCst)
> +    }
> +
> +    /// Increment reused length counter by given size, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize {
> +        self.reused_stream_len.fetch_add(size, Ordering::SeqCst)
> +    }
> +
> +    /// Increment compressed length counter by given size, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 {
> +        self.compressed_stream_len.fetch_add(size, Ordering::SeqCst)
> +    }
> +
> +    /// Increment stream length counter by given size, returns previous value
> +    #[inline(always)]
> +    pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize {
> +        self.injected_stream_len.fetch_add(size, Ordering::SeqCst)
> +    }
> +
> +    /// Return a Arc clone to the total stream length counter
> +    #[inline(always)]
> +    pub(crate) fn total_stream_len_counter(&self) -> Arc<AtomicUsize> {
> +        self.total_stream_len.clone()
> +    }
> +
> +    /// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
> +    #[inline(always)]
> +    pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats {
> +        UploadStats {
> +            chunk_count: self.total_chunk_count.load(Ordering::SeqCst),
> +            chunk_reused: self.known_chunk_count.load(Ordering::SeqCst),
> +            chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst),
> +            size: self.total_stream_len.load(Ordering::SeqCst),
> +            size_reused: self.reused_stream_len.load(Ordering::SeqCst),
> +            size_injected: self.injected_stream_len.load(Ordering::SeqCst),
> +            size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize,
> +            duration,
> +            csum,
> +        }
> +    }
> +}
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 4d2e8a801..f08a65153 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,7 +1,8 @@
>  use std::collections::HashSet;
>  use std::future::Future;
> -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
> +use std::time::Instant;
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
> @@ -23,6 +24,7 @@ use pbs_tools::crypt_config::CryptConfig;
>  use proxmox_human_byte::HumanByte;
>  use proxmox_time::TimeSpan;
>  
> +use super::backup_stats::{BackupStats, UploadCounters, UploadStats};
>  use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>  use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>  
> @@ -40,11 +42,6 @@ impl Drop for BackupWriter {
>      }
>  }
>  
> -pub struct BackupStats {
> -    pub size: u64,
> -    pub csum: [u8; 32],
> -}
> -
>  /// Options for uploading blobs/streams to the server
>  #[derive(Default, Clone)]
>  pub struct UploadOptions {
> @@ -54,18 +51,6 @@ pub struct UploadOptions {
>      pub fixed_size: Option<u64>,
>  }
>  
> -struct UploadStats {
> -    chunk_count: usize,
> -    chunk_reused: usize,
> -    chunk_injected: usize,
> -    size: usize,
> -    size_reused: usize,
> -    size_injected: usize,
> -    size_compressed: usize,
> -    duration: std::time::Duration,
> -    csum: [u8; 32],
> -}
> -
>  struct ChunkUploadResponse {
>      future: h2::client::ResponseFuture,
>      size: usize,
> @@ -194,6 +179,7 @@ impl BackupWriter {
>          mut reader: R,
>          file_name: &str,
>      ) -> Result<BackupStats, Error> {
> +        let start_time = Instant::now();
>          let mut raw_data = Vec::new();
>          // fixme: avoid loading into memory
>          reader.read_to_end(&mut raw_data)?;
> @@ -211,7 +197,12 @@ impl BackupWriter {
>                  raw_data,
>              )
>              .await?;
> -        Ok(BackupStats { size, csum })
> +        Ok(BackupStats {
> +            size,
> +            csum,
> +            duration: start_time.elapsed(),
> +            chunk_count: 0,
> +        })
>      }
>  
>      pub async fn upload_blob_from_data(
> @@ -220,6 +211,7 @@ impl BackupWriter {
>          file_name: &str,
>          options: UploadOptions,
>      ) -> Result<BackupStats, Error> {
> +        let start_time = Instant::now();
>          let blob = match (options.encrypt, &self.crypt_config) {
>              (false, _) => DataBlob::encode(&data, None, options.compress)?,
>              (true, None) => bail!("requested encryption without a crypt config"),
> @@ -243,7 +235,12 @@ impl BackupWriter {
>                  raw_data,
>              )
>              .await?;
> -        Ok(BackupStats { size, csum })
> +        Ok(BackupStats {
> +            size,
> +            csum,
> +            duration: start_time.elapsed(),
> +            chunk_count: 0,
> +        })
>      }
>  
>      pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
> @@ -421,10 +418,7 @@ impl BackupWriter {
>              "csum": hex::encode(upload_stats.csum),
>          });
>          let _value = self.h2.post(&close_path, Some(param)).await?;
> -        Ok(BackupStats {
> -            size: upload_stats.size as u64,
> -            csum: upload_stats.csum,
> -        })
> +        Ok(upload_stats.to_backup_stats())
>      }
>  
>      fn response_queue() -> (
> @@ -653,23 +647,10 @@ impl BackupWriter {
>          injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>          archive: &str,
>      ) -> impl Future<Output = Result<UploadStats, Error>> {
> -        let total_chunks = Arc::new(AtomicUsize::new(0));
> -        let total_chunks2 = total_chunks.clone();
> -        let known_chunk_count = Arc::new(AtomicUsize::new(0));
> -        let known_chunk_count2 = known_chunk_count.clone();
> -        let injected_chunk_count = Arc::new(AtomicUsize::new(0));
> -        let injected_chunk_count2 = injected_chunk_count.clone();
> -
> -        let stream_len = Arc::new(AtomicUsize::new(0));
> -        let stream_len2 = stream_len.clone();
> -        let stream_len3 = stream_len.clone();
> -        let compressed_stream_len = Arc::new(AtomicU64::new(0));
> -        let compressed_stream_len2 = compressed_stream_len.clone();
> -        let reused_len = Arc::new(AtomicUsize::new(0));
> -        let reused_len2 = reused_len.clone();
> -        let injected_len = Arc::new(AtomicUsize::new(0));
> -        let injected_len2 = injected_len.clone();
> -        let uploaded_len = Arc::new(AtomicUsize::new(0));
> +        let mut counters = UploadCounters::new();
> +        let total_stream_len = counters.total_stream_len_counter();
> +        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
> +        let counters_readonly = counters.clone();
>  
>          let append_chunk_path = format!("{}_index", prefix);
>          let upload_chunk_path = format!("{}_chunk", prefix);
> @@ -691,7 +672,7 @@ impl BackupWriter {
>                  loop {
>                      tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
>  
> -                    let size = HumanByte::from(stream_len3.load(Ordering::SeqCst));
> +                    let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
>                      let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
>                      let elapsed = TimeSpan::from(start_time.elapsed());
>  
> @@ -703,22 +684,21 @@ impl BackupWriter {
>          };
>  
>          stream
> -            .inject_reused_chunks(injections, stream_len.clone())
> +            .inject_reused_chunks(injections, counters.total_stream_len_counter())
>              .and_then(move |chunk_info| match chunk_info {
>                  InjectedChunksInfo::Known(chunks) => {
>                      // account for injected chunks
>                      let count = chunks.len();
> -                    total_chunks.fetch_add(count, Ordering::SeqCst);
> -                    injected_chunk_count.fetch_add(count, Ordering::SeqCst);
> +                    counters.inc_total_chunks(count);
> +                    counters.inc_injected_chunks(count);
>  
>                      let mut known = Vec::new();
>                      let mut guard = index_csum.lock().unwrap();
>                      let csum = guard.as_mut().unwrap();
>                      for chunk in chunks {
> -                        let offset =
> -                            stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
> -                        reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> -                        injected_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> +                        let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64;
> +                        counters.inc_reused_stream_len(chunk.size() as usize);
> +                        counters.inc_injected_stream_len(chunk.size() as usize);
>                          let digest = chunk.digest();
>                          known.push((offset, digest));
>                          let end_offset = offset + chunk.size();
> @@ -731,8 +711,8 @@ impl BackupWriter {
>                      // account for not injected chunks (new and known)
>                      let chunk_len = data.len();
>  
> -                    total_chunks.fetch_add(1, Ordering::SeqCst);
> -                    let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
> +                    counters.inc_total_chunks(1);
> +                    let offset = counters.inc_total_stream_len(chunk_len) as u64;
>  
>                      let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
>  
> @@ -755,14 +735,14 @@ impl BackupWriter {
>  
>                      let chunk_is_known = known_chunks.contains(digest);
>                      if chunk_is_known {
> -                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
> -                        reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> +                        counters.inc_known_chunks(1);
> +                        counters.inc_reused_stream_len(chunk_len);
>                          future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
>                      } else {
> -                        let compressed_stream_len2 = compressed_stream_len.clone();
> +                        let mut counters = counters.clone();
>                          known_chunks.insert(*digest);
>                          future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> -                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                            counters.inc_compressed_stream_len(chunk.raw_size());
>                              MergedChunkInfo::New(ChunkInfo {
>                                  chunk,
>                                  digest,
> @@ -837,15 +817,6 @@ impl BackupWriter {
>              })
>              .then(move |result| async move { upload_result.await?.and(result) }.boxed())
>              .and_then(move |_| {
> -                let duration = start_time.elapsed();
> -                let chunk_count = total_chunks2.load(Ordering::SeqCst);
> -                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> -                let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
> -                let size = stream_len2.load(Ordering::SeqCst);
> -                let size_reused = reused_len2.load(Ordering::SeqCst);
> -                let size_injected = injected_len2.load(Ordering::SeqCst);
> -                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
> -
>                  let mut guard = index_csum_2.lock().unwrap();
>                  let csum = guard.take().unwrap().finish();
>  
> @@ -853,17 +824,7 @@ impl BackupWriter {
>                      handle.abort();
>                  }
>  
> -                futures::future::ok(UploadStats {
> -                    chunk_count,
> -                    chunk_reused,
> -                    chunk_injected,
> -                    size,
> -                    size_reused,
> -                    size_injected,
> -                    size_compressed,
> -                    duration,
> -                    csum,
> -                })
> +                futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
>              })
>      }
>  
> diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
> index 3d2da27b9..b875347bb 100644
> --- a/pbs-client/src/lib.rs
> +++ b/pbs-client/src/lib.rs
> @@ -41,4 +41,7 @@ pub use backup_specification::*;
>  mod chunk_stream;
>  pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData};
>  
> +mod backup_stats;
> +pub use backup_stats::BackupStats;
> +
>  pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list