[pbs-devel] [RFC v2 pxar 06/36] encoder: move to stack based state tracking
Fabian Grünbichler
f.gruenbichler at proxmox.com
Mon Mar 11 14:21:50 CET 2024
On March 5, 2024 10:26 am, Christian Ebner wrote:
> In preparation for the proxmox-backup-client look-ahead caching,
> where a passing around of different encoder instances with internal
> references is not feasible.
>
> Instead of creating a new encoder instance for each directory level
> and keeping references to the parent state, use an internal stack.
>
> This is a breaking change in the pxar library API.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - fix incorrect docu comment
>
> examples/pxarcmd.rs | 6 +-
> src/encoder/aio.rs | 20 ++--
> src/encoder/mod.rs | 246 +++++++++++++++++++++++++-------------------
> src/encoder/sync.rs | 10 +-
> 4 files changed, 158 insertions(+), 124 deletions(-)
>
> diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
> index e0c779d..dcf3c44 100644
> --- a/examples/pxarcmd.rs
> +++ b/examples/pxarcmd.rs
> @@ -138,14 +138,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
>
> let meta = Metadata::from(&file_meta);
> if file_type.is_dir() {
> - let mut dir = encoder.create_directory(file_name, &meta)?;
> + encoder.create_directory(file_name, &meta)?;
> add_directory(
> - &mut dir,
> + encoder,
> std::fs::read_dir(file_path)?,
> root_path,
> &mut *hardlinks,
> )?;
> - dir.finish()?;
> + encoder.finish()?;
> } else if file_type.is_symlink() {
> todo!("symlink handling");
> } else if file_type.is_file() {
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index 82b9ab2..60b11cd 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -105,17 +105,14 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
> &mut self,
> file_name: P,
> metadata: &Metadata,
> - ) -> io::Result<Encoder<'_, T>> {
> - Ok(Encoder {
> - inner: self
> - .inner
> - .create_directory(file_name.as_ref(), metadata)
> - .await?,
> - })
> + ) -> io::Result<()> {
> + self.inner
> + .create_directory(file_name.as_ref(), metadata)
> + .await
> }
>
> - /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
> - pub async fn finish(self) -> io::Result<()> {
> + /// Finish this directory. This is mandatory, encodes the end for the current directory.
> + pub async fn finish(&mut self) -> io::Result<()> {
> self.inner.finish().await
> }
>
> @@ -302,11 +299,12 @@ mod test {
> .await
> .unwrap();
> {
> - let mut dir = encoder
> + encoder
> .create_directory("baba", &Metadata::dir_builder(0o700).build())
> .await
> .unwrap();
> - dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> + encoder
> + .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> .await
> .unwrap();
> }
> diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
> index e4ea69b..962087a 100644
> --- a/src/encoder/mod.rs
> +++ b/src/encoder/mod.rs
> @@ -227,6 +227,16 @@ struct EncoderState {
> }
>
> impl EncoderState {
> + #[inline]
> + fn position(&self) -> u64 {
> + self.write_position
> + }
> +
> + #[inline]
> + fn payload_position(&self) -> u64 {
> + self.payload_write_position
> + }
> +
> fn merge_error(&mut self, error: Option<EncodeError>) {
> // one error is enough:
> if self.encode_error.is_none() {
> @@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
> Borrowed(&'a mut T),
> }
>
> -impl<'a, T> EncoderOutput<'a, T> {
> - #[inline]
> - fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
> - where
> - 'a: 's,
> - {
> - EncoderOutput::Borrowed(self.as_mut())
> - }
> -}
> -
> impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
> fn as_mut(&mut self) -> &mut T {
> match self {
> @@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
> pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> output: EncoderOutput<'a, T>,
> payload_output: EncoderOutput<'a, Option<T>>,
> - state: EncoderState,
> - parent: Option<&'a mut EncoderState>,
> + /// EncoderState stack storing the state for each directory level
> + state: Vec<EncoderState>,
> finished: bool,
>
> /// Since only the "current" entry can be actively writing files, we share the file copy
> @@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> file_copy_buffer: Arc<Mutex<Vec<u8>>>,
> }
>
> -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
> - fn drop(&mut self) {
> - if let Some(ref mut parent) = self.parent {
> - // propagate errors:
> - parent.merge_error(self.state.encode_error);
> - if !self.finished {
> - parent.add_error(EncodeError::IncompleteDirectory);
> - }
> - } else if !self.finished {
> - // FIXME: how do we deal with this?
> - // eprintln!("Encoder dropped without finishing!");
> - }
> - }
> -}
> -
> impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> pub async fn new(
> output: EncoderOutput<'a, T>,
> @@ -317,8 +302,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> let mut this = Self {
> output,
> payload_output: EncoderOutput::Owned(None),
> - state: EncoderState::default(),
> - parent: None,
> + state: vec![EncoderState::default()],
> finished: false,
> file_copy_buffer: Arc::new(Mutex::new(unsafe {
> crate::util::vec_new_uninitialized(1024 * 1024)
> @@ -326,7 +310,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> };
>
> this.encode_metadata(metadata).await?;
> - this.state.files_offset = this.position();
> + let state = this
> + .state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
nit: this could just use this.state_mut()
> + state.files_offset = state.position();
>
> Ok(this)
> }
> @@ -337,13 +325,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> }
>
> fn check(&self) -> io::Result<()> {
> - match self.state.encode_error {
> + if self.finished {
> + io_bail!("unexpected encoder finished state");
> + }
> + let state = self
> + .state
> + .last()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
> + match state.encode_error {
nit: this could just use self.state()
> Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
> Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
> None => Ok(()),
> }
> }
>
> + fn state(&self) -> io::Result<&EncoderState> {
> + self.state
> + .last()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> + }
> +
> + fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
> + self.state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> + }
> +
> pub async fn create_file<'b>(
> &'b mut self,
> metadata: &Metadata,
> @@ -368,26 +375,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> {
> self.check()?;
>
> - let file_offset = self.position();
> + let file_offset = self.state()?.position();
> self.start_file_do(Some(metadata), file_name).await?;
>
> if self.payload_output.as_mut().is_some() {
> - let mut data = self.payload_position().to_le_bytes().to_vec();
> + let state = self
> + .state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
this one here
> + let mut data = state.payload_position().to_le_bytes().to_vec();
> data.append(&mut file_size.to_le_bytes().to_vec());
> seq_write_pxar_entry(
> self.output.as_mut(),
> format::PXAR_PAYLOAD_REF,
> &data,
> - &mut self.state.write_position,
> + &mut state.write_position,
> )
> .await?;
> } else {
> let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
> header.check_header_size()?;
> - seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
> + let state = self
> + .state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
> + seq_write_struct(self.output.as_mut(), header, &mut state.write_position).await?;
and this one here can't use the state/state_mut helpers atm (because
those borrow self)..
> };
>
> - let payload_data_offset = self.position();
> + let state = self
> + .state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
> + let payload_data_offset = state.position();
same here (and repeated quite a few times in the remainder of this
patch). it might be worth it to have another helper that gives you
(&mut output, &mut state) for payload or regular output?
e.g., something like this (or as two helpers dropping the bool
parameter):
+ fn output_state(&mut self, payload: bool) -> io::Result<(&mut T, &mut EncoderState)> {
+ Ok((
+ if payload {
+ self.output.as_mut()
+ } else {
+ self.payload_output.as_mut().as_mut().unwrap()
+ },
+ self.state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+ ))
+ }
>
> let meta_size = payload_data_offset - file_offset;
>
> @@ -400,7 +419,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> size: file_size + meta_size,
> },
> remaining_size: file_size,
> - parent: &mut self.state,
> + parent: state,
> })
> }
>
[.. the rest just contains repeats of the pattern mentioned above]
More information about the pbs-devel
mailing list