[pbs-devel] [PATCH pxar 1/3] update to tokio 1.0

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Jan 12 14:58:28 CET 2021


unfortunately, futures::io::AsyncRead and tokio::io::AsyncRead no longer
share a do_poll_read signature, so we need to adapt one to the other
(and also no longer generate some wrapper implementations via macro).

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 Cargo.toml          |   5 +--
 src/accessor/aio.rs |   7 +--
 src/decoder/aio.rs  | 105 ++++++++++++++++++++++++--------------------
 3 files changed, 64 insertions(+), 53 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 24b5489..875de7a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@ siphasher = "0.3"
 
 anyhow = { version = "1.0", optional = true }
 futures = { version = "0.3.1", optional = true }
-tokio = { version = "0.2.10", optional = true, default-features = false }
+tokio = { version = "1.0", optional = true, default-features = false }
 
 [target.'cfg(target_os = "linux")'.dependencies]
 libc = "0.2"
@@ -65,8 +65,7 @@ async-example = [
     "futures-io",
     "tokio-io",
     "tokio-fs",
-    "tokio/rt-threaded",
-    "tokio/io-driver",
+    "tokio/rt-multi-thread",
     "tokio/macros",
 ]
 
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index a1aaa08..dd017ae 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -410,9 +410,10 @@ impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> {
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<io::Result<usize>> {
-        Self::do_poll_read(self, cx, buf)
+        buf: &mut tokio::io::ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        Self::do_poll_read(self, cx, &mut buf.initialize_unfilled())
+            .map_ok(|bytes| { buf.set_filled(bytes); () })
     }
 }
 
diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
index e7152b3..1a5f5ea 100644
--- a/src/decoder/aio.rs
+++ b/src/decoder/aio.rs
@@ -136,61 +136,72 @@ mod stream {
 #[cfg(feature = "futures-io")]
 pub use stream::DecoderStream;
 
-macro_rules! async_io_impl {
-    (
-        #[cfg( $($attr:tt)+ )]
-        mod $mod:ident {
-            $(#[$docs:meta])*
-            $name:ident : $trait:path ;
-        }
-    ) => {
-        #[cfg( $($attr)+ )]
-        mod $mod {
-            use std::io;
-            use std::pin::Pin;
-            use std::task::{Context, Poll};
-
-            $(#[$docs])*
-            pub struct $name<T> {
-                inner: T,
-            }
+#[cfg(feature = "futures-io")]
+mod fut {
+    use std::io;
+    use std::pin::Pin;
+    use std::task::{Context, Poll};
 
-            impl<T: $trait> $name<T> {
-                pub fn new(inner: T) -> Self {
-                    Self { inner }
-                }
-            }
+    /// Read adapter for `futures::io::AsyncRead`
+    pub struct FuturesReader<T> {
+        inner: T,
+    }
 
-            impl<T: $trait> crate::decoder::SeqRead for $name<T> {
-                fn poll_seq_read(
-                    self: Pin<&mut Self>,
-                    cx: &mut Context,
-                    buf: &mut [u8],
-                ) -> Poll<io::Result<usize>> {
-                    unsafe {
-                        self.map_unchecked_mut(|this| &mut this.inner)
-                            .poll_read(cx, buf)
-                    }
-                }
+    impl<T: futures::io::AsyncRead> FuturesReader<T> {
+        pub fn new(inner: T) -> Self {
+            Self { inner }
+        }
+    }
+
+    impl<T: futures::io::AsyncRead> crate::decoder::SeqRead for FuturesReader<T> {
+        fn poll_seq_read(
+            self: Pin<&mut Self>,
+            cx: &mut Context,
+            buf: &mut [u8],
+        ) -> Poll<io::Result<usize>> {
+            unsafe {
+                self.map_unchecked_mut(|this| &mut this.inner)
+                    .poll_read(cx, buf)
             }
         }
-        #[cfg( $($attr)+ )]
-        pub use $mod::$name;
     }
 }
 
-async_io_impl! {
-    #[cfg(feature = "futures-io")]
-    mod fut {
-        /// Read adapter for `futures::io::AsyncRead`.
-        FuturesReader : futures::io::AsyncRead;
+#[cfg(feature = "futures-io")]
+use fut::FuturesReader;
+
+#[cfg(feature = "tokio-io")]
+mod tok {
+    use std::io;
+    use std::pin::Pin;
+    use std::task::{Context, Poll};
+
+    /// Read adapter for `futures::io::AsyncRead`
+    pub struct TokioReader<T> {
+        inner: T,
     }
-}
 
-async_io_impl! {
-    #[cfg(feature = "tokio-io")]
-    mod tok {
-        /// Read adapter for `tokio::io::AsyncRead`.
-        TokioReader : tokio::io::AsyncRead;
+    impl<T: tokio::io::AsyncRead> TokioReader<T> {
+        pub fn new(inner: T) -> Self {
+            Self { inner }
+        }
+    }
+
+    impl<T: tokio::io::AsyncRead> crate::decoder::SeqRead for TokioReader<T> {
+        fn poll_seq_read(
+            self: Pin<&mut Self>,
+            cx: &mut Context,
+            buf: &mut [u8],
+        ) -> Poll<io::Result<usize>> {
+            let mut read_buf = tokio::io::ReadBuf::new(buf);
+            unsafe {
+                self.map_unchecked_mut(|this| &mut this.inner)
+                    .poll_read(cx, &mut read_buf)
+                    .map_ok(|_| read_buf.filled().len())
+            }
+        }
     }
 }
+
+#[cfg(feature = "tokio-io")]
+use tok::TokioReader;
-- 
2.20.1






More information about the pbs-devel mailing list