[pbs-devel] [PATCH proxmox-backup 02/15] broadcast_future: refactor broadcast/future binding
Fabian Grünbichler
f.gruenbichler at proxmox.com
Mon Jan 25 14:42:47 CET 2021
into its own, private struct.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
src/tools/broadcast_future.rs | 42 +++++++++++++++--------------------
1 file changed, 18 insertions(+), 24 deletions(-)
diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs
index 94aedf18..88b7aaab 100644
--- a/src/tools/broadcast_future.rs
+++ b/src/tools/broadcast_future.rs
@@ -62,14 +62,16 @@ impl <T: Clone> BroadcastData<T> {
}
}
+type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
+
+struct BroadCastFutureBinding<T> {
+ broadcast: BroadcastData<T>,
+ future: Option<SourceFuture<T>>,
+}
+
/// Broadcast future results to registered listeners
pub struct BroadcastFuture<T> {
- inner: Arc<
- Mutex<(
- BroadcastData<T>,
- Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
- )>,
- >,
+ inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
}
impl<T: Clone + Send + 'static> BroadcastFuture<T> {
@@ -77,7 +79,11 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
///
/// The result of the future is sent to all registered listeners.
pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
- Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) }
+ let inner = BroadCastFutureBinding {
+ broadcast: BroadcastData::new(),
+ future: Some(Pin::from(source)),
+ };
+ Self { inner: Arc::new(Mutex::new(inner)) }
}
/// Creates a new instance with a oneshot channel as trigger
@@ -92,29 +98,17 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
}
fn notify_listeners(
- inner: Arc<
- Mutex<(
- BroadcastData<T>,
- Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
- )>,
- >,
+ inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
result: Result<T, String>,
) {
let mut data = inner.lock().unwrap();
- data.0.notify_listeners(result);
+ data.broadcast.notify_listeners(result);
}
- fn spawn(
- inner: Arc<
- Mutex<(
- BroadcastData<T>,
- Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
- )>,
- >,
- ) -> impl Future<Output = Result<T, Error>> {
+ fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
let mut data = inner.lock().unwrap();
- if let Some(source) = data.1.take() {
+ if let Some(source) = data.future.take() {
let inner1 = inner.clone();
@@ -127,7 +121,7 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
tokio::spawn(task);
}
- data.0.listen()
+ data.broadcast.listen()
}
/// Register a listener
--
2.20.1
More information about the pbs-devel
mailing list