[pbs-devel] [PATCH v8 pxar 06/69] encoder: allow split output writer for archive creation

Christian Ebner c.ebner at proxmox.com
Tue May 28 11:42:00 CEST 2024


During regular pxar archive encoding, the payload of regular files is
written as part of the archive.

This patch introduces functionality to instead attach a writer variant
with a split payload writer instance to redirect the payload to a
different output.
The separation of data and metadata streams allows for efficient
reuse of payload data by referencing the payload writer byte offset,
without having to reencode it.

Whenever the payload of regular files is redirected to a dedicated
output writer, encode a payload reference header followed by the
required data to locate the data, instead of adding the regular payload
header followed by the encoded payload to the archive.

This is in preparation for reusing payload chunks for unchanged files
of backups created via the proxmox-backup-client.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 7:
- no changes

changes since version 6:
- patch reordered, use PxarVariant instead of optional payload output

 src/encoder/aio.rs  |  24 ++++---
 src/encoder/mod.rs  | 160 ++++++++++++++++++++++++++++++++------------
 src/encoder/sync.rs |  19 ++++--
 3 files changed, 148 insertions(+), 55 deletions(-)

diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index f11e57c..610fce5 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
 
 use crate::encoder::{self, LinkOffset, SeqWrite};
 use crate::format;
-use crate::Metadata;
+use crate::{Metadata, PxarVariant};
 
 /// Asynchronous `pxar` encoder.
 ///
@@ -22,10 +22,10 @@ impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
     /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
     #[inline]
     pub async fn from_tokio(
-        output: T,
+        output: PxarVariant<T, T>,
         metadata: &Metadata,
     ) -> io::Result<Encoder<'a, TokioWriter<T>>> {
-        Encoder::new(TokioWriter::new(output), metadata).await
+        Encoder::new(output.wrap(|output| TokioWriter::new(output)), metadata).await
     }
 }
 
@@ -37,7 +37,9 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
         metadata: &'b Metadata,
     ) -> io::Result<Encoder<'a, TokioWriter<tokio::fs::File>>> {
         Encoder::new(
-            TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
+            PxarVariant::Unified(TokioWriter::new(
+                tokio::fs::File::create(path.as_ref()).await?,
+            )),
             metadata,
         )
         .await
@@ -46,9 +48,10 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
 
 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     /// Create an asynchronous encoder for an output implementing our internal write interface.
-    pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
+    pub async fn new(output: PxarVariant<T, T>, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
+        let output = output.wrap_multi(|output| output.into(), |payload_output| payload_output);
         Ok(Self {
-            inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
+            inner: encoder::EncoderImpl::new(output, metadata).await?,
         })
     }
 
@@ -294,9 +297,12 @@ mod test {
     /// Assert that `Encoder` is `Send`
     fn send_test() {
         let test = async {
-            let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
-                .await
-                .unwrap();
+            let mut encoder = Encoder::new(
+                crate::PxarVariant::Unified(DummyOutput),
+                &Metadata::dir_builder(0o700).build(),
+            )
+            .await
+            .unwrap();
             {
                 encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 2bc3128..fbd90fe 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -17,8 +17,8 @@ use endian_trait::Endian;
 
 use crate::binary_tree_array;
 use crate::decoder::{self, SeqRead};
-use crate::format::{self, GoodbyeItem};
-use crate::Metadata;
+use crate::format::{self, GoodbyeItem, PayloadRef};
+use crate::{Metadata, PxarVariant};
 
 pub mod aio;
 pub mod sync;
@@ -222,6 +222,9 @@ struct EncoderState {
     /// We need to keep track how much we have written to get offsets.
     write_position: u64,
 
+    /// Track the bytes written to the payload writer
+    payload_write_position: u64,
+
     /// Mark the encoder state as correctly finished, ready to be dropped
     finished: bool,
 }
@@ -232,6 +235,11 @@ impl EncoderState {
         self.write_position
     }
 
+    #[inline]
+    fn payload_position(&self) -> u64 {
+        self.payload_write_position
+    }
+
     fn merge_error(&mut self, error: Option<EncodeError>) {
         // one error is enough:
         if self.encode_error.is_none() {
@@ -292,7 +300,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
-    output: EncoderOutput<'a, T>,
+    output: PxarVariant<EncoderOutput<'a, T>, T>,
     /// EncoderState stack storing the state for each directory level
     state: Vec<EncoderState>,
     finished: bool,
@@ -316,7 +324,7 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
 
 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     pub async fn new(
-        output: EncoderOutput<'a, T>,
+        output: PxarVariant<EncoderOutput<'a, T>, T>,
         metadata: &Metadata,
     ) -> io::Result<EncoderImpl<'a, T>> {
         if !metadata.is_dir() {
@@ -362,9 +370,16 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             .ok_or_else(|| io_format_err!("encoder state stack underflow"))
     }
 
-    fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
+    fn output_state(&mut self) -> io::Result<(PxarVariant<&mut T, &mut T>, &mut EncoderState)> {
+        let output = match &mut self.output {
+            PxarVariant::Unified(output) => PxarVariant::Unified(output.as_mut()),
+            PxarVariant::Split(output, payload_output) => {
+                PxarVariant::Split(output.as_mut(), payload_output)
+            }
+        };
+
         Ok((
-            self.output.as_mut(),
+            output,
             self.state
                 .last_mut()
                 .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
@@ -398,10 +413,33 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
-        let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
-        header.check_header_size()?;
-        let (output, state) = self.output_state()?;
-        seq_write_struct(output, header, &mut state.write_position).await?;
+        let (mut output, state) = self.output_state()?;
+        if let Some(payload_output) = output.payload_mut() {
+            // payload references must point to the position prior to the payload header,
+            // separating payload entries in the payload stream
+            let payload_position = state.payload_position();
+
+            let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+            header.check_header_size()?;
+            seq_write_struct(payload_output, header, &mut state.payload_write_position).await?;
+
+            let payload_ref = PayloadRef {
+                offset: payload_position,
+                size: file_size,
+            };
+
+            seq_write_pxar_entry(
+                output.archive_mut(),
+                format::PXAR_PAYLOAD_REF,
+                &payload_ref.data(),
+                &mut state.write_position,
+            )
+            .await?;
+        } else {
+            let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+            header.check_header_size()?;
+            seq_write_struct(output.archive_mut(), header, &mut state.write_position).await?;
+        }
 
         let payload_data_offset = state.position();
 
@@ -576,9 +614,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         self.start_file_do(metadata, file_name).await?;
 
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         if let Some((htype, entry_data)) = entry_htype_data {
-            seq_write_pxar_entry(output, htype, entry_data, &mut state.write_position).await?;
+            seq_write_pxar_entry(
+                output.archive_mut(),
+                htype,
+                entry_data,
+                &mut state.write_position,
+            )
+            .await?;
         }
 
         let end_offset = state.position();
@@ -617,6 +661,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         // the child will write to OUR state now:
         let write_position = state.position();
+        let payload_write_position = state.payload_position();
 
         self.state.push(EncoderState {
             items: Vec::new(),
@@ -626,6 +671,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             file_offset: Some(file_offset),
             file_hash,
             write_position,
+            payload_write_position,
             finished: false,
         });
 
@@ -645,9 +691,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            output,
+            output.archive_mut(),
             format::PXAR_ENTRY,
             metadata.stat.clone(),
             &mut state.write_position,
@@ -672,9 +718,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            output,
+            output.archive_mut(),
             format::PXAR_XATTR,
             &xattr.data,
             &mut state.write_position,
@@ -683,10 +729,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_USER,
                 acl.clone(),
                 &mut state.write_position,
@@ -696,7 +742,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.groups {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
                 &mut state.write_position,
@@ -706,7 +752,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         if let Some(acl) = &acl.group_obj {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
                 &mut state.write_position,
@@ -716,7 +762,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         if let Some(acl) = &acl.default {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
                 &mut state.write_position,
@@ -726,7 +772,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.default_users {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
                 &mut state.write_position,
@@ -736,7 +782,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.default_groups {
             seq_write_pxar_struct_entry(
-                output,
+                output.archive_mut(),
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
                 &mut state.write_position,
@@ -748,9 +794,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            output,
+            output.archive_mut(),
             format::PXAR_FCAPS,
             &fcaps.data,
             &mut state.write_position,
@@ -762,9 +808,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         &mut self,
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            output,
+            output.archive_mut(),
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
             &mut state.write_position,
@@ -774,9 +820,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
-        let (output, state) = self.output_state()?;
+        let (mut output, state) = self.output_state()?;
         seq_write_pxar_entry_zero(
-            output,
+            output.archive_mut(),
             format::PXAR_FILENAME,
             file_name,
             &mut state.write_position,
@@ -789,7 +835,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             io_bail!("unexpected state on encoder close");
         }
 
-        if let EncoderOutput::Owned(output) = &mut self.output {
+        if let Some(output) = self.output.payload_mut() {
+            flush(output).await?;
+        }
+
+        if let EncoderOutput::Owned(output) = self.output.archive_mut() {
             flush(output).await?;
         }
 
@@ -805,7 +855,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             .pop()
             .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
-            self.output.as_mut(),
+            self.output.archive_mut().as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
             &mut state.write_position,
@@ -813,10 +863,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         .await?;
 
         let end_offset = state.position();
+        let payload_end_offset = state.payload_position();
 
         let encode_error = state.finish();
         if let Some(parent) = self.state.last_mut() {
             parent.write_position = end_offset;
+            parent.payload_write_position = payload_end_offset;
 
             let file_offset = state
                 .file_offset
@@ -886,7 +938,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
 /// Writer for a file object in a directory.
 pub(crate) struct FileImpl<'a, S: SeqWrite> {
-    output: &'a mut S,
+    /// Optional write redirection of file payloads to this sequential stream
+    output: PxarVariant<&'a mut S, &'a mut S>,
 
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
     /// directly instead of on Drop of FileImpl?
@@ -934,7 +987,7 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     ) -> Poll<io::Result<usize>> {
         let this = self.get_mut();
         this.check_remaining(data.len())?;
-        let output = unsafe { Pin::new_unchecked(&mut *this.output) };
+        let output = unsafe { Pin::new_unchecked(&mut *this.output.archive_mut()) };
         match output.poll_seq_write(cx, data) {
             Poll::Ready(Ok(put)) => {
                 this.remaining_size -= put as u64;
@@ -948,7 +1001,10 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// Poll flush interface to more easily connect to tokio/futures.
     #[cfg(feature = "tokio-io")]
     pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
+        unsafe {
+            self.map_unchecked_mut(|this| this.output.archive_mut())
+                .poll_flush(cx)
+        }
     }
 
     /// Poll close/shutdown interface to more easily connect to tokio/futures.
@@ -957,7 +1013,10 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// provided by our encoder.
     #[cfg(feature = "tokio-io")]
     pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
+        unsafe {
+            self.map_unchecked_mut(|this| this.output.archive_mut())
+                .poll_flush(cx)
+        }
     }
 
     /// Write file data for the current file entry in a pxar archive.
@@ -967,19 +1026,38 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// for convenience.
     pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
         self.check_remaining(data.len())?;
-        let put =
-            poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
-                .await?;
-        //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
+        let put = if let Some(mut output) = self.output.payload_mut() {
+            let put =
+                poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
+                    .await?;
+            self.parent.payload_write_position += put as u64;
+            put
+        } else {
+            let put = poll_fn(|cx| unsafe {
+                Pin::new_unchecked(self.output.archive_mut()).poll_seq_write(cx, data)
+            })
+            .await?;
+            self.parent.write_position += put as u64;
+            put
+        };
+
         self.remaining_size -= put as u64;
-        self.parent.write_position += put as u64;
         Ok(put)
     }
 
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        if let Some(ref mut output) = self.output.payload_mut() {
+            seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
+        } else {
+            seq_write_all(
+                self.output.archive_mut(),
+                data,
+                &mut self.parent.write_position,
+            )
+            .await?;
+        }
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 48a97af..9d39658 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -9,7 +9,7 @@ use crate::decoder::sync::StandardReader;
 use crate::encoder::{self, LinkOffset, SeqWrite};
 use crate::format;
 use crate::util::poll_result_once;
-use crate::Metadata;
+use crate::{Metadata, PxarVariant};
 
 /// Blocking `pxar` encoder.
 ///
@@ -28,7 +28,7 @@ impl<'a, T: io::Write + 'a> Encoder<'a, StandardWriter<T>> {
     /// Encode a `pxar` archive into a regular `std::io::Write` output.
     #[inline]
     pub fn from_std(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, StandardWriter<T>>> {
-        Encoder::new(StandardWriter::new(output), metadata)
+        Encoder::new(PxarVariant::Unified(StandardWriter::new(output)), metadata)
     }
 }
 
@@ -39,7 +39,7 @@ impl<'a> Encoder<'a, StandardWriter<std::fs::File>> {
         metadata: &'b Metadata,
     ) -> io::Result<Encoder<'a, StandardWriter<std::fs::File>>> {
         Encoder::new(
-            StandardWriter::new(std::fs::File::create(path.as_ref())?),
+            PxarVariant::Unified(StandardWriter::new(std::fs::File::create(path.as_ref())?)),
             metadata,
         )
     }
@@ -50,9 +50,18 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     ///
     /// Note that the `output`'s `SeqWrite` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
+    // Optionally attach a dedicated writer to redirect the payloads of regular files to a separate
+    // output.
+    pub fn new(output: PxarVariant<T, T>, metadata: &Metadata) -> io::Result<Self> {
+        let output = match output {
+            PxarVariant::Unified(output) => PxarVariant::Unified(output.into()),
+            PxarVariant::Split(output, payload_output) => {
+                PxarVariant::Split(output.into(), payload_output)
+            }
+        };
+
         Ok(Self {
-            inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
+            inner: poll_result_once(encoder::EncoderImpl::new(output, metadata))?,
         })
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list