[pbs-devel] [PATCH v4 pxar 04/58] encoder: add optional output writer for file payloads

Christian Ebner c.ebner at proxmox.com
Mon Apr 29 14:10:08 CEST 2024


During regular pxar archive encoding, the payload of regular files is
written as part of the archive.

This patch introduces functionality to attach an optional, dedicated
writer instance to redirect the payload to a different output.
The separation of data and metadata streams allows for efficient
reuse of payload data by referencing the payload writer byte offset,
without having to reencode it.

Whenever the payload of regular files is redirected to a dedicated
output writer, encode a payload reference header followed by the
required data to locate the data, instead of adding the regular payload
header followed by the encoded payload to the archive.

This is in preparation for reusing payload chunks for unchanged files
of backups created via the proxmox-backup-client.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/encoder/aio.rs  | 24 +++++++++---
 src/encoder/mod.rs  | 89 ++++++++++++++++++++++++++++++++++++++++-----
 src/encoder/sync.rs | 13 +++++--
 3 files changed, 107 insertions(+), 19 deletions(-)

diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index ad25fea..31a1a2f 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -24,8 +24,14 @@ impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
     pub async fn from_tokio(
         output: T,
         metadata: &Metadata,
+        payload_output: Option<T>,
     ) -> io::Result<Encoder<'a, TokioWriter<T>>> {
-        Encoder::new(TokioWriter::new(output), metadata).await
+        Encoder::new(
+            TokioWriter::new(output),
+            metadata,
+            payload_output.map(|payload_output| TokioWriter::new(payload_output)),
+        )
+        .await
     }
 }
 
@@ -39,6 +45,7 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
         Encoder::new(
             TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
             metadata,
+            None,
         )
         .await
     }
@@ -46,9 +53,13 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
 
 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     /// Create an asynchronous encoder for an output implementing our internal write interface.
-    pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
+    pub async fn new(
+        output: T,
+        metadata: &Metadata,
+        payload_output: Option<T>,
+    ) -> io::Result<Encoder<'a, T>> {
         Ok(Self {
-            inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
+            inner: encoder::EncoderImpl::new(output.into(), metadata, payload_output).await?,
         })
     }
 
@@ -291,9 +302,10 @@ mod test {
     /// Assert that `Encoder` is `Send`
     fn send_test() {
         let test = async {
-            let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
-                .await
-                .unwrap();
+            let mut encoder =
+                Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build(), None)
+                    .await
+                    .unwrap();
             {
                 let mut dir = encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index da41733..99c3758 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -17,7 +17,7 @@ use endian_trait::Endian;
 
 use crate::binary_tree_array;
 use crate::decoder::{self, SeqRead};
-use crate::format::{self, GoodbyeItem};
+use crate::format::{self, GoodbyeItem, PayloadRef};
 use crate::Metadata;
 
 pub mod aio;
@@ -221,6 +221,9 @@ struct EncoderState {
 
     /// We need to keep track how much we have written to get offsets.
     write_position: u64,
+
+    /// Track the bytes written to the payload writer
+    payload_write_position: u64,
 }
 
 impl EncoderState {
@@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
+    payload_output: EncoderOutput<'a, Option<T>>,
     state: EncoderState,
     parent: Option<&'a mut EncoderState>,
     finished: bool,
@@ -306,12 +310,14 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     pub async fn new(
         output: EncoderOutput<'a, T>,
         metadata: &Metadata,
+        payload_output: Option<T>,
     ) -> io::Result<EncoderImpl<'a, T>> {
         if !metadata.is_dir() {
             io_bail!("directory metadata must contain the directory mode flag");
         }
         let mut this = Self {
             output,
+            payload_output: EncoderOutput::Owned(None),
             state: EncoderState::default(),
             parent: None,
             finished: false,
@@ -323,6 +329,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         this.encode_metadata(metadata).await?;
         this.state.files_offset = this.position();
 
+        if let Some(payload_output) = payload_output {
+            this.payload_output = EncoderOutput::Owned(Some(payload_output));
+        }
+
         Ok(this)
     }
 
@@ -361,10 +371,37 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_offset = self.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
-        let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
-        header.check_header_size()?;
+        if let Some(payload_output) = self.payload_output.as_mut() {
+            // payload references must point to the position prior to the payload header,
+            // separating payload entries in the payload stream
+            let payload_position = self.state.payload_write_position;
+
+            let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+            header.check_header_size()?;
+            seq_write_struct(
+                payload_output,
+                header,
+                &mut self.state.payload_write_position,
+            )
+            .await?;
+
+            let payload_ref = PayloadRef {
+                offset: payload_position,
+                size: file_size,
+            };
 
-        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+            seq_write_pxar_entry(
+                self.output.as_mut(),
+                format::PXAR_PAYLOAD_REF,
+                &payload_ref.data(),
+                &mut self.state.write_position,
+            )
+            .await?;
+        } else {
+            let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+            header.check_header_size()?;
+            seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+        }
 
         let payload_data_offset = self.position();
 
@@ -372,6 +409,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         Ok(FileImpl {
             output: self.output.as_mut(),
+            payload_output: self.payload_output.as_mut().as_mut(),
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
@@ -564,6 +602,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.state.write_position
     }
 
+    #[inline]
+    fn payload_position(&mut self) -> u64 {
+        self.state.payload_write_position
+    }
+
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
@@ -588,18 +631,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         // the child will write to OUR state now:
         let write_position = self.position();
+        let payload_write_position = self.payload_position();
 
         let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
 
         Ok(EncoderImpl {
             // always forward as Borrowed(), to avoid stacking references on nested calls
             output: self.output.to_borrowed_mut(),
+            payload_output: self.payload_output.to_borrowed_mut(),
             state: EncoderState {
                 entry_offset,
                 files_offset,
                 file_offset: Some(file_offset),
                 file_hash,
                 write_position,
+                payload_write_position,
                 ..Default::default()
             },
             parent: Some(&mut self.state),
@@ -764,15 +810,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         )
         .await?;
 
+        if let EncoderOutput::Owned(Some(output)) = &mut self.payload_output {
+            flush(output).await?;
+        }
+
         if let EncoderOutput::Owned(output) = &mut self.output {
             flush(output).await?;
         }
 
         // done up here because of the self-borrow and to propagate
         let end_offset = self.position();
+        let payload_end_offset = self.payload_position();
 
         if let Some(parent) = &mut self.parent {
             parent.write_position = end_offset;
+            parent.payload_write_position = payload_end_offset;
 
             let file_offset = self
                 .state
@@ -837,6 +889,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 pub(crate) struct FileImpl<'a, S: SeqWrite> {
     output: &'a mut S,
 
+    /// Optional write redirection of file payloads to this sequential stream
+    payload_output: Option<&'a mut S>,
+
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
     /// directly instead of on Drop of FileImpl?
     goodbye_item: GoodbyeItem,
@@ -916,19 +971,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// for convenience.
     pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
         self.check_remaining(data.len())?;
-        let put =
-            poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
-                .await?;
-        //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
+        let put = if let Some(mut output) = self.payload_output.as_mut() {
+            let put =
+                poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
+                    .await?;
+            self.parent.payload_write_position += put as u64;
+            put
+        } else {
+            let put = poll_fn(|cx| unsafe {
+                Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data)
+            })
+            .await?;
+            self.parent.write_position += put as u64;
+            put
+        };
+
         self.remaining_size -= put as u64;
-        self.parent.write_position += put as u64;
         Ok(put)
     }
 
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        if let Some(ref mut output) = self.payload_output {
+            seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
+        } else {
+            seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        }
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 1ec91b8..96d056d 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -28,7 +28,7 @@ impl<'a, T: io::Write + 'a> Encoder<'a, StandardWriter<T>> {
     /// Encode a `pxar` archive into a regular `std::io::Write` output.
     #[inline]
     pub fn from_std(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, StandardWriter<T>>> {
-        Encoder::new(StandardWriter::new(output), metadata)
+        Encoder::new(StandardWriter::new(output), metadata, None)
     }
 }
 
@@ -41,6 +41,7 @@ impl<'a> Encoder<'a, StandardWriter<std::fs::File>> {
         Encoder::new(
             StandardWriter::new(std::fs::File::create(path.as_ref())?),
             metadata,
+            None,
         )
     }
 }
@@ -50,9 +51,15 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     ///
     /// Note that the `output`'s `SeqWrite` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
+    // Optionally attach a dedicated writer to redirect the payloads of regular files to a separate
+    // output.
+    pub fn new(output: T, metadata: &Metadata, payload_output: Option<T>) -> io::Result<Self> {
         Ok(Self {
-            inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
+            inner: poll_result_once(encoder::EncoderImpl::new(
+                output.into(),
+                metadata,
+                payload_output,
+            ))?,
         })
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list