[pbs-devel] [PATCH v3 proxmox-backup 05/33] client: backup writer: bundle upload stats counters
Fabian Grünbichler
f.gruenbichler at proxmox.com
Thu Oct 10 16:49:00 CEST 2024
high level nit: I wonder whether it would make sense to refactor/merge
BackupStats, UploadStats and UploadStatsCounter and add some `fn`s
(e.g., always-inline fns to add to certain counters) and move the lot to
their own module?
e.g., you could then do something like
counters.to_stats(csum, duration)
to get the UploadStats, abstract away all the Atomics, get rid of a lot
of Arc cloning boilerplate, ..
On September 12, 2024 4:32 pm, Christian Ebner wrote:
> In preparation for push support in sync jobs.
>
> Introduce `UploadStatsCounters` struct to hold the Arc clones of the
> chunk upload statistics counters. By bundling them into the struct,
> they can be passed as single function parameter when factoring out
> the common stream future implementation in the subsequent
> implementation of the chunk upload for push support in sync jobs.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - no changes
>
> pbs-client/src/backup_writer.rs | 52 ++++++++++++++++++---------------
> 1 file changed, 28 insertions(+), 24 deletions(-)
>
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index d63c09b5a..34ac47beb 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -65,6 +65,16 @@ struct UploadStats {
> csum: [u8; 32],
> }
>
> +struct UploadStatsCounters {
> + injected_chunk_count: Arc<AtomicUsize>,
> + known_chunk_count: Arc<AtomicUsize>,
> + total_chunks: Arc<AtomicUsize>,
> + compressed_stream_len: Arc<AtomicU64>,
> + injected_len: Arc<AtomicUsize>,
> + reused_len: Arc<AtomicUsize>,
> + stream_len: Arc<AtomicUsize>,
> +}
> +
> type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
> type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
>
> @@ -638,20 +648,23 @@ impl BackupWriter {
> injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
> ) -> impl Future<Output = Result<UploadStats, Error>> {
> let total_chunks = Arc::new(AtomicUsize::new(0));
> - let total_chunks2 = total_chunks.clone();
> let known_chunk_count = Arc::new(AtomicUsize::new(0));
> - let known_chunk_count2 = known_chunk_count.clone();
> let injected_chunk_count = Arc::new(AtomicUsize::new(0));
> - let injected_chunk_count2 = injected_chunk_count.clone();
>
> let stream_len = Arc::new(AtomicUsize::new(0));
> - let stream_len2 = stream_len.clone();
> let compressed_stream_len = Arc::new(AtomicU64::new(0));
> - let compressed_stream_len2 = compressed_stream_len.clone();
> let reused_len = Arc::new(AtomicUsize::new(0));
> - let reused_len2 = reused_len.clone();
> let injected_len = Arc::new(AtomicUsize::new(0));
> - let injected_len2 = injected_len.clone();
> +
> + let counters = UploadStatsCounters {
> + injected_chunk_count: injected_chunk_count.clone(),
> + known_chunk_count: known_chunk_count.clone(),
> + total_chunks: total_chunks.clone(),
> + compressed_stream_len: compressed_stream_len.clone(),
> + injected_len: injected_len.clone(),
> + reused_len: reused_len.clone(),
> + stream_len: stream_len.clone(),
> + };
>
> let append_chunk_path = format!("{}_index", prefix);
> let upload_chunk_path = format!("{}_chunk", prefix);
> @@ -794,27 +807,18 @@ impl BackupWriter {
> })
> .then(move |result| async move { upload_result.await?.and(result) }.boxed())
> .and_then(move |_| {
> - let duration = start_time.elapsed();
> - let chunk_count = total_chunks2.load(Ordering::SeqCst);
> - let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> - let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
> - let size = stream_len2.load(Ordering::SeqCst);
> - let size_reused = reused_len2.load(Ordering::SeqCst);
> - let size_injected = injected_len2.load(Ordering::SeqCst);
> - let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
> -
> let mut guard = index_csum_2.lock().unwrap();
> let csum = guard.take().unwrap().finish();
>
> futures::future::ok(UploadStats {
> - chunk_count,
> - chunk_reused,
> - chunk_injected,
> - size,
> - size_reused,
> - size_injected,
> - size_compressed,
> - duration,
> + chunk_count: counters.total_chunks.load(Ordering::SeqCst),
> + chunk_reused: counters.known_chunk_count.load(Ordering::SeqCst),
> + chunk_injected: counters.injected_chunk_count.load(Ordering::SeqCst),
> + size: counters.stream_len.load(Ordering::SeqCst),
> + size_reused: counters.reused_len.load(Ordering::SeqCst),
> + size_injected: counters.injected_len.load(Ordering::SeqCst),
> + size_compressed: counters.compressed_stream_len.load(Ordering::SeqCst) as usize,
> + duration: start_time.elapsed(),
> csum,
> })
> })
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list