[pbs-devel] [PATCH v8 pxar 03/69] encoder: move to stack based state tracking

Christian Ebner c.ebner at proxmox.com
Tue May 28 11:41:57 CEST 2024


In preparation for the proxmox-backup-client look-ahead caching,
where a passing around of different encoder instances with internal
references is not feasible.

Instead of creating a new encoder instance for each directory level
and keeping references to the parent state, use an internal stack.
Adds additional helper functions to solve borrow issues, when both
the state and writers have to be accessed by a mutable reference.

This is a breaking change in the pxar library API.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 7:
- no changes

changes since version 6:
- patch reordered

 examples/pxarcmd.rs  |   7 +-
 src/encoder/aio.rs   |  26 +++--
 src/encoder/mod.rs   | 271 +++++++++++++++++++++++++------------------
 src/encoder/sync.rs  |  16 ++-
 tests/simple/fs.rs   |   6 +-
 tests/simple/main.rs |   3 +
 6 files changed, 196 insertions(+), 133 deletions(-)

diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
index e0c779d..0294eba 100644
--- a/examples/pxarcmd.rs
+++ b/examples/pxarcmd.rs
@@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> {
     let mut encoder = Encoder::create(file, &meta)?;
     add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?;
     encoder.finish()?;
+    encoder.close()?;
 
     Ok(())
 }
@@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
 
         let meta = Metadata::from(&file_meta);
         if file_type.is_dir() {
-            let mut dir = encoder.create_directory(file_name, &meta)?;
+            encoder.create_directory(file_name, &meta)?;
             add_directory(
-                &mut dir,
+                encoder,
                 std::fs::read_dir(file_path)?,
                 root_path,
                 &mut *hardlinks,
             )?;
-            dir.finish()?;
+            encoder.finish()?;
         } else if file_type.is_symlink() {
             todo!("symlink handling");
         } else if file_type.is_file() {
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index ad25fea..f11e57c 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -98,20 +98,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: self
-                .inner
-                .create_directory(file_name.as_ref(), metadata)
-                .await?,
-        })
+    ) -> io::Result<()> {
+        self.inner
+            .create_directory(file_name.as_ref(), metadata)
+            .await
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub async fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub async fn finish(&mut self) -> io::Result<()> {
         self.inner.finish().await
     }
 
+    /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+    /// output stream, if some is given
+    pub async fn close(self) -> io::Result<()> {
+        self.inner.close().await
+    }
+
     /// Add a symbolic link to the archive.
     pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
@@ -295,11 +298,12 @@ mod test {
                 .await
                 .unwrap();
             {
-                let mut dir = encoder
+                encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
                     .await
                     .unwrap();
-                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+                encoder
+                    .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
                     .await
                     .unwrap();
             }
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index da41733..2bc3128 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -221,9 +221,17 @@ struct EncoderState {
 
     /// We need to keep track how much we have written to get offsets.
     write_position: u64,
+
+    /// Mark the encoder state as correctly finished, ready to be dropped
+    finished: bool,
 }
 
 impl EncoderState {
+    #[inline]
+    fn position(&self) -> u64 {
+        self.write_position
+    }
+
     fn merge_error(&mut self, error: Option<EncodeError>) {
         // one error is enough:
         if self.encode_error.is_none() {
@@ -234,6 +242,23 @@ impl EncoderState {
     fn add_error(&mut self, error: EncodeError) {
         self.merge_error(Some(error));
     }
+
+    fn finish(&mut self) -> Option<EncodeError> {
+        self.finished = true;
+        self.encode_error.take()
+    }
+}
+
+impl Drop for EncoderState {
+    fn drop(&mut self) {
+        if !self.finished {
+            eprintln!("unfinished encoder state dropped");
+        }
+
+        if self.encode_error.is_some() {
+            eprintln!("finished encoder state with errors");
+        }
+    }
 }
 
 pub(crate) enum EncoderOutput<'a, T> {
@@ -241,16 +266,6 @@ pub(crate) enum EncoderOutput<'a, T> {
     Borrowed(&'a mut T),
 }
 
-impl<'a, T> EncoderOutput<'a, T> {
-    #[inline]
-    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
-    where
-        'a: 's,
-    {
-        EncoderOutput::Borrowed(self.as_mut())
-    }
-}
-
 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
     fn as_mut(&mut self) -> &mut T {
         match self {
@@ -278,8 +293,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
-    state: EncoderState,
-    parent: Option<&'a mut EncoderState>,
+    /// EncoderState stack storing the state for each directory level
+    state: Vec<EncoderState>,
     finished: bool,
 
     /// Since only the "current" entry can be actively writing files, we share the file copy
@@ -289,15 +304,12 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
 
 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
     fn drop(&mut self) {
-        if let Some(ref mut parent) = self.parent {
-            // propagate errors:
-            parent.merge_error(self.state.encode_error);
-            if !self.finished {
-                parent.add_error(EncodeError::IncompleteDirectory);
-            }
-        } else if !self.finished {
-            // FIXME: how do we deal with this?
-            // eprintln!("Encoder dropped without finishing!");
+        if !self.finished {
+            eprintln!("unclosed encoder dropped");
+        }
+
+        if !self.state.is_empty() {
+            eprintln!("closed encoder dropped with state");
         }
     }
 }
@@ -312,8 +324,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         }
         let mut this = Self {
             output,
-            state: EncoderState::default(),
-            parent: None,
+            state: vec![EncoderState::default()],
             finished: false,
             file_copy_buffer: Arc::new(Mutex::new(unsafe {
                 crate::util::vec_new_uninitialized(1024 * 1024)
@@ -321,19 +332,45 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         };
 
         this.encode_metadata(metadata).await?;
-        this.state.files_offset = this.position();
+        let state = this.state_mut()?;
+        state.files_offset = state.position();
 
         Ok(this)
     }
 
     fn check(&self) -> io::Result<()> {
-        match self.state.encode_error {
+        if self.finished {
+            io_bail!("unexpected encoder finished state");
+        }
+        let state = self.state()?;
+        match state.encode_error {
             Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
             Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
             None => Ok(()),
         }
     }
 
+    fn state(&self) -> io::Result<&EncoderState> {
+        self.state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+        self.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
+        Ok((
+            self.output.as_mut(),
+            self.state
+                .last_mut()
+                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+        ))
+    }
+
     pub async fn create_file<'b>(
         &'b mut self,
         metadata: &Metadata,
@@ -358,27 +395,27 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
         let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
         header.check_header_size()?;
+        let (output, state) = self.output_state()?;
+        seq_write_struct(output, header, &mut state.write_position).await?;
 
-        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
-
-        let payload_data_offset = self.position();
+        let payload_data_offset = state.position();
 
         let meta_size = payload_data_offset - file_offset;
 
         Ok(FileImpl {
-            output: self.output.as_mut(),
+            output,
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
                 size: file_size + meta_size,
             },
             remaining_size: file_size,
-            parent: &mut self.state,
+            parent: state,
         })
     }
 
@@ -459,7 +496,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         target: &Path,
         target_offset: LinkOffset,
     ) -> io::Result<()> {
-        let current_offset = self.position();
+        let current_offset = self.state()?.position();
         if current_offset <= target_offset.0 {
             io_bail!("invalid hardlink offset, can only point to prior files");
         }
@@ -533,24 +570,20 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<LinkOffset> {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
 
         let file_name = file_name.as_os_str().as_bytes();
 
         self.start_file_do(metadata, file_name).await?;
+
+        let (output, state) = self.output_state()?;
         if let Some((htype, entry_data)) = entry_htype_data {
-            seq_write_pxar_entry(
-                self.output.as_mut(),
-                htype,
-                entry_data,
-                &mut self.state.write_position,
-            )
-            .await?;
+            seq_write_pxar_entry(output, htype, entry_data, &mut state.write_position).await?;
         }
 
-        let end_offset = self.position();
+        let end_offset = state.position();
 
-        self.state.items.push(GoodbyeItem {
+        state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
             offset: file_offset,
             size: end_offset - file_offset,
@@ -559,16 +592,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         Ok(LinkOffset(file_offset))
     }
 
-    #[inline]
-    fn position(&mut self) -> u64 {
-        self.state.write_position
-    }
-
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'_, T>> {
+    ) -> io::Result<()> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -578,34 +606,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_name = file_name.as_os_str().as_bytes();
         let file_hash = format::hash_filename(file_name);
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.encode_filename(file_name).await?;
 
-        let entry_offset = self.position();
+        let entry_offset = self.state()?.position();
         self.encode_metadata(metadata).await?;
 
-        let files_offset = self.position();
+        let state = self.state_mut()?;
+        let files_offset = state.position();
 
         // the child will write to OUR state now:
-        let write_position = self.position();
-
-        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
-        Ok(EncoderImpl {
-            // always forward as Borrowed(), to avoid stacking references on nested calls
-            output: self.output.to_borrowed_mut(),
-            state: EncoderState {
-                entry_offset,
-                files_offset,
-                file_offset: Some(file_offset),
-                file_hash,
-                write_position,
-                ..Default::default()
-            },
-            parent: Some(&mut self.state),
+        let write_position = state.position();
+
+        self.state.push(EncoderState {
+            items: Vec::new(),
+            encode_error: None,
+            entry_offset,
+            files_offset,
+            file_offset: Some(file_offset),
+            file_hash,
+            write_position,
             finished: false,
-            file_copy_buffer,
-        })
+        });
+
+        Ok(())
     }
 
     async fn start_file_do(
@@ -621,11 +645,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_ENTRY,
             metadata.stat.clone(),
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -647,72 +672,74 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_XATTR,
             &xattr.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         if let Some(acl) = &acl.group_obj {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         if let Some(acl) = &acl.default {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.default_users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.default_groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -721,11 +748,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_FCAPS,
             &fcaps.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
@@ -734,66 +762,89 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         &mut self,
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry_zero(
-            self.output.as_mut(),
+            output,
             format::PXAR_FILENAME,
             file_name,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
-    pub async fn finish(mut self) -> io::Result<()> {
+    pub async fn close(mut self) -> io::Result<()> {
+        if !self.state.is_empty() {
+            io_bail!("unexpected state on encoder close");
+        }
+
+        if let EncoderOutput::Owned(output) = &mut self.output {
+            flush(output).await?;
+        }
+
+        self.finished = true;
+
+        Ok(())
+    }
+
+    pub async fn finish(&mut self) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
+        let mut state = self
+            .state
+            .pop()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
-        if let EncoderOutput::Owned(output) = &mut self.output {
-            flush(output).await?;
-        }
+        let end_offset = state.position();
 
-        // done up here because of the self-borrow and to propagate
-        let end_offset = self.position();
-
-        if let Some(parent) = &mut self.parent {
+        let encode_error = state.finish();
+        if let Some(parent) = self.state.last_mut() {
             parent.write_position = end_offset;
 
-            let file_offset = self
-                .state
+            let file_offset = state
                 .file_offset
                 .expect("internal error: parent set but no file_offset?");
 
             parent.items.push(GoodbyeItem {
-                hash: self.state.file_hash,
+                hash: state.file_hash,
                 offset: file_offset,
                 size: end_offset - file_offset,
             });
+            // propagate errors
+            parent.merge_error(encode_error);
+            Ok(())
+        } else {
+            match encode_error {
+                Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
+                Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
+                None => Ok(()),
+            }
         }
-        self.finished = true;
-        Ok(())
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
-        let goodbye_offset = self.position();
+        let state = self.state_mut()?;
+        let goodbye_offset = state.position();
 
         // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
-        let mut tail = take(&mut self.state.items);
+        let mut tail = take(&mut state.items);
         let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
         let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
 
@@ -818,7 +869,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         bst.push(
             GoodbyeItem {
                 hash: format::PXAR_GOODBYE_TAIL_MARKER,
-                offset: goodbye_offset - self.state.entry_offset,
+                offset: goodbye_offset - state.entry_offset,
                 size: goodbye_size,
             }
             .to_le(),
@@ -845,8 +896,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
     /// exactly zero.
     remaining_size: u64,
 
-    /// The directory containing this file. This is where we propagate the `IncompleteFile` error
-    /// to, and where we insert our `GoodbyeItem`.
+    /// The directory stack with the last item being the directory containing this file. This is
+    /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`.
     parent: &'a mut EncoderState,
 }
 
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 1ec91b8..48a97af 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -99,17 +99,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
-        })
+    ) -> io::Result<()> {
+        poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub fn finish(&mut self) -> io::Result<()> {
         poll_result_once(self.inner.finish())
     }
 
+    /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+    /// output stream, if some is given
+    pub fn close(self) -> io::Result<()> {
+        poll_result_once(self.inner.close())
+    }
+
     /// Add a symbolic link to the archive.
     pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
diff --git a/tests/simple/fs.rs b/tests/simple/fs.rs
index 9a89c4d..4284805 100644
--- a/tests/simple/fs.rs
+++ b/tests/simple/fs.rs
@@ -144,12 +144,12 @@ impl Entry {
 
             EntryKind::Directory(entries) => {
                 self.no_hardlink()?;
-                let mut dir = encoder.create_directory(&self.name, &self.metadata)?;
+                encoder.create_directory(&self.name, &self.metadata)?;
                 let path = path.join(&self.name);
                 for entry in entries {
-                    entry.encode_into(&mut dir, hardlinks, &path)?;
+                    entry.encode_into(encoder, hardlinks, &path)?;
                 }
-                dir.finish()?;
+                encoder.finish()?;
             }
 
             EntryKind::Symlink(path) => {
diff --git a/tests/simple/main.rs b/tests/simple/main.rs
index d661c7d..e55457f 100644
--- a/tests/simple/main.rs
+++ b/tests/simple/main.rs
@@ -51,6 +51,9 @@ fn test1() {
     encoder
         .finish()
         .expect("failed to finish encoding the pxar archive");
+    encoder
+        .close()
+        .expect("failed to close the encoder instance");
 
     assert!(!file.is_empty(), "encoder did not write any data");
 
-- 
2.39.2





More information about the pbs-devel mailing list