[pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues

Christian Ebner c.ebner at proxmox.com
Thu Mar 28 13:36:49 CET 2024


Adds a queue to the chunk stream to request forced boundaries at a
given offset within the stream and inject reused dynamic entries
after this boundary.

The chunks are then passed along to the uploader stream using the
injection queue, which inserts them during upload.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- combined queues into new optional struct
- refactoring

 examples/test_chunk_speed2.rs                 |  2 +-
 pbs-client/src/backup_writer.rs               | 89 +++++++++++--------
 pbs-client/src/chunk_stream.rs                | 36 +++++++-
 pbs-client/src/pxar/create.rs                 |  6 +-
 pbs-client/src/pxar_backup_stream.rs          |  7 +-
 proxmox-backup-client/src/main.rs             | 31 ++++---
 .../src/proxmox_restore_daemon/api.rs         |  1 +
 pxar-bin/src/main.rs                          |  1 +
 tests/catar.rs                                |  1 +
 9 files changed, 121 insertions(+), 53 deletions(-)

diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
index 3f69b436d..22dd14ce2 100644
--- a/examples/test_chunk_speed2.rs
+++ b/examples/test_chunk_speed2.rs
@@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
         .map_err(Error::from);
 
     //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
-    let mut chunk_stream = ChunkStream::new(stream, None);
+    let mut chunk_stream = ChunkStream::new(stream, None, None);
 
     let start_time = std::time::Instant::now();
 
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 8bd0e4f36..032d93da7 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
 use std::future::Future;
 use std::os::unix::fs::OpenOptionsExt;
 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
@@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
 
 use proxmox_human_byte::HumanByte;
 
+use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
 
 use super::{H2Client, HttpClient};
@@ -265,6 +266,7 @@ impl BackupWriter {
         archive_name: &str,
         stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
         options: UploadOptions,
+        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
     ) -> Result<BackupStats, Error> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
@@ -341,6 +343,7 @@ impl BackupWriter {
                 None
             },
             options.compress,
+            injection_queue,
         )
         .await?;
 
@@ -637,6 +640,7 @@ impl BackupWriter {
         known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
+        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let total_chunks = Arc::new(AtomicUsize::new(0));
         let total_chunks2 = total_chunks.clone();
@@ -663,48 +667,63 @@ impl BackupWriter {
         let index_csum_2 = index_csum.clone();
 
         stream
-            .and_then(move |data| {
-                let chunk_len = data.len();
+            .inject_reused_chunks(
+                injection_queue.unwrap_or_default(),
+                stream_len,
+                reused_len.clone(),
+                index_csum.clone(),
+            )
+            .and_then(move |chunk_info| match chunk_info {
+                InjectedChunksInfo::Known(chunks) => {
+                    total_chunks.fetch_add(chunks.len(), Ordering::SeqCst);
+                    future::ok(MergedChunkInfo::Known(chunks))
+                }
+                InjectedChunksInfo::Raw((offset, data)) => {
+                    let chunk_len = data.len();
 
-                total_chunks.fetch_add(1, Ordering::SeqCst);
-                let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
+                    total_chunks.fetch_add(1, Ordering::SeqCst);
 
-                let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
+                    let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
 
-                if let Some(ref crypt_config) = crypt_config {
-                    chunk_builder = chunk_builder.crypt_config(crypt_config);
-                }
+                    if let Some(ref crypt_config) = crypt_config {
+                        chunk_builder = chunk_builder.crypt_config(crypt_config);
+                    }
 
-                let mut known_chunks = known_chunks.lock().unwrap();
-                let digest = chunk_builder.digest();
+                    let mut known_chunks = known_chunks.lock().unwrap();
 
-                let mut guard = index_csum.lock().unwrap();
-                let csum = guard.as_mut().unwrap();
+                    let digest = chunk_builder.digest();
 
-                let chunk_end = offset + chunk_len as u64;
+                    let mut guard = index_csum.lock().unwrap();
+                    let csum = guard.as_mut().unwrap();
 
-                if !is_fixed_chunk_size {
-                    csum.update(&chunk_end.to_le_bytes());
-                }
-                csum.update(digest);
+                    let chunk_end = offset + chunk_len as u64;
 
-                let chunk_is_known = known_chunks.contains(digest);
-                if chunk_is_known {
-                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
-                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
-                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
-                } else {
-                    let compressed_stream_len2 = compressed_stream_len.clone();
-                    known_chunks.insert(*digest);
-                    future::ready(chunk_builder.build().map(move |(chunk, digest)| {
-                        compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
-                        MergedChunkInfo::New(ChunkInfo {
-                            chunk,
-                            digest,
-                            chunk_len: chunk_len as u64,
-                            offset,
-                        })
-                    }))
+                    if !is_fixed_chunk_size {
+                        csum.update(&chunk_end.to_le_bytes());
+                    }
+                    csum.update(digest);
+
+                    let chunk_is_known = known_chunks.contains(digest);
+                    if chunk_is_known {
+                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
+                        reused_len.fetch_add(chunk_len, Ordering::SeqCst);
+
+                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
+                    } else {
+                        let compressed_stream_len2 = compressed_stream_len.clone();
+                        known_chunks.insert(*digest);
+
+                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
+                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+
+                            MergedChunkInfo::New(ChunkInfo {
+                                chunk,
+                                digest,
+                                chunk_len: chunk_len as u64,
+                                offset,
+                            })
+                        }))
+                    }
                 }
             })
             .merge_known_chunks()
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index a45420ca0..6ac0c638b 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -38,15 +38,17 @@ pub struct ChunkStream<S: Unpin> {
     chunker: Chunker,
     buffer: BytesMut,
     scan_pos: usize,
+    injection_data: Option<InjectionData>,
 }
 
 impl<S: Unpin> ChunkStream<S> {
-    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
+    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
         Self {
             input,
             chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
             buffer: BytesMut::new(),
             scan_pos: 0,
+            injection_data,
         }
     }
 }
@@ -64,6 +66,34 @@ where
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
         let this = self.get_mut();
         loop {
+            if let Some(InjectionData {
+                boundaries,
+                injections,
+                consumed,
+            }) = this.injection_data.as_mut()
+            {
+                // Make sure to release this lock as soon as possible
+                let mut boundaries = boundaries.lock().unwrap();
+                if let Some(inject) = boundaries.pop_front() {
+                    let max = *consumed + this.buffer.len() as u64;
+                    if inject.boundary <= max {
+                        let chunk_size = (inject.boundary - *consumed) as usize;
+                        let result = this.buffer.split_to(chunk_size);
+                        *consumed += chunk_size as u64;
+                        this.scan_pos = 0;
+
+                        // Add the size of the injected chunks to consumed, so chunk stream offsets
+                        // are in sync with the rest of the archive.
+                        *consumed += inject.size as u64;
+
+                        injections.lock().unwrap().push_back(inject);
+
+                        return Poll::Ready(Some(Ok(result)));
+                    }
+                    boundaries.push_front(inject);
+                }
+            }
+
             if this.scan_pos < this.buffer.len() {
                 let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
 
@@ -74,7 +104,11 @@ where
                     // continue poll
                 } else if chunk_size <= this.buffer.len() {
                     let result = this.buffer.split_to(chunk_size);
+                    if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
+                        *consumed += chunk_size as u64;
+                    }
                     this.scan_pos = 0;
+
                     return Poll::Ready(Some(Ok(result)));
                 } else {
                     panic!("got unexpected chunk boundary from chunker");
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index e2d3954ca..2c7867f22 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -1,4 +1,4 @@
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, VecDeque};
 use std::ffi::{CStr, CString, OsStr};
 use std::fmt;
 use std::io::{self, Read};
@@ -29,6 +29,7 @@ use proxmox_sys::fs::{self, acl, xattr};
 use pbs_datastore::catalog::BackupCatalogWriter;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 
+use crate::inject_reused_chunks::InjectChunks;
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
@@ -134,6 +135,7 @@ struct Archiver {
     hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
     file_copy_buffer: Vec<u8>,
     skip_e2big_xattr: bool,
+    forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -164,6 +166,7 @@ pub async fn create_archive<T, F>(
     feature_flags: Flags,
     callback: F,
     options: PxarCreateOptions,
+    forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
 ) -> Result<(), Error>
 where
     T: SeqWrite + Send,
@@ -224,6 +227,7 @@ where
         hardlinks: HashMap::new(),
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         skip_e2big_xattr: options.skip_e2big_xattr,
+        forced_boundaries,
     };
 
     archiver
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 95145cb0d..4ea084f28 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -1,3 +1,4 @@
+use std::collections::VecDeque;
 use std::io::Write;
 //use std::os::unix::io::FromRawFd;
 use std::path::Path;
@@ -17,6 +18,7 @@ use proxmox_io::StdChannelWriter;
 
 use pbs_datastore::catalog::CatalogWriter;
 
+use crate::inject_reused_chunks::InjectChunks;
 use crate::pxar::create::PxarWriters;
 
 /// Stream implementation to encode and upload .pxar archives.
@@ -42,6 +44,7 @@ impl PxarBackupStream {
         dir: Dir,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
+        boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
         separate_payload_stream: bool,
     ) -> Result<(Self, Option<Self>), Error> {
         let buffer_size = 256 * 1024;
@@ -79,6 +82,7 @@ impl PxarBackupStream {
                     Ok(())
                 },
                 options,
+                boundaries,
             )
             .await
             {
@@ -110,11 +114,12 @@ impl PxarBackupStream {
         dirname: &Path,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
+        boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
         separate_payload_stream: bool,
     ) -> Result<(Self, Option<Self>), Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
 
-        Self::new(dir, catalog, options, separate_payload_stream)
+        Self::new(dir, catalog, options, boundaries, separate_payload_stream)
     }
 }
 
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 294b52ddb..215095ee7 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
 use std::io::{self, Read, Seek, SeekFrom, Write};
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
@@ -43,10 +43,10 @@ use pbs_client::tools::{
     CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
 };
 use pbs_client::{
-    delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
-    BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
-    FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
-    BACKUP_SOURCE_SCHEMA,
+    delete_ticket_info, parse_backup_detection_mode_specification, parse_backup_specification,
+    view_task_result, BackupReader, BackupRepository, BackupSpecificationType, BackupStats,
+    BackupWriter, ChunkStream, FixedChunkStream, HttpClient, InjectionData, PxarBackupStream,
+    RemoteChunkReader, UploadOptions, BACKUP_DETECTION_MODE_SPEC, BACKUP_SOURCE_SCHEMA,
 };
 use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
 use pbs_datastore::chunk_store::verify_chunk_size;
@@ -199,14 +199,16 @@ async fn backup_directory<P: AsRef<Path>>(
         bail!("cannot backup directory with fixed chunk size!");
     }
 
+    let payload_boundaries = Arc::new(Mutex::new(VecDeque::new()));
     let (pxar_stream, payload_stream) = PxarBackupStream::open(
         dir_path.as_ref(),
         catalog,
         pxar_create_options,
+        Some(payload_boundaries.clone()),
         payload_target.is_some(),
     )?;
 
-    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
     let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
     let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -218,16 +220,16 @@ async fn backup_directory<P: AsRef<Path>>(
         }
     });
 
-    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
+    let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
 
     if let Some(payload_stream) = payload_stream {
         let payload_target = payload_target
             .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
 
-        let mut payload_chunk_stream = ChunkStream::new(
-            payload_stream,
-            chunk_size,
-        );
+        let payload_injections = Arc::new(Mutex::new(VecDeque::new()));
+        let injection_data = InjectionData::new(payload_boundaries, payload_injections.clone());
+        let mut payload_chunk_stream =
+            ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
         let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
         let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
 
@@ -242,6 +244,7 @@ async fn backup_directory<P: AsRef<Path>>(
             &payload_target,
             stream,
             upload_options,
+            Some(payload_injections),
         );
 
         match futures::join!(stats, payload_stats) {
@@ -278,7 +281,7 @@ async fn backup_image<P: AsRef<Path>>(
     }
 
     let stats = client
-        .upload_stream(archive_name, stream, upload_options)
+        .upload_stream(archive_name, stream, upload_options, None)
         .await?;
 
     Ok(stats)
@@ -569,7 +572,7 @@ fn spawn_catalog_upload(
     let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
     let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
     let catalog_chunk_size = 512 * 1024;
-    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
+    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
 
     let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
         StdChannelWriter::new(catalog_tx),
@@ -585,7 +588,7 @@ fn spawn_catalog_upload(
 
     tokio::spawn(async move {
         let catalog_upload_result = client
-            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
+            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
             .await;
 
         if let Err(ref err) = catalog_upload_result {
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index ea97976e6..0883d6cda 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -364,6 +364,7 @@ fn extract(
                         Flags::DEFAULT,
                         |_| Ok(()),
                         options,
+                        None,
                     )
                     .await
                 }
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 58c9d2cfd..d46c98d2b 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -405,6 +405,7 @@ async fn create_archive(
             Ok(())
         },
         options,
+        None,
     )
     .await?;
 
diff --git a/tests/catar.rs b/tests/catar.rs
index 9e96a8610..d5ef85ffe 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         Flags::DEFAULT,
         |_| Ok(()),
         options,
+        None,
     ))?;
 
     Command::new("cmp")
-- 
2.39.2





More information about the pbs-devel mailing list