[pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload
Christian Ebner
c.ebner at proxmox.com
Mon Jul 15 12:15:48 CEST 2024
In preparation for implementing push support for sync jobs.
Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++-------------
1 file changed, 29 insertions(+), 18 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a67b471a7..6daad9fde 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@@ -675,19 +676,12 @@ impl BackupWriter {
stream_len: stream_len.clone(),
};
- let append_chunk_path = format!("{}_index", prefix);
- let upload_chunk_path = format!("{}_chunk", prefix);
let is_fixed_chunk_size = prefix == "fixed";
- let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
-
- let start_time = std::time::Instant::now();
-
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
let index_csum_2 = index_csum.clone();
- stream
+ let stream = stream
.inject_reused_chunks(injections, stream_len.clone())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
@@ -758,7 +752,28 @@ impl BackupWriter {
}
}
})
- .merge_known_chunks()
+ .merge_known_chunks();
+
+ Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters)
+ }
+
+ fn upload_merged_chunk_stream(
+ h2: H2Client,
+ wid: u64,
+ prefix: &str,
+ stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+ index_csum: Arc<Mutex<Option<Sha256>>>,
+ counters: UploadStatsCounters,
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
+ let append_chunk_path = format!("{prefix}_index");
+ let upload_chunk_path = format!("{prefix}_chunk");
+
+ let (upload_queue, upload_result) =
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+
+ let start_time = std::time::Instant::now();
+
+ stream
.try_for_each(move |merged_chunk_info| {
let upload_queue = upload_queue.clone();
@@ -768,10 +783,8 @@ impl BackupWriter {
let digest_str = hex::encode(digest);
log::trace!(
- "upload new chunk {} ({} bytes, offset {})",
- digest_str,
- chunk_info.chunk_len,
- offset
+ "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})",
+ chunk_len = chunk_info.chunk_len,
);
let chunk_data = chunk_info.chunk.into_inner();
@@ -800,9 +813,7 @@ impl BackupWriter {
upload_queue
.send((new_info, Some(response)))
.await
- .map_err(|err| {
- format_err!("failed to send to upload queue: {}", err)
- })
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
},
))
} else {
@@ -810,13 +821,13 @@ impl BackupWriter {
upload_queue
.send((merged_chunk_info, None))
.await
- .map_err(|err| format_err!("failed to send to upload queue: {}", err))
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
})
}
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let mut guard = index_csum_2.lock().unwrap();
+ let mut guard = index_csum.lock().unwrap();
let csum = guard.take().unwrap().finish();
futures::future::ok(UploadStats {
--
2.39.2
More information about the pbs-devel
mailing list