[pbs-devel] [PATCH v3 pxar 07/58] decoder/accessor: add optional payload input stream

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Apr 3 12:38:17 CEST 2024


On March 28, 2024 1:36 pm, Christian Ebner wrote:
> Implement an optional redirection to read the payload for regular files
> from a different input stream.
> 
> This allows to decode split stream archives.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - pass the payload input on decoder/accessor instantiation in order to
>   avoid possible adding/removing during decoding/accessing.
> - major refactoring

style nit: for those fns that take input and payload_input, it might
make sense to order them next to eachother? IMHO it makes the call sites
more readable, especially in those cases where it's mostly passed along
;) for the constructors, I am a bit torn on which variant is nicer.

> 
>  examples/apxar.rs    |   2 +-
>  src/accessor/aio.rs  |  10 ++--
>  src/accessor/mod.rs  |  61 ++++++++++++++++++++++---
>  src/accessor/sync.rs |   8 ++--
>  src/decoder/aio.rs   |  14 ++++--
>  src/decoder/mod.rs   | 106 +++++++++++++++++++++++++++++++++++++++----
>  src/decoder/sync.rs  |  15 ++++--
>  src/lib.rs           |   3 ++
>  8 files changed, 184 insertions(+), 35 deletions(-)
> 
> diff --git a/examples/apxar.rs b/examples/apxar.rs
> index 0c62242..d5eb04e 100644
> --- a/examples/apxar.rs
> +++ b/examples/apxar.rs
> @@ -9,7 +9,7 @@ async fn main() {
>          .await
>          .expect("failed to open file");
>  
> -    let mut reader = Decoder::from_tokio(file)
> +    let mut reader = Decoder::from_tokio(file, None)
>          .await
>          .expect("failed to open pxar archive contents");
>  
> diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
> index 98d7755..0ebb921 100644
> --- a/src/accessor/aio.rs
> +++ b/src/accessor/aio.rs
> @@ -39,7 +39,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
>      /// by a blocking file.
>      #[inline]
>      pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
> -        Accessor::new(FileReader::new(input), size).await
> +        Accessor::new(FileReader::new(input), size, None).await
>      }
>  }
>  
> @@ -75,7 +75,7 @@ where
>          input: T,
>          size: u64,
>      ) -> io::Result<Accessor<FileRefReader<T>>> {
> -        Accessor::new(FileRefReader::new(input), size).await
> +        Accessor::new(FileRefReader::new(input), size, None).await
>      }
>  }
>  
> @@ -85,9 +85,11 @@ impl<T: ReadAt> Accessor<T> {
>      ///
>      /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
>      /// not allowed to use the `Waker`, as this will cause a `panic!`.
> -    pub async fn new(input: T, size: u64) -> io::Result<Self> {
> +    /// Optionally take the file payloads from the provided input stream rather than the regular
> +    /// pxar stream.
> +    pub async fn new(input: T, size: u64, payload_input: Option<T>) -> io::Result<Self> {
>          Ok(Self {
> -            inner: accessor::AccessorImpl::new(input, size).await?,
> +            inner: accessor::AccessorImpl::new(input, size, payload_input).await?,
>          })
>      }
>  
> diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
> index 6a2de73..4789595 100644
> --- a/src/accessor/mod.rs
> +++ b/src/accessor/mod.rs
> @@ -182,10 +182,11 @@ pub(crate) struct AccessorImpl<T> {
>      input: T,
>      size: u64,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: ReadAt> AccessorImpl<T> {
> -    pub async fn new(input: T, size: u64) -> io::Result<Self> {
> +    pub async fn new(input: T, size: u64, payload_input: Option<T>) -> io::Result<Self> {
>          if size < (size_of::<GoodbyeItem>() as u64) {
>              io_bail!("too small to contain a pxar archive");
>          }
> @@ -194,6 +195,7 @@ impl<T: ReadAt> AccessorImpl<T> {
>              input,
>              size,
>              caches: Arc::new(Caches::default()),
> +            payload_input,
>          })
>      }
>  
> @@ -207,6 +209,9 @@ impl<T: ReadAt> AccessorImpl<T> {
>              self.size,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input
> +                .as_ref()
> +                .map(|input| input as &dyn ReadAt),
>          )
>          .await
>      }
> @@ -228,7 +233,13 @@ async fn get_decoder<T: ReadAt>(
>      entry_range: Range<u64>,
>      path: PathBuf,
>  ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
> -    DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
> +    DecoderImpl::new_full(
> +        SeqReadAtAdapter::new(input, entry_range.clone()),
> +        path,
> +        true,
> +        None,
> +    )
> +    .await
>  }
>  
>  // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
> @@ -263,6 +274,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              self.size,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -274,6 +286,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              offset,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -293,17 +306,23 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              .next()
>              .await
>              .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
> +
>          Ok(FileEntryImpl {
>              input: self.input.clone(),
>              entry,
>              entry_range_info: entry_range_info.clone(),
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.payload_input.clone(),
>          })
>      }
>  
>      /// Allow opening arbitrary contents from a specific range.
>      pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
> -        FileContentsImpl::new(self.input.clone(), range)
> +        if let Some(payload_input) = &self.payload_input {
> +            FileContentsImpl::new(payload_input.clone(), range)
> +        } else {
> +            FileContentsImpl::new(self.input.clone(), range)
> +        }
>      }
>  
>      /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
> @@ -326,9 +345,12 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>  
>          let link_offset = entry_file_offset - link_offset;
>  
> -        let (mut decoder, entry_offset) =
> -            get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
> -                .await?;
> +        let (mut decoder, entry_offset) = get_decoder_at_filename(
> +            self.input.clone(),
> +            link_offset..self.size,
> +            PathBuf::new(),
> +        )
> +        .await?;
>  
>          let entry = decoder
>              .next()
> @@ -342,6 +364,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              EntryKind::File {
>                  offset: Some(offset),
>                  size,
> +                ..
>              } => {
>                  let meta_size = offset - link_offset;
>                  let entry_end = link_offset + meta_size + size;
> @@ -353,6 +376,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>                          entry_range: entry_offset..entry_end,
>                      },
>                      caches: Arc::clone(&self.caches),
> +                    payload_input: self.payload_input.clone(),
>                  })
>              }
>              _ => io_bail!("hardlink does not point to a regular file"),
> @@ -369,6 +393,7 @@ pub(crate) struct DirectoryImpl<T> {
>      table: Arc<[GoodbyeItem]>,
>      path: PathBuf,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: Clone + ReadAt> DirectoryImpl<T> {
> @@ -378,6 +403,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>          end_offset: u64,
>          path: PathBuf,
>          caches: Arc<Caches>,
> +        payload_input: Option<T>,
>      ) -> io::Result<DirectoryImpl<T>> {
>          let tail = Self::read_tail_entry(&input, end_offset).await?;
>  
> @@ -407,6 +433,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>              table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
>              path,
>              caches,
> +            payload_input,
>          };
>  
>          // sanity check:
> @@ -533,6 +560,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>                  entry_range: self.entry_range(),
>              },
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.payload_input.clone(),
>          })
>      }
>  
> @@ -685,6 +713,7 @@ pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
>      entry: Entry,
>      entry_range_info: EntryRangeInfo,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: Clone + ReadAt> FileEntryImpl<T> {
> @@ -698,6 +727,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
>              self.entry_range_info.entry_range.end,
>              self.entry.path.clone(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -711,14 +741,30 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
>              EntryKind::File {
>                  size,
>                  offset: Some(offset),
> +                payload_offset: None,
>              } => Ok(Some(offset..(offset + size))),
> +            // Payload offset beats regular offset if some
> +            EntryKind::File {
> +                size,
> +                offset: Some(_offset),
> +                payload_offset: Some(payload_offset),
> +            } => {
> +                let start_offset = payload_offset + size_of::<format::Header>() as u64;
> +                Ok(Some(start_offset..start_offset + size))
> +            }
>              _ => Ok(None),
>          }
>      }
>  
>      pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
>          match self.content_range()? {
> -            Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
> +            Some(range) => {
> +                if let Some(ref payload_input) = self.payload_input {
> +                    Ok(FileContentsImpl::new(payload_input.clone(), range))
> +                } else {
> +                    Ok(FileContentsImpl::new(self.input.clone(), range))
> +                }
> +            }
>              None => io_bail!("not a file"),

nit: would be easier to parse if it were

let range = ..
if let Some(ref payload_input) = .. {
..
} else {
..
}

and it would also mesh better with `open_contents_at_range` above.

>          }
>      }
> @@ -808,6 +854,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
>              entry,
>              entry_range_info: self.entry_range_info.clone(),
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.dir.payload_input.clone(),
>          })
>      }
>  
> diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs
> index a777152..6150a18 100644
> --- a/src/accessor/sync.rs
> +++ b/src/accessor/sync.rs
> @@ -31,7 +31,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
>      /// Decode a `pxar` archive from a standard file implementing `FileExt`.
>      #[inline]
>      pub fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
> -        Accessor::new(FileReader::new(input), size)
> +        Accessor::new(FileReader::new(input), size, None)
>      }
>  }
>  
> @@ -64,7 +64,7 @@ where
>  {
>      /// Open an `Arc` or `Rc` of `File`.
>      pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result<Accessor<FileRefReader<T>>> {
> -        Accessor::new(FileRefReader::new(input), size)
> +        Accessor::new(FileRefReader::new(input), size, None)
>      }
>  }
>  
> @@ -74,9 +74,9 @@ impl<T: ReadAt> Accessor<T> {
>      ///
>      /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
>      /// not allowed to use the `Waker`, as this will cause a `panic!`.
> -    pub fn new(input: T, size: u64) -> io::Result<Self> {
> +    pub fn new(input: T, size: u64, payload_input: Option<T>) -> io::Result<Self> {
>          Ok(Self {
> -            inner: poll_result_once(accessor::AccessorImpl::new(input, size))?,
> +            inner: poll_result_once(accessor::AccessorImpl::new(input, size, payload_input))?,
>          })
>      }
>  
> diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
> index 4de8c6f..bb032cf 100644
> --- a/src/decoder/aio.rs
> +++ b/src/decoder/aio.rs
> @@ -20,8 +20,12 @@ pub struct Decoder<T> {
>  impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> {
>      /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
>      #[inline]
> -    pub async fn from_tokio(input: T) -> io::Result<Self> {
> -        Decoder::new(TokioReader::new(input)).await
> +    pub async fn from_tokio(input: T, payload_input: Option<T>) -> io::Result<Self> {
> +        Decoder::new(
> +            TokioReader::new(input),
> +            payload_input.map(|payload_input| TokioReader::new(payload_input)),
> +        )
> +        .await
>      }
>  }
>  
> @@ -30,15 +34,15 @@ impl Decoder<TokioReader<tokio::fs::File>> {
>      /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
>      #[inline]
>      pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
> -        Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await
> +        Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?, None).await
>      }
>  }
>  
>  impl<T: SeqRead> Decoder<T> {
>      /// Create an async decoder from an input implementing our internal read interface.
> -    pub async fn new(input: T) -> io::Result<Self> {
> +    pub async fn new(input: T, payload_input: Option<T>) -> io::Result<Self> {
>          Ok(Self {
> -            inner: decoder::DecoderImpl::new(input).await?,
> +            inner: decoder::DecoderImpl::new(input, payload_input).await?,
>          })
>      }
>  
> diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
> index f439327..8cc4877 100644
> --- a/src/decoder/mod.rs
> +++ b/src/decoder/mod.rs
> @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
>      state: State,
>      with_goodbye_tables: bool,
>  
> +    // Payload of regular files might be provided by a different reader
> +    payload_input: Option<T>,
> +    payload_consumed: u64,
> +
>      /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
>      /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
>      eof_after_entry: bool,
> @@ -167,6 +171,8 @@ enum State {
>      Default,
>      InPayload {
>          offset: u64,
> +        size: u64,
> +        payload_ref: bool,
>      },
>  
>      /// file entries with no data (fifo, socket)
> @@ -195,8 +201,8 @@ pub(crate) enum ItemResult {
>  }
>  
>  impl<I: SeqRead> DecoderImpl<I> {
> -    pub async fn new(input: I) -> io::Result<Self> {
> -        Self::new_full(input, "/".into(), false).await
> +    pub async fn new(input: I, payload_input: Option<I>) -> io::Result<Self> {
> +        Self::new_full(input, "/".into(), false, payload_input).await
>      }
>  
>      pub(crate) fn input(&self) -> &I {
> @@ -207,6 +213,7 @@ impl<I: SeqRead> DecoderImpl<I> {
>          input: I,
>          path: PathBuf,
>          eof_after_entry: bool,
> +        payload_input: Option<I>,
>      ) -> io::Result<Self> {
>          let this = DecoderImpl {
>              input,
> @@ -219,6 +226,8 @@ impl<I: SeqRead> DecoderImpl<I> {
>              path_lengths: Vec::new(),
>              state: State::Begin,
>              with_goodbye_tables: false,
> +            payload_input,
> +            payload_consumed: 0,
>              eof_after_entry,
>          };
>  
> @@ -242,9 +251,18 @@ impl<I: SeqRead> DecoderImpl<I> {
>                      // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
>                      self.read_next_item().await?;
>                  }
> -                State::InPayload { offset } => {
> -                    // We need to skip the current payload first.
> -                    self.skip_entry(offset).await?;
> +                State::InPayload {
> +                    offset,
> +                    payload_ref,
> +                    ..
> +                } => {
> +                    if payload_ref {
> +                        // Update consumed payload as given by the offset referenced by the content reader
> +                        self.payload_consumed += offset;
> +                    } else if self.payload_input.is_none() {
> +                        // Skip remaining payload of current entry in regular stream
> +                        self.skip_entry(offset).await?;
> +                    }

I am a bit confused by this here - shouldn't all payloads be encoded via
refs now if we have a split archive? and vice versa? why the second
condition? and what if I pass a bogus payload input for an archive that
doesn't contain any references?

>                      self.read_next_item().await?;
>                  }
>                  State::InGoodbyeTable => {
> @@ -308,11 +326,19 @@ impl<I: SeqRead> DecoderImpl<I> {
>      }
>  
>      pub fn content_reader(&mut self) -> Option<Contents<I>> {
> -        if let State::InPayload { offset } = &mut self.state {
> +        if let State::InPayload {
> +            offset,
> +            size,
> +            payload_ref,
> +        } = &mut self.state
> +        {
> +            if *payload_ref && self.payload_input.is_none() {
> +                return None;
> +            }
>              Some(Contents::new(
> -                &mut self.input,
> +                self.payload_input.as_mut().unwrap_or(&mut self.input),
>                  offset,
> -                self.current_header.content_size(),
> +                *size,

similar here..

e.g., something like this:

            let input = if *payload_ref {
                if let Some(payload_input) = self.payload_input.as_mut() {
                    payload_input
                } else {
                    return None;
                }
            } else {
                &mut self.input
            };
            Some(Contents::new(input, offset, *size))

although technically we do have an invariant there that we could check -
we shouldn't encounter a non-ref payload when we have a payload_input..

>              ))
>          } else {
>              None
> @@ -531,8 +557,60 @@ impl<I: SeqRead> DecoderImpl<I> {
>                  self.entry.kind = EntryKind::File {
>                      size: self.current_header.content_size(),
>                      offset,
> +                    payload_offset: None,
> +                };
> +                self.state = State::InPayload {
> +                    offset: 0,
> +                    size: self.current_header.content_size(),
> +                    payload_ref: false,
> +                };
> +                return Ok(ItemResult::Entry);
> +            }
> +            format::PXAR_PAYLOAD_REF => {
> +                let offset = seq_read_position(&mut self.input).await.transpose()?;
> +                let payload_ref = self.read_payload_ref().await?;
> +
> +                if let Some(payload_input) = self.payload_input.as_mut() {

this condition (cted below)

> +                    if seq_read_position(payload_input)
> +                        .await
> +                        .transpose()?
> +                        .is_none()
> +                    {
> +                        // Skip payload padding for injected chunks in sequential decoder
> +                        let to_skip = payload_ref.offset - self.payload_consumed;

should we add a check here for the invariant that offsets should only
ever be increasing? (and avoid an underflow for corrupt/invalid archives
;))

> +                        self.skip_payload(to_skip).await?;
> +                    }
> +                }
> +
> +                if let Some(payload_input) = self.payload_input.as_mut() {

and this condition here are the same?

> +                    let header: u64 = seq_read_entry(payload_input).await?;

why not read a Header here?

> +                    if header != format::PXAR_PAYLOAD {
> +                        io_bail!(
> +                            "unexpected header in payload input: expected {} , got {header}",
> +                            format::PXAR_PAYLOAD,
> +                        );
> +                    }
> +                    let size: u64 = seq_read_entry(payload_input).await?;
> +                    self.payload_consumed += size_of::<Header>() as u64;
> +
> +                    if size != payload_ref.size + size_of::<Header>() as u64 {
> +                        io_bail!(
> +                            "encountered payload size mismatch: got {}, expected {size}",
> +                            payload_ref.size
> +                        );
> +                    }

then these could use the size helpers of Header ;)

> +                }
> +
> +                self.entry.kind = EntryKind::File {
> +                    size: payload_ref.size,
> +                    offset,
> +                    payload_offset: Some(payload_ref.offset),
> +                };
> +                self.state = State::InPayload {
> +                    offset: 0,
> +                    size: payload_ref.size,
> +                    payload_ref: true,
>                  };
> -                self.state = State::InPayload { offset: 0 };
>                  return Ok(ItemResult::Entry);
>              }
>              format::PXAR_FILENAME | format::PXAR_GOODBYE => {

> [..]




More information about the pbs-devel mailing list