[pbs-devel] [PATCH v5 proxmox-backup 02/31] client: backup writer: factor out merged chunk stream upload
Christian Ebner
c.ebner at proxmox.com
Fri Oct 18 10:42:13 CEST 2024
In preparation for implementing push support for sync jobs.
Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 4:
- Rebased onto current master
changes since version 3:
- adapted to refactored upload stat counters
pbs-client/src/backup_writer.rs | 88 +++++++++++++++++++++------------
1 file changed, 56 insertions(+), 32 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index f08a65153..1ec181f99 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::time::Instant;
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@@ -648,42 +649,14 @@ impl BackupWriter {
archive: &str,
) -> impl Future<Output = Result<UploadStats, Error>> {
let mut counters = UploadCounters::new();
- let total_stream_len = counters.total_stream_len_counter();
- let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counters_readonly = counters.clone();
- let append_chunk_path = format!("{}_index", prefix);
- let upload_chunk_path = format!("{}_chunk", prefix);
let is_fixed_chunk_size = prefix == "fixed";
- let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
-
- let start_time = std::time::Instant::now();
-
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
let index_csum_2 = index_csum.clone();
- let progress_handle = if archive.ends_with(".img")
- || archive.ends_with(".pxar")
- || archive.ends_with(".ppxar")
- {
- Some(tokio::spawn(async move {
- loop {
- tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-
- let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
- let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
- let elapsed = TimeSpan::from(start_time.elapsed());
-
- log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
- }
- }))
- } else {
- None
- };
-
- stream
+ let stream = stream
.inject_reused_chunks(injections, counters.total_stream_len_counter())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
@@ -753,7 +726,58 @@ impl BackupWriter {
}
}
})
- .merge_known_chunks()
+ .merge_known_chunks();
+
+ Self::upload_merged_chunk_stream(
+ h2,
+ wid,
+ archive,
+ prefix,
+ stream,
+ index_csum_2,
+ counters_readonly,
+ )
+ }
+
+ fn upload_merged_chunk_stream(
+ h2: H2Client,
+ wid: u64,
+ archive: &str,
+ prefix: &str,
+ stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+ index_csum: Arc<Mutex<Option<Sha256>>>,
+ counters: UploadCounters,
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
+ let append_chunk_path = format!("{prefix}_index");
+ let upload_chunk_path = format!("{prefix}_chunk");
+
+ let start_time = std::time::Instant::now();
+ let total_stream_len = counters.total_stream_len_counter();
+ let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+
+ let (upload_queue, upload_result) =
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
+
+ let progress_handle = if archive.ends_with(".img")
+ || archive.ends_with(".pxar")
+ || archive.ends_with(".ppxar")
+ {
+ Some(tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+ let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
+ let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
+ let elapsed = TimeSpan::from(start_time.elapsed());
+
+ log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
+ }
+ }))
+ } else {
+ None
+ };
+
+ stream
.try_for_each(move |merged_chunk_info| {
let upload_queue = upload_queue.clone();
@@ -817,14 +841,14 @@ impl BackupWriter {
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let mut guard = index_csum_2.lock().unwrap();
+ let mut guard = index_csum.lock().unwrap();
let csum = guard.take().unwrap().finish();
if let Some(handle) = progress_handle {
handle.abort();
}
- futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
+ futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed()))
})
}
--
2.39.5
More information about the pbs-devel
mailing list