[pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size
Thomas Lamprecht
t.lamprecht at proxmox.com
Wed Mar 24 16:44:38 CET 2021
On 24.03.21 16:29, Dominik Csapak wrote:
> The text 'had to upload [KMG]iB' implies that this is the size we
> actually had to send to the server, while in reality it is the
> raw data size before compression.
>
> Count the size of the compressed chunks and print it separately.
> Split the average speed into its own line so they do not get too long.
>
> Introduce an 'UploadStats' struct instead of using a giant anonymous tuple
> for the return values of 'upload_chunk_info_stream'.
>
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> changes from v1:
> * properly format log lines (archive in front, no dots at the end, etc.)
> * introduce UploadStats struct and use that
> * rustfmt relevant code
can we please stop the squashing multiple changes into one patch?
This needs to be at least two separate patches:
1. Non-semantic change to struct
2. clarify backup/upload size
rustfmt may be well its own patch too (first or last, I do not care to much
for that)
> src/client/backup_writer.rs | 141 ++++++++++++++++++++++++------------
> 1 file changed, 96 insertions(+), 45 deletions(-)
>
> diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
> index cef7edef..3dcd7f57 100644
> --- a/src/client/backup_writer.rs
> +++ b/src/client/backup_writer.rs
> @@ -1,6 +1,6 @@
> use std::collections::HashSet;
> use std::os::unix::fs::OpenOptionsExt;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
> use std::sync::{Arc, Mutex};
>
> use anyhow::{bail, format_err, Error};
> @@ -48,6 +48,16 @@ pub struct UploadOptions {
> pub fixed_size: Option<u64>,
> }
>
> +struct UploadStats {
> + chunk_count: usize,
> + chunk_reused: usize,
> + size: usize,
> + size_reused: usize,
> + size_compressed: usize,
> + duration: std::time::Duration,
> + csum: [u8; 32],
> +}
> +
> type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
> type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
>
> @@ -256,55 +266,81 @@ impl BackupWriter {
>
> let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
>
> - let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
> - Self::upload_chunk_info_stream(
> - self.h2.clone(),
> - wid,
> - stream,
> - &prefix,
> - known_chunks.clone(),
> - if options.encrypt { self.crypt_config.clone() } else { None },
> - options.compress,
> - self.verbose,
> - )
> - .await?;
> -
> - let uploaded = size - size_reused;
> - let vsize_h: HumanByte = size.into();
> + let upload_stats = Self::upload_chunk_info_stream(
> + self.h2.clone(),
> + wid,
> + stream,
> + &prefix,
> + known_chunks.clone(),
> + if options.encrypt {
> + self.crypt_config.clone()
> + } else {
> + None
> + },
> + options.compress,
> + self.verbose,
> + )
> + .await?;
> +
> + let size_dirty = upload_stats.size - upload_stats.size_reused;
> + let size: HumanByte = upload_stats.size.into();
> let archive = if self.verbose {
> archive_name.to_string()
> } else {
> crate::tools::format::strip_server_file_extension(archive_name)
> };
> if archive_name != CATALOG_NAME {
> - let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
> - let uploaded: HumanByte = uploaded.into();
> - println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
> + let speed: HumanByte = ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
> + let size_dirty: HumanByte = size_dirty.into();
> + let size_compressed: HumanByte = upload_stats.size_compressed.into();
> + println!(
> + "{}: had to backup {} of {} (compressed {}) in {:.2}s",
> + archive,
> + size_dirty,
> + size,
> + size_compressed,
> + upload_stats.duration.as_secs_f64()
> + );
> + println!("{}: average backup speed: {}/s", archive, speed);
> } else {
> - println!("Uploaded backup catalog ({})", vsize_h);
> + println!("Uploaded backup catalog ({})", size);
> }
>
> - if size_reused > 0 && size > 1024*1024 {
> - let reused_percent = size_reused as f64 * 100. / size as f64;
> - let reused: HumanByte = size_reused.into();
> - println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
> + if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
> + let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
> + let reused: HumanByte = upload_stats.size_reused.into();
> + println!(
> + "{}: backup was done incrementally, reused {} ({:.1}%)",
> + archive, reused, reused_percent
> + );
> }
> - if self.verbose && chunk_count > 0 {
> - println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
> - println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
> - println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
> + if self.verbose && upload_stats.chunk_count > 0 {
> + println!(
> + "{}: Reused {} from {} chunks.",
> + archive, upload_stats.chunk_reused, upload_stats.chunk_count
> + );
> + println!(
> + "{}: Average chunk size was {}.",
> + archive,
> + HumanByte::from(upload_stats.size / upload_stats.chunk_count)
> + );
> + println!(
> + "{}: Average time per request: {} microseconds.",
> + archive,
> + (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
> + );
> }
>
> let param = json!({
> "wid": wid ,
> - "chunk-count": chunk_count,
> - "size": size,
> - "csum": proxmox::tools::digest_to_hex(&csum),
> + "chunk-count": upload_stats.chunk_count,
> + "size": upload_stats.size,
> + "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
> });
> let _value = self.h2.post(&close_path, Some(param)).await?;
> Ok(BackupStats {
> - size: size as u64,
> - csum,
> + size: upload_stats.size as u64,
> + csum: upload_stats.csum,
> })
> }
>
> @@ -521,7 +557,7 @@ impl BackupWriter {
> crypt_config: Option<Arc<CryptConfig>>,
> compress: bool,
> verbose: bool,
> - ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
> + ) -> impl Future<Output = Result<UploadStats, Error>> {
>
> let total_chunks = Arc::new(AtomicUsize::new(0));
> let total_chunks2 = total_chunks.clone();
> @@ -530,6 +566,8 @@ impl BackupWriter {
>
> let stream_len = Arc::new(AtomicUsize::new(0));
> let stream_len2 = stream_len.clone();
> + let compressed_stream_len = Arc::new(AtomicU64::new(0));
> + let compressed_stream_len2 = compressed_stream_len.clone();
> let reused_len = Arc::new(AtomicUsize::new(0));
> let reused_len2 = reused_len.clone();
>
> @@ -572,6 +610,7 @@ impl BackupWriter {
> csum.update(digest);
>
> let chunk_is_known = known_chunks.contains(digest);
> + let compressed_stream_len2 = compressed_stream_len.clone();
> if chunk_is_known {
> known_chunk_count.fetch_add(1, Ordering::SeqCst);
> reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> @@ -580,12 +619,15 @@ impl BackupWriter {
> known_chunks.insert(*digest);
> future::ready(chunk_builder
> .build()
> - .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
> - chunk,
> - digest,
> - chunk_len: chunk_len as u64,
> - offset,
> - }))
> + .map(move |(chunk, digest)| {
> + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> + MergedChunkInfo::New(ChunkInfo {
> + chunk,
> + digest,
> + chunk_len: chunk_len as u64,
> + offset,
> + })
> + })
> )
> }
> })
> @@ -642,15 +684,24 @@ impl BackupWriter {
> }.boxed())
> .and_then(move |_| {
> let duration = start_time.elapsed();
> - let total_chunks = total_chunks2.load(Ordering::SeqCst);
> - let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
> - let stream_len = stream_len2.load(Ordering::SeqCst);
> - let reused_len = reused_len2.load(Ordering::SeqCst);
> + let chunk_count = total_chunks2.load(Ordering::SeqCst);
> + let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> + let size = stream_len2.load(Ordering::SeqCst);
> + let size_reused = reused_len2.load(Ordering::SeqCst);
> + let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>
> let mut guard = index_csum_2.lock().unwrap();
> let csum = guard.take().unwrap().finish();
>
> - futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
> + futures::future::ok(UploadStats{
> + chunk_count,
> + chunk_reused,
> + size,
> + size_reused,
> + size_compressed,
> + duration,
> + csum,
> + })
> })
> }
>
>
More information about the pbs-devel
mailing list