[pbs-devel] [RFC v2 pxar 05/36] accessor: add optional payload input stream

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon Mar 11 14:21:42 CET 2024


On March 5, 2024 10:26 am, Christian Ebner wrote:
> Allows to read regular file payloads from a split pxar archive, where
> the payload stream has been redirected to a different archive on
> creation.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - no changes
> 
>  src/accessor/aio.rs |  7 ++++
>  src/accessor/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++------
>  2 files changed, 82 insertions(+), 10 deletions(-)
> 
> diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
> index 98d7755..db6c5e4 100644
> --- a/src/accessor/aio.rs
> +++ b/src/accessor/aio.rs
> @@ -91,6 +91,13 @@ impl<T: ReadAt> Accessor<T> {
>          })
>      }
>  
> +    /// Take the file payloads from the provided input stream rather than the regular pxar stream
> +    pub fn redirect_payload_input(self, payload_input: T) -> Self {
> +        Self {
> +            inner: self.inner.redirect_payload_input(payload_input),

you know the drill by now ;)

> +        }
> +    }
> +
>      /// Open a directory handle to the root of the pxar archive.
>      pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
>          Ok(Directory::new(self.inner.open_root_ref().await?))
> diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
> index ed99c85..6b3dfd2 100644
> --- a/src/accessor/mod.rs
> +++ b/src/accessor/mod.rs
> @@ -182,6 +182,7 @@ pub(crate) struct AccessorImpl<T> {
>      input: T,
>      size: u64,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: ReadAt> AccessorImpl<T> {
> @@ -194,9 +195,15 @@ impl<T: ReadAt> AccessorImpl<T> {
>              input,
>              size,
>              caches: Arc::new(Caches::default()),
> +            payload_input: None,
>          })
>      }
>  
> +    pub fn redirect_payload_input(mut self, payload_input: T) -> Self {
> +        self.payload_input = Some(payload_input);
> +        self

same here :)

> +    }
> +
>      pub fn size(&self) -> u64 {
>          self.size
>      }
> @@ -207,6 +214,9 @@ impl<T: ReadAt> AccessorImpl<T> {
>              self.size,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input
> +                .as_ref()
> +                .map(|input| input as &dyn ReadAt),
>          )
>          .await
>      }
> @@ -227,8 +237,21 @@ async fn get_decoder<T: ReadAt>(
>      input: T,
>      entry_range: Range<u64>,
>      path: PathBuf,
> +    payload_input: Option<T>,
>  ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
> -    DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
> +    let mut decoder = DecoderImpl::new_full(
> +        SeqReadAtAdapter::new(input, entry_range.clone()),
> +        path,
> +        true,
> +    )
> +    .await?;
> +
> +    if let Some(payload_input) = payload_input {
> +        // Payload stream is just passed along, the range can therefore be zero
> +        decoder = decoder.redirect_payload_input(SeqReadAtAdapter::new(payload_input, 0..0));
> +    }
> +
> +    Ok(decoder)
>  }
>  
>  // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
> @@ -236,6 +259,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
>      input: T,
>      entry_range: Range<u64>,
>      path: PathBuf,
> +    payload_input: Option<T>,
>  ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
>      // Read the header, it should be a FILENAME, then skip over it and its length:
>      let header: format::Header = read_entry_at(&input, entry_range.start).await?;
> @@ -251,7 +275,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
>      }
>  
>      Ok((
> -        get_decoder(input, entry_offset..entry_range.end, path).await?,
> +        get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?,
>          entry_offset,
>      ))
>  }
> @@ -263,6 +287,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              self.size,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -274,6 +299,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              offset,
>              "/".into(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -287,23 +313,30 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              self.input.clone(),
>              entry_range_info.entry_range.clone(),
>              PathBuf::new(),
> +            self.payload_input.clone(),
>          )
>          .await?;
>          let entry = decoder
>              .next()
>              .await
>              .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
> +
>          Ok(FileEntryImpl {
>              input: self.input.clone(),
>              entry,
>              entry_range_info: entry_range_info.clone(),
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.payload_input.clone(),
>          })
>      }
>  
>      /// Allow opening arbitrary contents from a specific range.
>      pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
> -        FileContentsImpl::new(self.input.clone(), range)
> +        if let Some(payload_input) = &self.payload_input {
> +            FileContentsImpl::new(payload_input.clone(), range)
> +        } else {
> +            FileContentsImpl::new(self.input.clone(), range)
> +        }
>      }
>  
>      /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
> @@ -326,9 +359,13 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>  
>          let link_offset = entry_file_offset - link_offset;
>  
> -        let (mut decoder, entry_offset) =
> -            get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
> -                .await?;
> +        let (mut decoder, entry_offset) = get_decoder_at_filename(
> +            self.input.clone(),
> +            link_offset..self.size,
> +            PathBuf::new(),
> +            self.payload_input.clone(),
> +        )
> +        .await?;
>  
>          let entry = decoder
>              .next()
> @@ -354,6 +391,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>                          entry_range: entry_offset..entry_end,
>                      },
>                      caches: Arc::clone(&self.caches),
> +                    payload_input: self.payload_input.clone(),
>                  })
>              }
>              _ => io_bail!("hardlink does not point to a regular file"),
> @@ -370,6 +408,7 @@ pub(crate) struct DirectoryImpl<T> {
>      table: Arc<[GoodbyeItem]>,
>      path: PathBuf,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: Clone + ReadAt> DirectoryImpl<T> {
> @@ -379,6 +418,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>          end_offset: u64,
>          path: PathBuf,
>          caches: Arc<Caches>,
> +        payload_input: Option<T>,
>      ) -> io::Result<DirectoryImpl<T>> {
>          let tail = Self::read_tail_entry(&input, end_offset).await?;
>  
> @@ -408,6 +448,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>              table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
>              path,
>              caches,
> +            payload_input,
>          };
>  
>          // sanity check:
> @@ -503,6 +544,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>                  None => self.path.clone(),
>                  Some(file) => self.path.join(file),
>              },
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -534,6 +576,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>                  entry_range: self.entry_range(),
>              },
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.payload_input.clone(),
>          })
>      }
>  
> @@ -686,6 +729,7 @@ pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
>      entry: Entry,
>      entry_range_info: EntryRangeInfo,
>      caches: Arc<Caches>,
> +    payload_input: Option<T>,
>  }
>  
>  impl<T: Clone + ReadAt> FileEntryImpl<T> {
> @@ -699,6 +743,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
>              self.entry_range_info.entry_range.end,
>              self.entry.path.clone(),
>              Arc::clone(&self.caches),
> +            self.payload_input.clone(),
>          )
>          .await
>      }
> @@ -711,16 +756,35 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
>              }
>              EntryKind::File {
>                  size,
> -                offset: Some(offset),
> -                ..
> -            } => Ok(Some(offset..(offset + size))),
> +                offset,
> +                payload_offset,
> +            } => {
> +                // Payload offset will be some for PXAR_PAYLOAD_REF's
> +                // It should win over the regular offset, since the actual payloads
> +                // are stored in the separated payload input stream
> +                if let Some(payload_offset) = payload_offset {
> +                    return Ok(Some(payload_offset..(payload_offset + size)));
> +                }
> +
> +                if let Some(offset) = offset {
> +                    return Ok(Some(offset..(offset + size)));
> +                }
> +
> +                Ok(None)

style: this could actually be handled by the match arm:

@@ -711,14 +757,27 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             EntryKind::File {
                 size,
                 offset: Some(offset),
+                payload_offset: None,
             } => Ok(Some(offset..(offset + size))),
+            // Payload offset beats regular offset
+            EntryKind::File {
+                size,
+                offset: Some(_offset),
+                payload_offset: Some(payload_offset),
+            } => Ok(Some(payload_offset..(payload_offset + size))),
             _ => Ok(None),
         }
     }

> +            }
>              _ => Ok(None),
>          }
>      }
>  
>      pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
>          match self.content_range()? {
> -            Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
> +            Some(range) => {
> +                if let Some(ref payload_input) = self.payload_input {
> +                    Ok(FileContentsImpl::new(payload_input.clone(), range))
> +                } else {
> +                    Ok(FileContentsImpl::new(self.input.clone(), range))
> +                }
> +            }
>              None => io_bail!("not a file"),
>          }
>      }
> @@ -810,6 +874,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
>              entry,
>              entry_range_info: self.entry_range_info.clone(),
>              caches: Arc::clone(&self.caches),
> +            payload_input: self.dir.payload_input.clone(),
>          })
>      }
>  
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list