[pbs-devel] [PATCH v3 pxar 06/58] encoder: move to stack based state tracking
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Apr 3 11:54:45 CEST 2024
On March 28, 2024 1:36 pm, Christian Ebner wrote:
> In preparation for the proxmox-backup-client look-ahead caching,
> where a passing around of different encoder instances with internal
> references is not feasible.
>
> Instead of creating a new encoder instance for each directory level
> and keeping references to the parent state, use an internal stack.
>
> This is a breaking change in the pxar library API.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - consume encoder with new `close` method to finalize
> - new output_state helper usable when for both a mut borrow is required
> - double checked use of state/state_mut usage
> - major refactoring
>
> examples/pxarcmd.rs | 7 +-
> src/encoder/aio.rs | 26 ++--
> src/encoder/mod.rs | 285 +++++++++++++++++++++++--------------------
> src/encoder/sync.rs | 16 ++-
> tests/simple/main.rs | 3 +
> 5 files changed, 187 insertions(+), 150 deletions(-)
>
> diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
> index e0c779d..0294eba 100644
> --- a/examples/pxarcmd.rs
> +++ b/examples/pxarcmd.rs
> @@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> {
> let mut encoder = Encoder::create(file, &meta)?;
> add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?;
> encoder.finish()?;
> + encoder.close()?;
>
> Ok(())
> }
> @@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
>
> let meta = Metadata::from(&file_meta);
> if file_type.is_dir() {
> - let mut dir = encoder.create_directory(file_name, &meta)?;
> + encoder.create_directory(file_name, &meta)?;
> add_directory(
> - &mut dir,
> + encoder,
> std::fs::read_dir(file_path)?,
> root_path,
> &mut *hardlinks,
> )?;
> - dir.finish()?;
> + encoder.finish()?;
> } else if file_type.is_symlink() {
> todo!("symlink handling");
> } else if file_type.is_file() {
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index 31a1a2f..635e550 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -109,20 +109,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
> &mut self,
> file_name: P,
> metadata: &Metadata,
> - ) -> io::Result<Encoder<'_, T>> {
> - Ok(Encoder {
> - inner: self
> - .inner
> - .create_directory(file_name.as_ref(), metadata)
> - .await?,
> - })
> + ) -> io::Result<()> {
> + self.inner
> + .create_directory(file_name.as_ref(), metadata)
> + .await
> }
>
> - /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
> - pub async fn finish(self) -> io::Result<()> {
> + /// Finish this directory. This is mandatory, encodes the end for the current directory.
> + pub async fn finish(&mut self) -> io::Result<()> {
> self.inner.finish().await
> }
>
> + /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
> + /// output stream, if some is given
> + pub async fn close(self) -> io::Result<()> {
> + self.inner.close().await
> + }
> +
> /// Add a symbolic link to the archive.
> pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
> &mut self,
> @@ -307,11 +310,12 @@ mod test {
> .await
> .unwrap();
> {
> - let mut dir = encoder
> + encoder
> .create_directory("baba", &Metadata::dir_builder(0o700).build())
> .await
> .unwrap();
> - dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> + encoder
> + .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> .await
> .unwrap();
> }
> diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
> index bff6acf..31bb0fa 100644
> --- a/src/encoder/mod.rs
> +++ b/src/encoder/mod.rs
> @@ -227,6 +227,16 @@ struct EncoderState {
> }
>
> impl EncoderState {
> + #[inline]
> + fn position(&self) -> u64 {
> + self.write_position
> + }
> +
> + #[inline]
> + fn payload_position(&self) -> u64 {
> + self.payload_write_position
> + }
> +
> fn merge_error(&mut self, error: Option<EncodeError>) {
> // one error is enough:
> if self.encode_error.is_none() {
> @@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
> Borrowed(&'a mut T),
> }
>
> -impl<'a, T> EncoderOutput<'a, T> {
> - #[inline]
> - fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
> - where
> - 'a: 's,
> - {
> - EncoderOutput::Borrowed(self.as_mut())
> - }
> -}
> -
> impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
> fn as_mut(&mut self) -> &mut T {
> match self {
> @@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
> pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> output: EncoderOutput<'a, T>,
> payload_output: EncoderOutput<'a, Option<T>>,
> - state: EncoderState,
> - parent: Option<&'a mut EncoderState>,
> + /// EncoderState stack storing the state for each directory level
> + state: Vec<EncoderState>,
> finished: bool,
>
> /// Since only the "current" entry can be actively writing files, we share the file copy
> @@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> file_copy_buffer: Arc<Mutex<Vec<u8>>>,
> }
>
> -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
> - fn drop(&mut self) {
> - if let Some(ref mut parent) = self.parent {
> - // propagate errors:
> - parent.merge_error(self.state.encode_error);
> - if !self.finished {
> - parent.add_error(EncodeError::IncompleteDirectory);
> - }
> - } else if !self.finished {
> - // FIXME: how do we deal with this?
> - // eprintln!("Encoder dropped without finishing!");
> - }
> - }
> -}
should we still have some sort of checks here? e.g., when dropping an
encoder, how should self.finished and self.state look like? IIUC, then a
dropped encoder should have an empty state and be finished (i.e.,
`close()` has been called on it).
or is this simply not relevant anymore because we only create one and
then drop it at the end (but should we then have a similar mechanism for
EncoderState?)
> -
> impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> pub async fn new(
> output: EncoderOutput<'a, T>,
> @@ -318,8 +303,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> let mut this = Self {
> output,
> payload_output: EncoderOutput::Owned(None),
> - state: EncoderState::default(),
> - parent: None,
> + state: vec![EncoderState::default()],
> finished: false,
> file_copy_buffer: Arc::new(Mutex::new(unsafe {
> crate::util::vec_new_uninitialized(1024 * 1024)
> @@ -327,7 +311,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> };
>
> this.encode_metadata(metadata).await?;
> - this.state.files_offset = this.position();
> + let state = this.state_mut()?;
> + state.files_offset = state.position();
>
> if let Some(payload_output) = payload_output {
> this.payload_output = EncoderOutput::Owned(Some(payload_output));
> @@ -337,13 +322,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> }
>
> fn check(&self) -> io::Result<()> {
> - match self.state.encode_error {
> + if self.finished {
> + io_bail!("unexpected encoder finished state");
> + }
> + let state = self.state()?;
> + match state.encode_error {
> Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
> Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
> None => Ok(()),
> }
> }
>
> + fn state(&self) -> io::Result<&EncoderState> {
> + self.state
> + .last()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> + }
> +
> + fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
> + self.state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))
> + }
> +
> + fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
> + Ok((
> + self.output.as_mut(),
> + self.state
> + .last_mut()
> + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
> + ))
> + }
> +
we could have another helper here that also returns the Option<&mut T>
for payload_output (while not used as often, it might still be a good
idea for readability):
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index b0ec877..e8c5faa 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -387,6 +387,16 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
))
}
+ fn payload_output_state(&mut self) -> io::Result<(&mut T, Option<&mut T>, &mut EncoderState)> {
+ Ok((
+ self.output.as_mut(),
+ self.payload_output.as_mut().as_mut(),
+ self.state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+ ))
+ }
+
pub async fn create_file<'b>(
&'b mut self,
metadata: &Metadata,
@@ -414,12 +424,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let file_offset = self.state()?.position();
self.start_file_do(Some(metadata), file_name).await?;
- if let Some(payload_output) = self.payload_output.as_mut() {
- let state = self
- .state
- .last_mut()
- .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ let (output, payload_output, state) = self.payload_output_state()?;
+ if let Some(payload_output) = payload_output {
// Position prior to the payload header
let payload_position = state.payload_position();
@@ -435,7 +442,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
// Write ref to metadata archive
seq_write_pxar_entry(
- self.output.as_mut(),
+ output,
format::PXAR_PAYLOAD_REF,
&payload_ref.data(),
&mut state.write_position,
@@ -444,21 +451,18 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
} else {
let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
header.check_header_size()?;
- let (output, state) = self.output_state()?;
seq_write_struct(output, header, &mut state.write_position).await?;
}
- let state = self
- .state
- .last_mut()
- .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ let (output, payload_output, state) = self.payload_output_state()?;
+
let payload_data_offset = state.position();
let meta_size = payload_data_offset - file_offset;
Ok(FileImpl {
- output: self.output.as_mut(),
- payload_output: self.payload_output.as_mut().as_mut(),
+ output,
+ payload_output,
goodbye_item: GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
> [..]
More information about the pbs-devel
mailing list