[pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream

Christian Ebner c.ebner at proxmox.com
Wed Feb 28 15:02:04 CET 2024


This patch is in preparation for being able to quickly lookup
metadata for previous snapshots, by splitting the upload of
a pxar archive into two dedicated streams, one for metadata,
being assigned a .pxar.meta.didx suffix and one for payload
data, being assigned a .pxar.pld.didx suffix.

The patch constructs all the required duplicate chunk stream,
backup writer and upload stream instances required for the
split archive uploads.

This not only makes it possible reuse the payload chunks for
further backup runs but keeps the metadata archive small,
with the outlook of even making the currently used catalog
obsolete.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-client/src/pxar/create.rs                 |  4 +
 pbs-client/src/pxar_backup_stream.rs          | 55 ++++++++++----
 proxmox-backup-client/src/main.rs             | 75 +++++++++++++++++--
 .../src/proxmox_restore_daemon/api.rs         | 12 ++-
 pxar-bin/src/main.rs                          |  1 +
 tests/catar.rs                                |  1 +
 6 files changed, 123 insertions(+), 25 deletions(-)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index de8c0696..59aa4450 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -141,6 +141,7 @@ pub async fn create_archive<T, F>(
     feature_flags: Flags,
     callback: F,
     catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
+    mut payload_writer: Option<T>,
     options: PxarCreateOptions,
 ) -> Result<(), Error>
 where
@@ -171,6 +172,9 @@ where
     }
 
     let mut encoder = Encoder::new(&mut writer, &metadata).await?;
+    if let Some(writer) = payload_writer.as_mut() {
+        encoder = encoder.attach_payload_output(writer);
+    }
 
     let mut patterns = options.patterns;
 
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 22a6ffdc..c7be4a66 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -40,20 +40,31 @@ impl PxarBackupStream {
         dir: Dir,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
-    ) -> Result<Self, Error> {
-        let (tx, rx) = std::sync::mpsc::sync_channel(10);
-
+        separate_payload_stream: bool,
+    ) -> Result<(Self, Option<Self>), Error> {
         let buffer_size = 256 * 1024;
 
-        let error = Arc::new(Mutex::new(None));
-        let error2 = Arc::clone(&error);
-        let handler = async move {
-            let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+        let (tx, rx) = std::sync::mpsc::sync_channel(10);
+        let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+            buffer_size,
+            StdChannelWriter::new(tx),
+        ));
+        let writer = pxar::encoder::sync::StandardWriter::new(writer);
+
+        let (payload_tx, payload_rx) = std::sync::mpsc::sync_channel(10);
+        let payload_writer = if separate_payload_stream {
+            let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
                 buffer_size,
-                StdChannelWriter::new(tx),
+                StdChannelWriter::new(payload_tx),
             ));
+            Some(pxar::encoder::sync::StandardWriter::new(payload_writer))
+        } else {
+            None
+        };
 
-            let writer = pxar::encoder::sync::StandardWriter::new(writer);
+        let error = Arc::new(Mutex::new(None));
+        let error2 = Arc::clone(&error);
+        let handler = async move {
             if let Err(err) = crate::pxar::create_archive(
                 dir,
                 writer,
@@ -63,6 +74,7 @@ impl PxarBackupStream {
                     Ok(())
                 },
                 Some(catalog),
+                payload_writer,
                 options,
             )
             .await
@@ -76,21 +88,34 @@ impl PxarBackupStream {
         let future = Abortable::new(handler, registration);
         tokio::spawn(future);
 
-        Ok(Self {
+        let backup_stream = Self {
             rx: Some(rx),
-            handle: Some(handle),
-            error,
-        })
+            handle: Some(handle.clone()),
+            error: error.clone(),
+        };
+
+        let backup_stream_payload = if separate_payload_stream {
+            Some(Self {
+                rx: Some(payload_rx),
+                handle: Some(handle),
+                error,
+            })
+        } else {
+            None
+        };
+
+        Ok((backup_stream, backup_stream_payload))
     }
 
     pub fn open<W: Write + Send + 'static>(
         dirname: &Path,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
-    ) -> Result<Self, Error> {
+        separate_payload_stream: bool,
+    ) -> Result<(Self, Option<Self>), Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
 
-        Self::new(dir, catalog, options)
+        Self::new(dir, catalog, options, separate_payload_stream)
     }
 }
 
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 256080be..f252f5b7 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -187,17 +187,24 @@ async fn backup_directory<P: AsRef<Path>>(
     client: &BackupWriter,
     dir_path: P,
     archive_name: &str,
+    payload_target: Option<&str>,
     chunk_size: Option<usize>,
     catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
     pxar_create_options: pbs_client::pxar::PxarCreateOptions,
     upload_options: UploadOptions,
-) -> Result<BackupStats, Error> {
-    let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
-    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+) -> Result<(BackupStats, Option<BackupStats>), Error> {
     if upload_options.fixed_size.is_some() {
         bail!("cannot backup directory with fixed chunk size!");
     }
 
+    let (pxar_stream, payload_stream) = PxarBackupStream::open(
+        dir_path.as_ref(),
+        catalog,
+        pxar_create_options,
+        payload_target.is_some(),
+    )?;
+
+    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
     let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
     let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -209,12 +216,46 @@ async fn backup_directory<P: AsRef<Path>>(
         }
     });
 
+    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
 
-    let stats = client
-        .upload_stream(archive_name, stream, upload_options)
-        .await?;
+    if let Some(payload_stream) = payload_stream {
+        let payload_target = if let Some(payload_target) = payload_target {
+            payload_target
+        } else {
+            bail!("got payload stream, but no target archive name");
+        };
 
-    Ok(stats)
+        let mut payload_chunk_stream = ChunkStream::new(
+            payload_stream,
+            chunk_size,
+        );
+        let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
+        let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
+
+        // spawn payload chunker inside a separate task so that it can run parallel
+        tokio::spawn(async move {
+            while let Some(v) = payload_chunk_stream.next().await {
+                let _ = payload_tx.send(v).await;
+            }
+        });
+
+        let payload_stats = client.upload_stream(
+            &payload_target,
+            stream,
+            upload_options,
+        );
+
+        match futures::join!(stats, payload_stats) {
+            (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
+            (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")),
+            (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")),
+            (Err(err), Err(payload_err)) => {
+                Err(format_err!("upload failed: {err} - {payload_err}"))
+            }
+        }
+    } else {
+        Ok((stats.await?, None))
+    }
 }
 
 async fn backup_image<P: AsRef<Path>>(
@@ -985,6 +1026,13 @@ async fn create_backup(
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
             }
             (BackupSpecificationType::PXAR, false) => {
+                let metadata_mode = false; // Until enabled via param
+                let target = if metadata_mode {
+                    format!("{target_base}.meta.didx")
+                } else {
+                    target
+                };
+
                 // start catalog upload on first use
                 if catalog.is_none() {
                     let catalog_upload_res =
@@ -1015,16 +1063,27 @@ async fn create_backup(
                     ..UploadOptions::default()
                 };
 
-                let stats = backup_directory(
+                let payload_target = format!("{target_base}.pld.{extension}");
+                let (stats, payload_stats) = backup_directory(
                     &client,
                     &filename,
                     &target,
+                    Some(&payload_target),
                     chunk_size_opt,
                     catalog.clone(),
                     pxar_options,
                     upload_options,
                 )
                 .await?;
+
+                if let Some(payload_stats) = payload_stats {
+                    manifest.add_file(
+                        payload_target,
+                        payload_stats.size,
+                        payload_stats.csum,
+                        crypto.mode,
+                    )?;
+                }
                 manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
                 catalog.lock().unwrap().end_directory()?;
             }
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index c2055222..bd8ddb20 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -356,8 +356,16 @@ fn extract(
                     };
 
                     let pxar_writer = TokioWriter::new(writer);
-                    create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options)
-                        .await
+                    create_archive(
+                        dir,
+                        pxar_writer,
+                        Flags::DEFAULT,
+                        |_| Ok(()),
+                        None,
+                        None,
+                        options,
+                    )
+                    .await
                 }
                 .await;
                 if let Err(err) = result {
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 2bbe90e3..e3b0faac 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -383,6 +383,7 @@ async fn create_archive(
             Ok(())
         },
         None,
+        None,
         options,
     )
     .await?;
diff --git a/tests/catar.rs b/tests/catar.rs
index 36bb4f3b..04af4ffd 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         Flags::DEFAULT,
         |_| Ok(()),
         None,
+        None,
         options,
     ))?;
 
-- 
2.39.2





More information about the pbs-devel mailing list