[pbs-devel] [PATCH v5 proxmox-backup 01/31] client: backup writer: refactor backup and upload stats counters
Fabian Grünbichler
f.gruenbichler at proxmox.com
Fri Oct 25 12:20:56 CEST 2024
as discussed off-list, I think this could be improved further
refactoring also caught a missing increment of the compressed stream
length ;)
diff on top of the whole series:
diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index 7aa618667..87a2b1c53 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -4,6 +4,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
+use crate::pxar::create::ReusableDynamicEntry;
+
/// Basic backup run statistics and archive checksum
pub struct BackupStats {
pub size: u64,
@@ -64,52 +66,40 @@ impl UploadCounters {
}
}
- /// Increment total chunk counter by `count`, returns previous value
- #[inline(always)]
- pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize {
- self.total_chunk_count.fetch_add(count, Ordering::SeqCst)
- }
-
- /// Increment known chunk counter by `count`, returns previous value
- #[inline(always)]
- pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize {
- self.known_chunk_count.fetch_add(count, Ordering::SeqCst)
- }
-
- /// Increment injected chunk counter by `count`, returns previous value
- #[inline(always)]
- pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize {
- self.injected_chunk_count.fetch_add(count, Ordering::SeqCst)
- }
-
- /// Increment stream length counter by given size, returns previous value
#[inline(always)]
- pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize {
- self.total_stream_len.fetch_add(size, Ordering::SeqCst)
+ pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize {
+ self.known_chunk_count.fetch_add(1, Ordering::SeqCst);
+ self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+ self.reused_stream_len
+ .fetch_add(chunk_len, Ordering::SeqCst);
+ self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
}
- /// Increment reused length counter by given size, returns previous value
#[inline(always)]
- pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize {
- self.reused_stream_len.fetch_add(size, Ordering::SeqCst)
+ pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize {
+ self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+ self.compressed_stream_len
+ .fetch_add(chunk_raw_size, Ordering::SeqCst);
+ self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
}
- /// Increment compressed length counter by given size, returns previous value
#[inline(always)]
- pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 {
- self.compressed_stream_len.fetch_add(size, Ordering::SeqCst)
- }
+ pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize {
+ self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+ self.injected_chunk_count.fetch_add(1, Ordering::SeqCst);
- /// Increment stream length counter by given size, returns previous value
- #[inline(always)]
- pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize {
- self.injected_stream_len.fetch_add(size, Ordering::SeqCst)
+ self.reused_stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ self.injected_stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ self.total_stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst)
}
/// Return a Arc clone to the total stream length counter
#[inline(always)]
- pub(crate) fn total_stream_len_counter(&self) -> Arc<AtomicUsize> {
- self.total_stream_len.clone()
+ pub(crate) fn total_stream_len_counter(&self) -> usize {
+ self.total_stream_len.load(Ordering::SeqCst)
}
/// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a09757486..58b1c226f 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -304,9 +304,9 @@ impl BackupWriter {
.and_then(move |mut merged_chunk_info| {
match merged_chunk_info {
MergedChunkInfo::New(ref chunk_info) => {
- counters.inc_total_chunks(1);
let chunk_len = chunk_info.chunk_len;
- let offset = counters.inc_total_stream_len(chunk_len as usize);
+ let offset =
+ counters.add_new_chunk(chunk_len as usize, chunk_info.chunk.raw_size());
let end_offset = offset as u64 + chunk_len;
let mut guard = index_csum.lock().unwrap();
let csum = guard.as_mut().unwrap();
@@ -317,10 +317,7 @@ impl BackupWriter {
}
MergedChunkInfo::Known(ref mut known_chunk_list) => {
for (chunk_len, digest) in known_chunk_list {
- counters.inc_total_chunks(1);
- counters.inc_known_chunks(1);
- counters.inc_reused_stream_len(*chunk_len as usize);
- let offset = counters.inc_total_stream_len(*chunk_len as usize);
+ let offset = counters.add_known_chunk(*chunk_len as usize);
let end_offset = offset as u64 + *chunk_len;
let mut guard = index_csum.lock().unwrap();
let csum = guard.as_mut().unwrap();
@@ -753,21 +750,15 @@ impl BackupWriter {
let index_csum_2 = index_csum.clone();
let stream = stream
- .inject_reused_chunks(injections, counters.total_stream_len_counter())
+ .inject_reused_chunks(injections, counters.clone())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
// account for injected chunks
- let count = chunks.len();
- counters.inc_total_chunks(count);
- counters.inc_injected_chunks(count);
-
let mut known = Vec::new();
let mut guard = index_csum.lock().unwrap();
let csum = guard.as_mut().unwrap();
for chunk in chunks {
- let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64;
- counters.inc_reused_stream_len(chunk.size() as usize);
- counters.inc_injected_stream_len(chunk.size() as usize);
+ let offset = counters.add_injected_chunk(&chunk) as u64;
let digest = chunk.digest();
known.push((offset, digest));
let end_offset = offset + chunk.size();
@@ -780,9 +771,6 @@ impl BackupWriter {
// account for not injected chunks (new and known)
let chunk_len = data.len();
- counters.inc_total_chunks(1);
- let offset = counters.inc_total_stream_len(chunk_len) as u64;
-
let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
if let Some(ref crypt_config) = crypt_config {
@@ -790,7 +778,30 @@ impl BackupWriter {
}
let mut known_chunks = known_chunks.lock().unwrap();
- let digest = chunk_builder.digest();
+ let digest = chunk_builder.digest().clone();
+ let chunk_is_known = known_chunks.contains(&digest);
+ let (offset, res) = if chunk_is_known {
+ let offset = counters.add_known_chunk(chunk_len) as u64;
+ (offset, MergedChunkInfo::Known(vec![(offset, digest)]))
+ } else {
+ match chunk_builder.build() {
+ Ok((chunk, digest)) => {
+ let offset =
+ counters.add_new_chunk(chunk_len, chunk.raw_size()) as u64;
+ known_chunks.insert(digest);
+ (
+ offset,
+ MergedChunkInfo::New(ChunkInfo {
+ chunk,
+ digest,
+ chunk_len: chunk_len as u64,
+ offset,
+ }),
+ )
+ }
+ Err(err) => return future::ready(Err(err)),
+ }
+ };
let mut guard = index_csum.lock().unwrap();
let csum = guard.as_mut().unwrap();
@@ -800,26 +811,9 @@ impl BackupWriter {
if !is_fixed_chunk_size {
csum.update(&chunk_end.to_le_bytes());
}
- csum.update(digest);
+ csum.update(&digest);
- let chunk_is_known = known_chunks.contains(digest);
- if chunk_is_known {
- counters.inc_known_chunks(1);
- counters.inc_reused_stream_len(chunk_len);
- future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
- } else {
- let mut counters = counters.clone();
- known_chunks.insert(*digest);
- future::ready(chunk_builder.build().map(move |(chunk, digest)| {
- counters.inc_compressed_stream_len(chunk.raw_size());
- MergedChunkInfo::New(ChunkInfo {
- chunk,
- digest,
- chunk_len: chunk_len as u64,
- offset,
- })
- }))
- }
+ future::ok(res)
}
})
.merge_known_chunks();
@@ -848,8 +842,7 @@ impl BackupWriter {
let upload_chunk_path = format!("{prefix}_chunk");
let start_time = std::time::Instant::now();
- let total_stream_len = counters.total_stream_len_counter();
- let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let uploaded_len = Arc::new(AtomicUsize::new(0));
let (upload_queue, upload_result) =
Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
@@ -858,11 +851,12 @@ impl BackupWriter {
|| archive.ends_with(".pxar")
|| archive.ends_with(".ppxar")
{
+ let counters = counters.clone();
Some(tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
- let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
+ let size = HumanByte::from(counters.total_stream_len_counter());
let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
let elapsed = TimeSpan::from(start_time.elapsed());
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
index 4b2922012..b93b8b846 100644
--- a/pbs-client/src/inject_reused_chunks.rs
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -1,13 +1,13 @@
use std::cmp;
use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{mpsc, Arc};
+use std::sync::mpsc;
use std::task::{Context, Poll};
use anyhow::{anyhow, Error};
use futures::{ready, Stream};
use pin_project_lite::pin_project;
+use crate::backup_stats::UploadCounters;
use crate::pxar::create::ReusableDynamicEntry;
pin_project! {
@@ -16,7 +16,7 @@ pin_project! {
input: S,
next_injection: Option<InjectChunks>,
injections: Option<mpsc::Receiver<InjectChunks>>,
- stream_len: Arc<AtomicUsize>,
+ counters: UploadCounters,
}
}
@@ -42,7 +42,7 @@ pub trait InjectReusedChunks: Sized {
fn inject_reused_chunks(
self,
injections: Option<mpsc::Receiver<InjectChunks>>,
- stream_len: Arc<AtomicUsize>,
+ counters: UploadCounters,
) -> InjectReusedChunksQueue<Self>;
}
@@ -53,13 +53,13 @@ where
fn inject_reused_chunks(
self,
injections: Option<mpsc::Receiver<InjectChunks>>,
- stream_len: Arc<AtomicUsize>,
+ counters: UploadCounters,
) -> InjectReusedChunksQueue<Self> {
InjectReusedChunksQueue {
input: self,
next_injection: None,
injections,
- stream_len,
+ counters,
}
}
}
@@ -85,7 +85,7 @@ where
if let Some(inject) = this.next_injection.take() {
// got reusable dynamic entries to inject
- let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+ let offset = this.counters.total_stream_len_counter() as u64;
match inject.boundary.cmp(&offset) {
// inject now
On October 18, 2024 10:42 am, Christian Ebner wrote:
> In preparation for push support in sync jobs.
>
> Extend and move `BackupStats` into `backup_stats` submodule and add
> method to create them from `UploadStats`.
>
> Further, introduce `UploadCounters` struct to hold the Arc clones of
> the chunk upload statistics counters, simplifying the house keeping.
>
> By bundling the counters into the struct, they can be passed as
> single function parameter when factoring out the common stream future
> in the subsequent implementation of the chunk upload for sync jobs
> in push direction.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 4:
> - Rebased onto current master
>
> changes since version 3:
> - not present in previous version
>
> pbs-client/src/backup_stats.rs | 130 ++++++++++++++++++++++++++++++++
> pbs-client/src/backup_writer.rs | 111 +++++++++------------------
> pbs-client/src/lib.rs | 3 +
> 3 files changed, 169 insertions(+), 75 deletions(-)
> create mode 100644 pbs-client/src/backup_stats.rs
>
> diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
> new file mode 100644
> index 000000000..7aa618667
> --- /dev/null
> +++ b/pbs-client/src/backup_stats.rs
> @@ -0,0 +1,130 @@
> +//! Implements counters to generate statistics for log outputs during uploads with backup writer
> +
> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> +use std::sync::Arc;
> +use std::time::Duration;
> +
> +/// Basic backup run statistics and archive checksum
> +pub struct BackupStats {
> + pub size: u64,
> + pub csum: [u8; 32],
> + pub duration: Duration,
> + pub chunk_count: u64,
> +}
> +
> +/// Extended backup run statistics and archive checksum
> +pub(crate) struct UploadStats {
> + pub(crate) chunk_count: usize,
> + pub(crate) chunk_reused: usize,
> + pub(crate) chunk_injected: usize,
> + pub(crate) size: usize,
> + pub(crate) size_reused: usize,
> + pub(crate) size_injected: usize,
> + pub(crate) size_compressed: usize,
> + pub(crate) duration: Duration,
> + pub(crate) csum: [u8; 32],
> +}
> +
> +impl UploadStats {
> + /// Convert the upload stats to the more concise [`BackupStats`]
> + #[inline(always)]
> + pub(crate) fn to_backup_stats(&self) -> BackupStats {
> + BackupStats {
> + chunk_count: self.chunk_count as u64,
> + size: self.size as u64,
> + duration: self.duration,
> + csum: self.csum,
> + }
> + }
> +}
> +
> +/// Atomic counters for accounting upload stream progress information
> +#[derive(Clone)]
> +pub(crate) struct UploadCounters {
> + injected_chunk_count: Arc<AtomicUsize>,
> + known_chunk_count: Arc<AtomicUsize>,
> + total_chunk_count: Arc<AtomicUsize>,
> + compressed_stream_len: Arc<AtomicU64>,
> + injected_stream_len: Arc<AtomicUsize>,
> + reused_stream_len: Arc<AtomicUsize>,
> + total_stream_len: Arc<AtomicUsize>,
> +}
> +
> +impl UploadCounters {
> + /// Create and zero init new upload counters
> + pub(crate) fn new() -> Self {
> + Self {
> + total_chunk_count: Arc::new(AtomicUsize::new(0)),
> + injected_chunk_count: Arc::new(AtomicUsize::new(0)),
> + known_chunk_count: Arc::new(AtomicUsize::new(0)),
> + compressed_stream_len: Arc::new(AtomicU64::new(0)),
> + injected_stream_len: Arc::new(AtomicUsize::new(0)),
> + reused_stream_len: Arc::new(AtomicUsize::new(0)),
> + total_stream_len: Arc::new(AtomicUsize::new(0)),
> + }
> + }
> +
> + /// Increment total chunk counter by `count`, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize {
> + self.total_chunk_count.fetch_add(count, Ordering::SeqCst)
> + }
> +
> + /// Increment known chunk counter by `count`, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize {
> + self.known_chunk_count.fetch_add(count, Ordering::SeqCst)
> + }
> +
> + /// Increment injected chunk counter by `count`, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize {
> + self.injected_chunk_count.fetch_add(count, Ordering::SeqCst)
> + }
> +
> + /// Increment stream length counter by given size, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize {
> + self.total_stream_len.fetch_add(size, Ordering::SeqCst)
> + }
> +
> + /// Increment reused length counter by given size, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize {
> + self.reused_stream_len.fetch_add(size, Ordering::SeqCst)
> + }
> +
> + /// Increment compressed length counter by given size, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 {
> + self.compressed_stream_len.fetch_add(size, Ordering::SeqCst)
> + }
> +
> + /// Increment stream length counter by given size, returns previous value
> + #[inline(always)]
> + pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize {
> + self.injected_stream_len.fetch_add(size, Ordering::SeqCst)
> + }
> +
> + /// Return a Arc clone to the total stream length counter
> + #[inline(always)]
> + pub(crate) fn total_stream_len_counter(&self) -> Arc<AtomicUsize> {
> + self.total_stream_len.clone()
> + }
> +
> + /// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
> + #[inline(always)]
> + pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats {
> + UploadStats {
> + chunk_count: self.total_chunk_count.load(Ordering::SeqCst),
> + chunk_reused: self.known_chunk_count.load(Ordering::SeqCst),
> + chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst),
> + size: self.total_stream_len.load(Ordering::SeqCst),
> + size_reused: self.reused_stream_len.load(Ordering::SeqCst),
> + size_injected: self.injected_stream_len.load(Ordering::SeqCst),
> + size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize,
> + duration,
> + csum,
> + }
> + }
> +}
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 4d2e8a801..f08a65153 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,7 +1,8 @@
> use std::collections::HashSet;
> use std::future::Future;
> -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> +use std::time::Instant;
>
> use anyhow::{bail, format_err, Error};
> use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
> @@ -23,6 +24,7 @@ use pbs_tools::crypt_config::CryptConfig;
> use proxmox_human_byte::HumanByte;
> use proxmox_time::TimeSpan;
>
> +use super::backup_stats::{BackupStats, UploadCounters, UploadStats};
> use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
> use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>
> @@ -40,11 +42,6 @@ impl Drop for BackupWriter {
> }
> }
>
> -pub struct BackupStats {
> - pub size: u64,
> - pub csum: [u8; 32],
> -}
> -
> /// Options for uploading blobs/streams to the server
> #[derive(Default, Clone)]
> pub struct UploadOptions {
> @@ -54,18 +51,6 @@ pub struct UploadOptions {
> pub fixed_size: Option<u64>,
> }
>
> -struct UploadStats {
> - chunk_count: usize,
> - chunk_reused: usize,
> - chunk_injected: usize,
> - size: usize,
> - size_reused: usize,
> - size_injected: usize,
> - size_compressed: usize,
> - duration: std::time::Duration,
> - csum: [u8; 32],
> -}
> -
> struct ChunkUploadResponse {
> future: h2::client::ResponseFuture,
> size: usize,
> @@ -194,6 +179,7 @@ impl BackupWriter {
> mut reader: R,
> file_name: &str,
> ) -> Result<BackupStats, Error> {
> + let start_time = Instant::now();
> let mut raw_data = Vec::new();
> // fixme: avoid loading into memory
> reader.read_to_end(&mut raw_data)?;
> @@ -211,7 +197,12 @@ impl BackupWriter {
> raw_data,
> )
> .await?;
> - Ok(BackupStats { size, csum })
> + Ok(BackupStats {
> + size,
> + csum,
> + duration: start_time.elapsed(),
> + chunk_count: 0,
> + })
> }
>
> pub async fn upload_blob_from_data(
> @@ -220,6 +211,7 @@ impl BackupWriter {
> file_name: &str,
> options: UploadOptions,
> ) -> Result<BackupStats, Error> {
> + let start_time = Instant::now();
> let blob = match (options.encrypt, &self.crypt_config) {
> (false, _) => DataBlob::encode(&data, None, options.compress)?,
> (true, None) => bail!("requested encryption without a crypt config"),
> @@ -243,7 +235,12 @@ impl BackupWriter {
> raw_data,
> )
> .await?;
> - Ok(BackupStats { size, csum })
> + Ok(BackupStats {
> + size,
> + csum,
> + duration: start_time.elapsed(),
> + chunk_count: 0,
> + })
> }
>
> pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
> @@ -421,10 +418,7 @@ impl BackupWriter {
> "csum": hex::encode(upload_stats.csum),
> });
> let _value = self.h2.post(&close_path, Some(param)).await?;
> - Ok(BackupStats {
> - size: upload_stats.size as u64,
> - csum: upload_stats.csum,
> - })
> + Ok(upload_stats.to_backup_stats())
> }
>
> fn response_queue() -> (
> @@ -653,23 +647,10 @@ impl BackupWriter {
> injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
> archive: &str,
> ) -> impl Future<Output = Result<UploadStats, Error>> {
> - let total_chunks = Arc::new(AtomicUsize::new(0));
> - let total_chunks2 = total_chunks.clone();
> - let known_chunk_count = Arc::new(AtomicUsize::new(0));
> - let known_chunk_count2 = known_chunk_count.clone();
> - let injected_chunk_count = Arc::new(AtomicUsize::new(0));
> - let injected_chunk_count2 = injected_chunk_count.clone();
> -
> - let stream_len = Arc::new(AtomicUsize::new(0));
> - let stream_len2 = stream_len.clone();
> - let stream_len3 = stream_len.clone();
> - let compressed_stream_len = Arc::new(AtomicU64::new(0));
> - let compressed_stream_len2 = compressed_stream_len.clone();
> - let reused_len = Arc::new(AtomicUsize::new(0));
> - let reused_len2 = reused_len.clone();
> - let injected_len = Arc::new(AtomicUsize::new(0));
> - let injected_len2 = injected_len.clone();
> - let uploaded_len = Arc::new(AtomicUsize::new(0));
> + let mut counters = UploadCounters::new();
> + let total_stream_len = counters.total_stream_len_counter();
> + let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
> + let counters_readonly = counters.clone();
>
> let append_chunk_path = format!("{}_index", prefix);
> let upload_chunk_path = format!("{}_chunk", prefix);
> @@ -691,7 +672,7 @@ impl BackupWriter {
> loop {
> tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
>
> - let size = HumanByte::from(stream_len3.load(Ordering::SeqCst));
> + let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
> let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
> let elapsed = TimeSpan::from(start_time.elapsed());
>
> @@ -703,22 +684,21 @@ impl BackupWriter {
> };
>
> stream
> - .inject_reused_chunks(injections, stream_len.clone())
> + .inject_reused_chunks(injections, counters.total_stream_len_counter())
> .and_then(move |chunk_info| match chunk_info {
> InjectedChunksInfo::Known(chunks) => {
> // account for injected chunks
> let count = chunks.len();
> - total_chunks.fetch_add(count, Ordering::SeqCst);
> - injected_chunk_count.fetch_add(count, Ordering::SeqCst);
> + counters.inc_total_chunks(count);
> + counters.inc_injected_chunks(count);
>
> let mut known = Vec::new();
> let mut guard = index_csum.lock().unwrap();
> let csum = guard.as_mut().unwrap();
> for chunk in chunks {
> - let offset =
> - stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
> - reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> - injected_len.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> + let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64;
> + counters.inc_reused_stream_len(chunk.size() as usize);
> + counters.inc_injected_stream_len(chunk.size() as usize);
> let digest = chunk.digest();
> known.push((offset, digest));
> let end_offset = offset + chunk.size();
> @@ -731,8 +711,8 @@ impl BackupWriter {
> // account for not injected chunks (new and known)
> let chunk_len = data.len();
>
> - total_chunks.fetch_add(1, Ordering::SeqCst);
> - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
> + counters.inc_total_chunks(1);
> + let offset = counters.inc_total_stream_len(chunk_len) as u64;
>
> let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
>
> @@ -755,14 +735,14 @@ impl BackupWriter {
>
> let chunk_is_known = known_chunks.contains(digest);
> if chunk_is_known {
> - known_chunk_count.fetch_add(1, Ordering::SeqCst);
> - reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> + counters.inc_known_chunks(1);
> + counters.inc_reused_stream_len(chunk_len);
> future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> } else {
> - let compressed_stream_len2 = compressed_stream_len.clone();
> + let mut counters = counters.clone();
> known_chunks.insert(*digest);
> future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> + counters.inc_compressed_stream_len(chunk.raw_size());
> MergedChunkInfo::New(ChunkInfo {
> chunk,
> digest,
> @@ -837,15 +817,6 @@ impl BackupWriter {
> })
> .then(move |result| async move { upload_result.await?.and(result) }.boxed())
> .and_then(move |_| {
> - let duration = start_time.elapsed();
> - let chunk_count = total_chunks2.load(Ordering::SeqCst);
> - let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> - let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst);
> - let size = stream_len2.load(Ordering::SeqCst);
> - let size_reused = reused_len2.load(Ordering::SeqCst);
> - let size_injected = injected_len2.load(Ordering::SeqCst);
> - let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
> -
> let mut guard = index_csum_2.lock().unwrap();
> let csum = guard.take().unwrap().finish();
>
> @@ -853,17 +824,7 @@ impl BackupWriter {
> handle.abort();
> }
>
> - futures::future::ok(UploadStats {
> - chunk_count,
> - chunk_reused,
> - chunk_injected,
> - size,
> - size_reused,
> - size_injected,
> - size_compressed,
> - duration,
> - csum,
> - })
> + futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
> })
> }
>
> diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
> index 3d2da27b9..b875347bb 100644
> --- a/pbs-client/src/lib.rs
> +++ b/pbs-client/src/lib.rs
> @@ -41,4 +41,7 @@ pub use backup_specification::*;
> mod chunk_stream;
> pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData};
>
> +mod backup_stats;
> +pub use backup_stats::BackupStats;
> +
> pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
> --
> 2.39.5
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list