[pbs-devel] [PATCH proxmox-backup 02/15] broadcast_future: refactor broadcast/future binding

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon Jan 25 14:42:47 CET 2021


into its own, private struct.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 src/tools/broadcast_future.rs | 42 +++++++++++++++--------------------
 1 file changed, 18 insertions(+), 24 deletions(-)

diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs
index 94aedf18..88b7aaab 100644
--- a/src/tools/broadcast_future.rs
+++ b/src/tools/broadcast_future.rs
@@ -62,14 +62,16 @@ impl <T: Clone> BroadcastData<T> {
     }
 }
 
+type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
+
+struct BroadCastFutureBinding<T> {
+    broadcast: BroadcastData<T>,
+    future: Option<SourceFuture<T>>,
+}
+
 /// Broadcast future results to registered listeners
 pub struct BroadcastFuture<T> {
-    inner: Arc<
-        Mutex<(
-            BroadcastData<T>,
-            Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
-        )>,
-    >,
+    inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
 }
 
 impl<T: Clone + Send + 'static> BroadcastFuture<T> {
@@ -77,7 +79,11 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
     ///
     /// The result of the future is sent to all registered listeners.
     pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
-        Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) }
+        let inner = BroadCastFutureBinding {
+            broadcast: BroadcastData::new(),
+            future: Some(Pin::from(source)),
+        };
+        Self { inner: Arc::new(Mutex::new(inner)) }
     }
 
     /// Creates a new instance with a oneshot channel as trigger
@@ -92,29 +98,17 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
     }
 
     fn notify_listeners(
-        inner: Arc<
-            Mutex<(
-                BroadcastData<T>,
-                Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
-            )>,
-        >,
+        inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
         result: Result<T, String>,
     ) {
         let mut data = inner.lock().unwrap();
-        data.0.notify_listeners(result);
+        data.broadcast.notify_listeners(result);
     }
 
-    fn spawn(
-        inner: Arc<
-            Mutex<(
-                BroadcastData<T>,
-                Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
-            )>,
-        >,
-    ) -> impl Future<Output = Result<T, Error>> {
+    fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
         let mut data = inner.lock().unwrap();
 
-        if let Some(source) = data.1.take() {
+        if let Some(source) = data.future.take() {
 
             let inner1 = inner.clone();
 
@@ -127,7 +121,7 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
             tokio::spawn(task);
         }
 
-        data.0.listen()
+        data.broadcast.listen()
     }
 
     /// Register a listener
-- 
2.20.1






More information about the pbs-devel mailing list