[pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted
Stefan Reiter
s.reiter at proxmox.com
Wed Sep 30 16:16:00 CEST 2020
We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.
Also abort on drop, if the caller wants to wait for completion, he has
to call complete().
Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
it doesn't matter if that happens a little earlier and some chunks are
left unwritten
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
pub struct SendHandle<I> {
input: Sender<I>,
abort: Arc<Mutex<Option<String>>>,
+ aborted: Arc<AtomicBool>,
}
/// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
Self {
input: self.input.clone(),
abort: Arc::clone(&self.abort),
+ aborted: Arc::clone(&self.aborted),
}
}
}
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
};
let abort = Arc::new(Mutex::new(None));
+ let aborted = Arc::new(AtomicBool::new(false));
for i in 0..threads {
let input_rx = input_rx.clone();
let abort = Arc::clone(&abort);
+ let aborted = Arc::clone(&aborted);
// Erase the 'a lifetime bound. This is safe because we
// join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
std::thread::Builder::new()
.name(format!("{} ({})", name, i))
.spawn(move || loop {
+ if aborted.load(Ordering::Acquire) {
+ // some other thread aborted, exit early
+ return;
+ }
let data = match input_rx.recv() {
Ok(data) => data,
Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
match (handler_fn)(data) {
Ok(()) => (),
Err(err) => {
+ aborted.store(true, Ordering::Release);
let mut guard = abort.lock().unwrap();
if guard.is_none() {
*guard = Some(err.to_string());
}
+ return;
}
}
})
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
input: Some(SendHandle {
input: input_tx,
abort,
+ aborted,
}),
_marker: std::marker::PhantomData,
}
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
}
- /// Wait for worker threads to complete and check for errors
+ /// Wait for worker threads to complete and check for errors.
+ /// Only this ensures completion. Dropping the instance aborts instead.
pub fn complete(mut self) -> Result<(), Error> {
let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
// Note: We make sure that all threads will be joined
impl<'a, I> Drop for ParallelHandler<'a, I> {
fn drop(&mut self) {
- drop(self.input.take());
+ let input = self.input.take();
+ if let Some(input) = &input {
+ // shut down ASAP
+ input.aborted.store(true, Ordering::Release);
+ }
+ drop(input);
while let Some(handle) = self.handles.pop() {
let _ = handle.join();
}
--
2.20.1
More information about the pbs-devel
mailing list