[pbs-devel] [PATCH v3 proxmox-backup 05/33] client: backup writer: bundle upload stats counters

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Oct 10 16:49:00 CEST 2024


high level nit: I wonder whether it would make sense to refactor/merge
BackupStats, UploadStats and UploadStatsCounter and add some `fn`s
(e.g., always-inline fns to add to certain counters) and move the lot to
their own module?

e.g., you could then do something like

counters.to_stats(csum, duration)

to get the UploadStats, abstract away all the Atomics, get rid of a lot
of Arc cloning boilerplate, ..

On September 12, 2024 4:32 pm, Christian Ebner wrote:
> In preparation for push support in sync jobs.
> 
> Introduce `UploadStatsCounters` struct to hold the Arc clones of the
> chunk upload statistics counters. By bundling them into the struct,
> they can be passed as single function parameter when factoring out
> the common stream future implementation in the subsequent
> implementation of the chunk upload for push support in sync jobs.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - no changes
> 
>  pbs-client/src/backup_writer.rs | 52 ++++++++++++++++++---------------
>  1 file changed, 28 insertions(+), 24 deletions(-)
> 
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index d63c09b5a..34ac47beb 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -65,6 +65,16 @@ struct UploadStats {
>      csum: [u8; 32],
>  }
>  
> +struct UploadStatsCounters {
> +    injected_chunk_count: Arc<AtomicUsize>,
> +    known_chunk_count: Arc<AtomicUsize>,
> +    total_chunks: Arc<AtomicUsize>,
> +    compressed_stream_len: Arc<AtomicU64>,
> +    injected_len: Arc<AtomicUsize>,
> +    reused_len: Arc<AtomicUsize>,
> +    stream_len: Arc<AtomicUsize>,
> +}
> +
>  type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
>  type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
>  
> @@ -638,20 +648,23 @@ impl BackupWriter {
>          injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>      ) -> impl Future<Output = Result<UploadStats, Error>> {
>          let total_chunks = Arc::new(AtomicUsize::new(0));
> -        let total_chunks2 = total_chunks.clone();
>          let known_chunk_count = Arc::new(AtomicUsize::new(0));
> -        let known_chunk_count2 = known_chunk_count.clone();
>          let injected_chunk_count = Arc::new(AtomicUsize::new(0));
> -        let injected_chunk_count2 = injected_chunk_count.clone();
>  
>          let stream_len = Arc::new(AtomicUsize::new(0));
> -        let stream_len2 = stream_len.clone();
>          let compressed_stream_len = Arc::new(AtomicU64::new(0));
> -        let compressed_stream_len2 = compressed_stream_len.clone();
>          let reused_len = Arc::new(AtomicUsize::new(0));
> -        let reused_len2 = reused_len.clone();
>          let injected_len = Arc::new(AtomicUsize::new(0));
> -        let injected_len2 = injected_len.clone();
> +
> +        let counters = UploadStatsCounters {
> +            injected_chunk_count: injected_chunk_count.clone(),
> +            known_chunk_count: known_chunk_count.clone(),
> +            total_chunks: total_chunks.clone(),
> +            compressed_stream_len: compressed_stream_len.clone(),
> +            injected_len: injected_len.clone(),
> +            reused_len: reused_len.clone(),
> +            stream_len: stream_len.clone(),
> +        };
>  
>          let append_chunk_path = format!("{}_index", prefix);
>          let upload_chunk_path = format!("{}_chunk", prefix);
> @@ -794,27 +807,18 @@ impl BackupWriter {
>              })
>              .then(move |result| async move { upload_result.await?.and(result) }.boxed())
>              .and_then(move |_| {
> -                let duration = start_time.elapsed();
> -                let chunk_count = total_chunks2.load(Ordering::SeqCst);
> -                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> -                let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
> -                let size = stream_len2.load(Ordering::SeqCst);
> -                let size_reused = reused_len2.load(Ordering::SeqCst);
> -                let size_injected = injected_len2.load(Ordering::SeqCst);
> -                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
> -
>                  let mut guard = index_csum_2.lock().unwrap();
>                  let csum = guard.take().unwrap().finish();
>  
>                  futures::future::ok(UploadStats {
> -                    chunk_count,
> -                    chunk_reused,
> -                    chunk_injected,
> -                    size,
> -                    size_reused,
> -                    size_injected,
> -                    size_compressed,
> -                    duration,
> +                    chunk_count: counters.total_chunks.load(Ordering::SeqCst),
> +                    chunk_reused: counters.known_chunk_count.load(Ordering::SeqCst),
> +                    chunk_injected: counters.injected_chunk_count.load(Ordering::SeqCst),
> +                    size: counters.stream_len.load(Ordering::SeqCst),
> +                    size_reused: counters.reused_len.load(Ordering::SeqCst),
> +                    size_injected: counters.injected_len.load(Ordering::SeqCst),
> +                    size_compressed: counters.compressed_stream_len.load(Ordering::SeqCst) as usize,
> +                    duration: start_time.elapsed(),
>                      csum,
>                  })
>              })
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list