[pbs-devel] [PATCH v3 pxar 06/58] encoder: move to stack based state tracking
Christian Ebner
c.ebner at proxmox.com
Thu Mar 28 13:36:15 CET 2024
In preparation for the proxmox-backup-client look-ahead caching,
where a passing around of different encoder instances with internal
references is not feasible.
Instead of creating a new encoder instance for each directory level
and keeping references to the parent state, use an internal stack.
This is a breaking change in the pxar library API.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- consume encoder with new `close` method to finalize
- new output_state helper usable when for both a mut borrow is required
- double checked use of state/state_mut usage
- major refactoring
examples/pxarcmd.rs | 7 +-
src/encoder/aio.rs | 26 ++--
src/encoder/mod.rs | 285 +++++++++++++++++++++++--------------------
src/encoder/sync.rs | 16 ++-
tests/simple/main.rs | 3 +
5 files changed, 187 insertions(+), 150 deletions(-)
diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
index e0c779d..0294eba 100644
--- a/examples/pxarcmd.rs
+++ b/examples/pxarcmd.rs
@@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> {
let mut encoder = Encoder::create(file, &meta)?;
add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?;
encoder.finish()?;
+ encoder.close()?;
Ok(())
}
@@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
let meta = Metadata::from(&file_meta);
if file_type.is_dir() {
- let mut dir = encoder.create_directory(file_name, &meta)?;
+ encoder.create_directory(file_name, &meta)?;
add_directory(
- &mut dir,
+ encoder,
std::fs::read_dir(file_path)?,
root_path,
&mut *hardlinks,
)?;
- dir.finish()?;
+ encoder.finish()?;
} else if file_type.is_symlink() {
todo!("symlink handling");
} else if file_type.is_file() {
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 31a1a2f..635e550 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -109,20 +109,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
&mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'_, T>> {
- Ok(Encoder {
- inner: self
- .inner
- .create_directory(file_name.as_ref(), metadata)
- .await?,
- })
+ ) -> io::Result<()> {
+ self.inner
+ .create_directory(file_name.as_ref(), metadata)
+ .await
}
- /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub async fn finish(self) -> io::Result<()> {
+ /// Finish this directory. This is mandatory, encodes the end for the current directory.
+ pub async fn finish(&mut self) -> io::Result<()> {
self.inner.finish().await
}
+ /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+ /// output stream, if some is given
+ pub async fn close(self) -> io::Result<()> {
+ self.inner.close().await
+ }
+
/// Add a symbolic link to the archive.
pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
&mut self,
@@ -307,11 +310,12 @@ mod test {
.await
.unwrap();
{
- let mut dir = encoder
+ encoder
.create_directory("baba", &Metadata::dir_builder(0o700).build())
.await
.unwrap();
- dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+ encoder
+ .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
.await
.unwrap();
}
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index bff6acf..31bb0fa 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -227,6 +227,16 @@ struct EncoderState {
}
impl EncoderState {
+ #[inline]
+ fn position(&self) -> u64 {
+ self.write_position
+ }
+
+ #[inline]
+ fn payload_position(&self) -> u64 {
+ self.payload_write_position
+ }
+
fn merge_error(&mut self, error: Option<EncodeError>) {
// one error is enough:
if self.encode_error.is_none() {
@@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
Borrowed(&'a mut T),
}
-impl<'a, T> EncoderOutput<'a, T> {
- #[inline]
- fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
- where
- 'a: 's,
- {
- EncoderOutput::Borrowed(self.as_mut())
- }
-}
-
impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
fn as_mut(&mut self) -> &mut T {
match self {
@@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
output: EncoderOutput<'a, T>,
payload_output: EncoderOutput<'a, Option<T>>,
- state: EncoderState,
- parent: Option<&'a mut EncoderState>,
+ /// EncoderState stack storing the state for each directory level
+ state: Vec<EncoderState>,
finished: bool,
/// Since only the "current" entry can be actively writing files, we share the file copy
@@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
file_copy_buffer: Arc<Mutex<Vec<u8>>>,
}
-impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
- fn drop(&mut self) {
- if let Some(ref mut parent) = self.parent {
- // propagate errors:
- parent.merge_error(self.state.encode_error);
- if !self.finished {
- parent.add_error(EncodeError::IncompleteDirectory);
- }
- } else if !self.finished {
- // FIXME: how do we deal with this?
- // eprintln!("Encoder dropped without finishing!");
- }
- }
-}
-
impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
pub async fn new(
output: EncoderOutput<'a, T>,
@@ -318,8 +303,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let mut this = Self {
output,
payload_output: EncoderOutput::Owned(None),
- state: EncoderState::default(),
- parent: None,
+ state: vec![EncoderState::default()],
finished: false,
file_copy_buffer: Arc::new(Mutex::new(unsafe {
crate::util::vec_new_uninitialized(1024 * 1024)
@@ -327,7 +311,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
};
this.encode_metadata(metadata).await?;
- this.state.files_offset = this.position();
+ let state = this.state_mut()?;
+ state.files_offset = state.position();
if let Some(payload_output) = payload_output {
this.payload_output = EncoderOutput::Owned(Some(payload_output));
@@ -337,13 +322,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
fn check(&self) -> io::Result<()> {
- match self.state.encode_error {
+ if self.finished {
+ io_bail!("unexpected encoder finished state");
+ }
+ let state = self.state()?;
+ match state.encode_error {
Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
None => Ok(()),
}
}
+ fn state(&self) -> io::Result<&EncoderState> {
+ self.state
+ .last()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+ }
+
+ fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+ self.state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+ }
+
+ fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
+ Ok((
+ self.output.as_mut(),
+ self.state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+ ))
+ }
+
pub async fn create_file<'b>(
&'b mut self,
metadata: &Metadata,
@@ -368,17 +378,22 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
{
self.check()?;
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
self.start_file_do(Some(metadata), file_name).await?;
if let Some(payload_output) = self.payload_output.as_mut() {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+
// Position prior to the payload header
- let payload_position = self.state.payload_write_position;
+ let payload_position = state.payload_position();
// Separate payloads in payload archive PXAR_PAYLOAD markers
let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
header.check_header_size()?;
- seq_write_struct(payload_output, header, &mut self.state.payload_write_position).await?;
+ seq_write_struct(payload_output, header, &mut state.payload_write_position).await?;
let payload_ref = PayloadRef {
offset: payload_position,
@@ -390,16 +405,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_PAYLOAD_REF,
&payload_ref.data(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
} else {
let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
header.check_header_size()?;
- seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ let (output, state) = self.output_state()?;
+ seq_write_struct(output, header, &mut state.write_position).await?;
}
- let payload_data_offset = self.position();
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ let payload_data_offset = state.position();
let meta_size = payload_data_offset - file_offset;
@@ -412,7 +432,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
size: file_size + meta_size,
},
remaining_size: file_size,
- parent: &mut self.state,
+ parent: state,
})
}
@@ -493,7 +513,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
target: &Path,
target_offset: LinkOffset,
) -> io::Result<()> {
- let current_offset = self.position();
+ let current_offset = self.state()?.position();
if current_offset <= target_offset.0 {
io_bail!("invalid hardlink offset, can only point to prior files");
}
@@ -567,24 +587,20 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
) -> io::Result<LinkOffset> {
self.check()?;
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
let file_name = file_name.as_os_str().as_bytes();
self.start_file_do(metadata, file_name).await?;
+
+ let (output, state) = self.output_state()?;
if let Some((htype, entry_data)) = entry_htype_data {
- seq_write_pxar_entry(
- self.output.as_mut(),
- htype,
- entry_data,
- &mut self.state.write_position,
- )
- .await?;
+ seq_write_pxar_entry(output, htype, entry_data, &mut state.write_position).await?;
}
- let end_offset = self.position();
+ let end_offset = state.position();
- self.state.items.push(GoodbyeItem {
+ state.items.push(GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
size: end_offset - file_offset,
@@ -593,21 +609,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(LinkOffset(file_offset))
}
- #[inline]
- fn position(&mut self) -> u64 {
- self.state.write_position
- }
-
- #[inline]
- fn payload_position(&mut self) -> u64 {
- self.state.payload_write_position
- }
-
pub async fn create_directory(
&mut self,
file_name: &Path,
metadata: &Metadata,
- ) -> io::Result<EncoderImpl<'_, T>> {
+ ) -> io::Result<()> {
self.check()?;
if !metadata.is_dir() {
@@ -617,37 +623,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let file_name = file_name.as_os_str().as_bytes();
let file_hash = format::hash_filename(file_name);
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
self.encode_filename(file_name).await?;
- let entry_offset = self.position();
+ let entry_offset = self.state()?.position();
self.encode_metadata(metadata).await?;
- let files_offset = self.position();
+ let state = self.state_mut()?;
+ let files_offset = state.position();
// the child will write to OUR state now:
- let write_position = self.position();
- let payload_write_position = self.payload_position();
-
- let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
- Ok(EncoderImpl {
- // always forward as Borrowed(), to avoid stacking references on nested calls
- output: self.output.to_borrowed_mut(),
- payload_output: self.payload_output.to_borrowed_mut(),
- state: EncoderState {
- entry_offset,
- files_offset,
- file_offset: Some(file_offset),
- file_hash,
- write_position,
- payload_write_position,
- ..Default::default()
- },
- parent: Some(&mut self.state),
- finished: false,
- file_copy_buffer,
- })
+ let write_position = state.position();
+ let payload_write_position = state.payload_position();
+
+ self.state.push(EncoderState {
+ entry_offset,
+ files_offset,
+ file_offset: Some(file_offset),
+ file_hash,
+ write_position,
+ payload_write_position,
+ ..Default::default()
+ });
+
+ Ok(())
}
async fn start_file_do(
@@ -663,11 +662,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+ let (output, state) = self.output_state()?;
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ENTRY,
metadata.stat.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
@@ -689,72 +689,74 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+ let (output, state) = self.output_state()?;
seq_write_pxar_entry(
- self.output.as_mut(),
+ output,
format::PXAR_XATTR,
&xattr.data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+ let (output, state) = self.output_state()?;
for acl in &acl.users {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_USER,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
for acl in &acl.groups {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_GROUP,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
if let Some(acl) = &acl.group_obj {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_GROUP_OBJ,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
if let Some(acl) = &acl.default {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_DEFAULT,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
for acl in &acl.default_users {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_DEFAULT_USER,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
for acl in &acl.default_groups {
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_ACL_DEFAULT_GROUP,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -763,11 +765,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+ let (output, state) = self.output_state()?;
seq_write_pxar_entry(
- self.output.as_mut(),
+ output,
format::PXAR_FCAPS,
&fcaps.data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
@@ -776,35 +779,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
&mut self,
quota_project_id: &format::QuotaProjectId,
) -> io::Result<()> {
+ let (output, state) = self.output_state()?;
seq_write_pxar_struct_entry(
- self.output.as_mut(),
+ output,
format::PXAR_QUOTA_PROJID,
*quota_project_id,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
crate::util::validate_filename(file_name)?;
+ let (output, state) = self.output_state()?;
seq_write_pxar_entry_zero(
- self.output.as_mut(),
+ output,
format::PXAR_FILENAME,
file_name,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
- pub async fn finish(mut self) -> io::Result<()> {
- let tail_bytes = self.finish_goodbye_table().await?;
- seq_write_pxar_entry(
- self.output.as_mut(),
- format::PXAR_GOODBYE,
- &tail_bytes,
- &mut self.state.write_position,
- )
- .await?;
+ pub async fn close(mut self) -> io::Result<()> {
+ if !self.state.is_empty() {
+ io_bail!("unexpected state on encoder close");
+ }
if let EncoderOutput::Owned(Some(output)) = &mut self.payload_output {
flush(output).await?;
@@ -814,34 +814,59 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
flush(output).await?;
}
- // done up here because of the self-borrow and to propagate
- let end_offset = self.position();
- let payload_end_offset = self.payload_position();
+ self.finished = true;
+
+ Ok(())
+ }
+
+ pub async fn finish(&mut self) -> io::Result<()> {
+ let tail_bytes = self.finish_goodbye_table().await?;
+ let mut state = self
+ .state
+ .pop()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_GOODBYE,
+ &tail_bytes,
+ &mut state.write_position,
+ )
+ .await?;
+
+ let end_offset = state.position();
+ let payload_end_offset = state.payload_position();
- if let Some(parent) = &mut self.parent {
+ if let Some(parent) = self.state.last_mut() {
parent.write_position = end_offset;
parent.payload_write_position = payload_end_offset;
- let file_offset = self
- .state
+ let file_offset = state
.file_offset
.expect("internal error: parent set but no file_offset?");
parent.items.push(GoodbyeItem {
- hash: self.state.file_hash,
+ hash: state.file_hash,
offset: file_offset,
size: end_offset - file_offset,
});
+ // propagate errors
+ parent.merge_error(state.encode_error);
+ Ok(())
+ } else {
+ match state.encode_error {
+ Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
+ Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
+ None => Ok(()),
+ }
}
- self.finished = true;
- Ok(())
}
async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
- let goodbye_offset = self.position();
+ let state = self.state_mut()?;
+ let goodbye_offset = state.position();
// "take" out the tail (to not leave an array of endian-swapped structs in `self`)
- let mut tail = take(&mut self.state.items);
+ let mut tail = take(&mut state.items);
let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
@@ -866,7 +891,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
bst.push(
GoodbyeItem {
hash: format::PXAR_GOODBYE_TAIL_MARKER,
- offset: goodbye_offset - self.state.entry_offset,
+ offset: goodbye_offset - state.entry_offset,
size: goodbye_size,
}
.to_le(),
@@ -896,8 +921,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
/// exactly zero.
remaining_size: u64,
- /// The directory containing this file. This is where we propagate the `IncompleteFile` error
- /// to, and where we insert our `GoodbyeItem`.
+ /// The directory stack with the last item being the directory containing this file. This is
+ /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`.
parent: &'a mut EncoderState,
}
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 96d056d..d0d62ba 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -106,17 +106,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
&mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'_, T>> {
- Ok(Encoder {
- inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
- })
+ ) -> io::Result<()> {
+ poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
}
- /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub fn finish(self) -> io::Result<()> {
+ /// Finish this directory. This is mandatory, encodes the end for the current directory.
+ pub fn finish(&mut self) -> io::Result<()> {
poll_result_once(self.inner.finish())
}
+ /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+ /// output stream, if some is given
+ pub fn close(self) -> io::Result<()> {
+ poll_result_once(self.inner.close())
+ }
+
/// Add a symbolic link to the archive.
pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
&mut self,
diff --git a/tests/simple/main.rs b/tests/simple/main.rs
index d661c7d..e55457f 100644
--- a/tests/simple/main.rs
+++ b/tests/simple/main.rs
@@ -51,6 +51,9 @@ fn test1() {
encoder
.finish()
.expect("failed to finish encoding the pxar archive");
+ encoder
+ .close()
+ .expect("failed to close the encoder instance");
assert!(!file.is_empty(), "encoder did not write any data");
--
2.39.2
More information about the pbs-devel
mailing list