[pbs-devel] [PATCH proxmox] async: runtime: Modernise module and update docs

Max Carrara m.carrara at proxmox.com
Mon Aug 21 13:37:45 CEST 2023


This commit updates all helper functions, taking into account recent
developments regarding `tokio`.

In particular, the `block_in_place()` and `block_on()` functions now
don't panic anymore if used within the single-threaded `tokio` runtime
and instead behave as expected in both runtime flavours.

Furthermore, because `tokio` may add more runtime flavours in the
future, all helpers will now panic if used within an unsupported
runtime. This is to prevent unforeseen behavioural quirks and
interactions with `tokio` internals.

The above changes make `BlockingGuard` redundant; it is consequently
removed.

The documentation is also updated, describing the behaviour of the
helper functions and the purpose of the `runtime.rs` module in more
detail.

Signed-off-by: Max Carrara <m.carrara at proxmox.com>
---
 proxmox-async/src/runtime.rs | 222 +++++++++++++++++++++--------------
 1 file changed, 131 insertions(+), 91 deletions(-)

 NOTE: This patch is a follow-up to https://lists.proxmox.com/pipermail/pbs-devel/2023-August/006477.html
 but its changes are different enough for it not to be considered an
 actual v2 anymore.

diff --git a/proxmox-async/src/runtime.rs b/proxmox-async/src/runtime.rs
index 0fe9fae..efc1cd8 100644
--- a/proxmox-async/src/runtime.rs
+++ b/proxmox-async/src/runtime.rs
@@ -1,6 +1,37 @@
 //! Helpers for quirks of the current tokio runtime.
+//!
+//! It is preferred to use these helpers throughout our applications.
+//!
+//! # `tokio`, Runtime Flavors, and Panics
+//!
+//! Because [`tokio`] may introduce more [`RuntimeFlavor`s][RuntimeFlavor] in the future,
+//! we [`panic!`] on flavors we're not (yet) explicitly supporting.
+//!
+//! This is done for forward-compatibility's sake in order to prevent unforeseen
+//! interactions with [`tokio`], such as with [`tokio::task::block_in_place`],
+//! which [`panic!`s][panic!] *only* if called within a [`CurrentThread`][ct-rt]-flavored
+//! runtime, but not in a [`MultiThread`][mt-rt]-flavored runtime or if there's
+//! *no runtime* at all.
+//!
+//! All [`panic!`s][panic!] can otherwise be either avoided or caught early by instantiating
+//! your runtime with [`get_runtime()`] or [`get_runtime_with_builder()`]. Or, if you're
+//! creating a separate async application, use [`main()`] for convenience.
+//!
+//! ## Supported [`RuntimeFlavor`s][RuntimeFlavor]
+//!
+//! * [`RuntimeFlavor::MultiThread`]
+//! * [`RuntimeFlavor::CurrentThread`]
+//!
+//! # [`tokio`] and OpenSSL
+//!
+//! There's a nasty [OpenSSL bug][openssl-bug] causing a race between OpenSSL clean-up handlers
+//! and the [`tokio`] runtime. This however is handled by [`get_runtime_with_builder()`]
+//! and thus also within [`get_runtime()`] and our [`main()`] wrapper.
+//!
+//! [ct-rt]: RuntimeFlavor::CurrentThread
+//! [mt-rt]: RuntimeFlavor::MultiThread
+//! [openssl-bug]: https://github.com/openssl/openssl/issues/6214

-use std::cell::RefCell;
 use std::future::Future;
 use std::sync::{Arc, Mutex, Weak};
 use std::task::{Context, Poll, Waker};
@@ -8,39 +39,7 @@ use std::thread::{self, Thread};

 use lazy_static::lazy_static;
 use pin_utils::pin_mut;
-use tokio::runtime::{self, Runtime};
-
-thread_local! {
-    static BLOCKING: RefCell<bool> = RefCell::new(false);
-}
-
-fn is_in_tokio() -> bool {
-    tokio::runtime::Handle::try_current().is_ok()
-}
-
-fn is_blocking() -> bool {
-    BLOCKING.with(|v| *v.borrow())
-}
-
-struct BlockingGuard(bool);
-
-impl BlockingGuard {
-    fn set() -> Self {
-        Self(BLOCKING.with(|v| {
-            let old = *v.borrow();
-            *v.borrow_mut() = true;
-            old
-        }))
-    }
-}
-
-impl Drop for BlockingGuard {
-    fn drop(&mut self) {
-        BLOCKING.with(|v| {
-            *v.borrow_mut() = self.0;
-        });
-    }
-}
+use tokio::runtime::{self, Runtime, RuntimeFlavor};

 lazy_static! {
     // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
@@ -53,14 +52,28 @@ extern "C" {
     fn OPENSSL_thread_stop();
 }

-/// Get or create the current main tokio runtime.
+#[inline]
+fn panic_on_bad_flavor(runtime: &runtime::Runtime) {
+    match runtime.handle().runtime_flavor() {
+        RuntimeFlavor::CurrentThread => (),
+        RuntimeFlavor::MultiThread => (),
+        bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+    }
+}
+
+/// Get or build the current main [`tokio`] [`Runtime`]. Useful if [`tokio`'s][tokio] defaults
+/// don't suit your needs.
 ///
-/// This makes sure that tokio's worker threads are marked for us so that we know whether we
-/// can/need to use `block_in_place` in our `block_on` helper.
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [mod]: self
 pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
     let mut guard = RUNTIME.lock().unwrap();

     if let Some(rt) = guard.upgrade() {
+        panic_on_bad_flavor(&rt);
         return rt;
     }

@@ -74,6 +87,8 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
     });

     let runtime = builder.build().expect("failed to spawn tokio runtime");
+    panic_on_bad_flavor(&runtime);
+
     let rt = Arc::new(runtime);

     *guard = Arc::downgrade(&rt);
@@ -81,9 +96,12 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
     rt
 }

-/// Get or create the current main tokio runtime.
+/// Get or create the current main [`tokio`] [`Runtime`].
+///
+/// This is a convenience wrapper around [`get_runtime_with_builder()`] using
+/// [`tokio`'s multithreaded runtime][mt-rt-meth].
 ///
-/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
+/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
 pub fn get_runtime() -> Arc<Runtime> {
     get_runtime_with_builder(|| {
         let mut builder = runtime::Builder::new_multi_thread();
@@ -93,67 +111,89 @@ pub fn get_runtime() -> Arc<Runtime> {
 }

 /// Block on a synchronous piece of code.
-pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
-    // don't double-exit the context (tokio doesn't like that)
-    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
-    if is_blocking() || !is_in_tokio() {
-        fut()
+///
+/// This is a wrapper around [`tokio::task::block_in_place()`] that allows to
+/// block the current thread even within a [`Runtime`] with [`RuntimeFlavor::CurrentThread`].
+///
+/// Normally, [tokio's `block_in_place()`][bip] [`panic`s][panic] when called in
+/// such a case; this function instead just runs the piece of code right away, preventing
+/// an unforeseen panic.
+///
+/// # Note
+/// If you're in a [`CurrentThread`][RuntimeFlavor::CurrentThread] runtime and you
+/// *really* need to execute a bunch of blocking code, you might want to consider
+/// executing that code with [`tokio::task::spawn_blocking()`] instead. This prevents
+/// blocking the single-threaded runtime and still allows you to communicate via channels.
+///
+/// See [tokio's documentation on CPU-bound tasks and blocking code][tok-block-doc]
+/// for more information.
+///
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [bip]: tokio::task::block_in_place()
+/// [mod]: self
+/// [sp]: tokio::task::spawn_blocking()
+/// [tok-block-doc]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
+pub fn block_in_place<R>(func: impl FnOnce() -> R) -> R {
+    if let Ok(runtime) = runtime::Handle::try_current() {
+        match runtime.runtime_flavor() {
+            RuntimeFlavor::CurrentThread => func(),
+            RuntimeFlavor::MultiThread => tokio::task::block_in_place(func),
+            bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+        }
     } else {
-        // we are in an actual tokio worker thread, block it:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            fut()
-        })
+        func()
     }
 }

-/// Block on a future in this thread.
-pub fn block_on<F: Future>(fut: F) -> F::Output {
-    // don't double-exit the context (tokio doesn't like that)
-    if is_blocking() {
-        block_on_local_future(fut)
-    } else if is_in_tokio() {
-        // inside a tokio worker we need to tell tokio that we're about to really block:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            block_on_local_future(fut)
-        })
+/// Block on a future in the current thread.
+///
+/// Not to be confused with [`tokio::runtime::Handle::block_on()`] and
+/// [`tokio::runtime::Runtime::block_on()`].
+///
+/// This will prevent other futures from running in the current thread in the meantime.
+/// Essentially, this is [`block_in_place()`], but for [`Future`s][Future] instead of functions.
+///
+/// If there's no runtime currently active, this function will create a temporary one
+/// using [`get_runtime()`] in order to block on and finish running the provided [`Future`].
+///
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [mod]: self
+pub fn block_on<F: Future>(future: F) -> F::Output {
+    if let Ok(runtime) = runtime::Handle::try_current() {
+        match runtime.runtime_flavor() {
+            RuntimeFlavor::CurrentThread => block_on_local_future(future),
+            RuntimeFlavor::MultiThread => {
+                tokio::task::block_in_place(move || block_on_local_future(future))
+            }
+            bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+        }
     } else {
-        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
-        // it on demand if necessary), then enter it
-        let _guard = BlockingGuard::set();
-        let _enter_guard = get_runtime().enter();
-        get_runtime().block_on(fut)
-    }
-}
-
-/*
-fn block_on_impl<F>(mut fut: F) -> F::Output
-where
-    F: Future + Send,
-    F::Output: Send + 'static,
-{
-    let (tx, rx) = tokio::sync::oneshot::channel();
-    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
-    tokio::spawn(async move {
-        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
-        tx
-            .send(fut.await)
-            .map_err(drop)
-            .expect("failed to send block_on result to channel")
-    });
+        let runtime = get_runtime();
+        let _enter_guard = runtime.enter();

-    futures::executor::block_on(async move {
-        rx.await.expect("failed to receive block_on result from channel")
-    })
-    std::mem::forget(fut);
+        runtime.block_on(future)
+    }
 }
-*/

-/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
-/// compatibility, which will perform all the necessary tasks on-demand anyway.
+/// This is our [`tokio`] entrypoint, which blocks on the provided [`Future`]
+/// until it's completed, using [`tokio`'s multithreaded runtime][mt-rt-meth].
+///
+/// It is preferred to use this function over other ways of instantiating a runtime.
+/// See the [module level][mod] documentation for more information.
+///
+/// [mod]: self
+/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
 pub fn main<F: Future>(fut: F) -> F::Output {
-    block_on(fut)
+    let runtime = get_runtime();
+    let _enter_guard = runtime.enter();
+
+    runtime.block_on(fut)
 }

 struct ThreadWaker(Thread);
--
2.39.2






More information about the pbs-devel mailing list