[pbs-devel] [RFC v2 proxmox-backup 26/36] client: chunk stream: add chunk injection queues

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Mar 12 10:46:13 CET 2024


On March 5, 2024 10:26 am, Christian Ebner wrote:
> Adds a queue to the chunk stream to request forced boundaries at a
> given offset within the stream and inject reused chunks after this
> boundary.
> 
> The chunks are then passed along to the uploader stream using the
> injection queue, which inserts them during upload.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>

I think this patch here would benefit from a few more Option<..>
wrappings (to make it clear where injection can actually happen), and
possibly also combining some stuff into structs (to reduce the number of
parameters and group those only set/needed for injection/caching/..)

I haven't tested the proposed changes below, but AFAICT they should
work..

> ---
> changes since version 1:
> - refactor bail on non-existing payload target archive name
> 
>  examples/test_chunk_speed2.rs                 | 10 ++-
>  pbs-client/src/backup_writer.rs               | 89 +++++++++++--------
>  pbs-client/src/chunk_stream.rs                | 42 ++++++++-
>  pbs-client/src/pxar/create.rs                 |  6 +-
>  pbs-client/src/pxar_backup_stream.rs          |  8 +-
>  proxmox-backup-client/src/main.rs             | 28 ++++--
>  .../src/proxmox_restore_daemon/api.rs         |  3 +
>  pxar-bin/src/main.rs                          |  5 +-
>  tests/catar.rs                                |  3 +
>  9 files changed, 147 insertions(+), 47 deletions(-)
> 
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436..b20a5b59 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -1,3 +1,6 @@
> +use std::collections::VecDeque;
> +use std::sync::{Arc, Mutex};
> +
>  use anyhow::Error;
>  use futures::*;
>  
> @@ -26,7 +29,12 @@ async fn run() -> Result<(), Error> {
>          .map_err(Error::from);
>  
>      //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> -    let mut chunk_stream = ChunkStream::new(stream, None);
> +    let mut chunk_stream = ChunkStream::new(
> +        stream,
> +        None,
> +        Arc::new(Mutex::new(VecDeque::new())),
> +        Arc::new(Mutex::new(VecDeque::new())),
> +    );
>  
>      let start_time = std::time::Instant::now();
>  
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 8a03d8ea..e66b93df 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
>  use std::future::Future;
>  use std::os::unix::fs::OpenOptionsExt;
>  use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>  
>  use proxmox_human_byte::HumanByte;
>  
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>  use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>  
>  use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
>          archive_name: &str,
>          stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
>          options: UploadOptions,
> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,

this one is already properly optional :)

>      ) -> Result<BackupStats, Error> {
>          let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>  
> @@ -341,6 +343,7 @@ impl BackupWriter {
>                  None
>              },
>              options.compress,
> +            injection_queue,
>          )
>          .await?;
>  
> @@ -637,6 +640,7 @@ impl BackupWriter {
>          known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>          crypt_config: Option<Arc<CryptConfig>>,
>          compress: bool,
> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
>      ) -> impl Future<Output = Result<UploadStats, Error>> {
>          let total_chunks = Arc::new(AtomicUsize::new(0));
>          let total_chunks2 = total_chunks.clone();
> @@ -663,48 +667,63 @@ impl BackupWriter {
>          let index_csum_2 = index_csum.clone();
>  
>          stream
> -            .and_then(move |data| {
> -                let chunk_len = data.len();
> +            .inject_reused_chunks(
> +                injection_queue.unwrap_or_default(),
> +                stream_len,
> +                reused_len.clone(),
> +                index_csum.clone(),
> +            )
> +            .and_then(move |chunk_info| match chunk_info {
> +                InjectedChunksInfo::Known(chunks) => {
> +                    total_chunks.fetch_add(chunks.len(), Ordering::SeqCst);
> +                    future::ok(MergedChunkInfo::Known(chunks))
> +                }
> +                InjectedChunksInfo::Raw((offset, data)) => {
> +                    let chunk_len = data.len();
>  
> -                total_chunks.fetch_add(1, Ordering::SeqCst);
> -                let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;

this house keeping is now split between here and inject_reused_chunks,
which makes it a bit hard to follow..

> +                    total_chunks.fetch_add(1, Ordering::SeqCst);
>  
> -                let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
> +                    let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
>  
> -                if let Some(ref crypt_config) = crypt_config {
> -                    chunk_builder = chunk_builder.crypt_config(crypt_config);
> -                }
> +                    if let Some(ref crypt_config) = crypt_config {
> +                        chunk_builder = chunk_builder.crypt_config(crypt_config);
> +                    }
>  
> -                let mut known_chunks = known_chunks.lock().unwrap();
> -                let digest = chunk_builder.digest();
> +                    let mut known_chunks = known_chunks.lock().unwrap();
>  
> -                let mut guard = index_csum.lock().unwrap();
> -                let csum = guard.as_mut().unwrap();
> +                    let digest = chunk_builder.digest();
>  
> -                let chunk_end = offset + chunk_len as u64;
> +                    let mut guard = index_csum.lock().unwrap();
> +                    let csum = guard.as_mut().unwrap();
>  
> -                if !is_fixed_chunk_size {
> -                    csum.update(&chunk_end.to_le_bytes());
> -                }
> -                csum.update(digest);
> +                    let chunk_end = offset + chunk_len as u64;
>  
> -                let chunk_is_known = known_chunks.contains(digest);
> -                if chunk_is_known {
> -                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
> -                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> -                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> -                } else {
> -                    let compressed_stream_len2 = compressed_stream_len.clone();
> -                    known_chunks.insert(*digest);
> -                    future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> -                        compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> -                        MergedChunkInfo::New(ChunkInfo {
> -                            chunk,
> -                            digest,
> -                            chunk_len: chunk_len as u64,
> -                            offset,
> -                        })
> -                    }))
> +                    if !is_fixed_chunk_size {
> +                        csum.update(&chunk_end.to_le_bytes());
> +                    }
> +                    csum.update(digest);
> +
> +                    let chunk_is_known = known_chunks.contains(digest);
> +                    if chunk_is_known {
> +                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
> +                        reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> +
> +                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> +                    } else {
> +                        let compressed_stream_len2 = compressed_stream_len.clone();
> +                        known_chunks.insert(*digest);
> +
> +                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> +                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +
> +                            MergedChunkInfo::New(ChunkInfo {
> +                                chunk,
> +                                digest,
> +                                chunk_len: chunk_len as u64,
> +                                offset,
> +                            })
> +                        }))
> +                    }
>                  }
>              })
>              .merge_known_chunks()
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 895f6eae..891d6928 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -1,4 +1,6 @@
> +use std::collections::VecDeque;
>  use std::pin::Pin;
> +use std::sync::{Arc, Mutex};
>  use std::task::{Context, Poll};
>  
>  use anyhow::Error;
> @@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream};
>  
>  use pbs_datastore::Chunker;
>  
> +use crate::inject_reused_chunks::InjectChunks;
> +
>  /// Split input stream into dynamic sized chunks
>  pub struct ChunkStream<S: Unpin> {
>      input: S,
>      chunker: Chunker,
>      buffer: BytesMut,
>      scan_pos: usize,
> +    consumed: u64,
> +    boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> +    injections: Arc<Mutex<VecDeque<InjectChunks>>>,

okay, so boundaries and injections are only either both meaningful, or
not. we only set them for the payload stream. they should be an Option
;) technically consumed atm could also go inside that option, and we
could make the whole thing a struct?

struct InjectionData {
    boundaries: Arc<Mutex<..>,
    injections: Arc<Mutex<..>,
    consumed: u64,
}

and then pass in an Option of that?

>  }
>  
>  impl<S: Unpin> ChunkStream<S> {
> -    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> +    pub fn new(
> +        input: S,
> +        chunk_size: Option<usize>,
> +        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> +        injections: Arc<Mutex<VecDeque<InjectChunks>>>,
> +    ) -> Self {
>          Self {
>              input,
>              chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
>              buffer: BytesMut::new(),
>              scan_pos: 0,
> +            consumed: 0,
> +            boundaries,
> +            injections,
>          }
>      }
>  }
> @@ -40,6 +55,29 @@ where
>      fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>          let this = self.get_mut();
>          loop {
> +            {

this scope for lock purposes can then be an `if let Some(..)`, either
with the struct or tuple..

> +                // Make sure to release this lock as soon as possible
> +                let mut boundaries = this.boundaries.lock().unwrap();
> +                if let Some(inject) = boundaries.pop_front() {
> +                    let max = this.consumed + this.buffer.len() as u64;
> +                    if inject.boundary <= max {
> +                        let chunk_size = (inject.boundary - this.consumed) as usize;
> +                        let result = this.buffer.split_to(chunk_size);
> +                        this.consumed += chunk_size as u64;
> +                        this.scan_pos = 0;
> +
> +                        // Add the size of the injected chunks to consumed, so chunk stream offsets
> +                        // are in sync with the rest of the archive.
> +                        this.consumed += inject.size as u64;
> +
> +                        this.injections.lock().unwrap().push_back(inject);
> +
> +                        return Poll::Ready(Some(Ok(result)));
> +                    }
> +                    boundaries.push_front(inject);
> +                }
> +            }
> +
>              if this.scan_pos < this.buffer.len() {
>                  let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>  
> @@ -50,7 +88,9 @@ where
>                      // continue poll
>                  } else if chunk_size <= this.buffer.len() {
>                      let result = this.buffer.split_to(chunk_size);
> +                    this.consumed += chunk_size as u64;
>                      this.scan_pos = 0;
> +
>                      return Poll::Ready(Some(Ok(result)));
>                  } else {
>                      panic!("got unexpected chunk boundary from chunker");
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 59aa4450..9ae84d37 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -1,4 +1,4 @@
> -use std::collections::{HashMap, HashSet};
> +use std::collections::{HashMap, HashSet, VecDeque};
>  use std::ffi::{CStr, CString, OsStr};
>  use std::fmt;
>  use std::io::{self, Read};
> @@ -26,6 +26,7 @@ use proxmox_sys::fs::{self, acl, xattr};
>  
>  use pbs_datastore::catalog::BackupCatalogWriter;
>  
> +use crate::inject_reused_chunks::InjectChunks;
>  use crate::pxar::metadata::errno_is_unsupported;
>  use crate::pxar::tools::assert_single_path_component;
>  use crate::pxar::Flags;
> @@ -131,6 +132,7 @@ struct Archiver {
>      hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
>      file_copy_buffer: Vec<u8>,
>      skip_e2big_xattr: bool,
> +    forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
>  }
>  
>  type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -143,6 +145,7 @@ pub async fn create_archive<T, F>(
>      catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
>      mut payload_writer: Option<T>,
>      options: PxarCreateOptions,
> +    forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,

could be combined with the payload_writer and the caching parameters
added later on? and then the whole thing can be optional?

>  ) -> Result<(), Error>
>  where
>      T: SeqWrite + Send,
> @@ -201,6 +204,7 @@ where
>          hardlinks: HashMap::new(),
>          file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>          skip_e2big_xattr: options.skip_e2big_xattr,
> +        forced_boundaries,
>      };
>  
>      archiver
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 9a600cc1..1a51b0c2 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -1,3 +1,4 @@
> +use std::collections::VecDeque;
>  use std::io::Write;
>  //use std::os::unix::io::FromRawFd;
>  use std::path::Path;
> @@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter;
>  
>  use pbs_datastore::catalog::CatalogWriter;
>  
> +use crate::inject_reused_chunks::InjectChunks;
> +
>  /// Stream implementation to encode and upload .pxar archives.
>  ///
>  /// The hyper client needs an async Stream for file upload, so we
> @@ -40,6 +43,7 @@ impl PxarBackupStream {
>          dir: Dir,
>          catalog: Arc<Mutex<CatalogWriter<W>>>,
>          options: crate::pxar::PxarCreateOptions,
> +        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
>          separate_payload_stream: bool,
>      ) -> Result<(Self, Option<Self>), Error> {
>          let buffer_size = 256 * 1024;
> @@ -79,6 +83,7 @@ impl PxarBackupStream {
>                  Some(catalog),
>                  payload_writer,
>                  options,
> +                boundaries,
>              )
>              .await
>              {
> @@ -110,11 +115,12 @@ impl PxarBackupStream {
>          dirname: &Path,
>          catalog: Arc<Mutex<CatalogWriter<W>>>,
>          options: crate::pxar::PxarCreateOptions,
> +        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
>          separate_payload_stream: bool,

make boundaries optional (and maybe give it a more "readable" name ;)),
replace the separate_payload_stream with its Some-ness?

>      ) -> Result<(Self, Option<Self>), Error> {
>          let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
>  
> -        Self::new(dir, catalog, options, separate_payload_stream)
> +        Self::new(dir, catalog, options, boundaries, separate_payload_stream)
>      }
>  }
>  
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index e609aa16..f077ddf6 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
>  use std::io::{self, Read, Seek, SeekFrom, Write};
>  use std::path::{Path, PathBuf};
>  use std::pin::Pin;
> @@ -197,14 +197,19 @@ async fn backup_directory<P: AsRef<Path>>(
>          bail!("cannot backup directory with fixed chunk size!");
>      }
>  
> +    let payload_boundaries = Arc::new(Mutex::new(VecDeque::new()));

make this an Option, set based on payload_target

>      let (pxar_stream, payload_stream) = PxarBackupStream::open(
>          dir_path.as_ref(),
>          catalog,
>          pxar_create_options,
> +        payload_boundaries.clone(),
>          payload_target.is_some(),
>      )?;
>  
> -    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
> +    let dummy_injections = Arc::new(Mutex::new(VecDeque::new()));
> +    let dummy_boundaries = Arc::new(Mutex::new(VecDeque::new()));
> +    let mut chunk_stream =
> +        ChunkStream::new(pxar_stream, chunk_size, dummy_boundaries, dummy_injections);

replace these with None

>      let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
>  
>      let stream = ReceiverStream::new(rx).map_err(Error::from);
> @@ -216,15 +221,18 @@ async fn backup_directory<P: AsRef<Path>>(
>          }
>      });
>  
> -    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
> +    let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
>  
>      if let Some(payload_stream) = payload_stream {
>          let payload_target = payload_target
>              .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
>  
> +        let payload_injections = Arc::new(Mutex::new(VecDeque::new()));
>          let mut payload_chunk_stream = ChunkStream::new(
>              payload_stream,
>              chunk_size,
> +            payload_boundaries,
> +            payload_injections.clone(),
>          );
>          let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
>          let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
> @@ -240,6 +248,7 @@ async fn backup_directory<P: AsRef<Path>>(
>              &payload_target,
>              stream,
>              upload_options,
> +            Some(payload_injections),
>          );
>  
>          match futures::join!(stats, payload_stats) {
> @@ -276,7 +285,7 @@ async fn backup_image<P: AsRef<Path>>(
>      }
>  
>      let stats = client
> -        .upload_stream(archive_name, stream, upload_options)
> +        .upload_stream(archive_name, stream, upload_options, None)
>          .await?;
>  
>      Ok(stats)
> @@ -567,7 +576,14 @@ fn spawn_catalog_upload(
>      let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
>      let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
>      let catalog_chunk_size = 512 * 1024;
> -    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
> +    let boundaries = Arc::new(Mutex::new(VecDeque::new()));
> +    let injections = Arc::new(Mutex::new(VecDeque::new()));
> +    let catalog_chunk_stream = ChunkStream::new(
> +        catalog_stream,
> +        Some(catalog_chunk_size),
> +        boundaries,
> +        injections.clone(),
> +    );

replace these with None (they are also dummies AFAICT?)

>  
>      let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
>          StdChannelWriter::new(catalog_tx),
> @@ -583,7 +599,7 @@ fn spawn_catalog_upload(
>  
>      tokio::spawn(async move {
>          let catalog_upload_result = client
> -            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
> +            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
>              .await;
>  
>          if let Err(ref err) = catalog_upload_result {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index bd8ddb20..d912734c 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -1,8 +1,10 @@
>  ///! File-restore API running inside the restore VM
> +use std::collections::VecDeque;
>  use std::ffi::OsStr;
>  use std::fs;
>  use std::os::unix::ffi::OsStrExt;
>  use std::path::{Path, PathBuf};
> +use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, Error};
>  use futures::FutureExt;
> @@ -364,6 +366,7 @@ fn extract(
>                          None,
>                          None,
>                          options,
> +                        Arc::new(Mutex::new(VecDeque::new())),
>                      )
>                      .await
>                  }
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index e3b0faac..74ee04f7 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -1,10 +1,10 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
>  use std::ffi::OsStr;
>  use std::fs::OpenOptions;
>  use std::os::unix::fs::OpenOptionsExt;
>  use std::path::{Path, PathBuf};
>  use std::sync::atomic::{AtomicBool, Ordering};
> -use std::sync::Arc;
> +use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
>  use futures::future::FutureExt;
> @@ -385,6 +385,7 @@ async fn create_archive(
>          None,
>          None,
>          options,
> +        Arc::new(Mutex::new(VecDeque::new())),

None / None merged with payload writer

>      )
>      .await?;
>  
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 04af4ffd..6edd747d 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -1,4 +1,6 @@
> +use std::collections::VecDeque;
>  use std::process::Command;
> +use std::sync::{Arc, Mutex};
>  
>  use anyhow::Error;
>  
> @@ -41,6 +43,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
>          None,
>          None,
>          options,
> +        Arc::new(Mutex::new(VecDeque::new())),

same

>      ))?;
>  
>      Command::new("cmp")
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list