[pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream
Fabian Grünbichler
f.gruenbichler at proxmox.com
Mon Mar 11 14:21:34 CET 2024
On March 5, 2024 10:26 am, Christian Ebner wrote:
> Implement an optional redirection to read the payload for regular files
> from a different input stream.
>
> This allows to decode split stream archives.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - refactor to use new PayloadRef type and decoder method
>
> src/accessor/mod.rs | 2 ++
> src/decoder/mod.rs | 78 +++++++++++++++++++++++++++++++++++++++++----
> src/decoder/sync.rs | 7 ++++
> src/lib.rs | 3 ++
> 4 files changed, 83 insertions(+), 7 deletions(-)
>
> diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
> index 6a2de73..ed99c85 100644
> --- a/src/accessor/mod.rs
> +++ b/src/accessor/mod.rs
> @@ -342,6 +342,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
> EntryKind::File {
> offset: Some(offset),
> size,
> + ..
> } => {
> let meta_size = offset - link_offset;
> let entry_end = link_offset + meta_size + size;
> @@ -711,6 +712,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
> EntryKind::File {
> size,
> offset: Some(offset),
> + ..
> } => Ok(Some(offset..(offset + size))),
> _ => Ok(None),
> }
> diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
> index 70c44ce..7b8254d 100644
> --- a/src/decoder/mod.rs
> +++ b/src/decoder/mod.rs
> @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
> state: State,
> with_goodbye_tables: bool,
>
> + // Payload of regular files might be provided by a different reader
> + payload_input: Option<T>,
> + payload_consumed: u64,
> +
> /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
> /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
> eof_after_entry: bool,
> @@ -167,6 +171,7 @@ enum State {
> Default,
> InPayload {
> offset: u64,
> + size: u64,
> },
>
> /// file entries with no data (fifo, socket)
> @@ -199,6 +204,11 @@ impl<I: SeqRead> DecoderImpl<I> {
> Self::new_full(input, "/".into(), false).await
> }
>
> + pub fn redirect_payload_input(mut self, payload_input: I) -> Self {
> + self.payload_input = Some(payload_input);
same question as for the encoder - do we want to prevent misuse here and
check/ensure that no payload_input has already been set before?
> + self
> + }
> +
> pub(crate) fn input(&self) -> &I {
> &self.input
> }
> @@ -219,6 +229,8 @@ impl<I: SeqRead> DecoderImpl<I> {
> path_lengths: Vec::new(),
> state: State::Begin,
> with_goodbye_tables: false,
> + payload_input: None,
> + payload_consumed: 0,
> eof_after_entry,
> };
>
> @@ -242,9 +254,14 @@ impl<I: SeqRead> DecoderImpl<I> {
> // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
> self.read_next_item().await?;
> }
> - State::InPayload { offset } => {
> - // We need to skip the current payload first.
> - self.skip_entry(offset).await?;
> + State::InPayload { offset, .. } => {
> + if self.payload_input.is_none() {
> + // Skip remaining payload of current entry in regular stream
> + self.skip_entry(offset).await?;
> + } else {
> + // Update consumed payload as given by the offset referenced by the content reader
> + self.payload_consumed += offset;
> + }
> self.read_next_item().await?;
> }
> State::InGoodbyeTable => {
> @@ -308,11 +325,11 @@ impl<I: SeqRead> DecoderImpl<I> {
> }
>
> pub fn content_reader(&mut self) -> Option<Contents<I>> {
> - if let State::InPayload { offset } = &mut self.state {
> + if let State::InPayload { offset, size } = &mut self.state {
> Some(Contents::new(
> - &mut self.input,
> + self.payload_input.as_mut().unwrap_or(&mut self.input),
> offset,
> - self.current_header.content_size(),
> + *size,
> ))
> } else {
> None
> @@ -531,8 +548,40 @@ impl<I: SeqRead> DecoderImpl<I> {
> self.entry.kind = EntryKind::File {
> size: self.current_header.content_size(),
> offset,
> + payload_offset: None,
> + };
> + self.state = State::InPayload {
> + offset: 0,
> + size: self.current_header.content_size(),
> + };
> + return Ok(ItemResult::Entry);
> + }
> + format::PXAR_PAYLOAD_REF => {
> + let offset = seq_read_position(&mut self.input).await.transpose()?;
> + let payload_ref = self.read_payload_ref().await?;
> +
> + let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut()
> + {
> + seq_read_position(payload_input).await.transpose()?
> + } else {
> + None
> + };
> +
> + // Skip payload padding for injected chunks in sync decoder
> + if self.payload_input.is_some() && payload_input_offset.is_none() {
> + let to_skip = payload_ref.offset - self.payload_consumed;
> + self.skip_payload(to_skip).await?;
> + }
style: these two could be combined into
if let Some(payload_input) = self.payload_input.as_mut() {
if seq_read_position(payload_input).await.transpose()?.is_none() {
// Skip payload padding for injected chunks in sync decoder
let to_skip = payload_ref.offset - self.payload_consumed;
self.skip_payload(to_skip).await?;
}
}
> + self.entry.kind = EntryKind::File {
> + size: payload_ref.size,
> + offset,
> + payload_offset: Some(payload_ref.offset),
> + };
> + self.state = State::InPayload {
> + offset: 0,
> + size: payload_ref.size,
> };
> - self.state = State::InPayload { offset: 0 };
> return Ok(ItemResult::Entry);
> }
> format::PXAR_FILENAME | format::PXAR_GOODBYE => {
> @@ -576,6 +625,21 @@ impl<I: SeqRead> DecoderImpl<I> {
> Ok(())
> }
>
> + async fn skip_payload(&mut self, length: u64) -> io::Result<()> {
> + let mut len = length;
> + let scratch = scratch_buffer();
> + while len >= (scratch.len() as u64) {
> + seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?;
> + len -= scratch.len() as u64;
> + }
> + let len = len as usize;
> + if len > 0 {
> + seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?;
> + }
> + self.payload_consumed += length;
> + Ok(())
> + }
nit: this could also share the "skip" part with `skip_entry`, and take the
input and length as parameter?
nit: casting to usize at the start would make the code easier to parse
IMHO
> async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
> let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
> let data = seq_read_exact_data(&mut self.input, size).await?;
> diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
> index 5597a03..b22b341 100644
> --- a/src/decoder/sync.rs
> +++ b/src/decoder/sync.rs
> @@ -53,6 +53,13 @@ impl<T: SeqRead> Decoder<T> {
> })
> }
>
> + /// Take the file payloads from the provided input stream rather than the regular pxar stream
> + pub fn redirect_payload_input(self, payload_input: T) -> Self {
> + Self {
> + inner: self.inner.redirect_payload_input(payload_input),
same question here about being called twice
> + }
> + }
> +
> /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
> /// layer "above" the `Accessor` propagates the actual type (sync vs async).
> pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
> diff --git a/src/lib.rs b/src/lib.rs
> index 210c4b1..ef81a85 100644
> --- a/src/lib.rs
> +++ b/src/lib.rs
> @@ -364,6 +364,9 @@ pub enum EntryKind {
>
> /// The file's byte offset inside the archive, if available.
> offset: Option<u64>,
> +
> + /// The file's byte offset inside the payload stream, if available.
> + payload_offset: Option<u64>,
> },
>
> /// Directory entry. When iterating through an archive, the contents follow next.
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list