[pbs-devel] [PATCH v7 pxar 07/69] decoder/accessor: allow for split input stream variant

Christian Ebner c.ebner at proxmox.com
Mon May 27 16:32:21 CEST 2024


When a pxar archive was encoded using the split stream output
variant, access to the payload of regular files has to be redirected
to the corresponding dedicated input.

Allow to pass the split input variant to the decoder and accessor
instances to handle the split streams accordingly and decode split
stream archives.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 6:
- use PxarVariant instead of optional payload inputs

 examples/apxar.rs    |   2 +-
 src/accessor/aio.rs  |  10 ++--
 src/accessor/mod.rs  |  83 ++++++++++++++++++---------
 src/accessor/sync.rs |   8 +--
 src/decoder/aio.rs   |  13 +++--
 src/decoder/mod.rs   | 133 ++++++++++++++++++++++++++++++++++---------
 src/decoder/sync.rs  |  21 +++++--
 src/lib.rs           |   3 +
 tests/compat.rs      |   3 +-
 tests/simple/main.rs |   8 ++-
 10 files changed, 206 insertions(+), 78 deletions(-)

diff --git a/examples/apxar.rs b/examples/apxar.rs
index 0c62242..0dab51d 100644
--- a/examples/apxar.rs
+++ b/examples/apxar.rs
@@ -9,7 +9,7 @@ async fn main() {
         .await
         .expect("failed to open file");
 
-    let mut reader = Decoder::from_tokio(file)
+    let mut reader = Decoder::from_tokio(pxar::PxarVariant::Unified(file))
         .await
         .expect("failed to open pxar archive contents");
 
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index 98d7755..73b1025 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -18,7 +18,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::aio::Decoder;
 use crate::format::GoodbyeItem;
 use crate::util;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 use super::sync::{FileReader, FileRefReader};
 
@@ -39,7 +39,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
     /// by a blocking file.
     #[inline]
     pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
-        Accessor::new(FileReader::new(input), size).await
+        Accessor::new(PxarVariant::Unified(FileReader::new(input)), size).await
     }
 }
 
@@ -75,7 +75,7 @@ where
         input: T,
         size: u64,
     ) -> io::Result<Accessor<FileRefReader<T>>> {
-        Accessor::new(FileRefReader::new(input), size).await
+        Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size).await
     }
 }
 
@@ -85,7 +85,9 @@ impl<T: ReadAt> Accessor<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub async fn new(input: T, size: u64) -> io::Result<Self> {
+    /// Optionally take the file payloads from the provided input stream rather than the regular
+    /// pxar stream.
+    pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         Ok(Self {
             inner: accessor::AccessorImpl::new(input, size).await?,
         })
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index 6a2de73..c061b74 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -19,7 +19,7 @@ use crate::binary_tree_array;
 use crate::decoder::{self, DecoderImpl};
 use crate::format::{self, GoodbyeItem};
 use crate::util;
-use crate::{Entry, EntryKind};
+use crate::{Entry, EntryKind, PxarVariant};
 
 pub mod aio;
 pub mod cache;
@@ -179,17 +179,22 @@ struct Caches {
 
 /// The random access state machine implementation.
 pub(crate) struct AccessorImpl<T> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     size: u64,
     caches: Arc<Caches>,
 }
 
 impl<T: ReadAt> AccessorImpl<T> {
-    pub async fn new(input: T, size: u64) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         if size < (size_of::<GoodbyeItem>() as u64) {
             io_bail!("too small to contain a pxar archive");
         }
 
+        let input = input.wrap_multi(
+            |input| input,
+            |(payload_input, size)| (payload_input, 0..size),
+        );
+
         Ok(Self {
             input,
             size,
@@ -202,13 +207,14 @@ impl<T: ReadAt> AccessorImpl<T> {
     }
 
     pub async fn open_root_ref(&self) -> io::Result<DirectoryImpl<&dyn ReadAt>> {
-        DirectoryImpl::open_at_end(
-            &self.input as &dyn ReadAt,
-            self.size,
-            "/".into(),
-            Arc::clone(&self.caches),
-        )
-        .await
+        let input = match &self.input {
+            PxarVariant::Unified(input) => PxarVariant::Unified(input as &dyn ReadAt),
+            PxarVariant::Split(input, (payload_input, range)) => PxarVariant::Split(
+                input as &dyn ReadAt,
+                (payload_input as &dyn ReadAt, range.clone()),
+            ),
+        };
+        DirectoryImpl::open_at_end(input, self.size, "/".into(), Arc::clone(&self.caches)).await
     }
 
     pub fn set_goodbye_table_cache(
@@ -224,21 +230,25 @@ impl<T: ReadAt> AccessorImpl<T> {
 }
 
 async fn get_decoder<T: ReadAt>(
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_range: Range<u64>,
     path: PathBuf,
 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
-    DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
+    let input = input.wrap_multi(
+        |input| SeqReadAtAdapter::new(input, entry_range.clone()),
+        |(payload_input, range)| SeqReadAtAdapter::new(payload_input, range),
+    );
+    DecoderImpl::new_full(input, path, true).await
 }
 
 // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
 async fn get_decoder_at_filename<T: ReadAt>(
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_range: Range<u64>,
     path: PathBuf,
 ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
     // Read the header, it should be a FILENAME, then skip over it and its length:
-    let header: format::Header = read_entry_at(&input, entry_range.start).await?;
+    let header: format::Header = read_entry_at(input.archive(), entry_range.start).await?;
     header.check_header_size()?;
 
     if header.htype != format::PXAR_FILENAME {
@@ -293,6 +303,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             .next()
             .await
             .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
+
         Ok(FileEntryImpl {
             input: self.input.clone(),
             entry,
@@ -303,7 +314,11 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
 
     /// Allow opening arbitrary contents from a specific range.
     pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
-        FileContentsImpl::new(self.input.clone(), range)
+        if let Some((payload_input, _)) = &self.input.payload() {
+            FileContentsImpl::new(payload_input.clone(), range)
+        } else {
+            FileContentsImpl::new(self.input.archive().clone(), range)
+        }
     }
 
     /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@@ -342,6 +357,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             EntryKind::File {
                 offset: Some(offset),
                 size,
+                ..
             } => {
                 let meta_size = offset - link_offset;
                 let entry_end = link_offset + meta_size + size;
@@ -362,7 +378,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
 
 /// The directory random-access state machine implementation.
 pub(crate) struct DirectoryImpl<T> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_ofs: u64,
     goodbye_ofs: u64,
     size: u64,
@@ -374,12 +390,12 @@ pub(crate) struct DirectoryImpl<T> {
 impl<T: Clone + ReadAt> DirectoryImpl<T> {
     /// Open a directory ending at the specified position.
     async fn open_at_end(
-        input: T,
+        input: PxarVariant<T, (T, Range<u64>)>,
         end_offset: u64,
         path: PathBuf,
         caches: Arc<Caches>,
     ) -> io::Result<DirectoryImpl<T>> {
-        let tail = Self::read_tail_entry(&input, end_offset).await?;
+        let tail = Self::read_tail_entry(input.archive(), end_offset).await?;
 
         if end_offset < tail.size {
             io_bail!("goodbye tail size out of range");
@@ -434,7 +450,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
                 data.as_mut_ptr() as *mut u8,
                 len * size_of::<GoodbyeItem>(),
             );
-            read_exact_at(&self.input, slice, self.table_offset()).await?;
+            read_exact_at(self.input.archive(), slice, self.table_offset()).await?;
         }
         Ok(Arc::from(data))
     }
@@ -599,7 +615,8 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
 
             let cursor = self.get_cursor(index).await?;
             if cursor.file_name == path {
-                return Ok(Some(cursor.decode_entry().await?));
+                let entry = cursor.decode_entry().await?;
+                return Ok(Some(entry));
             }
 
             dup += 1;
@@ -645,13 +662,13 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
     }
 
     async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> {
-        let head: format::Header = read_entry_at(&self.input, file_ofs).await?;
+        let head: format::Header = read_entry_at(self.input.archive(), file_ofs).await?;
         if head.htype != format::PXAR_FILENAME {
             io_bail!("expected PXAR_FILENAME header, found: {}", head);
         }
 
         let mut path = read_exact_data_at(
-            &self.input,
+            self.input.archive(),
             head.content_size() as usize,
             file_ofs + (size_of_val(&head) as u64),
         )
@@ -681,7 +698,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
 /// A file entry retrieved from a Directory.
 #[derive(Clone)]
 pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry: Entry,
     entry_range_info: EntryRangeInfo,
     caches: Arc<Caches>,
@@ -711,15 +728,29 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             EntryKind::File {
                 size,
                 offset: Some(offset),
+                payload_offset: None,
             } => Ok(Some(offset..(offset + size))),
+            // Payload offset beats regular offset if some
+            EntryKind::File {
+                size,
+                offset: Some(_offset),
+                payload_offset: Some(payload_offset),
+            } => {
+                let start_offset = payload_offset + size_of::<format::Header>() as u64;
+                Ok(Some(start_offset..start_offset + size))
+            }
             _ => Ok(None),
         }
     }
 
     pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
-        match self.content_range()? {
-            Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
-            None => io_bail!("not a file"),
+        let range = self
+            .content_range()?
+            .ok_or_else(|| io_format_err!("not a file"))?;
+        if let Some((ref payload_input, _)) = self.input.payload() {
+            Ok(FileContentsImpl::new(payload_input.clone(), range))
+        } else {
+            Ok(FileContentsImpl::new(self.input.archive().clone(), range))
         }
     }
 
diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs
index a777152..df2ed23 100644
--- a/src/accessor/sync.rs
+++ b/src/accessor/sync.rs
@@ -12,7 +12,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::Decoder;
 use crate::format::GoodbyeItem;
 use crate::util::poll_result_once;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Blocking `pxar` random-access decoder.
 ///
@@ -31,7 +31,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
     /// Decode a `pxar` archive from a standard file implementing `FileExt`.
     #[inline]
     pub fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
-        Accessor::new(FileReader::new(input), size)
+        Accessor::new(PxarVariant::Unified(FileReader::new(input)), size)
     }
 }
 
@@ -64,7 +64,7 @@ where
 {
     /// Open an `Arc` or `Rc` of `File`.
     pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result<Accessor<FileRefReader<T>>> {
-        Accessor::new(FileRefReader::new(input), size)
+        Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size)
     }
 }
 
@@ -74,7 +74,7 @@ impl<T: ReadAt> Accessor<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(input: T, size: u64) -> io::Result<Self> {
+    pub fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         Ok(Self {
             inner: poll_result_once(accessor::AccessorImpl::new(input, size))?,
         })
diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
index 4de8c6f..3f9881d 100644
--- a/src/decoder/aio.rs
+++ b/src/decoder/aio.rs
@@ -6,7 +6,7 @@ use std::io;
 use std::path::Path;
 
 use crate::decoder::{self, Contents, SeqRead};
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Asynchronous `pxar` decoder.
 ///
@@ -20,8 +20,8 @@ pub struct Decoder<T> {
 impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> {
     /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
     #[inline]
-    pub async fn from_tokio(input: T) -> io::Result<Self> {
-        Decoder::new(TokioReader::new(input)).await
+    pub async fn from_tokio(input: PxarVariant<T, T>) -> io::Result<Self> {
+        Decoder::new(input.wrap(|input| TokioReader::new(input))).await
     }
 }
 
@@ -30,13 +30,16 @@ impl Decoder<TokioReader<tokio::fs::File>> {
     /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
     #[inline]
     pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
-        Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await
+        Decoder::from_tokio(PxarVariant::Unified(
+            tokio::fs::File::open(path.as_ref()).await?,
+        ))
+        .await
     }
 }
 
 impl<T: SeqRead> Decoder<T> {
     /// Create an async decoder from an input implementing our internal read interface.
-    pub async fn new(input: T) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
         Ok(Self {
             inner: decoder::DecoderImpl::new(input).await?,
         })
diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
index d19ffd1..b5c17b8 100644
--- a/src/decoder/mod.rs
+++ b/src/decoder/mod.rs
@@ -19,7 +19,7 @@ use endian_trait::Endian;
 
 use crate::format::{self, Header};
 use crate::util::{self, io_err_other};
-use crate::{Entry, EntryKind, Metadata};
+use crate::{Entry, EntryKind, Metadata, PxarVariant};
 
 pub mod aio;
 pub mod sync;
@@ -150,13 +150,16 @@ async fn seq_read_entry<T: SeqRead + ?Sized, E: Endian>(input: &mut T) -> io::Re
 /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
 /// synchronous or `async` I/O objects in as input.
 pub(crate) struct DecoderImpl<T> {
-    pub(crate) input: T,
+    // Payload of regular files might be provided by a different reader
+    pub(crate) input: PxarVariant<T, T>,
     current_header: Header,
     entry: Entry,
     path_lengths: Vec<usize>,
     state: State,
     with_goodbye_tables: bool,
 
+    payload_consumed: u64,
+
     /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
     /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
     eof_after_entry: bool,
@@ -167,6 +170,7 @@ enum State {
     Default,
     InPayload {
         offset: u64,
+        size: u64,
     },
 
     /// file entries with no data (fifo, socket)
@@ -195,16 +199,16 @@ pub(crate) enum ItemResult {
 }
 
 impl<I: SeqRead> DecoderImpl<I> {
-    pub async fn new(input: I) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<I, I>) -> io::Result<Self> {
         Self::new_full(input, "/".into(), false).await
     }
 
     pub(crate) fn input(&self) -> &I {
-        &self.input
+        self.input.archive()
     }
 
     pub(crate) async fn new_full(
-        input: I,
+        input: PxarVariant<I, I>,
         path: PathBuf,
         eof_after_entry: bool,
     ) -> io::Result<Self> {
@@ -219,6 +223,7 @@ impl<I: SeqRead> DecoderImpl<I> {
             path_lengths: Vec::new(),
             state: State::Begin,
             with_goodbye_tables: false,
+            payload_consumed: 0,
             eof_after_entry,
         };
 
@@ -242,9 +247,14 @@ impl<I: SeqRead> DecoderImpl<I> {
                     // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
                     self.read_next_item().await?;
                 }
-                State::InPayload { offset } => {
-                    // We need to skip the current payload first.
-                    self.skip_entry(offset).await?;
+                State::InPayload { offset, .. } => {
+                    if self.input.payload().is_some() {
+                        // Update consumed payload as given by the offset referenced by the content reader
+                        self.payload_consumed += offset;
+                    } else {
+                        // Skip remaining payload of current entry in regular stream
+                        self.skip_entry(offset).await?;
+                    }
                     self.read_next_item().await?;
                 }
                 State::InGoodbyeTable => {
@@ -300,20 +310,28 @@ impl<I: SeqRead> DecoderImpl<I> {
     }
 
     pub fn content_size(&self) -> Option<u64> {
-        if let State::InPayload { .. } = self.state {
-            Some(self.current_header.content_size())
+        if let State::InPayload { size, .. } = self.state {
+            if self.input.payload().is_some() {
+                Some(size)
+            } else {
+                Some(self.current_header.content_size())
+            }
         } else {
             None
         }
     }
 
     pub fn content_reader(&mut self) -> Option<Contents<I>> {
-        if let State::InPayload { offset } = &mut self.state {
-            Some(Contents::new(
-                &mut self.input,
-                offset,
-                self.current_header.content_size(),
-            ))
+        if let State::InPayload { offset, size } = &mut self.state {
+            if self.input.payload().is_some() {
+                Some(Contents::new(
+                    self.input.payload_mut().unwrap(),
+                    offset,
+                    *size,
+                ))
+            } else {
+                Some(Contents::new(self.input.archive_mut(), offset, *size))
+            }
         } else {
             None
         }
@@ -357,7 +375,7 @@ impl<I: SeqRead> DecoderImpl<I> {
         self.state = State::Default;
         self.entry.clear_data();
 
-        let header: Header = match seq_read_entry_or_eof(&mut self.input).await? {
+        let header: Header = match seq_read_entry_or_eof(self.input.archive_mut()).await? {
             None => return Ok(None),
             Some(header) => header,
         };
@@ -377,11 +395,11 @@ impl<I: SeqRead> DecoderImpl<I> {
         } else if header.htype == format::PXAR_ENTRY || header.htype == format::PXAR_ENTRY_V1 {
             if header.htype == format::PXAR_ENTRY {
                 self.entry.metadata = Metadata {
-                    stat: seq_read_entry(&mut self.input).await?,
+                    stat: seq_read_entry(self.input.archive_mut()).await?,
                     ..Default::default()
                 };
             } else if header.htype == format::PXAR_ENTRY_V1 {
-                let stat: format::Stat_V1 = seq_read_entry(&mut self.input).await?;
+                let stat: format::Stat_V1 = seq_read_entry(self.input.archive_mut()).await?;
 
                 self.entry.metadata = Metadata {
                     stat: stat.into(),
@@ -457,7 +475,7 @@ impl<I: SeqRead> DecoderImpl<I> {
             )
         };
 
-        match seq_read_exact_or_eof(&mut self.input, dest).await? {
+        match seq_read_exact_or_eof(self.input.archive_mut(), dest).await? {
             Some(()) => {
                 self.current_header.check_header_size()?;
                 Ok(Some(()))
@@ -527,12 +545,71 @@ impl<I: SeqRead> DecoderImpl<I> {
                 return Ok(ItemResult::Entry);
             }
             format::PXAR_PAYLOAD => {
-                let offset = seq_read_position(&mut self.input).await.transpose()?;
+                let offset = seq_read_position(self.input.archive_mut())
+                    .await
+                    .transpose()?;
                 self.entry.kind = EntryKind::File {
                     size: self.current_header.content_size(),
                     offset,
+                    payload_offset: None,
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: self.current_header.content_size(),
+                };
+                return Ok(ItemResult::Entry);
+            }
+            format::PXAR_PAYLOAD_REF => {
+                let offset = seq_read_position(self.input.archive_mut())
+                    .await
+                    .transpose()?;
+                let payload_ref = self.read_payload_ref().await?;
+
+                if let Some(payload_input) = self.input.payload_mut() {
+                    if seq_read_position(payload_input)
+                        .await
+                        .transpose()?
+                        .is_none()
+                    {
+                        if self.payload_consumed > payload_ref.offset {
+                            io_bail!(
+                                "unexpected offset {}, smaller than already consumed payload {}",
+                                payload_ref.offset,
+                                self.payload_consumed,
+                            );
+                        }
+                        let to_skip = payload_ref.offset - self.payload_consumed;
+                        Self::skip(payload_input, to_skip as usize).await?;
+                        self.payload_consumed += to_skip;
+                    }
+
+                    let header: Header = seq_read_entry(payload_input).await?;
+                    if header.htype != format::PXAR_PAYLOAD {
+                        io_bail!(
+                            "unexpected header in payload input: expected {} , got {header}",
+                            format::PXAR_PAYLOAD,
+                        );
+                    }
+                    self.payload_consumed += size_of::<Header>() as u64;
+
+                    if header.content_size() != payload_ref.size {
+                        io_bail!(
+                            "encountered payload size mismatch: got {}, expected {}",
+                            payload_ref.size,
+                            header.content_size(),
+                        );
+                    }
+                }
+
+                self.entry.kind = EntryKind::File {
+                    size: payload_ref.size,
+                    offset,
+                    payload_offset: Some(payload_ref.offset),
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: payload_ref.size,
                 };
-                self.state = State::InPayload { offset: 0 };
                 return Ok(ItemResult::Entry);
             }
             format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@@ -564,7 +641,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn skip_entry(&mut self, offset: u64) -> io::Result<()> {
         let len = (self.current_header.content_size() - offset) as usize;
-        Self::skip(&mut self.input, len).await
+        Self::skip(self.input.archive_mut(), len).await
     }
 
     async fn skip(input: &mut I, mut len: usize) -> io::Result<()> {
@@ -581,7 +658,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
         let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
-        let data = seq_read_exact_data(&mut self.input, size).await?;
+        let data = seq_read_exact_data(self.input.archive_mut(), size).await?;
         Ok(data)
     }
 
@@ -598,7 +675,7 @@ impl<I: SeqRead> DecoderImpl<I> {
                 size_of::<T>(),
             );
         }
-        seq_read_entry(&mut self.input).await
+        seq_read_entry(self.input.archive_mut()).await
     }
 
     //
@@ -630,8 +707,8 @@ impl<I: SeqRead> DecoderImpl<I> {
         }
         let data_size = content_size - size_of::<u64>();
 
-        let offset: u64 = seq_read_entry(&mut self.input).await?;
-        let data = seq_read_exact_data(&mut self.input, data_size).await?;
+        let offset: u64 = seq_read_entry(self.input.archive_mut()).await?;
+        let data = seq_read_exact_data(self.input.archive_mut(), data_size).await?;
 
         Ok(format::Hardlink { offset, data })
     }
@@ -667,7 +744,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn read_payload_ref(&mut self) -> io::Result<format::PayloadRef> {
         self.current_header.check_header_size()?;
-        seq_read_entry(&mut self.input).await
+        seq_read_entry(self.input.archive_mut()).await
     }
 }
 
diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
index 5597a03..8779f87 100644
--- a/src/decoder/sync.rs
+++ b/src/decoder/sync.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
 
 use crate::decoder::{self, SeqRead};
 use crate::util::poll_result_once;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Blocking `pxar` decoder.
 ///
@@ -25,8 +25,8 @@ pub struct Decoder<T> {
 impl<T: io::Read> Decoder<StandardReader<T>> {
     /// Decode a `pxar` archive from a regular `std::io::Read` input.
     #[inline]
-    pub fn from_std(input: T) -> io::Result<Self> {
-        Decoder::new(StandardReader::new(input))
+    pub fn from_std(input: PxarVariant<T, T>) -> io::Result<Self> {
+        Decoder::new(input.wrap(|i| StandardReader::new(i)))
     }
 
     /// Get a direct reference to the reader contained inside the contained [`StandardReader`].
@@ -37,8 +37,15 @@ impl<T: io::Read> Decoder<StandardReader<T>> {
 
 impl Decoder<StandardReader<std::fs::File>> {
     /// Convenience shortcut for `File::open` followed by `Accessor::from_file`.
-    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
-        Self::from_std(std::fs::File::open(path.as_ref())?)
+    pub fn open<P: AsRef<Path>>(path: PxarVariant<P, P>) -> io::Result<Self> {
+        let input = match path {
+            PxarVariant::Split(input, payload_input) => PxarVariant::Split(
+                std::fs::File::open(input)?,
+                std::fs::File::open(payload_input)?,
+            ),
+            PxarVariant::Unified(input) => PxarVariant::Unified(std::fs::File::open(input)?),
+        };
+        Self::from_std(input)
     }
 }
 
@@ -47,7 +54,9 @@ impl<T: SeqRead> Decoder<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(input: T) -> io::Result<Self> {
+    /// The optional payload input must be used to restore regular file payloads for payload references
+    /// encountered within the archive.
+    pub fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
         Ok(Self {
             inner: poll_result_once(decoder::DecoderImpl::new(input))?,
         })
diff --git a/src/lib.rs b/src/lib.rs
index f784c9e..bafdfe4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -364,6 +364,9 @@ pub enum EntryKind {
 
         /// The file's byte offset inside the archive, if available.
         offset: Option<u64>,
+
+        /// The file's byte offset inside the payload stream, if available.
+        payload_offset: Option<u64>,
     },
 
     /// Directory entry. When iterating through an archive, the contents follow next.
diff --git a/tests/compat.rs b/tests/compat.rs
index 3b43e38..8f1b778 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -94,7 +94,8 @@ fn create_archive() -> io::Result<Vec<u8>> {
 fn test_archive() {
     let archive = create_archive().expect("failed to create test archive");
     let mut input = &archive[..];
-    let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder");
+    let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
+        .expect("failed to create decoder");
 
     let item = decoder
         .next()
diff --git a/tests/simple/main.rs b/tests/simple/main.rs
index e55457f..e403184 100644
--- a/tests/simple/main.rs
+++ b/tests/simple/main.rs
@@ -61,14 +61,16 @@ fn test1() {
     // std::fs::write("myarchive.pxar", &file).expect("failed to write out test archive");
 
     let mut input = &file[..];
-    let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder");
+    let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
+        .expect("failed to create decoder");
     let decoded_fs =
         fs::Entry::decode_from(&mut decoder).expect("failed to decode previously encoded archive");
 
     assert_eq!(test_fs, decoded_fs);
 
-    let accessor = accessor::Accessor::new(&file[..], file.len() as u64)
-        .expect("failed to create random access reader for encoded archive");
+    let accessor =
+        accessor::Accessor::new(pxar::PxarVariant::Unified(&file[..]), file.len() as u64)
+            .expect("failed to create random access reader for encoded archive");
 
     check_bunzip2(&accessor);
     check_run_special_files(&accessor);
-- 
2.39.2





More information about the pbs-devel mailing list