[pbs-devel] [PATCH pxar 01/22] decoder/aio: add contents() and content_size() calls

Stefan Reiter s.reiter at proxmox.com
Tue Feb 16 18:06:49 CET 2021


Returns a tokio AsyncRead implementation for its "Contents" to keep with
the aio theme.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/decoder/aio.rs | 43 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 42 insertions(+), 1 deletion(-)

diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
index 82030b0..5cc6694 100644
--- a/src/decoder/aio.rs
+++ b/src/decoder/aio.rs
@@ -56,6 +56,18 @@ impl<T: SeqRead> Decoder<T> {
         self.inner.next_do().await.transpose()
     }
 
+    /// Get a reader for the contents of the current entry, if the entry has contents.
+    /// Only available for feature "tokio-io", since it returns an AsyncRead reader.
+    #[cfg(feature = "tokio-io")]
+    pub fn contents(&mut self) -> Option<Contents<T>> {
+        self.inner.content_reader().map(|inner| Contents { inner })
+    }
+
+    /// Get the size of the current contents, if the entry has contents.
+    pub fn content_size(&self) -> Option<u64> {
+        self.inner.content_size()
+    }
+
     /// Include goodbye tables in iteration.
     pub fn enable_goodbye_entries(&mut self, on: bool) {
         self.inner.with_goodbye_tables = on;
@@ -93,7 +105,36 @@ mod tok {
             }
         }
     }
+
+    pub struct Contents<'a, T: crate::decoder::SeqRead> {
+        pub(crate) inner: crate::decoder::Contents<'a, T>,
+    }
+
+    impl<'a, T: crate::decoder::SeqRead> tokio::io::AsyncRead for Contents<'a, T> {
+        fn poll_read(
+            self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            buf: &mut tokio::io::ReadBuf<'_>,
+        ) -> Poll<io::Result<()>> {
+            unsafe {
+                // Safety: poll_seq_read will only write to the buffer, so we don't need to
+                // initialize it first, we can treat is a &[u8] immediately as long as we uphold
+                // the ReadBuf invariants in the conditional below
+                let write_buf =
+                    &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+                let result = self
+                    .map_unchecked_mut(|this| &mut this.inner as &mut dyn crate::decoder::SeqRead)
+                    .poll_seq_read(cx, write_buf);
+                if let Poll::Ready(Ok(n)) = result {
+                    // if we've written data, advance both initialized and filled bytes cursor
+                    buf.assume_init(buf.filled().len() + n);
+                    buf.advance(n);
+                }
+                result.map(|_| Ok(()))
+            }
+        }
+    }
 }
 
 #[cfg(feature = "tokio-io")]
-use tok::TokioReader;
+use tok::{Contents, TokioReader};
-- 
2.20.1






More information about the pbs-devel mailing list