[pbs-devel] [RFC v2 pxar 06/36] encoder: move to stack based state tracking

Christian Ebner c.ebner at proxmox.com
Tue Mar 5 10:26:33 CET 2024


In preparation for the proxmox-backup-client look-ahead caching,
where a passing around of different encoder instances with internal
references is not feasible.

Instead of creating a new encoder instance for each directory level
and keeping references to the parent state, use an internal stack.

This is a breaking change in the pxar library API.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- fix incorrect docu comment

 examples/pxarcmd.rs |   6 +-
 src/encoder/aio.rs  |  20 ++--
 src/encoder/mod.rs  | 246 +++++++++++++++++++++++++-------------------
 src/encoder/sync.rs |  10 +-
 4 files changed, 158 insertions(+), 124 deletions(-)

diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
index e0c779d..dcf3c44 100644
--- a/examples/pxarcmd.rs
+++ b/examples/pxarcmd.rs
@@ -138,14 +138,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
 
         let meta = Metadata::from(&file_meta);
         if file_type.is_dir() {
-            let mut dir = encoder.create_directory(file_name, &meta)?;
+            encoder.create_directory(file_name, &meta)?;
             add_directory(
-                &mut dir,
+                encoder,
                 std::fs::read_dir(file_path)?,
                 root_path,
                 &mut *hardlinks,
             )?;
-            dir.finish()?;
+            encoder.finish()?;
         } else if file_type.is_symlink() {
             todo!("symlink handling");
         } else if file_type.is_file() {
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 82b9ab2..60b11cd 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -105,17 +105,14 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: self
-                .inner
-                .create_directory(file_name.as_ref(), metadata)
-                .await?,
-        })
+    ) -> io::Result<()> {
+        self.inner
+            .create_directory(file_name.as_ref(), metadata)
+            .await
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub async fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub async fn finish(&mut self) -> io::Result<()> {
         self.inner.finish().await
     }
 
@@ -302,11 +299,12 @@ mod test {
                 .await
                 .unwrap();
             {
-                let mut dir = encoder
+                encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
                     .await
                     .unwrap();
-                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+                encoder
+                    .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
                     .await
                     .unwrap();
             }
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index e4ea69b..962087a 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -227,6 +227,16 @@ struct EncoderState {
 }
 
 impl EncoderState {
+    #[inline]
+    fn position(&self) -> u64 {
+        self.write_position
+    }
+
+    #[inline]
+    fn payload_position(&self) -> u64 {
+        self.payload_write_position
+    }
+
     fn merge_error(&mut self, error: Option<EncodeError>) {
         // one error is enough:
         if self.encode_error.is_none() {
@@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
     Borrowed(&'a mut T),
 }
 
-impl<'a, T> EncoderOutput<'a, T> {
-    #[inline]
-    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
-    where
-        'a: 's,
-    {
-        EncoderOutput::Borrowed(self.as_mut())
-    }
-}
-
 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
     fn as_mut(&mut self) -> &mut T {
         match self {
@@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
     payload_output: EncoderOutput<'a, Option<T>>,
-    state: EncoderState,
-    parent: Option<&'a mut EncoderState>,
+    /// EncoderState stack storing the state for each directory level
+    state: Vec<EncoderState>,
     finished: bool,
 
     /// Since only the "current" entry can be actively writing files, we share the file copy
@@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     file_copy_buffer: Arc<Mutex<Vec<u8>>>,
 }
 
-impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
-    fn drop(&mut self) {
-        if let Some(ref mut parent) = self.parent {
-            // propagate errors:
-            parent.merge_error(self.state.encode_error);
-            if !self.finished {
-                parent.add_error(EncodeError::IncompleteDirectory);
-            }
-        } else if !self.finished {
-            // FIXME: how do we deal with this?
-            // eprintln!("Encoder dropped without finishing!");
-        }
-    }
-}
-
 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     pub async fn new(
         output: EncoderOutput<'a, T>,
@@ -317,8 +302,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let mut this = Self {
             output,
             payload_output: EncoderOutput::Owned(None),
-            state: EncoderState::default(),
-            parent: None,
+            state: vec![EncoderState::default()],
             finished: false,
             file_copy_buffer: Arc::new(Mutex::new(unsafe {
                 crate::util::vec_new_uninitialized(1024 * 1024)
@@ -326,7 +310,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         };
 
         this.encode_metadata(metadata).await?;
-        this.state.files_offset = this.position();
+        let state = this
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        state.files_offset = state.position();
 
         Ok(this)
     }
@@ -337,13 +325,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     fn check(&self) -> io::Result<()> {
-        match self.state.encode_error {
+        if self.finished {
+            io_bail!("unexpected encoder finished state");
+        }
+        let state = self
+            .state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        match state.encode_error {
             Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
             Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
             None => Ok(()),
         }
     }
 
+    fn state(&self) -> io::Result<&EncoderState> {
+        self.state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+        self.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
     pub async fn create_file<'b>(
         &'b mut self,
         metadata: &Metadata,
@@ -368,26 +375,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
         if self.payload_output.as_mut().is_some() {
-            let mut data = self.payload_position().to_le_bytes().to_vec();
+            let state = self
+                .state
+                .last_mut()
+                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+            let mut data = state.payload_position().to_le_bytes().to_vec();
             data.append(&mut file_size.to_le_bytes().to_vec());
             seq_write_pxar_entry(
                 self.output.as_mut(),
                 format::PXAR_PAYLOAD_REF,
                 &data,
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         } else {
             let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
             header.check_header_size()?;
-            seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+            let state = self
+                .state
+                .last_mut()
+                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+            seq_write_struct(self.output.as_mut(), header, &mut state.write_position).await?;
         };
 
-        let payload_data_offset = self.position();
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        let payload_data_offset = state.position();
 
         let meta_size = payload_data_offset - file_offset;
 
@@ -400,7 +419,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 size: file_size + meta_size,
             },
             remaining_size: file_size,
-            parent: &mut self.state,
+            parent: state,
         })
     }
 
@@ -481,7 +500,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         target: &Path,
         target_offset: LinkOffset,
     ) -> io::Result<()> {
-        let current_offset = self.position();
+        let current_offset = self.state()?.position();
         if current_offset <= target_offset.0 {
             io_bail!("invalid hardlink offset, can only point to prior files");
         }
@@ -555,24 +574,29 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<LinkOffset> {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
 
         let file_name = file_name.as_os_str().as_bytes();
 
         self.start_file_do(metadata, file_name).await?;
+
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         if let Some((htype, entry_data)) = entry_htype_data {
             seq_write_pxar_entry(
                 self.output.as_mut(),
                 htype,
                 entry_data,
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
-        let end_offset = self.position();
+        let end_offset = state.position();
 
-        self.state.items.push(GoodbyeItem {
+        state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
             offset: file_offset,
             size: end_offset - file_offset,
@@ -581,21 +605,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         Ok(LinkOffset(file_offset))
     }
 
-    #[inline]
-    fn position(&mut self) -> u64 {
-        self.state.write_position
-    }
-
-    #[inline]
-    fn payload_position(&mut self) -> u64 {
-        self.state.payload_write_position
-    }
-
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'_, T>> {
+    ) -> io::Result<()> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -605,37 +619,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_name = file_name.as_os_str().as_bytes();
         let file_hash = format::hash_filename(file_name);
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.encode_filename(file_name).await?;
 
-        let entry_offset = self.position();
+        let entry_offset = self.state()?.position();
         self.encode_metadata(metadata).await?;
 
-        let files_offset = self.position();
+        let state = self.state_mut()?;
+        let files_offset = state.position();
 
         // the child will write to OUR state now:
-        let write_position = self.position();
-        let payload_write_position = self.payload_position();
-
-        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
-        Ok(EncoderImpl {
-            // always forward as Borrowed(), to avoid stacking references on nested calls
-            output: self.output.to_borrowed_mut(),
-            payload_output: self.payload_output.to_borrowed_mut(),
-            state: EncoderState {
-                entry_offset,
-                files_offset,
-                file_offset: Some(file_offset),
-                file_hash,
-                write_position,
-                payload_write_position,
-                ..Default::default()
-            },
-            parent: Some(&mut self.state),
-            finished: false,
-            file_copy_buffer,
-        })
+        let write_position = state.position();
+        let payload_write_position = state.payload_position();
+
+        self.state.push(EncoderState {
+            entry_offset,
+            files_offset,
+            file_offset: Some(file_offset),
+            file_hash,
+            write_position,
+            payload_write_position,
+            ..Default::default()
+        });
+
+        Ok(())
     }
 
     async fn start_file_do(
@@ -651,11 +658,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_struct_entry(
             self.output.as_mut(),
             format::PXAR_ENTRY,
             metadata.stat.clone(),
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -677,22 +688,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_XATTR,
             &xattr.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
                 self.output.as_mut(),
                 format::PXAR_ACL_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -702,7 +721,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -712,7 +731,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -722,7 +741,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -732,7 +751,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -742,7 +761,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -751,11 +770,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_FCAPS,
             &fcaps.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
@@ -764,33 +787,45 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         &mut self,
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_struct_entry(
             self.output.as_mut(),
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry_zero(
             self.output.as_mut(),
             format::PXAR_FILENAME,
             file_name,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
-    pub async fn finish(mut self) -> io::Result<()> {
+    pub async fn finish(&mut self) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
+        let mut state = self
+            .state
+            .pop()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -804,34 +839,37 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             flush(output).await?;
         }
 
-        // done up here because of the self-borrow and to propagate
-        let end_offset = self.position();
-        let payload_end_offset = self.payload_position();
+        let end_offset = state.position();
+        let payload_end_offset = state.payload_position();
 
-        if let Some(parent) = &mut self.parent {
+        if let Some(parent) = self.state.last_mut() {
             parent.write_position = end_offset;
             parent.payload_write_position = payload_end_offset;
 
-            let file_offset = self
-                .state
+            let file_offset = state
                 .file_offset
                 .expect("internal error: parent set but no file_offset?");
 
             parent.items.push(GoodbyeItem {
-                hash: self.state.file_hash,
+                hash: state.file_hash,
                 offset: file_offset,
                 size: end_offset - file_offset,
             });
+            // propagate errors
+            parent.merge_error(state.encode_error);
+        } else {
+            self.finished = true;
         }
-        self.finished = true;
+
         Ok(())
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
-        let goodbye_offset = self.position();
+        let state = self.state_mut()?;
+        let goodbye_offset = state.position();
 
         // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
-        let mut tail = take(&mut self.state.items);
+        let mut tail = take(&mut state.items);
         let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
         let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
 
@@ -856,7 +894,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         bst.push(
             GoodbyeItem {
                 hash: format::PXAR_GOODBYE_TAIL_MARKER,
-                offset: goodbye_offset - self.state.entry_offset,
+                offset: goodbye_offset - state.entry_offset,
                 size: goodbye_size,
             }
             .to_le(),
@@ -886,8 +924,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
     /// exactly zero.
     remaining_size: u64,
 
-    /// The directory containing this file. This is where we propagate the `IncompleteFile` error
-    /// to, and where we insert our `GoodbyeItem`.
+    /// The directory stack with the last item being the directory containing this file. This is
+    /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`.
     parent: &'a mut EncoderState,
 }
 
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 28981df..a7c350e 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -106,14 +106,12 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
-        })
+    ) -> io::Result<()> {
+        poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub fn finish(&mut self) -> io::Result<()> {
         poll_result_once(self.inner.finish())
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list