[pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size

Thomas Lamprecht t.lamprecht at proxmox.com
Wed Mar 24 16:44:38 CET 2021


On 24.03.21 16:29, Dominik Csapak wrote:
> The text 'had to upload [KMG]iB' implies that this is the size we
> actually had to send to the server, while in reality it is the
> raw data size before compression.
> 
> Count the size of the compressed chunks and print it separately.
> Split the average speed into its own line so they do not get too long.
> 
> Introduce an 'UploadStats' struct instead of using a giant anonymous tuple
> for the return values of 'upload_chunk_info_stream'.
> 
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> changes from v1:
> * properly format log lines (archive in front, no dots at the end, etc.)
> * introduce UploadStats struct and use that
> * rustfmt relevant code


can we please stop the squashing multiple changes into one patch?

This needs to be at least two separate patches:

1. Non-semantic change to struct
2. clarify backup/upload size

rustfmt may be well its own patch too (first or last, I do not care to much
for that)

>  src/client/backup_writer.rs | 141 ++++++++++++++++++++++++------------
>  1 file changed, 96 insertions(+), 45 deletions(-)
> 
> diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
> index cef7edef..3dcd7f57 100644
> --- a/src/client/backup_writer.rs
> +++ b/src/client/backup_writer.rs
> @@ -1,6 +1,6 @@
>  use std::collections::HashSet;
>  use std::os::unix::fs::OpenOptionsExt;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
>  use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
> @@ -48,6 +48,16 @@ pub struct UploadOptions {
>      pub fixed_size: Option<u64>,
>  }
>  
> +struct UploadStats {
> +    chunk_count: usize,
> +    chunk_reused: usize,
> +    size: usize,
> +    size_reused: usize,
> +    size_compressed: usize,
> +    duration: std::time::Duration,
> +    csum: [u8; 32],
> +}
> +
>  type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
>  type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
>  
> @@ -256,55 +266,81 @@ impl BackupWriter {
>  
>          let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
>  
> -        let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
> -            Self::upload_chunk_info_stream(
> -                self.h2.clone(),
> -                wid,
> -                stream,
> -                &prefix,
> -                known_chunks.clone(),
> -                if options.encrypt { self.crypt_config.clone() } else { None },
> -                options.compress,
> -                self.verbose,
> -            )
> -            .await?;
> -
> -        let uploaded = size - size_reused;
> -        let vsize_h: HumanByte = size.into();
> +        let upload_stats = Self::upload_chunk_info_stream(
> +            self.h2.clone(),
> +            wid,
> +            stream,
> +            &prefix,
> +            known_chunks.clone(),
> +            if options.encrypt {
> +                self.crypt_config.clone()
> +            } else {
> +                None
> +            },
> +            options.compress,
> +            self.verbose,
> +        )
> +        .await?;
> +
> +        let size_dirty = upload_stats.size - upload_stats.size_reused;
> +        let size: HumanByte = upload_stats.size.into();
>          let archive = if self.verbose {
>              archive_name.to_string()
>          } else {
>              crate::tools::format::strip_server_file_extension(archive_name)
>          };
>          if archive_name != CATALOG_NAME {
> -            let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
> -            let uploaded: HumanByte = uploaded.into();
> -            println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
> +            let speed: HumanByte = ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
> +            let size_dirty: HumanByte = size_dirty.into();
> +            let size_compressed: HumanByte = upload_stats.size_compressed.into();
> +            println!(
> +                "{}: had to backup {} of {} (compressed {}) in {:.2}s",
> +                archive,
> +                size_dirty,
> +                size,
> +                size_compressed,
> +                upload_stats.duration.as_secs_f64()
> +            );
> +            println!("{}: average backup speed: {}/s", archive, speed);
>          } else {
> -            println!("Uploaded backup catalog ({})", vsize_h);
> +            println!("Uploaded backup catalog ({})", size);
>          }
>  
> -        if size_reused > 0 && size > 1024*1024 {
> -            let reused_percent = size_reused as f64 * 100. / size as f64;
> -            let reused: HumanByte = size_reused.into();
> -            println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
> +        if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
> +            let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
> +            let reused: HumanByte = upload_stats.size_reused.into();
> +            println!(
> +                "{}: backup was done incrementally, reused {} ({:.1}%)",
> +                archive, reused, reused_percent
> +            );
>          }
> -        if self.verbose && chunk_count > 0 {
> -            println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
> -            println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
> -            println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
> +        if self.verbose && upload_stats.chunk_count > 0 {
> +            println!(
> +                "{}: Reused {} from {} chunks.",
> +                archive, upload_stats.chunk_reused, upload_stats.chunk_count
> +            );
> +            println!(
> +                "{}: Average chunk size was {}.",
> +                archive,
> +                HumanByte::from(upload_stats.size / upload_stats.chunk_count)
> +            );
> +            println!(
> +                "{}: Average time per request: {} microseconds.",
> +                archive,
> +                (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
> +            );
>          }
>  
>          let param = json!({
>              "wid": wid ,
> -            "chunk-count": chunk_count,
> -            "size": size,
> -            "csum": proxmox::tools::digest_to_hex(&csum),
> +            "chunk-count": upload_stats.chunk_count,
> +            "size": upload_stats.size,
> +            "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
>          });
>          let _value = self.h2.post(&close_path, Some(param)).await?;
>          Ok(BackupStats {
> -            size: size as u64,
> -            csum,
> +            size: upload_stats.size as u64,
> +            csum: upload_stats.csum,
>          })
>      }
>  
> @@ -521,7 +557,7 @@ impl BackupWriter {
>          crypt_config: Option<Arc<CryptConfig>>,
>          compress: bool,
>          verbose: bool,
> -    ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
> +    ) -> impl Future<Output = Result<UploadStats, Error>> {
>  
>          let total_chunks = Arc::new(AtomicUsize::new(0));
>          let total_chunks2 = total_chunks.clone();
> @@ -530,6 +566,8 @@ impl BackupWriter {
>  
>          let stream_len = Arc::new(AtomicUsize::new(0));
>          let stream_len2 = stream_len.clone();
> +        let compressed_stream_len = Arc::new(AtomicU64::new(0));
> +        let compressed_stream_len2 = compressed_stream_len.clone();
>          let reused_len = Arc::new(AtomicUsize::new(0));
>          let reused_len2 = reused_len.clone();
>  
> @@ -572,6 +610,7 @@ impl BackupWriter {
>                  csum.update(digest);
>  
>                  let chunk_is_known = known_chunks.contains(digest);
> +                let compressed_stream_len2 = compressed_stream_len.clone();
>                  if chunk_is_known {
>                      known_chunk_count.fetch_add(1, Ordering::SeqCst);
>                      reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> @@ -580,12 +619,15 @@ impl BackupWriter {
>                      known_chunks.insert(*digest);
>                      future::ready(chunk_builder
>                          .build()
> -                        .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
> -                            chunk,
> -                            digest,
> -                            chunk_len: chunk_len as u64,
> -                            offset,
> -                        }))
> +                        .map(move |(chunk, digest)| {
> +                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                            MergedChunkInfo::New(ChunkInfo {
> +                                chunk,
> +                                digest,
> +                                chunk_len: chunk_len as u64,
> +                                offset,
> +                            })
> +                        })
>                      )
>                  }
>              })
> @@ -642,15 +684,24 @@ impl BackupWriter {
>              }.boxed())
>              .and_then(move |_| {
>                  let duration = start_time.elapsed();
> -                let total_chunks = total_chunks2.load(Ordering::SeqCst);
> -                let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
> -                let stream_len = stream_len2.load(Ordering::SeqCst);
> -                let reused_len = reused_len2.load(Ordering::SeqCst);
> +                let chunk_count = total_chunks2.load(Ordering::SeqCst);
> +                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> +                let size = stream_len2.load(Ordering::SeqCst);
> +                let size_reused = reused_len2.load(Ordering::SeqCst);
> +                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>  
>                  let mut guard = index_csum_2.lock().unwrap();
>                  let csum = guard.take().unwrap().finish();
>  
> -                futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
> +                futures::future::ok(UploadStats{
> +                    chunk_count,
> +                    chunk_reused,
> +                    size,
> +                    size_reused,
> +                    size_compressed,
> +                    duration,
> +                    csum,
> +                })
>              })
>      }
>  
> 






More information about the pbs-devel mailing list