[pbs-devel] [PATCH v6 proxmox-backup 42/65] client: streams: add channels for dynamic entry injection
Dominik Csapak
d.csapak at proxmox.com
Wed May 22 11:56:18 CEST 2024
high level:
i think this patch would be much cleaner if the 57/65
datastore:chunker: add Chunker trait
would come before this, because you change stuff around only to change it again
in a way that could be written better if those patches were
reversed, e.g.
in case there was a next_boundary in the ChunkStream, in the final code
there is the same code from the regulary chunking duplicated
(this.buffer.split_to;this.consumed +=;this.scan_pos)
and could be trivially handled by the same code part
(with a bit of reordering)
but the patches in this order make it hard to see since
here the paths are different enough so they can't be shared
some other comments inline:
On 5/14/24 12:33, Christian Ebner wrote:
> To reuse dynamic entries of a previous backup run and index them for
> the new snapshot. Adds a non-blocking channel between the pxar
> archiver and the chunk stream, as well as the chunk stream and the
> backup writer.
>
> The archiver sends forced boundary positions and the dynamic
> entries to inject into the chunk stream following this boundary.
>
> The chunk stream consumes this channel inputs as receiver whenever a
> new chunk is requested by the upload stream, forcing a non-regular
> chunk boundary in the pxar stream at the requested positions.
>
> The dynamic entries to inject and the boundary are then send via the
> second asynchronous channel to the backup writer's upload stream,
> indexing them by inserting the dynamic entries as known chunks into
> the upload stream.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> examples/test_chunk_speed2.rs | 2 +-
> pbs-client/src/backup_writer.rs | 110 ++++++++++++------
> pbs-client/src/chunk_stream.rs | 81 ++++++++++++-
> pbs-client/src/pxar/create.rs | 6 +-
> pbs-client/src/pxar_backup_stream.rs | 8 +-
> proxmox-backup-client/src/main.rs | 28 +++--
> .../src/proxmox_restore_daemon/api.rs | 1 +
> pxar-bin/src/main.rs | 1 +
> tests/catar.rs | 1 +
> 9 files changed, 183 insertions(+), 55 deletions(-)
>
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436d..22dd14ce2 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
> .map_err(Error::from);
>
> //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> - let mut chunk_stream = ChunkStream::new(stream, None);
> + let mut chunk_stream = ChunkStream::new(stream, None, None);
>
> let start_time = std::time::Instant::now();
>
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index dc9aa569f..66f209fed 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>
> use proxmox_human_byte::HumanByte;
>
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
> use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>
> use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
> archive_name: &str,
> stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
> options: UploadOptions,
> + injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
> ) -> Result<BackupStats, Error> {
> let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>
> @@ -341,6 +343,7 @@ impl BackupWriter {
> None
> },
> options.compress,
> + injections,
> )
> .await?;
>
> @@ -636,6 +639,7 @@ impl BackupWriter {
> known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> crypt_config: Option<Arc<CryptConfig>>,
> compress: bool,
> + injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
> ) -> impl Future<Output = Result<UploadStats, Error>> {
> let total_chunks = Arc::new(AtomicUsize::new(0));
> let total_chunks2 = total_chunks.clone();
> @@ -643,10 +647,12 @@ impl BackupWriter {
> let known_chunk_count2 = known_chunk_count.clone();
>
> let stream_len = Arc::new(AtomicUsize::new(0));
> + let stream_len1 = stream_len.clone();
> let stream_len2 = stream_len.clone();
> let compressed_stream_len = Arc::new(AtomicU64::new(0));
> let compressed_stream_len2 = compressed_stream_len.clone();
> let reused_len = Arc::new(AtomicUsize::new(0));
> + let reused_len1 = reused_len.clone();
both of these variables are not really necessary?
in the closure you could simply keep using 'stream_len'
and in the .inject_reused_chunks call you can clone it inline
the reused_len1 is used in the same closure as reused_len so
there you can simply use that instead for both updates
> let reused_len2 = reused_len.clone();
>
> let append_chunk_path = format!("{}_index", prefix);
> @@ -658,52 +664,79 @@ impl BackupWriter {
>
> let start_time = std::time::Instant::now();
>
> - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
> + let index_csum = Arc::new(Mutex::new(openssl::sha::Sha256::new()));
why do you change this?
you can simply use the
let mut guard = ..;
let csum = ..;
pattern everywhere you need?
> + let index_csum_1 = index_csum.clone();
> let index_csum_2 = index_csum.clone();
>
> stream
> - .and_then(move |data| {
> - let chunk_len = data.len();
> + .inject_reused_chunks(injections, stream_len)
> + .and_then(move |chunk_info| match chunk_info {
> + InjectedChunksInfo::Known(chunks) => {
> + // account for injected chunks
> + let count = chunks.len();
> + total_chunks.fetch_add(count, Ordering::SeqCst);
> +
> + let mut known = Vec::new();
> + let mut csum = index_csum_1.lock().unwrap();
> + for chunk in chunks {
> + let offset =
> + stream_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
> + reused_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> + let digest = chunk.digest();
> + known.push((offset, digest));
> + let end_offset = offset + chunk.size();
> + csum.update(&end_offset.to_le_bytes());
> + csum.update(&digest);
> + }
> + future::ok(MergedChunkInfo::Known(known))
> + }
> + InjectedChunksInfo::Raw(raw) => {
if you rename 'raw' to data and...
> + // account for not injected chunks (new and known)
> + let offset = stream_len1.fetch_add(raw.len(), Ordering::SeqCst) as u64;
> + let chunk_len = raw.len() as u64;
leave chunk_len as usize and..
>
> - total_chunks.fetch_add(1, Ordering::SeqCst);
> - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
> + total_chunks.fetch_add(1, Ordering::SeqCst);
reorder these statements a bit
and don't add some unnecssary newlines
these hunk get much more readable with 'git diff -w'
namely it completely dissapears :P since all you do here is basically
indenting the code one level
but that was really hard to see with the (IMHO not necessary) changes to the code
>
> - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
> + let mut chunk_builder = DataChunkBuilder::new(raw.as_ref()).compress(compress);
>
> - if let Some(ref crypt_config) = crypt_config {
> - chunk_builder = chunk_builder.crypt_config(crypt_config);
> - }
> + if let Some(ref crypt_config) = crypt_config {
> + chunk_builder = chunk_builder.crypt_config(crypt_config);
> + }
>
> - let mut known_chunks = known_chunks.lock().unwrap();
> - let digest = chunk_builder.digest();
> + let mut known_chunks = known_chunks.lock().unwrap();
>
> - let mut guard = index_csum.lock().unwrap();
> - let csum = guard.as_mut().unwrap();
> + let digest = chunk_builder.digest();
>
> - let chunk_end = offset + chunk_len as u64;
> + let mut csum = index_csum.lock().unwrap();
>
> - if !is_fixed_chunk_size {
> - csum.update(&chunk_end.to_le_bytes());
> - }
> - csum.update(digest);
> + let chunk_end = offset + chunk_len;
>
> - let chunk_is_known = known_chunks.contains(digest);
> - if chunk_is_known {
> - known_chunk_count.fetch_add(1, Ordering::SeqCst);
> - reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> - } else {
> - let compressed_stream_len2 = compressed_stream_len.clone();
> - known_chunks.insert(*digest);
> - future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> - MergedChunkInfo::New(ChunkInfo {
> - chunk,
> - digest,
> - chunk_len: chunk_len as u64,
> - offset,
> - })
> - }))
> + if !is_fixed_chunk_size {
> + csum.update(&chunk_end.to_le_bytes());
> + }
> + csum.update(digest);
> +
> + let chunk_is_known = known_chunks.contains(digest);
> + if chunk_is_known {
> + known_chunk_count.fetch_add(1, Ordering::SeqCst);
> + reused_len.fetch_add(chunk_len as usize, Ordering::SeqCst);
> +
> + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> + } else {
> + let compressed_stream_len2 = compressed_stream_len.clone();
> + known_chunks.insert(*digest);
> +
> + future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +
> + MergedChunkInfo::New(ChunkInfo {
> + chunk,
> + digest,
> + chunk_len,
> + offset,
> + })
> + }))
> + }
> }
> })
> .merge_known_chunks()
> @@ -771,8 +804,11 @@ impl BackupWriter {
> let size_reused = reused_len2.load(Ordering::SeqCst);
> let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>
> - let mut guard = index_csum_2.lock().unwrap();
> - let csum = guard.take().unwrap().finish();
> + let csum = Arc::into_inner(index_csum_2)
> + .unwrap()
> + .into_inner()
> + .unwrap()
> + .finish();
>
if you don't change the type above, this hunk also disappears
> futures::future::ok(UploadStats {
> chunk_count,
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 83c75ba28..7dade3f07 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks;
> /// Holds the queues for optional injection of reused dynamic index entries
> pub struct InjectionData {
> boundaries: mpsc::Receiver<InjectChunks>,
> + next_boundary: Option<InjectChunks>,
> injections: mpsc::Sender<InjectChunks>,
> consumed: u64,
> }
> @@ -25,6 +26,7 @@ impl InjectionData {
> ) -> Self {
> Self {
> boundaries,
> + next_boundary: None,
> injections,
> consumed: 0,
> }
> @@ -37,15 +39,17 @@ pub struct ChunkStream<S: Unpin> {
> chunker: Chunker,
> buffer: BytesMut,
> scan_pos: usize,
> + injection_data: Option<InjectionData>,
> }
>
> impl<S: Unpin> ChunkStream<S> {
> - pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> + pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
> Self {
> input,
> chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
> buffer: BytesMut::new(),
> scan_pos: 0,
> + injection_data,
> }
> }
> }
> @@ -62,19 +66,84 @@ where
>
> fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> let this = self.get_mut();
> +
> loop {
> + if let Some(InjectionData {
> + boundaries,
> + next_boundary,
> + injections,
> + consumed,
> + }) = this.injection_data.as_mut()
> + {
> + if next_boundary.is_none() {
> + if let Ok(boundary) = boundaries.try_recv() {
> + *next_boundary = Some(boundary);
> + }
> + }
> +
> + if let Some(inject) = next_boundary.take() {
> + // require forced boundary, lookup next regular boundary
> + let pos = this.chunker.scan(&this.buffer[this.scan_pos..]);
below we check explicitely if scan_pos < buffer.len(), wouldn't that make sense here too?
if i'm reading the code right, it can never be > than buffer.len() so it doesn't matter i think
but for the case they're equal it would be a bit of optimization
> +
> + let chunk_boundary = if pos == 0 {
> + *consumed + this.buffer.len() as u64
> + } else {
> + *consumed + (this.scan_pos + pos) as u64
> + };
> +
> + if inject.boundary <= chunk_boundary {
> + // forced boundary is before next boundary, force within current buffer
> + let chunk_size = (inject.boundary - *consumed) as usize;
> + let raw_chunk = this.buffer.split_to(chunk_size);
> + this.chunker.reset();
> + this.scan_pos = 0;
> +
> + *consumed += chunk_size as u64;
> +
> + // add the size of the injected chunks to consumed, so chunk stream offsets
> + // are in sync with the rest of the archive.
> + *consumed += inject.size as u64;
> +
> + injections.send(inject).unwrap();
> +
> + // the chunk can be empty, return nevertheless to allow the caller to
> + // make progress by consuming from the injection queue
> + return Poll::Ready(Some(Ok(raw_chunk)));
> + } else if pos != 0 {
> + *next_boundary = Some(inject);
> + // forced boundary is after next boundary, split off chunk from buffer
> + let chunk_size = this.scan_pos + pos;
> + let raw_chunk = this.buffer.split_to(chunk_size);
> + *consumed += chunk_size as u64;
> + this.scan_pos = 0;
> +
> + return Poll::Ready(Some(Ok(raw_chunk)));
> + } else {
> + // forced boundary is after current buffer length, continue reading
> + *next_boundary = Some(inject);
> + this.scan_pos = this.buffer.len();
> + }
these^ two else if/else branches were meant at the beginning of my message
after 57/65 these are practically identical
to the ones below in the "regular" chunking code
and when we move the 'let boundary = this.chunker...' above, we can reuse that result
for both cases
> + }
> + }
> +
> if this.scan_pos < this.buffer.len() {
> + // look for next chunk boundary, starting from scan_pos
> let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>
> let chunk_size = this.scan_pos + boundary;
>
> if boundary == 0 {
> + // no new chunk boundary, update position for next boundary lookup
> this.scan_pos = this.buffer.len();
> - // continue poll
> } else if chunk_size <= this.buffer.len() {
> - let result = this.buffer.split_to(chunk_size);
> + // found new chunk boundary inside buffer, split off chunk from buffer
> + let raw_chunk = this.buffer.split_to(chunk_size);
> + if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
> + *consumed += chunk_size as u64;
> + }
> this.scan_pos = 0;
> - return Poll::Ready(Some(Ok(result)));
> +
> + return Poll::Ready(Some(Ok(raw_chunk)));
why the extra newline?
> } else {
> panic!("got unexpected chunk boundary from chunker");
> }
> @@ -82,10 +151,11 @@ where
>
> match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
> Some(Err(err)) => {
> + // got error in byte stream, pass to consumer
> return Poll::Ready(Some(Err(err.into())));
> }
> None => {
> - this.scan_pos = 0;
is there any special reason why you're removing this ?
i mean it probably does not make a difference since we're at the end of the stream anyway?
> + // end of stream reached, flush remaining bytes in buffer
> if !this.buffer.is_empty() {
> return Poll::Ready(Some(Ok(this.buffer.split())));
> } else {
> @@ -93,6 +163,7 @@ where
> }
> }
> Some(Ok(data)) => {
> + // got new data, add to buffer
> this.buffer.extend_from_slice(data.as_ref());
> }
> }
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 0f32efcce..dd3c64525 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -6,7 +6,7 @@ use std::ops::Range;
> use std::os::unix::ffi::OsStrExt;
> use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
> use std::path::{Path, PathBuf};
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
>
> use anyhow::{bail, Context, Error};
> use futures::future::BoxFuture;
> @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
> use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::index::IndexFile;
>
> +use crate::inject_reused_chunks::InjectChunks;
> use crate::pxar::metadata::errno_is_unsupported;
> use crate::pxar::tools::assert_single_path_component;
> use crate::pxar::Flags;
> @@ -134,6 +135,7 @@ struct Archiver {
> hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
> file_copy_buffer: Vec<u8>,
> skip_e2big_xattr: bool,
> + forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
> }
>
> type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -164,6 +166,7 @@ pub async fn create_archive<T, F>(
> feature_flags: Flags,
> callback: F,
> options: PxarCreateOptions,
> + forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
> ) -> Result<(), Error>
> where
> T: SeqWrite + Send,
> @@ -224,6 +227,7 @@ where
> hardlinks: HashMap::new(),
> file_copy_buffer: vec::undefined(4 * 1024 * 1024),
> skip_e2big_xattr: options.skip_e2big_xattr,
> + forced_boundaries,
> };
>
> archiver
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 95145cb0d..9d2cb41d6 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -2,7 +2,7 @@ use std::io::Write;
> //use std::os::unix::io::FromRawFd;
> use std::path::Path;
> use std::pin::Pin;
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
> use std::task::{Context, Poll};
>
> use anyhow::{format_err, Error};
> @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter;
>
> use pbs_datastore::catalog::CatalogWriter;
>
> +use crate::inject_reused_chunks::InjectChunks;
> use crate::pxar::create::PxarWriters;
>
> /// Stream implementation to encode and upload .pxar archives.
> @@ -42,6 +43,7 @@ impl PxarBackupStream {
> dir: Dir,
> catalog: Arc<Mutex<CatalogWriter<W>>>,
> options: crate::pxar::PxarCreateOptions,
> + boundaries: Option<mpsc::Sender<InjectChunks>>,
> separate_payload_stream: bool,
> ) -> Result<(Self, Option<Self>), Error> {
> let buffer_size = 256 * 1024;
> @@ -79,6 +81,7 @@ impl PxarBackupStream {
> Ok(())
> },
> options,
> + boundaries,
> )
> .await
> {
> @@ -110,11 +113,12 @@ impl PxarBackupStream {
> dirname: &Path,
> catalog: Arc<Mutex<CatalogWriter<W>>>,
> options: crate::pxar::PxarCreateOptions,
> + boundaries: Option<mpsc::Sender<InjectChunks>>,
> separate_payload_stream: bool,
> ) -> Result<(Self, Option<Self>), Error> {
> let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
>
> - Self::new(dir, catalog, options, separate_payload_stream)
> + Self::new(dir, catalog, options, boundaries, separate_payload_stream)
> }
> }
>
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index 821777d66..5e93f9542 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -45,8 +45,8 @@ use pbs_client::tools::{
> use pbs_client::{
> delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
> BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
> - FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
> - BACKUP_SOURCE_SCHEMA,
> + FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
> + UploadOptions, BACKUP_SOURCE_SCHEMA,
> };
> use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
> use pbs_datastore::chunk_store::verify_chunk_size;
> @@ -199,14 +199,16 @@ async fn backup_directory<P: AsRef<Path>>(
> bail!("cannot backup directory with fixed chunk size!");
> }
>
> + let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel();
> let (pxar_stream, payload_stream) = PxarBackupStream::open(
> dir_path.as_ref(),
> catalog,
> pxar_create_options,
> + Some(payload_boundaries_tx),
> payload_target.is_some(),
> )?;
>
> - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
> + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
> let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
>
> let stream = ReceiverStream::new(rx).map_err(Error::from);
> @@ -218,13 +220,16 @@ async fn backup_directory<P: AsRef<Path>>(
> }
> });
>
> - let stats = client.upload_stream(archive_name, stream, upload_options.clone());
> + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
>
> if let Some(payload_stream) = payload_stream {
> let payload_target = payload_target
> .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
>
> - let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size);
> + let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
> + let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
> + let mut payload_chunk_stream =
> + ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
> let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
> let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
>
> @@ -235,7 +240,12 @@ async fn backup_directory<P: AsRef<Path>>(
> }
> });
>
> - let payload_stats = client.upload_stream(&payload_target, stream, upload_options);
> + let payload_stats = client.upload_stream(
> + &payload_target,
> + stream,
> + upload_options,
> + Some(payload_injections_rx),
> + );
>
> match futures::join!(stats, payload_stats) {
> (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
> @@ -271,7 +281,7 @@ async fn backup_image<P: AsRef<Path>>(
> }
>
> let stats = client
> - .upload_stream(archive_name, stream, upload_options)
> + .upload_stream(archive_name, stream, upload_options, None)
> .await?;
>
> Ok(stats)
> @@ -562,7 +572,7 @@ fn spawn_catalog_upload(
> let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
> let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
> let catalog_chunk_size = 512 * 1024;
> - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
> + let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
>
> let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
> StdChannelWriter::new(catalog_tx),
> @@ -578,7 +588,7 @@ fn spawn_catalog_upload(
>
> tokio::spawn(async move {
> let catalog_upload_result = client
> - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
> + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
> .await;
>
> if let Err(ref err) = catalog_upload_result {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index ea97976e6..0883d6cda 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -364,6 +364,7 @@ fn extract(
> Flags::DEFAULT,
> |_| Ok(()),
> options,
> + None,
> )
> .await
> }
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index 58c9d2cfd..d46c98d2b 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -405,6 +405,7 @@ async fn create_archive(
> Ok(())
> },
> options,
> + None,
> )
> .await?;
>
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 9e96a8610..d5ef85ffe 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
> Flags::DEFAULT,
> |_| Ok(()),
> options,
> + None,
> ))?;
>
> Command::new("cmp")
More information about the pbs-devel
mailing list