[pbs-devel] [PATCH v8 proxmox-backup 20/69] client: pxar: optionally split metadata and payload streams

Christian Ebner c.ebner at proxmox.com
Tue May 28 11:42:14 CEST 2024


... and attach the split payload writer variant to the pxar archive
creation. By this, metadata and payload data will create different
dynamic indexes, allowing to lookup and reuse payload chunks without
the additional overhead of the pxar archive's metadata.

For now this functionality remains disabled and will be enabled in a
later patch once the logic for reusing the payload chunks is in
place.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 7:
- no changes

changes since version 6:
- adapt to new PxarVariant pxar interface type

 pbs-client/src/pxar_backup_stream.rs | 51 ++++++++++++++-----
 proxmox-backup-client/src/main.rs    | 75 +++++++++++++++++++++++++---
 2 files changed, 105 insertions(+), 21 deletions(-)

diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 8dc3fd088..3541eddb5 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -42,21 +42,37 @@ impl PxarBackupStream {
         dir: Dir,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
-    ) -> Result<Self, Error> {
-        let (tx, rx) = std::sync::mpsc::sync_channel(10);
-
+        separate_payload_stream: bool,
+    ) -> Result<(Self, Option<Self>), Error> {
         let buffer_size = 256 * 1024;
 
-        let error = Arc::new(Mutex::new(None));
-        let error2 = Arc::clone(&error);
-        let handler = async move {
-            let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+        let (tx, rx) = std::sync::mpsc::sync_channel(10);
+        let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+            buffer_size,
+            StdChannelWriter::new(tx),
+        ));
+        let writer = pxar::encoder::sync::StandardWriter::new(writer);
+
+        let (writer, payload_rx) = if separate_payload_stream {
+            let (tx, rx) = std::sync::mpsc::sync_channel(10);
+            let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
                 buffer_size,
                 StdChannelWriter::new(tx),
             ));
+            (
+                pxar::PxarVariant::Split(
+                    writer,
+                    pxar::encoder::sync::StandardWriter::new(payload_writer),
+                ),
+                Some(rx),
+            )
+        } else {
+            (pxar::PxarVariant::Unified(writer), None)
+        };
 
-            let writer =
-                pxar::PxarVariant::Unified(pxar::encoder::sync::StandardWriter::new(writer));
+        let error = Arc::new(Mutex::new(None));
+        let error2 = Arc::clone(&error);
+        let handler = async move {
             if let Err(err) = crate::pxar::create_archive(
                 dir,
                 PxarWriters::new(writer, Some(catalog)),
@@ -78,21 +94,30 @@ impl PxarBackupStream {
         let future = Abortable::new(handler, registration);
         tokio::spawn(future);
 
-        Ok(Self {
+        let backup_stream = Self {
+            rx: Some(rx),
+            handle: Some(handle.clone()),
+            error: Arc::clone(&error),
+        };
+
+        let backup_payload_stream = payload_rx.map(|rx| Self {
             rx: Some(rx),
             handle: Some(handle),
             error,
-        })
+        });
+
+        Ok((backup_stream, backup_payload_stream))
     }
 
     pub fn open<W: Write + Send + 'static>(
         dirname: &Path,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
-    ) -> Result<Self, Error> {
+        separate_payload_stream: bool,
+    ) -> Result<(Self, Option<Self>), Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
 
-        Self::new(dir, catalog, options)
+        Self::new(dir, catalog, options, separate_payload_stream)
     }
 }
 
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index ad2bc5a66..25556d672 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -187,18 +187,24 @@ async fn backup_directory<P: AsRef<Path>>(
     client: &BackupWriter,
     dir_path: P,
     archive_name: &str,
+    payload_target: Option<&str>,
     chunk_size: Option<usize>,
     catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
     pxar_create_options: pbs_client::pxar::PxarCreateOptions,
     upload_options: UploadOptions,
-) -> Result<BackupStats, Error> {
+) -> Result<(BackupStats, Option<BackupStats>), Error> {
     if upload_options.fixed_size.is_some() {
         bail!("cannot backup directory with fixed chunk size!");
     }
 
-    let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
-    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+    let (pxar_stream, payload_stream) = PxarBackupStream::open(
+        dir_path.as_ref(),
+        catalog,
+        pxar_create_options,
+        payload_target.is_some(),
+    )?;
 
+    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
     let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
     let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -210,11 +216,36 @@ async fn backup_directory<P: AsRef<Path>>(
         }
     });
 
-    let stats = client
-        .upload_stream(archive_name, stream, upload_options)
-        .await?;
+    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
 
-    Ok(stats)
+    if let Some(payload_stream) = payload_stream {
+        let payload_target = payload_target
+            .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
+
+        let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size);
+        let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
+        let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
+
+        // spawn payload chunker inside a separate task so that it can run parallel
+        tokio::spawn(async move {
+            while let Some(v) = payload_chunk_stream.next().await {
+                let _ = payload_tx.send(v).await;
+            }
+        });
+
+        let payload_stats = client.upload_stream(&payload_target, stream, upload_options);
+
+        match futures::join!(stats, payload_stats) {
+            (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
+            (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")),
+            (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")),
+            (Err(err), Err(payload_err)) => {
+                Err(format_err!("upload failed: {err} - {payload_err}"))
+            }
+        }
+    } else {
+        Ok((stats.await?, None))
+    }
 }
 
 async fn backup_image<P: AsRef<Path>>(
@@ -985,6 +1016,23 @@ async fn create_backup(
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
             }
             (BackupSpecificationType::PXAR, false) => {
+                let metadata_mode = false; // Until enabled via param
+
+                let target_base = if let Some(base) = target_base.strip_suffix(".pxar") {
+                    base.to_string()
+                } else {
+                    bail!("unexpected suffix in target: {target_base}");
+                };
+
+                let (target, payload_target) = if metadata_mode {
+                    (
+                        format!("{target_base}.mpxar.{extension}"),
+                        Some(format!("{target_base}.ppxar.{extension}")),
+                    )
+                } else {
+                    (target, None)
+                };
+
                 // start catalog upload on first use
                 if catalog.is_none() {
                     let catalog_upload_res =
@@ -1015,16 +1063,27 @@ async fn create_backup(
                     ..UploadOptions::default()
                 };
 
-                let stats = backup_directory(
+                let (stats, payload_stats) = backup_directory(
                     &client,
                     &filename,
                     &target,
+                    payload_target.as_deref(),
                     chunk_size_opt,
                     catalog.clone(),
                     pxar_options,
                     upload_options,
                 )
                 .await?;
+
+                if let Some(payload_stats) = payload_stats {
+                    manifest.add_file(
+                        payload_target
+                            .ok_or_else(|| format_err!("missing payload target archive"))?,
+                        payload_stats.size,
+                        payload_stats.csum,
+                        crypto.mode,
+                    )?;
+                }
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
                 catalog.lock().unwrap().end_directory()?;
             }
-- 
2.39.2





More information about the pbs-devel mailing list