[pbs-devel] [RFC proxmox 1/2] add tools::future with TimeoutFutureExt

Stefan Reiter s.reiter at proxmox.com
Mon Dec 21 14:56:10 CET 2020


Implements shorthands to automatically cancel a long-running future
after a timeout is reached.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 proxmox/src/tools/future.rs | 48 +++++++++++++++++++++++++++++++++++++
 proxmox/src/tools/mod.rs    |  1 +
 2 files changed, 49 insertions(+)
 create mode 100644 proxmox/src/tools/future.rs

diff --git a/proxmox/src/tools/future.rs b/proxmox/src/tools/future.rs
new file mode 100644
index 0000000..476fd11
--- /dev/null
+++ b/proxmox/src/tools/future.rs
@@ -0,0 +1,48 @@
+//! Common extensions for Futures
+use anyhow::Error;
+use futures::future::{select, Either, FutureExt};
+use std::future::Future;
+use std::time::Duration;
+use tokio::time::delay_for;
+
+impl<T> TimeoutFutureExt for T where T: Future {}
+
+/// Implements a timeout for futures, automatically aborting them if the timeout runs out before
+/// the base future completes.
+pub trait TimeoutFutureExt: Future {
+    /// Returned Future returns 'None' in case the timeout was reached, otherwise the original
+    /// return value.
+    fn or_timeout<'a>(
+        self,
+        timeout: Duration,
+    ) -> Box<dyn Future<Output = Option<Self::Output>> + Unpin + Send + 'a>
+    where
+        Self: Sized + Unpin + Send + 'a,
+    {
+        let timeout_fut = delay_for(timeout);
+        Box::new(select(self, timeout_fut).map(|res| match res {
+            Either::Left((result, _)) => Some(result),
+            Either::Right(((), _)) => None,
+        }))
+    }
+
+    /// Returned Future returns either the original result, or `Err<err>` in case the timeout is
+    /// reached. Basically a shorthand to flatten a future that returns a `Result<_, Error>` with a
+    /// timeout. The base Future can return any kind of Error that can be made into an
+    /// `anyhow::Error`.
+    fn or_timeout_err<'a, O, E>(
+        self,
+        timeout: Duration,
+        err: Error,
+    ) -> Box<dyn Future<Output = Result<O, Error>> + Unpin + Send + 'a>
+    where
+        Self: Sized + Unpin + Send + 'a,
+        Self::Output: Into<Result<O, E>>,
+        E: Into<Error> + std::error::Error + Send + Sync + 'static,
+    {
+        Box::new(self.or_timeout(timeout).map(|res| match res {
+            Some(res) => res.into().map_err(Error::from),
+            None => Err(err),
+        }))
+    }
+}
diff --git a/proxmox/src/tools/mod.rs b/proxmox/src/tools/mod.rs
index ff3a720..5d1f46e 100644
--- a/proxmox/src/tools/mod.rs
+++ b/proxmox/src/tools/mod.rs
@@ -20,6 +20,7 @@ pub mod serde;
 pub mod time;
 pub mod uuid;
 pub mod vec;
+pub mod future;
 
 #[cfg(feature = "websocket")]
 pub mod websocket;
-- 
2.20.1






More information about the pbs-devel mailing list