[pbs-devel] [PATCH v6 pxar 9/29] fix #3174: enc: move from instance per dir to state stack

Christian Ebner c.ebner at proxmox.com
Thu Jan 25 14:25:48 CET 2024


The current encoder implementation creates an EncoderImpl for each
directory, storing its state and references to the parent and output,
restricting possible encoding errors by missuing the api.

This api is however to strict when state has to be tracked across
directoy boundaries, as e.g. when trying to encode cached entries, as
then multiple encoder instances with internal references have to be
handled.

This patch therefore moves to an internal stack to keep track of the
encoder states for different directory levels, thereby removing the
internal parent child borrowing. Only a single EncoderImpl instance will
be created, the `create_directory` call and `finish` call operating on
the internal state stack instead of creating/consuming the EncoderImpl
instance.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since v5:
- not present in previous version

 examples/pxarcmd.rs |   6 +-
 src/encoder/aio.rs  |  20 ++-
 src/encoder/mod.rs  | 305 ++++++++++++++++++++++++++------------------
 src/encoder/sync.rs |  16 ++-
 4 files changed, 202 insertions(+), 145 deletions(-)

diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
index c7848cc..0530e20 100644
--- a/examples/pxarcmd.rs
+++ b/examples/pxarcmd.rs
@@ -138,14 +138,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
 
         let meta = Metadata::from(&file_meta);
         if file_type.is_dir() {
-            let mut dir = encoder.create_directory(file_name, &meta)?;
+            encoder.create_directory(file_name, &meta)?;
             add_directory(
-                &mut dir,
+                encoder,
                 std::fs::read_dir(file_path)?,
                 root_path,
                 &mut *hardlinks,
             )?;
-            dir.finish(None)?;
+            encoder.finish(None)?;
         } else if file_type.is_symlink() {
             todo!("symlink handling");
         } else if file_type.is_file() {
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 8f586b6..1870c7d 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -103,25 +103,22 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: self
-                .inner
-                .create_directory(file_name.as_ref(), metadata)
-                .await?,
-        })
+    ) -> io::Result<()> {
+        self.inner
+            .create_directory(file_name.as_ref(), metadata)
+            .await
     }
 
     /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
     pub async fn finish(
-        self,
+        &mut self,
         appendix_tail: Option<(AppendixStartOffset, AppendixRefOffset)>,
     ) -> io::Result<()> {
         self.inner.finish(appendix_tail).await
     }
 
     /// Add size to encoders position and return new position.
-    pub unsafe fn position_add(&mut self, size: u64) -> u64 {
+    pub unsafe fn position_add(&mut self, size: u64) -> io::Result<u64> {
         unsafe { self.inner.position_add(size) }
     }
 
@@ -334,11 +331,12 @@ mod test {
             .await
             .unwrap();
             {
-                let mut dir = encoder
+                encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
                     .await
                     .unwrap();
-                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+                encoder
+                    .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
                     .await
                     .unwrap();
             }
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 7e18dc8..113dd9f 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -241,8 +241,7 @@ where
 pub async fn encoded_size(filename: &std::ffi::CStr, metadata: &Metadata) -> io::Result<u64> {
     let mut this = EncoderImpl {
         output: EncoderOutput::Owned(SeqSink::default()),
-        state: EncoderState::default(),
-        parent: None,
+        state: vec![EncoderState::default()],
         finished: false,
         file_copy_buffer: Arc::new(Mutex::new(unsafe {
             crate::util::vec_new_uninitialized(1024 * 1024)
@@ -252,7 +251,8 @@ pub async fn encoded_size(filename: &std::ffi::CStr, metadata: &Metadata) -> io:
 
     this.start_file_do(Some(metadata), filename.to_bytes())
         .await?;
-    Ok(this.position())
+    let state = this.state.last().ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+    Ok(state.position())
 }
 
 /// Error conditions caused by wrong usage of this crate.
@@ -296,6 +296,11 @@ struct EncoderState {
 }
 
 impl EncoderState {
+    #[inline]
+    fn position(&self) -> u64 {
+        self.write_position
+    }
+
     fn merge_error(&mut self, error: Option<EncodeError>) {
         // one error is enough:
         if self.encode_error.is_none() {
@@ -313,16 +318,6 @@ pub(crate) enum EncoderOutput<'a, T> {
     Borrowed(&'a mut T),
 }
 
-impl<'a, T> EncoderOutput<'a, T> {
-    #[inline]
-    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
-    where
-        'a: 's,
-    {
-        EncoderOutput::Borrowed(self.as_mut())
-    }
-}
-
 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
     fn as_mut(&mut self) -> &mut T {
         match self {
@@ -350,8 +345,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
-    state: EncoderState,
-    parent: Option<&'a mut EncoderState>,
+    /// EncoderState stack storing the state for each directory level
+    state: Vec<EncoderState>,
     finished: bool,
 
     /// Since only the "current" entry can be actively writing files, we share the file copy
@@ -361,21 +356,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     version: format::FormatVersion,
 }
 
-impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
-    fn drop(&mut self) {
-        if let Some(ref mut parent) = self.parent {
-            // propagate errors:
-            parent.merge_error(self.state.encode_error);
-            if !self.finished {
-                parent.add_error(EncodeError::IncompleteDirectory);
-            }
-        } else if !self.finished {
-            // FIXME: how do we deal with this?
-            // eprintln!("Encoder dropped without finishing!");
-        }
-    }
-}
-
 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     pub async fn new(
         output: EncoderOutput<'a, T>,
@@ -387,8 +367,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         }
         let mut this = Self {
             output,
-            state: EncoderState::default(),
-            parent: None,
+            state: vec![EncoderState::default()],
             finished: false,
             file_copy_buffer: Arc::new(Mutex::new(unsafe {
                 crate::util::vec_new_uninitialized(1024 * 1024)
@@ -398,19 +377,41 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         this.encode_format_version().await?;
         this.encode_metadata(metadata).await?;
-        this.state.files_offset = this.position();
+        let state = this.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        state.file_offset = Some(state.position());
 
         Ok(this)
     }
 
     fn check(&self) -> io::Result<()> {
-        match self.state.encode_error {
+        if self.finished {
+            io_bail!("unexpected encoder finished state");
+        }
+        let state = self
+            .state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        match state.encode_error {
             Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
             Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
             None => Ok(()),
         }
     }
 
+    fn state(&self) -> io::Result<&EncoderState> {
+        self.state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+        self.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
     pub async fn create_file<'b>(
         &'b mut self,
         metadata: &Metadata,
@@ -434,16 +435,18 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         'a: 'b,
     {
         self.check()?;
-
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
         let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
         header.check_header_size()?;
 
-        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+        let state = self.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        seq_write_struct(self.output.as_mut(), header, &mut state.write_position).await?;
 
-        let payload_data_offset = self.position();
+        let payload_data_offset = state.position();
 
         let meta_size = payload_data_offset - file_offset;
 
@@ -520,23 +523,29 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         }
         self.check()?;
 
-        let offset = self.position();
+        let offset = self.state()?.position();
         let file_name = file_name.as_os_str().as_bytes();
 
         let mut data = Vec::with_capacity(2 * 8);
         data.extend(&appendix_ref_offset.raw().to_le_bytes());
         data.extend(&file_size.to_le_bytes());
+
+
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_APPENDIX_REF,
             &data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
-        let end_offset = self.position();
+        let end_offset = state.write_position;
 
-        self.state.items.push(GoodbyeItem {
+        state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
             offset: offset,
             size: end_offset - offset,
@@ -559,17 +568,19 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.check()?;
 
         let data = &full_size.raw().to_le_bytes().to_vec();
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_APPENDIX,
             &data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
-        let offset = self.position();
-
-        Ok(AppendixStartOffset(offset))
+        Ok(AppendixStartOffset(state.write_position))
     }
 
     /// Return a file offset usable with `add_hardlink`.
@@ -600,7 +611,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         target: &Path,
         target_offset: LinkOffset,
     ) -> io::Result<()> {
-        let current_offset = self.position();
+        let current_offset = self.state()?.position();
         if current_offset <= target_offset.0 {
             io_bail!("invalid hardlink offset, can only point to prior files");
         }
@@ -673,25 +684,28 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         entry_htype_data: Option<(u64, &[u8])>,
     ) -> io::Result<LinkOffset> {
         self.check()?;
-
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
 
         let file_name = file_name.as_os_str().as_bytes();
 
         self.start_file_do(metadata, file_name).await?;
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         if let Some((htype, entry_data)) = entry_htype_data {
             seq_write_pxar_entry(
                 self.output.as_mut(),
                 htype,
                 entry_data,
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
-        let end_offset = self.position();
+        let end_offset = state.position();
 
-        self.state.items.push(GoodbyeItem {
+        state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
             offset: file_offset,
             size: end_offset - file_offset,
@@ -701,21 +715,17 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     #[inline]
-    fn position(&mut self) -> u64 {
-        self.state.write_position
-    }
-
-    #[inline]
-    pub unsafe fn position_add(&mut self, size: u64) -> u64 {
-        self.state.write_position += size;
-        self.state.write_position
+    pub unsafe fn position_add(&mut self, size: u64) -> io::Result<u64> {
+        let state = self.state_mut()?;
+        state.write_position += size;
+        Ok(state.write_position)
     }
 
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'_, T>> {
+    ) -> io::Result<()> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -725,35 +735,28 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_name = file_name.as_os_str().as_bytes();
         let file_hash = format::hash_filename(file_name);
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.encode_filename(file_name).await?;
 
-        let entry_offset = self.position();
+        let entry_offset = self.state()?.position();
         self.encode_metadata(metadata).await?;
 
-        let files_offset = self.position();
+        let state = self.state_mut()?;
+        let files_offset = state.position();
 
         // the child will write to OUR state now:
-        let write_position = self.position();
-
-        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
-        Ok(EncoderImpl {
-            // always forward as Borrowed(), to avoid stacking references on nested calls
-            output: self.output.to_borrowed_mut(),
-            state: EncoderState {
-                entry_offset,
-                files_offset,
-                file_offset: Some(file_offset),
-                file_hash,
-                write_position,
-                ..Default::default()
-            },
-            parent: Some(&mut self.state),
-            finished: false,
-            file_copy_buffer,
-            version: self.version.clone(),
-        })
+        let write_position = state.position();
+
+        self.state.push(EncoderState {
+            entry_offset,
+            files_offset,
+            file_offset: Some(file_offset),
+            file_hash,
+            write_position,
+            ..Default::default()
+        });
+
+        Ok(())
     }
 
     async fn start_file_do(
@@ -769,30 +772,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn encode_format_version(&mut self) -> io::Result<()> {
-		if self.state.write_position != 0 {
-			io_bail!("pxar format version must be encoded at the beginning of an archive");
-		}
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        if state.write_position != 0 {
+            io_bail!("pxar format version must be encoded at the beginning of an archive");
+        }
 
-		let version_bytes = match self.version {
-			format::FormatVersion::Version1 => return Ok(()),
-			format::FormatVersion::Version2 => 2u64.to_le_bytes(),
-		};
+        let version_bytes = match self.version {
+            format::FormatVersion::Version1 => return Ok(()),
+            format::FormatVersion::Version2 => 2u64.to_le_bytes(),
+        };
 
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_FORMAT_VERSION,
             &version_bytes,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_struct_entry(
             self.output.as_mut(),
             format::PXAR_ENTRY,
             metadata.stat.clone(),
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -814,22 +825,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_XATTR,
             &xattr.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
                 self.output.as_mut(),
                 format::PXAR_ACL_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -839,7 +858,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -849,7 +868,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -859,7 +878,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -869,7 +888,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -879,7 +898,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -888,11 +907,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_FCAPS,
             &fcaps.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
@@ -901,36 +924,49 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         &mut self,
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_struct_entry(
             self.output.as_mut(),
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
+        let state = self
+            .state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry_zero(
             self.output.as_mut(),
             format::PXAR_FILENAME,
             file_name,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     pub async fn finish(
-        mut self,
+        &mut self,
         appendix_tail: Option<(AppendixStartOffset, AppendixRefOffset)>,
     ) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
+
+        let mut state = self
+            .state
+            .pop()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -943,7 +979,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 self.output.as_mut(),
                 format::PXAR_GOODBYE,
                 &appendix_tail,
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -952,32 +988,33 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             flush(output).await?;
         }
 
-        // done up here because of the self-borrow and to propagate
-        let end_offset = self.position();
+        let end_offset = state.write_position;
 
-        if let Some(parent) = &mut self.parent {
+        if let Some(parent) = self.state.last_mut() {
             parent.write_position = end_offset;
 
-            let file_offset = self
-                .state
-                .file_offset
-                .expect("internal error: parent set but no file_offset?");
+            let file_offset = state.file_offset.expect("internal error: no file offset?");
 
             parent.items.push(GoodbyeItem {
-                hash: self.state.file_hash,
+                hash: state.file_hash,
                 offset: file_offset,
                 size: end_offset - file_offset,
             });
+            // propagate errors:
+            parent.merge_error(state.encode_error);
+        } else {
+            self.finished = true;
         }
-        self.finished = true;
+
         Ok(())
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
-        let goodbye_offset = self.position();
+        let state = self.state_mut()?;
+        let goodbye_offset = state.position();
 
         // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
-        let mut tail = take(&mut self.state.items);
+        let mut tail = take(&mut state.items);
         let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
         let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
 
@@ -1002,7 +1039,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         bst.push(
             GoodbyeItem {
                 hash: format::PXAR_GOODBYE_TAIL_MARKER,
-                offset: goodbye_offset - self.state.entry_offset,
+                offset: goodbye_offset - state.entry_offset,
                 size: goodbye_size,
             }
             .to_le(),
@@ -1029,18 +1066,26 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
     /// exactly zero.
     remaining_size: u64,
 
+    ///FIXME: Update comments
     /// The directory containing this file. This is where we propagate the `IncompleteFile` error
     /// to, and where we insert our `GoodbyeItem`.
-    parent: &'a mut EncoderState,
+    parent: &'a mut Vec<EncoderState>,
 }
 
 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
     fn drop(&mut self) {
         if self.remaining_size != 0 {
-            self.parent.add_error(EncodeError::IncompleteFile);
+            self.parent
+                .last_mut()
+                .unwrap()
+                .add_error(EncodeError::IncompleteFile);
         }
 
-        self.parent.items.push(self.goodbye_item.clone());
+        self.parent
+            .last_mut()
+            .unwrap()
+            .items
+            .push(self.goodbye_item.clone());
     }
 }
 
@@ -1071,7 +1116,11 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
         match output.poll_seq_write(cx, data) {
             Poll::Ready(Ok(put)) => {
                 this.remaining_size -= put as u64;
-                this.parent.write_position += put as u64;
+                let mut parent = this
+                    .parent
+                    .last_mut()
+                    .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+                parent.write_position += put as u64;
                 Poll::Ready(Ok(put))
             }
             other => other,
@@ -1105,14 +1154,22 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
                 .await?;
         //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
         self.remaining_size -= put as u64;
-        self.parent.write_position += put as u64;
+        let parent = self
+            .parent
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        parent.write_position += put as u64;
         Ok(put)
     }
 
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        let parent = self
+            .parent
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+        seq_write_all(self.output, data, &mut parent.write_position).await?;
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 8516662..fc338b2 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -28,7 +28,11 @@ impl<'a, T: io::Write + 'a> Encoder<'a, StandardWriter<T>> {
     /// Encode a `pxar` archive into a regular `std::io::Write` output.
     #[inline]
     pub fn from_std(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, StandardWriter<T>>> {
-        Encoder::new(StandardWriter::new(output), metadata, format::FormatVersion::default())
+        Encoder::new(
+            StandardWriter::new(output),
+            metadata,
+            format::FormatVersion::default(),
+        )
     }
 }
 
@@ -100,22 +104,20 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
-        })
+    ) -> io::Result<()> {
+        poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
     }
 
     /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
     pub fn finish(
-        self,
+        &mut self,
         appendix_tail: Option<(AppendixStartOffset, AppendixRefOffset)>,
     ) -> io::Result<()> {
         poll_result_once(self.inner.finish(appendix_tail))
     }
 
     /// Add size to encoders position and return new position.
-    pub unsafe fn position_add(&mut self, size: u64) -> u64 {
+    pub unsafe fn position_add(&mut self, size: u64) -> io::Result<u64> {
         unsafe { self.inner.position_add(size) }
     }
 
-- 
2.39.2





More information about the pbs-devel mailing list