[pbs-devel] [PATCH pxar 1/3] update to tokio 1.0
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Jan 12 14:58:28 CET 2021
unfortunately, futures::io::AsyncRead and tokio::io::AsyncRead no longer
share a do_poll_read signature, so we need to adapt one to the other
(and also no longer generate some wrapper implementations via macro).
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
Cargo.toml | 5 +--
src/accessor/aio.rs | 7 +--
src/decoder/aio.rs | 105 ++++++++++++++++++++++++--------------------
3 files changed, 64 insertions(+), 53 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 24b5489..875de7a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,7 +47,7 @@ siphasher = "0.3"
anyhow = { version = "1.0", optional = true }
futures = { version = "0.3.1", optional = true }
-tokio = { version = "0.2.10", optional = true, default-features = false }
+tokio = { version = "1.0", optional = true, default-features = false }
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"
@@ -65,8 +65,7 @@ async-example = [
"futures-io",
"tokio-io",
"tokio-fs",
- "tokio/rt-threaded",
- "tokio/io-driver",
+ "tokio/rt-multi-thread",
"tokio/macros",
]
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index a1aaa08..dd017ae 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -410,9 +410,10 @@ impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- Self::do_poll_read(self, cx, buf)
+ buf: &mut tokio::io::ReadBuf,
+ ) -> Poll<io::Result<()>> {
+ Self::do_poll_read(self, cx, &mut buf.initialize_unfilled())
+ .map_ok(|bytes| { buf.set_filled(bytes); () })
}
}
diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
index e7152b3..1a5f5ea 100644
--- a/src/decoder/aio.rs
+++ b/src/decoder/aio.rs
@@ -136,61 +136,72 @@ mod stream {
#[cfg(feature = "futures-io")]
pub use stream::DecoderStream;
-macro_rules! async_io_impl {
- (
- #[cfg( $($attr:tt)+ )]
- mod $mod:ident {
- $(#[$docs:meta])*
- $name:ident : $trait:path ;
- }
- ) => {
- #[cfg( $($attr)+ )]
- mod $mod {
- use std::io;
- use std::pin::Pin;
- use std::task::{Context, Poll};
-
- $(#[$docs])*
- pub struct $name<T> {
- inner: T,
- }
+#[cfg(feature = "futures-io")]
+mod fut {
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
- impl<T: $trait> $name<T> {
- pub fn new(inner: T) -> Self {
- Self { inner }
- }
- }
+ /// Read adapter for `futures::io::AsyncRead`
+ pub struct FuturesReader<T> {
+ inner: T,
+ }
- impl<T: $trait> crate::decoder::SeqRead for $name<T> {
- fn poll_seq_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- unsafe {
- self.map_unchecked_mut(|this| &mut this.inner)
- .poll_read(cx, buf)
- }
- }
+ impl<T: futures::io::AsyncRead> FuturesReader<T> {
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+ }
+
+ impl<T: futures::io::AsyncRead> crate::decoder::SeqRead for FuturesReader<T> {
+ fn poll_seq_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ unsafe {
+ self.map_unchecked_mut(|this| &mut this.inner)
+ .poll_read(cx, buf)
}
}
- #[cfg( $($attr)+ )]
- pub use $mod::$name;
}
}
-async_io_impl! {
- #[cfg(feature = "futures-io")]
- mod fut {
- /// Read adapter for `futures::io::AsyncRead`.
- FuturesReader : futures::io::AsyncRead;
+#[cfg(feature = "futures-io")]
+use fut::FuturesReader;
+
+#[cfg(feature = "tokio-io")]
+mod tok {
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ /// Read adapter for `futures::io::AsyncRead`
+ pub struct TokioReader<T> {
+ inner: T,
}
-}
-async_io_impl! {
- #[cfg(feature = "tokio-io")]
- mod tok {
- /// Read adapter for `tokio::io::AsyncRead`.
- TokioReader : tokio::io::AsyncRead;
+ impl<T: tokio::io::AsyncRead> TokioReader<T> {
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+ }
+
+ impl<T: tokio::io::AsyncRead> crate::decoder::SeqRead for TokioReader<T> {
+ fn poll_seq_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ let mut read_buf = tokio::io::ReadBuf::new(buf);
+ unsafe {
+ self.map_unchecked_mut(|this| &mut this.inner)
+ .poll_read(cx, &mut read_buf)
+ .map_ok(|_| read_buf.filled().len())
+ }
+ }
}
}
+
+#[cfg(feature = "tokio-io")]
+use tok::TokioReader;
--
2.20.1
More information about the pbs-devel
mailing list