[pbs-devel] [PATCH v6 proxmox-backup 42/65] client: streams: add channels for dynamic entry injection

Dominik Csapak d.csapak at proxmox.com
Wed May 22 11:56:18 CEST 2024


high level:

i think this patch would be much cleaner if the 57/65
datastore:chunker: add Chunker trait

would come before this, because you change stuff around only to change it again
in a way that could be written better if those patches were

reversed, e.g.

in case there was a next_boundary in the ChunkStream, in the final code
there is the same code from the regulary chunking duplicated

(this.buffer.split_to;this.consumed +=;this.scan_pos)

and could be trivially handled by the same code part
(with a bit of reordering)

but the patches in this order make it hard to see since
here the paths are different enough so they can't be shared

some other comments inline:

On 5/14/24 12:33, Christian Ebner wrote:
> To reuse dynamic entries of a previous backup run and index them for
> the new snapshot. Adds a non-blocking channel between the pxar
> archiver and the chunk stream, as well as the chunk stream and the
> backup writer.
> 
> The archiver sends forced boundary positions and the dynamic
> entries to inject into the chunk stream following this boundary.
> 
> The chunk stream consumes this channel inputs as receiver whenever a
> new chunk is requested by the upload stream, forcing a non-regular
> chunk boundary in the pxar stream at the requested positions.
> 
> The dynamic entries to inject and the boundary are then send via the
> second asynchronous channel to the backup writer's upload stream,
> indexing them by inserting the dynamic entries as known chunks into
> the upload stream.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
>   examples/test_chunk_speed2.rs                 |   2 +-
>   pbs-client/src/backup_writer.rs               | 110 ++++++++++++------
>   pbs-client/src/chunk_stream.rs                |  81 ++++++++++++-
>   pbs-client/src/pxar/create.rs                 |   6 +-
>   pbs-client/src/pxar_backup_stream.rs          |   8 +-
>   proxmox-backup-client/src/main.rs             |  28 +++--
>   .../src/proxmox_restore_daemon/api.rs         |   1 +
>   pxar-bin/src/main.rs                          |   1 +
>   tests/catar.rs                                |   1 +
>   9 files changed, 183 insertions(+), 55 deletions(-)
> 
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436d..22dd14ce2 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
>           .map_err(Error::from);
>   
>       //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> -    let mut chunk_stream = ChunkStream::new(stream, None);
> +    let mut chunk_stream = ChunkStream::new(stream, None, None);
>   
>       let start_time = std::time::Instant::now();
>   
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index dc9aa569f..66f209fed 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>   
>   use proxmox_human_byte::HumanByte;
>   
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>   use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>   
>   use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
>           archive_name: &str,
>           stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
>           options: UploadOptions,
> +        injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>       ) -> Result<BackupStats, Error> {
>           let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>   
> @@ -341,6 +343,7 @@ impl BackupWriter {
>                   None
>               },
>               options.compress,
> +            injections,
>           )
>           .await?;
>   
> @@ -636,6 +639,7 @@ impl BackupWriter {
>           known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>           crypt_config: Option<Arc<CryptConfig>>,
>           compress: bool,
> +        injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>       ) -> impl Future<Output = Result<UploadStats, Error>> {
>           let total_chunks = Arc::new(AtomicUsize::new(0));
>           let total_chunks2 = total_chunks.clone();
> @@ -643,10 +647,12 @@ impl BackupWriter {
>           let known_chunk_count2 = known_chunk_count.clone();
>   
>           let stream_len = Arc::new(AtomicUsize::new(0));
> +        let stream_len1 = stream_len.clone();
>           let stream_len2 = stream_len.clone();
>           let compressed_stream_len = Arc::new(AtomicU64::new(0));
>           let compressed_stream_len2 = compressed_stream_len.clone();
>           let reused_len = Arc::new(AtomicUsize::new(0));
> +        let reused_len1 = reused_len.clone();

both of these variables are not really necessary?
in the closure you could  simply keep using 'stream_len'
and in the .inject_reused_chunks call you can clone it inline

the reused_len1 is used in the same closure as reused_len so
there you can simply use that instead for both updates

>           let reused_len2 = reused_len.clone();
>   
>           let append_chunk_path = format!("{}_index", prefix);
> @@ -658,52 +664,79 @@ impl BackupWriter {
>   
>           let start_time = std::time::Instant::now();
>   
> -        let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
> +        let index_csum = Arc::new(Mutex::new(openssl::sha::Sha256::new()));

why do you change this?
you can simply use the

let mut guard = ..;
let csum = ..;

pattern everywhere you need?


> +        let index_csum_1 = index_csum.clone();
>           let index_csum_2 = index_csum.clone();
>   
>           stream
> -            .and_then(move |data| {
> -                let chunk_len = data.len();
> +            .inject_reused_chunks(injections, stream_len)
> +            .and_then(move |chunk_info| match chunk_info {
> +                InjectedChunksInfo::Known(chunks) => {
> +                    // account for injected chunks
> +                    let count = chunks.len();
> +                    total_chunks.fetch_add(count, Ordering::SeqCst);
> +
> +                    let mut known = Vec::new();
> +                    let mut csum = index_csum_1.lock().unwrap();
> +                    for chunk in chunks {
> +                        let offset =
> +                            stream_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
> +                        reused_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> +                        let digest = chunk.digest();
> +                        known.push((offset, digest));
> +                        let end_offset = offset + chunk.size();
> +                        csum.update(&end_offset.to_le_bytes());
> +                        csum.update(&digest);
> +                    }
> +                    future::ok(MergedChunkInfo::Known(known))
> +                }
> +                InjectedChunksInfo::Raw(raw) => {

if you rename 'raw' to data and...

> +                    // account for not injected chunks (new and known)
> +                    let offset = stream_len1.fetch_add(raw.len(), Ordering::SeqCst) as u64;
> +                    let chunk_len = raw.len() as u64;

leave chunk_len as usize and..

>   
> -                total_chunks.fetch_add(1, Ordering::SeqCst);
> -                let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
> +                    total_chunks.fetch_add(1, Ordering::SeqCst);

reorder these statements a bit
and don't add some unnecssary newlines

these hunk get much more readable with 'git diff -w'
namely it completely dissapears :P since all you do here is basically
indenting the code one level


but that was really hard to see with the (IMHO not necessary) changes to the code

>   
> -                let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
> +                    let mut chunk_builder = DataChunkBuilder::new(raw.as_ref()).compress(compress);
>   
> -                if let Some(ref crypt_config) = crypt_config {
> -                    chunk_builder = chunk_builder.crypt_config(crypt_config);
> -                }
> +                    if let Some(ref crypt_config) = crypt_config {
> +                        chunk_builder = chunk_builder.crypt_config(crypt_config);
> +                    }
>   
> -                let mut known_chunks = known_chunks.lock().unwrap();
> -                let digest = chunk_builder.digest();
> +                    let mut known_chunks = known_chunks.lock().unwrap();
>   
> -                let mut guard = index_csum.lock().unwrap();
> -                let csum = guard.as_mut().unwrap();
> +                    let digest = chunk_builder.digest();
>   
> -                let chunk_end = offset + chunk_len as u64;
> +                    let mut csum = index_csum.lock().unwrap();
>   
> -                if !is_fixed_chunk_size {
> -                    csum.update(&chunk_end.to_le_bytes());
> -                }
> -                csum.update(digest);
> +                    let chunk_end = offset + chunk_len;
>   
> -                let chunk_is_known = known_chunks.contains(digest);
> -                if chunk_is_known {
> -                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
> -                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> -                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> -                } else {
> -                    let compressed_stream_len2 = compressed_stream_len.clone();
> -                    known_chunks.insert(*digest);
> -                    future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> -                        compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> -                        MergedChunkInfo::New(ChunkInfo {
> -                            chunk,
> -                            digest,
> -                            chunk_len: chunk_len as u64,
> -                            offset,
> -                        })
> -                    }))
> +                    if !is_fixed_chunk_size {
> +                        csum.update(&chunk_end.to_le_bytes());
> +                    }
> +                    csum.update(digest);
> +
> +                    let chunk_is_known = known_chunks.contains(digest);
> +                    if chunk_is_known {
> +                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
> +                        reused_len.fetch_add(chunk_len as usize, Ordering::SeqCst);
> +
> +                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> +                    } else {
> +                        let compressed_stream_len2 = compressed_stream_len.clone();
> +                        known_chunks.insert(*digest);
> +
> +                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> +                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +
> +                            MergedChunkInfo::New(ChunkInfo {
> +                                chunk,
> +                                digest,
> +                                chunk_len,
> +                                offset,
> +                            })
> +                        }))
> +                    }
>                   }
>               })
>               .merge_known_chunks()
> @@ -771,8 +804,11 @@ impl BackupWriter {
>                   let size_reused = reused_len2.load(Ordering::SeqCst);
>                   let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>   
> -                let mut guard = index_csum_2.lock().unwrap();
> -                let csum = guard.take().unwrap().finish();
> +                let csum = Arc::into_inner(index_csum_2)
> +                    .unwrap()
> +                    .into_inner()
> +                    .unwrap()
> +                    .finish();
>   

if you don't change the type above, this hunk also disappears

>                   futures::future::ok(UploadStats {
>                       chunk_count,
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 83c75ba28..7dade3f07 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks;
>   /// Holds the queues for optional injection of reused dynamic index entries
>   pub struct InjectionData {
>       boundaries: mpsc::Receiver<InjectChunks>,
> +    next_boundary: Option<InjectChunks>,
>       injections: mpsc::Sender<InjectChunks>,
>       consumed: u64,
>   }
> @@ -25,6 +26,7 @@ impl InjectionData {
>       ) -> Self {
>           Self {
>               boundaries,
> +            next_boundary: None,
>               injections,
>               consumed: 0,
>           }
> @@ -37,15 +39,17 @@ pub struct ChunkStream<S: Unpin> {
>       chunker: Chunker,
>       buffer: BytesMut,
>       scan_pos: usize,
> +    injection_data: Option<InjectionData>,
>   }
>   
>   impl<S: Unpin> ChunkStream<S> {
> -    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> +    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
>           Self {
>               input,
>               chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
>               buffer: BytesMut::new(),
>               scan_pos: 0,
> +            injection_data,
>           }
>       }
>   }
> @@ -62,19 +66,84 @@ where
>   
>       fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>           let this = self.get_mut();
> +
>           loop {
> +            if let Some(InjectionData {
> +                boundaries,
> +                next_boundary,
> +                injections,
> +                consumed,
> +            }) = this.injection_data.as_mut()
> +            {
> +                if next_boundary.is_none() {
> +                    if let Ok(boundary) = boundaries.try_recv() {
> +                        *next_boundary = Some(boundary);
> +                    }
> +                }
> +
> +                if let Some(inject) = next_boundary.take() {
> +                    // require forced boundary, lookup next regular boundary
> +                    let pos = this.chunker.scan(&this.buffer[this.scan_pos..]);

below we check explicitely if scan_pos < buffer.len(), wouldn't that make sense here too?
if i'm reading the code right, it can never be > than buffer.len() so it doesn't matter i think
but for the case they're equal it would be a bit of optimization

> +
> +                    let chunk_boundary = if pos == 0 {
> +                        *consumed + this.buffer.len() as u64
> +                    } else {
> +                        *consumed + (this.scan_pos + pos) as u64
> +                    };
> +
> +                    if inject.boundary <= chunk_boundary {
> +                        // forced boundary is before next boundary, force within current buffer
> +                        let chunk_size = (inject.boundary - *consumed) as usize;
> +                        let raw_chunk = this.buffer.split_to(chunk_size);
> +                        this.chunker.reset();
> +                        this.scan_pos = 0;
> +
> +                        *consumed += chunk_size as u64;
> +
> +                        // add the size of the injected chunks to consumed, so chunk stream offsets
> +                        // are in sync with the rest of the archive.
> +                        *consumed += inject.size as u64;
> +
> +                        injections.send(inject).unwrap();
> +
> +                        // the chunk can be empty, return nevertheless to allow the caller to
> +                        // make progress by consuming from the injection queue
> +                        return Poll::Ready(Some(Ok(raw_chunk)));
> +                    } else if pos != 0 {
> +                        *next_boundary = Some(inject);
> +                        // forced boundary is after next boundary, split off chunk from buffer
> +                        let chunk_size = this.scan_pos + pos;
> +                        let raw_chunk = this.buffer.split_to(chunk_size);
> +                        *consumed += chunk_size as u64;
> +                        this.scan_pos = 0;
> +
> +                        return Poll::Ready(Some(Ok(raw_chunk)));
> +                    } else {
> +                        // forced boundary is after current buffer length, continue reading
> +                        *next_boundary = Some(inject);
> +                        this.scan_pos = this.buffer.len();
> +                    }

these^ two else if/else branches were meant at the beginning of my message
after 57/65 these are practically identical
to the ones below in the "regular" chunking code

and when we move the 'let boundary = this.chunker...' above, we can reuse that result
for both cases


> +                }
> +            }
> +
>               if this.scan_pos < this.buffer.len() {
> +                // look for next chunk boundary, starting from scan_pos
>                   let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>   
>                   let chunk_size = this.scan_pos + boundary;
>   
>                   if boundary == 0 {
> +                    // no new chunk boundary, update position for next boundary lookup
>                       this.scan_pos = this.buffer.len();
> -                    // continue poll
>                   } else if chunk_size <= this.buffer.len() {
> -                    let result = this.buffer.split_to(chunk_size);
> +                    // found new chunk boundary inside buffer, split off chunk from buffer
> +                    let raw_chunk = this.buffer.split_to(chunk_size);
> +                    if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
> +                        *consumed += chunk_size as u64;
> +                    }
>                       this.scan_pos = 0;
> -                    return Poll::Ready(Some(Ok(result)));
> +
> +                    return Poll::Ready(Some(Ok(raw_chunk)));

why the extra newline?

>                   } else {
>                       panic!("got unexpected chunk boundary from chunker");
>                   }
> @@ -82,10 +151,11 @@ where
>   
>               match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
>                   Some(Err(err)) => {
> +                    // got error in byte stream, pass to consumer
>                       return Poll::Ready(Some(Err(err.into())));
>                   }
>                   None => {
> -                    this.scan_pos = 0;

is there any special reason why you're removing this ?
i mean it probably does not make a difference since we're at the end of the stream anyway?

> +                    // end of stream reached, flush remaining bytes in buffer
>                       if !this.buffer.is_empty() {
>                           return Poll::Ready(Some(Ok(this.buffer.split())));
>                       } else {
> @@ -93,6 +163,7 @@ where
>                       }
>                   }
>                   Some(Ok(data)) => {
> +                    // got new data, add to buffer
>                       this.buffer.extend_from_slice(data.as_ref());
>                   }
>               }
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 0f32efcce..dd3c64525 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -6,7 +6,7 @@ use std::ops::Range;
>   use std::os::unix::ffi::OsStrExt;
>   use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
>   use std::path::{Path, PathBuf};
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
>   
>   use anyhow::{bail, Context, Error};
>   use futures::future::BoxFuture;
> @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
>   use pbs_datastore::dynamic_index::DynamicIndexReader;
>   use pbs_datastore::index::IndexFile;
>   
> +use crate::inject_reused_chunks::InjectChunks;
>   use crate::pxar::metadata::errno_is_unsupported;
>   use crate::pxar::tools::assert_single_path_component;
>   use crate::pxar::Flags;
> @@ -134,6 +135,7 @@ struct Archiver {
>       hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
>       file_copy_buffer: Vec<u8>,
>       skip_e2big_xattr: bool,
> +    forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>   }
>   
>   type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -164,6 +166,7 @@ pub async fn create_archive<T, F>(
>       feature_flags: Flags,
>       callback: F,
>       options: PxarCreateOptions,
> +    forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>   ) -> Result<(), Error>
>   where
>       T: SeqWrite + Send,
> @@ -224,6 +227,7 @@ where
>           hardlinks: HashMap::new(),
>           file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>           skip_e2big_xattr: options.skip_e2big_xattr,
> +        forced_boundaries,
>       };
>   
>       archiver
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 95145cb0d..9d2cb41d6 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -2,7 +2,7 @@ use std::io::Write;
>   //use std::os::unix::io::FromRawFd;
>   use std::path::Path;
>   use std::pin::Pin;
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
>   use std::task::{Context, Poll};
>   
>   use anyhow::{format_err, Error};
> @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter;
>   
>   use pbs_datastore::catalog::CatalogWriter;
>   
> +use crate::inject_reused_chunks::InjectChunks;
>   use crate::pxar::create::PxarWriters;
>   
>   /// Stream implementation to encode and upload .pxar archives.
> @@ -42,6 +43,7 @@ impl PxarBackupStream {
>           dir: Dir,
>           catalog: Arc<Mutex<CatalogWriter<W>>>,
>           options: crate::pxar::PxarCreateOptions,
> +        boundaries: Option<mpsc::Sender<InjectChunks>>,
>           separate_payload_stream: bool,
>       ) -> Result<(Self, Option<Self>), Error> {
>           let buffer_size = 256 * 1024;
> @@ -79,6 +81,7 @@ impl PxarBackupStream {
>                       Ok(())
>                   },
>                   options,
> +                boundaries,
>               )
>               .await
>               {
> @@ -110,11 +113,12 @@ impl PxarBackupStream {
>           dirname: &Path,
>           catalog: Arc<Mutex<CatalogWriter<W>>>,
>           options: crate::pxar::PxarCreateOptions,
> +        boundaries: Option<mpsc::Sender<InjectChunks>>,
>           separate_payload_stream: bool,
>       ) -> Result<(Self, Option<Self>), Error> {
>           let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
>   
> -        Self::new(dir, catalog, options, separate_payload_stream)
> +        Self::new(dir, catalog, options, boundaries, separate_payload_stream)
>       }
>   }
>   
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index 821777d66..5e93f9542 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -45,8 +45,8 @@ use pbs_client::tools::{
>   use pbs_client::{
>       delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
>       BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
> -    FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
> -    BACKUP_SOURCE_SCHEMA,
> +    FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
> +    UploadOptions, BACKUP_SOURCE_SCHEMA,
>   };
>   use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
>   use pbs_datastore::chunk_store::verify_chunk_size;
> @@ -199,14 +199,16 @@ async fn backup_directory<P: AsRef<Path>>(
>           bail!("cannot backup directory with fixed chunk size!");
>       }
>   
> +    let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel();
>       let (pxar_stream, payload_stream) = PxarBackupStream::open(
>           dir_path.as_ref(),
>           catalog,
>           pxar_create_options,
> +        Some(payload_boundaries_tx),
>           payload_target.is_some(),
>       )?;
>   
> -    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
> +    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
>       let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
>   
>       let stream = ReceiverStream::new(rx).map_err(Error::from);
> @@ -218,13 +220,16 @@ async fn backup_directory<P: AsRef<Path>>(
>           }
>       });
>   
> -    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
> +    let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
>   
>       if let Some(payload_stream) = payload_stream {
>           let payload_target = payload_target
>               .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
>   
> -        let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size);
> +        let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
> +        let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
> +        let mut payload_chunk_stream =
> +            ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
>           let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
>           let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
>   
> @@ -235,7 +240,12 @@ async fn backup_directory<P: AsRef<Path>>(
>               }
>           });
>   
> -        let payload_stats = client.upload_stream(&payload_target, stream, upload_options);
> +        let payload_stats = client.upload_stream(
> +            &payload_target,
> +            stream,
> +            upload_options,
> +            Some(payload_injections_rx),
> +        );
>   
>           match futures::join!(stats, payload_stats) {
>               (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
> @@ -271,7 +281,7 @@ async fn backup_image<P: AsRef<Path>>(
>       }
>   
>       let stats = client
> -        .upload_stream(archive_name, stream, upload_options)
> +        .upload_stream(archive_name, stream, upload_options, None)
>           .await?;
>   
>       Ok(stats)
> @@ -562,7 +572,7 @@ fn spawn_catalog_upload(
>       let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
>       let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
>       let catalog_chunk_size = 512 * 1024;
> -    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
> +    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
>   
>       let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
>           StdChannelWriter::new(catalog_tx),
> @@ -578,7 +588,7 @@ fn spawn_catalog_upload(
>   
>       tokio::spawn(async move {
>           let catalog_upload_result = client
> -            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
> +            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
>               .await;
>   
>           if let Err(ref err) = catalog_upload_result {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index ea97976e6..0883d6cda 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -364,6 +364,7 @@ fn extract(
>                           Flags::DEFAULT,
>                           |_| Ok(()),
>                           options,
> +                        None,
>                       )
>                       .await
>                   }
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index 58c9d2cfd..d46c98d2b 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -405,6 +405,7 @@ async fn create_archive(
>               Ok(())
>           },
>           options,
> +        None,
>       )
>       .await?;
>   
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 9e96a8610..d5ef85ffe 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
>           Flags::DEFAULT,
>           |_| Ok(()),
>           options,
> +        None,
>       ))?;
>   
>       Command::new("cmp")





More information about the pbs-devel mailing list