[pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream

Christian Ebner c.ebner at proxmox.com
Tue Mar 5 10:26:31 CET 2024


Implement an optional redirection to read the payload for regular files
from a different input stream.

This allows to decode split stream archives.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- refactor to use new PayloadRef type and decoder method

 src/accessor/mod.rs |  2 ++
 src/decoder/mod.rs  | 78 +++++++++++++++++++++++++++++++++++++++++----
 src/decoder/sync.rs |  7 ++++
 src/lib.rs          |  3 ++
 4 files changed, 83 insertions(+), 7 deletions(-)

diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index 6a2de73..ed99c85 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -342,6 +342,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             EntryKind::File {
                 offset: Some(offset),
                 size,
+                ..
             } => {
                 let meta_size = offset - link_offset;
                 let entry_end = link_offset + meta_size + size;
@@ -711,6 +712,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             EntryKind::File {
                 size,
                 offset: Some(offset),
+                ..
             } => Ok(Some(offset..(offset + size))),
             _ => Ok(None),
         }
diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
index 70c44ce..7b8254d 100644
--- a/src/decoder/mod.rs
+++ b/src/decoder/mod.rs
@@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
     state: State,
     with_goodbye_tables: bool,
 
+    // Payload of regular files might be provided by a different reader
+    payload_input: Option<T>,
+    payload_consumed: u64,
+
     /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
     /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
     eof_after_entry: bool,
@@ -167,6 +171,7 @@ enum State {
     Default,
     InPayload {
         offset: u64,
+        size: u64,
     },
 
     /// file entries with no data (fifo, socket)
@@ -199,6 +204,11 @@ impl<I: SeqRead> DecoderImpl<I> {
         Self::new_full(input, "/".into(), false).await
     }
 
+    pub fn redirect_payload_input(mut self, payload_input: I) -> Self {
+        self.payload_input = Some(payload_input);
+        self
+    }
+
     pub(crate) fn input(&self) -> &I {
         &self.input
     }
@@ -219,6 +229,8 @@ impl<I: SeqRead> DecoderImpl<I> {
             path_lengths: Vec::new(),
             state: State::Begin,
             with_goodbye_tables: false,
+            payload_input: None,
+            payload_consumed: 0,
             eof_after_entry,
         };
 
@@ -242,9 +254,14 @@ impl<I: SeqRead> DecoderImpl<I> {
                     // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
                     self.read_next_item().await?;
                 }
-                State::InPayload { offset } => {
-                    // We need to skip the current payload first.
-                    self.skip_entry(offset).await?;
+                State::InPayload { offset, .. } => {
+                    if self.payload_input.is_none() {
+                        // Skip remaining payload of current entry in regular stream
+                        self.skip_entry(offset).await?;
+                    } else {
+                        // Update consumed payload as given by the offset referenced by the content reader
+                        self.payload_consumed += offset;
+                    }
                     self.read_next_item().await?;
                 }
                 State::InGoodbyeTable => {
@@ -308,11 +325,11 @@ impl<I: SeqRead> DecoderImpl<I> {
     }
 
     pub fn content_reader(&mut self) -> Option<Contents<I>> {
-        if let State::InPayload { offset } = &mut self.state {
+        if let State::InPayload { offset, size } = &mut self.state {
             Some(Contents::new(
-                &mut self.input,
+                self.payload_input.as_mut().unwrap_or(&mut self.input),
                 offset,
-                self.current_header.content_size(),
+                *size,
             ))
         } else {
             None
@@ -531,8 +548,40 @@ impl<I: SeqRead> DecoderImpl<I> {
                 self.entry.kind = EntryKind::File {
                     size: self.current_header.content_size(),
                     offset,
+                    payload_offset: None,
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: self.current_header.content_size(),
+                };
+                return Ok(ItemResult::Entry);
+            }
+            format::PXAR_PAYLOAD_REF => {
+                let offset = seq_read_position(&mut self.input).await.transpose()?;
+                let payload_ref = self.read_payload_ref().await?;
+
+                let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut()
+                {
+                    seq_read_position(payload_input).await.transpose()?
+                } else {
+                    None
+                };
+
+                // Skip payload padding for injected chunks in sync decoder
+                if self.payload_input.is_some() && payload_input_offset.is_none() {
+                    let to_skip = payload_ref.offset - self.payload_consumed;
+                    self.skip_payload(to_skip).await?;
+                }
+
+                self.entry.kind = EntryKind::File {
+                    size: payload_ref.size,
+                    offset,
+                    payload_offset: Some(payload_ref.offset),
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: payload_ref.size,
                 };
-                self.state = State::InPayload { offset: 0 };
                 return Ok(ItemResult::Entry);
             }
             format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@@ -576,6 +625,21 @@ impl<I: SeqRead> DecoderImpl<I> {
         Ok(())
     }
 
+    async fn skip_payload(&mut self, length: u64) -> io::Result<()> {
+        let mut len = length;
+        let scratch = scratch_buffer();
+        while len >= (scratch.len() as u64) {
+            seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?;
+            len -= scratch.len() as u64;
+        }
+        let len = len as usize;
+        if len > 0 {
+            seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?;
+        }
+        self.payload_consumed += length;
+        Ok(())
+    }
+
     async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
         let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
         let data = seq_read_exact_data(&mut self.input, size).await?;
diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
index 5597a03..b22b341 100644
--- a/src/decoder/sync.rs
+++ b/src/decoder/sync.rs
@@ -53,6 +53,13 @@ impl<T: SeqRead> Decoder<T> {
         })
     }
 
+    /// Take the file payloads from the provided input stream rather than the regular pxar stream
+    pub fn redirect_payload_input(self, payload_input: T) -> Self {
+        Self {
+            inner: self.inner.redirect_payload_input(payload_input),
+        }
+    }
+
     /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
     /// layer "above" the `Accessor` propagates the actual type (sync vs async).
     pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
diff --git a/src/lib.rs b/src/lib.rs
index 210c4b1..ef81a85 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -364,6 +364,9 @@ pub enum EntryKind {
 
         /// The file's byte offset inside the archive, if available.
         offset: Option<u64>,
+
+        /// The file's byte offset inside the payload stream, if available.
+        payload_offset: Option<u64>,
     },
 
     /// Directory entry. When iterating through an archive, the contents follow next.
-- 
2.39.2





More information about the pbs-devel mailing list