[pbs-devel] [PATCH v3 pxar 06/58] encoder: move to stack based state tracking

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Apr 3 11:54:45 CEST 2024


On March 28, 2024 1:36 pm, Christian Ebner wrote:
> In preparation for the proxmox-backup-client look-ahead caching,
> where a passing around of different encoder instances with internal
> references is not feasible.
> 
> Instead of creating a new encoder instance for each directory level
> and keeping references to the parent state, use an internal stack.
> 
> This is a breaking change in the pxar library API.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - consume encoder with new `close` method to finalize
> - new output_state helper usable when for both a mut borrow is required
> - double checked use of state/state_mut usage
> - major refactoring
> 
>  examples/pxarcmd.rs  |   7 +-
>  src/encoder/aio.rs   |  26 ++--
>  src/encoder/mod.rs   | 285 +++++++++++++++++++++++--------------------
>  src/encoder/sync.rs  |  16 ++-
>  tests/simple/main.rs |   3 +
>  5 files changed, 187 insertions(+), 150 deletions(-)
> 
> diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
> index e0c779d..0294eba 100644
> --- a/examples/pxarcmd.rs
> +++ b/examples/pxarcmd.rs
> @@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> {
>      let mut encoder = Encoder::create(file, &meta)?;
>      add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?;
>      encoder.finish()?;
> +    encoder.close()?;
>  
>      Ok(())
>  }
> @@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
>  
>          let meta = Metadata::from(&file_meta);
>          if file_type.is_dir() {
> -            let mut dir = encoder.create_directory(file_name, &meta)?;
> +            encoder.create_directory(file_name, &meta)?;
>              add_directory(
> -                &mut dir,
> +                encoder,
>                  std::fs::read_dir(file_path)?,
>                  root_path,
>                  &mut *hardlinks,
>              )?;
> -            dir.finish()?;
> +            encoder.finish()?;
>          } else if file_type.is_symlink() {
>              todo!("symlink handling");
>          } else if file_type.is_file() {
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index 31a1a2f..635e550 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -109,20 +109,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>          &mut self,
>          file_name: P,
>          metadata: &Metadata,
> -    ) -> io::Result<Encoder<'_, T>> {
> -        Ok(Encoder {
> -            inner: self
> -                .inner
> -                .create_directory(file_name.as_ref(), metadata)
> -                .await?,
> -        })
> +    ) -> io::Result<()> {
> +        self.inner
> +            .create_directory(file_name.as_ref(), metadata)
> +            .await
>      }
>  
> -    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
> -    pub async fn finish(self) -> io::Result<()> {
> +    /// Finish this directory. This is mandatory, encodes the end for the current directory.
> +    pub async fn finish(&mut self) -> io::Result<()> {
>          self.inner.finish().await
>      }
>  
> +    /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
> +    /// output stream, if some is given
> +    pub async fn close(self) -> io::Result<()> {
> +        self.inner.close().await
> +    }
> +
>      /// Add a symbolic link to the archive.
>      pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
>          &mut self,
> @@ -307,11 +310,12 @@ mod test {
>                      .await
>                      .unwrap();
>              {
> -                let mut dir = encoder
> +                encoder
>                      .create_directory("baba", &Metadata::dir_builder(0o700).build())
>                      .await
>                      .unwrap();
> -                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> +                encoder
> +                    .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
>                      .await
>                      .unwrap();
>              }
> diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
> index bff6acf..31bb0fa 100644
> --- a/src/encoder/mod.rs
> +++ b/src/encoder/mod.rs
> @@ -227,6 +227,16 @@ struct EncoderState {
>  }
>  
>  impl EncoderState {
> +    #[inline]
> +    fn position(&self) -> u64 {
> +        self.write_position
> +    }
> +
> +    #[inline]
> +    fn payload_position(&self) -> u64 {
> +        self.payload_write_position
> +    }
> +
>      fn merge_error(&mut self, error: Option<EncodeError>) {
>          // one error is enough:
>          if self.encode_error.is_none() {
> @@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
>      Borrowed(&'a mut T),
>  }
>  
> -impl<'a, T> EncoderOutput<'a, T> {
> -    #[inline]
> -    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
> -    where
> -        'a: 's,
> -    {
> -        EncoderOutput::Borrowed(self.as_mut())
> -    }
> -}
> -
>  impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
>      fn as_mut(&mut self) -> &mut T {
>          match self {
> @@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
>  pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
>      output: EncoderOutput<'a, T>,
>      payload_output: EncoderOutput<'a, Option<T>>,
> -    state: EncoderState,
> -    parent: Option<&'a mut EncoderState>,
> +    /// EncoderState stack storing the state for each directory level
> +    state: Vec<EncoderState>,
>      finished: bool,
>  
>      /// Since only the "current" entry can be actively writing files, we share the file copy
> @@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
>      file_copy_buffer: Arc<Mutex<Vec<u8>>>,
>  }
>  
> -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
> -    fn drop(&mut self) {
> -        if let Some(ref mut parent) = self.parent {
> -            // propagate errors:
> -            parent.merge_error(self.state.encode_error);
> -            if !self.finished {
> -                parent.add_error(EncodeError::IncompleteDirectory);
> -            }
> -        } else if !self.finished {
> -            // FIXME: how do we deal with this?
> -            // eprintln!("Encoder dropped without finishing!");
> -        }
> -    }
> -}

should we still have some sort of checks here? e.g., when dropping an
encoder, how should self.finished and self.state look like? IIUC, then a
dropped encoder should have an empty state and be finished (i.e.,
`close()` has been called on it).

or is this simply not relevant anymore because we only create one and
then drop it at the end (but should we then have a similar mechanism for
EncoderState?)

> -
>  impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>      pub async fn new(
>          output: EncoderOutput<'a, T>,
> @@ -318,8 +303,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          let mut this = Self {
>              output,
>              payload_output: EncoderOutput::Owned(None),
> -            state: EncoderState::default(),
> -            parent: None,
> +            state: vec![EncoderState::default()],
>              finished: false,
>              file_copy_buffer: Arc::new(Mutex::new(unsafe {
>                  crate::util::vec_new_uninitialized(1024 * 1024)
> @@ -327,7 +311,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          };
>  
>          this.encode_metadata(metadata).await?;
> -        this.state.files_offset = this.position();
> +        let state = this.state_mut()?;
> +        state.files_offset = state.position();
>  
>          if let Some(payload_output) = payload_output {
>              this.payload_output = EncoderOutput::Owned(Some(payload_output));
> @@ -337,13 +322,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>      }
>  
>      fn check(&self) -> io::Result<()> {
> -        match self.state.encode_error {
> +        if self.finished {
> +            io_bail!("unexpected encoder finished state");
> +        }
> +        let state = self.state()?;
> +        match state.encode_error {
>              Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
>              Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
>              None => Ok(()),
>          }
>      }
>  
> +    fn state(&self) -> io::Result<&EncoderState> {
> +        self.state
> +            .last()
> +            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> +    }
> +
> +    fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
> +        self.state
> +            .last_mut()
> +            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> +    }
> +
> +    fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
> +        Ok((
> +            self.output.as_mut(),
> +            self.state
> +                .last_mut()
> +                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
> +        ))
> +    }
> +

we could have another helper here that also returns the Option<&mut T>
for payload_output (while not used as often, it might still be a good
idea for readability):

diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index b0ec877..e8c5faa 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -387,6 +387,16 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         ))
     }
 
+    fn payload_output_state(&mut self) -> io::Result<(&mut T, Option<&mut T>, &mut EncoderState)> {
+        Ok((
+            self.output.as_mut(),
+            self.payload_output.as_mut().as_mut(),
+            self.state
+                .last_mut()
+                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+        ))
+    }
+
     pub async fn create_file<'b>(
         &'b mut self,
         metadata: &Metadata,
@@ -414,12 +424,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
-        if let Some(payload_output) = self.payload_output.as_mut() {
-            let state = self
-                .state
-                .last_mut()
-                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        let (output, payload_output, state) = self.payload_output_state()?;
 
+        if let Some(payload_output) = payload_output {
             // Position prior to the payload header
             let payload_position = state.payload_position();
 
@@ -435,7 +442,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
             // Write ref to metadata archive
             seq_write_pxar_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_PAYLOAD_REF,
                 &payload_ref.data(),
                 &mut state.write_position,
@@ -444,21 +451,18 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         } else {
             let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
             header.check_header_size()?;
-            let (output, state) = self.output_state()?;
             seq_write_struct(output, header, &mut state.write_position).await?;
         }
 
-        let state = self
-            .state
-            .last_mut()
-            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        let (output, payload_output, state) = self.payload_output_state()?;
+
         let payload_data_offset = state.position();
 
         let meta_size = payload_data_offset - file_offset;
 
         Ok(FileImpl {
-            output: self.output.as_mut(),
-            payload_output: self.payload_output.as_mut().as_mut(),
+            output,
+            payload_output,
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,

> [..]




More information about the pbs-devel mailing list