[pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted

Stefan Reiter s.reiter at proxmox.com
Wed Sep 30 15:25:21 CEST 2020


We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.

Also abort on drop, if the caller wants to wait for completion, he has
to call complete().

Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
  it doesn't matter if that happens a little earlier and some chunks are
  left unwritten

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
 pub struct SendHandle<I> {
     input: Sender<I>,
     abort: Arc<Mutex<Option<String>>>,
+    aborted: Arc<AtomicBool>,
 }
 
 /// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
         Self {
             input: self.input.clone(),
             abort: Arc::clone(&self.abort),
+            aborted: Arc::clone(&self.aborted),
         }
     }
 }
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         };
 
         let abort = Arc::new(Mutex::new(None));
+        let aborted = Arc::new(AtomicBool::new(false));
 
         for i in 0..threads {
             let input_rx = input_rx.clone();
             let abort = Arc::clone(&abort);
+            let aborted = Arc::clone(&aborted);
 
             // Erase the 'a lifetime bound. This is safe because we
             // join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                 std::thread::Builder::new()
                     .name(format!("{} ({})", name, i))
                     .spawn(move || loop {
+                        if aborted.load(Ordering::Acquire) {
+                            // some other thread aborted, exit early
+                            return;
+                        }
                         let data = match input_rx.recv() {
                             Ok(data) => data,
                             Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                         match (handler_fn)(data) {
                             Ok(()) => (),
                             Err(err) => {
+                                aborted.store(true, Ordering::Release);
                                 let mut guard = abort.lock().unwrap();
                                 if guard.is_none() {
                                     *guard = Some(err.to_string());
                                 }
+                                return;
                             }
                         }
                     })
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
             input: Some(SendHandle {
                 input: input_tx,
                 abort,
+                aborted,
             }),
             _marker: std::marker::PhantomData,
         }
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
     }
 
-    /// Wait for worker threads to complete and check for errors
+    /// Wait for worker threads to complete and check for errors.
+    /// Only this ensures completion. Dropping the instance aborts instead.
     pub fn complete(mut self) -> Result<(), Error> {
         let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
         check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
 // Note: We make sure that all threads will be joined
 impl<'a, I> Drop for ParallelHandler<'a, I> {
     fn drop(&mut self) {
-        drop(self.input.take());
+        let input = self.input.take();
+        if let Some(input) = &input {
+            // shut down ASAP
+            input.aborted.store(true, Ordering::Release);
+        }
+        drop(input);
         while let Some(handle) = self.handles.pop() {
             let _ = handle.join();
         }
-- 
2.20.1






More information about the pbs-devel mailing list