[pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Apr 4 16:52:15 CEST 2024


On March 28, 2024 1:36 pm, Christian Ebner wrote:
> Adds a queue to the chunk stream to request forced boundaries at a
> given offset within the stream and inject reused dynamic entries
> after this boundary.
> 
> The chunks are then passed along to the uploader stream using the
> injection queue, which inserts them during upload.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - combined queues into new optional struct
> - refactoring
> 
>  examples/test_chunk_speed2.rs                 |  2 +-
>  pbs-client/src/backup_writer.rs               | 89 +++++++++++--------
>  pbs-client/src/chunk_stream.rs                | 36 +++++++-
>  pbs-client/src/pxar/create.rs                 |  6 +-
>  pbs-client/src/pxar_backup_stream.rs          |  7 +-
>  proxmox-backup-client/src/main.rs             | 31 ++++---
>  .../src/proxmox_restore_daemon/api.rs         |  1 +
>  pxar-bin/src/main.rs                          |  1 +
>  tests/catar.rs                                |  1 +
>  9 files changed, 121 insertions(+), 53 deletions(-)
> 
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436d..22dd14ce2 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
>          .map_err(Error::from);
>  
>      //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> -    let mut chunk_stream = ChunkStream::new(stream, None);
> +    let mut chunk_stream = ChunkStream::new(stream, None, None);
>  
>      let start_time = std::time::Instant::now();
>  
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 8bd0e4f36..032d93da7 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
>  use std::future::Future;
>  use std::os::unix::fs::OpenOptionsExt;
>  use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>  
>  use proxmox_human_byte::HumanByte;
>  
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>  use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>  
>  use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
>          archive_name: &str,
>          stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
>          options: UploadOptions,
> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
>      ) -> Result<BackupStats, Error> {
>          let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>  
> @@ -341,6 +343,7 @@ impl BackupWriter {
>                  None
>              },
>              options.compress,
> +            injection_queue,
>          )
>          .await?;
>  
> @@ -637,6 +640,7 @@ impl BackupWriter {
>          known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>          crypt_config: Option<Arc<CryptConfig>>,
>          compress: bool,
> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
>      ) -> impl Future<Output = Result<UploadStats, Error>> {
>          let total_chunks = Arc::new(AtomicUsize::new(0));
>          let total_chunks2 = total_chunks.clone();
> @@ -663,48 +667,63 @@ impl BackupWriter {
>          let index_csum_2 = index_csum.clone();
>  
>          stream
> -            .and_then(move |data| {
> -                let chunk_len = data.len();
> +            .inject_reused_chunks(
> +                injection_queue.unwrap_or_default(),
> +                stream_len,
> +                reused_len.clone(),
> +                index_csum.clone(),
> +            )
> +            .and_then(move |chunk_info| match chunk_info {

for this part here I am still not sure whether doing all of the
accounting here wouldn't be nicer..

> [..]

> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index a45420ca0..6ac0c638b 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -38,15 +38,17 @@ pub struct ChunkStream<S: Unpin> {
>      chunker: Chunker,
>      buffer: BytesMut,
>      scan_pos: usize,
> +    injection_data: Option<InjectionData>,
>  }
>  
>  impl<S: Unpin> ChunkStream<S> {
> -    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> +    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
>          Self {
>              input,
>              chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
>              buffer: BytesMut::new(),
>              scan_pos: 0,
> +            injection_data,
>          }
>      }
>  }
> @@ -64,6 +66,34 @@ where
>      fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>          let this = self.get_mut();
>          loop {
> +            if let Some(InjectionData {
> +                boundaries,
> +                injections,
> +                consumed,
> +            }) = this.injection_data.as_mut()
> +            {
> +                // Make sure to release this lock as soon as possible
> +                let mut boundaries = boundaries.lock().unwrap();
> +                if let Some(inject) = boundaries.pop_front() {

here I am a bit more wary that this popping and re-pushing might hurt
performance..

> +                    let max = *consumed + this.buffer.len() as u64;
> +                    if inject.boundary <= max {
> +                        let chunk_size = (inject.boundary - *consumed) as usize;
> +                        let result = this.buffer.split_to(chunk_size);

a comment or better variable naming would make this easier to follow
along..

"result" is a forced chunk that is created here because we've reached a
point where we want to inject something afterwards..

once more I am wondering here whether for the payload stream, a vastly
simplified chunker that just picks the boundaries based on re-use and
payload size(s) (to avoid the one file == one chunk pathological case
for lots of small files) wouldn't improve performance :)

> +                        *consumed += chunk_size as u64;
> +                        this.scan_pos = 0;
> +
> +                        // Add the size of the injected chunks to consumed, so chunk stream offsets
> +                        // are in sync with the rest of the archive.
> +                        *consumed += inject.size as u64;
> +
> +                        injections.lock().unwrap().push_back(inject);
> +
> +                        return Poll::Ready(Some(Ok(result)));
> +                    }
> +                    boundaries.push_front(inject);
> +                }
> +            }
> +
>              if this.scan_pos < this.buffer.len() {
>                  let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>  
> @@ -74,7 +104,11 @@ where
>                      // continue poll
>                  } else if chunk_size <= this.buffer.len() {
>                      let result = this.buffer.split_to(chunk_size);
> +                    if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
> +                        *consumed += chunk_size as u64;
> +                    }
>                      this.scan_pos = 0;
> +
>                      return Poll::Ready(Some(Ok(result)));
>                  } else {
>                      panic!("got unexpected chunk boundary from chunker");

> [..]




More information about the pbs-devel mailing list