[pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues
Fabian Grünbichler
f.gruenbichler at proxmox.com
Thu Apr 4 16:52:15 CEST 2024
On March 28, 2024 1:36 pm, Christian Ebner wrote:
> Adds a queue to the chunk stream to request forced boundaries at a
> given offset within the stream and inject reused dynamic entries
> after this boundary.
>
> The chunks are then passed along to the uploader stream using the
> injection queue, which inserts them during upload.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - combined queues into new optional struct
> - refactoring
>
> examples/test_chunk_speed2.rs | 2 +-
> pbs-client/src/backup_writer.rs | 89 +++++++++++--------
> pbs-client/src/chunk_stream.rs | 36 +++++++-
> pbs-client/src/pxar/create.rs | 6 +-
> pbs-client/src/pxar_backup_stream.rs | 7 +-
> proxmox-backup-client/src/main.rs | 31 ++++---
> .../src/proxmox_restore_daemon/api.rs | 1 +
> pxar-bin/src/main.rs | 1 +
> tests/catar.rs | 1 +
> 9 files changed, 121 insertions(+), 53 deletions(-)
>
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436d..22dd14ce2 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
> .map_err(Error::from);
>
> //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> - let mut chunk_stream = ChunkStream::new(stream, None);
> + let mut chunk_stream = ChunkStream::new(stream, None, None);
>
> let start_time = std::time::Instant::now();
>
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 8bd0e4f36..032d93da7 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
> use std::future::Future;
> use std::os::unix::fs::OpenOptionsExt;
> use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>
> use proxmox_human_byte::HumanByte;
>
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
> use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>
> use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
> archive_name: &str,
> stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
> options: UploadOptions,
> + injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
> ) -> Result<BackupStats, Error> {
> let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>
> @@ -341,6 +343,7 @@ impl BackupWriter {
> None
> },
> options.compress,
> + injection_queue,
> )
> .await?;
>
> @@ -637,6 +640,7 @@ impl BackupWriter {
> known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> crypt_config: Option<Arc<CryptConfig>>,
> compress: bool,
> + injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
> ) -> impl Future<Output = Result<UploadStats, Error>> {
> let total_chunks = Arc::new(AtomicUsize::new(0));
> let total_chunks2 = total_chunks.clone();
> @@ -663,48 +667,63 @@ impl BackupWriter {
> let index_csum_2 = index_csum.clone();
>
> stream
> - .and_then(move |data| {
> - let chunk_len = data.len();
> + .inject_reused_chunks(
> + injection_queue.unwrap_or_default(),
> + stream_len,
> + reused_len.clone(),
> + index_csum.clone(),
> + )
> + .and_then(move |chunk_info| match chunk_info {
for this part here I am still not sure whether doing all of the
accounting here wouldn't be nicer..
> [..]
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index a45420ca0..6ac0c638b 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -38,15 +38,17 @@ pub struct ChunkStream<S: Unpin> {
> chunker: Chunker,
> buffer: BytesMut,
> scan_pos: usize,
> + injection_data: Option<InjectionData>,
> }
>
> impl<S: Unpin> ChunkStream<S> {
> - pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> + pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
> Self {
> input,
> chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
> buffer: BytesMut::new(),
> scan_pos: 0,
> + injection_data,
> }
> }
> }
> @@ -64,6 +66,34 @@ where
> fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> let this = self.get_mut();
> loop {
> + if let Some(InjectionData {
> + boundaries,
> + injections,
> + consumed,
> + }) = this.injection_data.as_mut()
> + {
> + // Make sure to release this lock as soon as possible
> + let mut boundaries = boundaries.lock().unwrap();
> + if let Some(inject) = boundaries.pop_front() {
here I am a bit more wary that this popping and re-pushing might hurt
performance..
> + let max = *consumed + this.buffer.len() as u64;
> + if inject.boundary <= max {
> + let chunk_size = (inject.boundary - *consumed) as usize;
> + let result = this.buffer.split_to(chunk_size);
a comment or better variable naming would make this easier to follow
along..
"result" is a forced chunk that is created here because we've reached a
point where we want to inject something afterwards..
once more I am wondering here whether for the payload stream, a vastly
simplified chunker that just picks the boundaries based on re-use and
payload size(s) (to avoid the one file == one chunk pathological case
for lots of small files) wouldn't improve performance :)
> + *consumed += chunk_size as u64;
> + this.scan_pos = 0;
> +
> + // Add the size of the injected chunks to consumed, so chunk stream offsets
> + // are in sync with the rest of the archive.
> + *consumed += inject.size as u64;
> +
> + injections.lock().unwrap().push_back(inject);
> +
> + return Poll::Ready(Some(Ok(result)));
> + }
> + boundaries.push_front(inject);
> + }
> + }
> +
> if this.scan_pos < this.buffer.len() {
> let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>
> @@ -74,7 +104,11 @@ where
> // continue poll
> } else if chunk_size <= this.buffer.len() {
> let result = this.buffer.split_to(chunk_size);
> + if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
> + *consumed += chunk_size as u64;
> + }
> this.scan_pos = 0;
> +
> return Poll::Ready(Some(Ok(result)));
> } else {
> panic!("got unexpected chunk boundary from chunker");
> [..]
More information about the pbs-devel
mailing list