[pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async
Stefan Reiter
s.reiter at proxmox.com
Tue Feb 9 13:03:47 CET 2021
To really use the encoder with async/await, it needs to support
SeqWrite implementations that are Send. This requires changing a whole
bunch of '&mut dyn SeqWrite' trait objects to instead take a 'T:
SeqWrite' generic parameter directly instead. Most of this is quite
straightforward, though incurs a lot of churn (FileImpl needs a generic
parameter now for example).
The trickiest part is returning a new Encoder instance in
create_directory, as the trait object trick with
SeqWrite::as_trait_object doesn't work if SeqWrite is implemented for
generic '&mut S'.
Instead, work with the generic object directly, and express the
owned/borrowed state in the Encoder (to avoid nested borrowing) as an
enum EncoderOutput.
Add to the aio test to ensure the Encoder is now actually useable.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/encoder/aio.rs | 48 ++++++++---------
src/encoder/mod.rs | 128 +++++++++++++++++++++++---------------------
src/encoder/sync.rs | 28 ++++------
3 files changed, 101 insertions(+), 103 deletions(-)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index f4b96b3..6b90ce5 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -13,12 +13,12 @@ use crate::Metadata;
///
/// This is the `async` version of the `pxar` encoder.
#[repr(transparent)]
-pub struct Encoder<'a, T: SeqWrite + 'a> {
+pub struct Encoder<'a, T: SeqWrite + 'a + Send> {
inner: encoder::EncoderImpl<'a, T>,
}
#[cfg(feature = "tokio-io")]
-impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
+impl<'a, T: tokio::io::AsyncWrite + 'a + Send> Encoder<'a, TokioWriter<T>> {
/// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
#[inline]
pub async fn from_tokio(
@@ -44,11 +44,11 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
}
}
-impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
+impl<'a, T: SeqWrite + 'a + Send> Encoder<'a, T> {
/// Create an asynchronous encoder for an output implementing our internal write interface.
pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
Ok(Self {
- inner: encoder::EncoderImpl::new(output, metadata).await?,
+ inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
})
}
@@ -60,7 +60,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
metadata: &Metadata,
file_name: P,
file_size: u64,
- ) -> io::Result<File<'b>>
+ ) -> io::Result<File<'b, T>>
where
'a: 'b,
{
@@ -94,14 +94,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
- pub async fn create_directory<'b, P: AsRef<Path>>(
- &'b mut self,
+ pub async fn create_directory<P: AsRef<Path>>(
+ &mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
- where
- 'a: 'b,
- {
+ ) -> io::Result<Encoder<'_, T>> {
Ok(Encoder {
inner: self
.inner
@@ -111,15 +108,10 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
}
/// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub async fn finish(self) -> io::Result<T> {
+ pub async fn finish(self) -> io::Result<()> {
self.inner.finish().await
}
- /// Cancel this directory and get back the contained writer.
- pub fn into_writer(self) -> T {
- self.inner.into_writer()
- }
-
/// Add a symbolic link to the archive.
pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
&mut self,
@@ -176,11 +168,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
}
#[repr(transparent)]
-pub struct File<'a> {
- inner: encoder::FileImpl<'a>,
+pub struct File<'a, S: SeqWrite> {
+ inner: encoder::FileImpl<'a, S>,
}
-impl<'a> File<'a> {
+impl<'a, S: SeqWrite> File<'a, S> {
/// Get the file offset to be able to reference it with `add_hardlink`.
pub fn file_offset(&self) -> LinkOffset {
self.inner.file_offset()
@@ -198,7 +190,7 @@ impl<'a> File<'a> {
}
#[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for File<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
}
@@ -294,10 +286,16 @@ mod test {
let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
.await
.unwrap();
- encoder
- .create_directory("baba", &Metadata::dir_builder(0o700).build())
- .await
- .unwrap();
+ {
+ let mut dir = encoder
+ .create_directory("baba", &Metadata::dir_builder(0o700).build())
+ .await
+ .unwrap();
+ dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+ .await
+ .unwrap();
+ }
+ encoder.finish().await.unwrap();
};
fn test_send<T: Send>(_: T) {}
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 6ce35e0..428a5c5 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -48,20 +48,13 @@ pub trait SeqWrite {
) -> Poll<io::Result<usize>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
- /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
- /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
- /// subdirectory in that would have a trait object pointing to the trait object, and so on.
- fn as_trait_object(&mut self) -> &mut dyn SeqWrite
- where
- Self: Sized,
- {
- self as &mut dyn SeqWrite
- }
}
/// Allow using trait objects for generics taking a `SeqWrite`.
-impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
+impl<S> SeqWrite for &mut S
+where
+ S: SeqWrite + ?Sized,
+{
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
@@ -76,13 +69,6 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
}
-
- fn as_trait_object(&mut self) -> &mut dyn SeqWrite
- where
- Self: Sized,
- {
- &mut **self
- }
}
/// awaitable verison of `poll_seq_write`.
@@ -230,12 +216,38 @@ impl EncoderState {
}
}
+pub(crate) enum EncoderOutput<'a, T> {
+ Owned(T),
+ Borrowed(&'a mut T),
+}
+
+impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
+ fn as_mut(&mut self) -> &mut T {
+ match self {
+ EncoderOutput::Owned(ref mut o) => o,
+ EncoderOutput::Borrowed(b) => b,
+ }
+ }
+}
+
+impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
+ fn from(t: T) -> Self {
+ EncoderOutput::Owned(t)
+ }
+}
+
+impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
+ fn from(t: &'a mut T) -> Self {
+ EncoderOutput::Borrowed(t)
+ }
+}
+
/// The encoder state machine implementation for a directory.
///
/// We use `async fn` to implement the encoder state machine so that we can easily plug in both
/// synchronous or `async` I/O objects in as output.
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
- output: Option<T>,
+ output: EncoderOutput<'a, T>,
state: EncoderState,
parent: Option<&'a mut EncoderState>,
finished: bool,
@@ -261,12 +273,12 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
}
impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
- pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
+ pub(crate) async fn new(output: EncoderOutput<'a, T>, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
if !metadata.is_dir() {
io_bail!("directory metadata must contain the directory mode flag");
}
let mut this = Self {
- output: Some(output),
+ output,
state: EncoderState::default(),
parent: None,
finished: false,
@@ -292,7 +304,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
metadata: &Metadata,
file_name: &Path,
file_size: u64,
- ) -> io::Result<FileImpl<'b>>
+ ) -> io::Result<FileImpl<'b, T>>
where
'a: 'b,
{
@@ -305,7 +317,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
metadata: &Metadata,
file_name: &[u8],
file_size: u64,
- ) -> io::Result<FileImpl<'b>>
+ ) -> io::Result<FileImpl<'b, T>>
where
'a: 'b,
{
@@ -318,7 +330,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
header.check_header_size()?;
seq_write_struct(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
header,
&mut self.state.write_position,
)
@@ -329,7 +341,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let meta_size = payload_data_offset - file_offset;
Ok(FileImpl {
- output: self.output.as_mut().unwrap(),
+ output: self.output.as_mut(),
goodbye_item: GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
@@ -471,7 +483,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.start_file_do(metadata, file_name).await?;
if let Some((htype, entry_data)) = entry_htype_data {
seq_write_pxar_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
htype,
entry_data,
&mut self.state.write_position,
@@ -495,14 +507,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.state.write_position
}
- pub async fn create_directory<'b>(
- &'b mut self,
+ pub async fn create_directory(
+ &mut self,
file_name: &Path,
metadata: &Metadata,
- ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
- where
- 'a: 'b,
- {
+ ) -> io::Result<EncoderImpl<'_, T>> {
self.check()?;
if !metadata.is_dir() {
@@ -523,8 +532,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
// the child will write to OUR state now:
let write_position = self.position();
+ let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
+
Ok(EncoderImpl {
- output: self.output.as_mut().map(SeqWrite::as_trait_object),
+ // always forward as Borrowed(), to avoid stacking references on nested calls
+ output: self.output.as_mut().into(),
state: EncoderState {
entry_offset,
files_offset,
@@ -535,7 +547,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
},
parent: Some(&mut self.state),
finished: false,
- file_copy_buffer: Arc::clone(&self.file_copy_buffer),
+ file_copy_buffer,
})
}
@@ -553,7 +565,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ENTRY,
metadata.stat.clone(),
&mut self.state.write_position,
@@ -579,7 +591,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
seq_write_pxar_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_XATTR,
&xattr.data,
&mut self.state.write_position,
@@ -590,7 +602,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
for acl in &acl.users {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_USER,
acl.clone(),
&mut self.state.write_position,
@@ -600,7 +612,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
for acl in &acl.groups {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_GROUP,
acl.clone(),
&mut self.state.write_position,
@@ -610,7 +622,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
if let Some(acl) = &acl.group_obj {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_GROUP_OBJ,
acl.clone(),
&mut self.state.write_position,
@@ -620,7 +632,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
if let Some(acl) = &acl.default {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_DEFAULT,
acl.clone(),
&mut self.state.write_position,
@@ -630,7 +642,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
for acl in &acl.default_users {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_DEFAULT_USER,
acl.clone(),
&mut self.state.write_position,
@@ -640,7 +652,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
for acl in &acl.default_groups {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_ACL_DEFAULT_GROUP,
acl.clone(),
&mut self.state.write_position,
@@ -653,7 +665,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
seq_write_pxar_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_FCAPS,
&fcaps.data,
&mut self.state.write_position,
@@ -666,7 +678,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
quota_project_id: &format::QuotaProjectId,
) -> io::Result<()> {
seq_write_pxar_struct_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_QUOTA_PROJID,
*quota_project_id,
&mut self.state.write_position,
@@ -677,7 +689,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
crate::util::validate_filename(file_name)?;
seq_write_pxar_entry_zero(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_FILENAME,
file_name,
&mut self.state.write_position,
@@ -685,10 +697,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
.await
}
- pub async fn finish(mut self) -> io::Result<T> {
+ pub async fn finish(mut self) -> io::Result<()> {
let tail_bytes = self.finish_goodbye_table().await?;
seq_write_pxar_entry(
- self.output.as_mut().unwrap(),
+ self.output.as_mut(),
format::PXAR_GOODBYE,
&tail_bytes,
&mut self.state.write_position,
@@ -713,11 +725,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
});
}
self.finished = true;
- Ok(self.output.take().unwrap())
- }
-
- pub fn into_writer(mut self) -> T {
- self.output.take().unwrap()
+ Ok(())
}
async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
@@ -764,8 +772,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
/// Writer for a file object in a directory.
-pub struct FileImpl<'a> {
- output: &'a mut dyn SeqWrite,
+pub struct FileImpl<'a, S: SeqWrite> {
+ output: &'a mut S,
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
/// directly instead of on Drop of FileImpl?
@@ -780,7 +788,7 @@ pub struct FileImpl<'a> {
parent: &'a mut EncoderState,
}
-impl<'a> Drop for FileImpl<'a> {
+impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
fn drop(&mut self) {
if self.remaining_size != 0 {
self.parent.add_error(EncodeError::IncompleteFile);
@@ -790,7 +798,7 @@ impl<'a> Drop for FileImpl<'a> {
}
}
-impl<'a> FileImpl<'a> {
+impl<'a, S: SeqWrite> FileImpl<'a, S> {
/// Get the file offset to be able to reference it with `add_hardlink`.
pub fn file_offset(&self) -> LinkOffset {
LinkOffset(self.goodbye_item.offset)
@@ -828,7 +836,7 @@ impl<'a> FileImpl<'a> {
#[cfg(feature = "tokio-io")]
pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe {
- self.map_unchecked_mut(|this| &mut this.output)
+ self.map_unchecked_mut(|this| this.output)
.poll_flush(cx)
}
}
@@ -840,7 +848,7 @@ impl<'a> FileImpl<'a> {
#[cfg(feature = "tokio-io")]
pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe {
- self.map_unchecked_mut(|this| &mut this.output)
+ self.map_unchecked_mut(|this| this.output)
.poll_flush(cx)
}
}
@@ -864,14 +872,14 @@ impl<'a> FileImpl<'a> {
/// Completely write file data for the current file entry in a pxar archive.
pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
self.check_remaining(data.len())?;
- seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
+ seq_write_all(self.output, data, &mut self.parent.write_position).await?;
self.remaining_size -= data.len() as u64;
Ok(())
}
}
#[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
FileImpl::poll_write(self, cx, buf)
}
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 9ace8bf..859714d 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -52,7 +52,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
/// not allowed to use the `Waker`, as this will cause a `panic!`.
pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
Ok(Self {
- inner: poll_result_once(encoder::EncoderImpl::new(output, metadata))?,
+ inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
})
}
@@ -64,7 +64,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
metadata: &Metadata,
file_name: P,
file_size: u64,
- ) -> io::Result<File<'b>>
+ ) -> io::Result<File<'b, T>>
where
'a: 'b,
{
@@ -95,29 +95,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
- pub fn create_directory<'b, P: AsRef<Path>>(
- &'b mut self,
+ pub fn create_directory<P: AsRef<Path>>(
+ &mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
- where
- 'a: 'b,
- {
+ ) -> io::Result<Encoder<'_, T>> {
Ok(Encoder {
inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
})
}
/// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub fn finish(self) -> io::Result<T> {
+ pub fn finish(self) -> io::Result<()> {
poll_result_once(self.inner.finish())
}
- /// Cancel this directory and get back the contained writer.
- pub fn into_writer(self) -> T {
- self.inner.into_writer()
- }
-
/// Add a symbolic link to the archive.
pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
&mut self,
@@ -174,18 +166,18 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
}
#[repr(transparent)]
-pub struct File<'a> {
- inner: encoder::FileImpl<'a>,
+pub struct File<'a, S: SeqWrite> {
+ inner: encoder::FileImpl<'a, S>,
}
-impl<'a> File<'a> {
+impl<'a, S: SeqWrite> File<'a, S> {
/// Get the file offset to be able to reference it with `add_hardlink`.
pub fn file_offset(&self) -> LinkOffset {
self.inner.file_offset()
}
}
-impl<'a> io::Write for File<'a> {
+impl<'a, S: SeqWrite> io::Write for File<'a, S> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
poll_result_once(self.inner.write(data))
}
--
2.20.1
More information about the pbs-devel
mailing list