[pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size
Dominik Csapak
d.csapak at proxmox.com
Wed Mar 24 16:29:15 CET 2021
The text 'had to upload [KMG]iB' implies that this is the size we
actually had to send to the server, while in reality it is the
raw data size before compression.
Count the size of the compressed chunks and print it separately.
Split the average speed into its own line so they do not get too long.
Introduce an 'UploadStats' struct instead of using a giant anonymous tuple
for the return values of 'upload_chunk_info_stream'.
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
changes from v1:
* properly format log lines (archive in front, no dots at the end, etc.)
* introduce UploadStats struct and use that
* rustfmt relevant code
src/client/backup_writer.rs | 141 ++++++++++++++++++++++++------------
1 file changed, 96 insertions(+), 45 deletions(-)
diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
index cef7edef..3dcd7f57 100644
--- a/src/client/backup_writer.rs
+++ b/src/client/backup_writer.rs
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::os::unix::fs::OpenOptionsExt;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
@@ -48,6 +48,16 @@ pub struct UploadOptions {
pub fixed_size: Option<u64>,
}
+struct UploadStats {
+ chunk_count: usize,
+ chunk_reused: usize,
+ size: usize,
+ size_reused: usize,
+ size_compressed: usize,
+ duration: std::time::Duration,
+ csum: [u8; 32],
+}
+
type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
@@ -256,55 +266,81 @@ impl BackupWriter {
let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
- let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
- Self::upload_chunk_info_stream(
- self.h2.clone(),
- wid,
- stream,
- &prefix,
- known_chunks.clone(),
- if options.encrypt { self.crypt_config.clone() } else { None },
- options.compress,
- self.verbose,
- )
- .await?;
-
- let uploaded = size - size_reused;
- let vsize_h: HumanByte = size.into();
+ let upload_stats = Self::upload_chunk_info_stream(
+ self.h2.clone(),
+ wid,
+ stream,
+ &prefix,
+ known_chunks.clone(),
+ if options.encrypt {
+ self.crypt_config.clone()
+ } else {
+ None
+ },
+ options.compress,
+ self.verbose,
+ )
+ .await?;
+
+ let size_dirty = upload_stats.size - upload_stats.size_reused;
+ let size: HumanByte = upload_stats.size.into();
let archive = if self.verbose {
archive_name.to_string()
} else {
crate::tools::format::strip_server_file_extension(archive_name)
};
if archive_name != CATALOG_NAME {
- let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
- let uploaded: HumanByte = uploaded.into();
- println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
+ let speed: HumanByte = ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
+ let size_dirty: HumanByte = size_dirty.into();
+ let size_compressed: HumanByte = upload_stats.size_compressed.into();
+ println!(
+ "{}: had to backup {} of {} (compressed {}) in {:.2}s",
+ archive,
+ size_dirty,
+ size,
+ size_compressed,
+ upload_stats.duration.as_secs_f64()
+ );
+ println!("{}: average backup speed: {}/s", archive, speed);
} else {
- println!("Uploaded backup catalog ({})", vsize_h);
+ println!("Uploaded backup catalog ({})", size);
}
- if size_reused > 0 && size > 1024*1024 {
- let reused_percent = size_reused as f64 * 100. / size as f64;
- let reused: HumanByte = size_reused.into();
- println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
+ if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
+ let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
+ let reused: HumanByte = upload_stats.size_reused.into();
+ println!(
+ "{}: backup was done incrementally, reused {} ({:.1}%)",
+ archive, reused, reused_percent
+ );
}
- if self.verbose && chunk_count > 0 {
- println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
- println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
- println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
+ if self.verbose && upload_stats.chunk_count > 0 {
+ println!(
+ "{}: Reused {} from {} chunks.",
+ archive, upload_stats.chunk_reused, upload_stats.chunk_count
+ );
+ println!(
+ "{}: Average chunk size was {}.",
+ archive,
+ HumanByte::from(upload_stats.size / upload_stats.chunk_count)
+ );
+ println!(
+ "{}: Average time per request: {} microseconds.",
+ archive,
+ (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
+ );
}
let param = json!({
"wid": wid ,
- "chunk-count": chunk_count,
- "size": size,
- "csum": proxmox::tools::digest_to_hex(&csum),
+ "chunk-count": upload_stats.chunk_count,
+ "size": upload_stats.size,
+ "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
});
let _value = self.h2.post(&close_path, Some(param)).await?;
Ok(BackupStats {
- size: size as u64,
- csum,
+ size: upload_stats.size as u64,
+ csum: upload_stats.csum,
})
}
@@ -521,7 +557,7 @@ impl BackupWriter {
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
verbose: bool,
- ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
let total_chunks = Arc::new(AtomicUsize::new(0));
let total_chunks2 = total_chunks.clone();
@@ -530,6 +566,8 @@ impl BackupWriter {
let stream_len = Arc::new(AtomicUsize::new(0));
let stream_len2 = stream_len.clone();
+ let compressed_stream_len = Arc::new(AtomicU64::new(0));
+ let compressed_stream_len2 = compressed_stream_len.clone();
let reused_len = Arc::new(AtomicUsize::new(0));
let reused_len2 = reused_len.clone();
@@ -572,6 +610,7 @@ impl BackupWriter {
csum.update(digest);
let chunk_is_known = known_chunks.contains(digest);
+ let compressed_stream_len2 = compressed_stream_len.clone();
if chunk_is_known {
known_chunk_count.fetch_add(1, Ordering::SeqCst);
reused_len.fetch_add(chunk_len, Ordering::SeqCst);
@@ -580,12 +619,15 @@ impl BackupWriter {
known_chunks.insert(*digest);
future::ready(chunk_builder
.build()
- .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
- chunk,
- digest,
- chunk_len: chunk_len as u64,
- offset,
- }))
+ .map(move |(chunk, digest)| {
+ compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+ MergedChunkInfo::New(ChunkInfo {
+ chunk,
+ digest,
+ chunk_len: chunk_len as u64,
+ offset,
+ })
+ })
)
}
})
@@ -642,15 +684,24 @@ impl BackupWriter {
}.boxed())
.and_then(move |_| {
let duration = start_time.elapsed();
- let total_chunks = total_chunks2.load(Ordering::SeqCst);
- let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
- let stream_len = stream_len2.load(Ordering::SeqCst);
- let reused_len = reused_len2.load(Ordering::SeqCst);
+ let chunk_count = total_chunks2.load(Ordering::SeqCst);
+ let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
+ let size = stream_len2.load(Ordering::SeqCst);
+ let size_reused = reused_len2.load(Ordering::SeqCst);
+ let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
let mut guard = index_csum_2.lock().unwrap();
let csum = guard.take().unwrap().finish();
- futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
+ futures::future::ok(UploadStats{
+ chunk_count,
+ chunk_reused,
+ size,
+ size_reused,
+ size_compressed,
+ duration,
+ csum,
+ })
})
}
--
2.20.1
More information about the pbs-devel
mailing list