[pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join
Stefan Reiter
s.reiter at proxmox.com
Wed Sep 30 15:25:20 CEST 2020
Enables outside functions to check if an error has occurred without
calling send. Also fix a potential bug where errors that happen after
the SendHandle has been dropped while doing the thread join might have
been ignored. Requires internal check_abort to be moved out of 'impl
SendHandle'.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/tools/parallel_handler.rs | 29 +++++++++++++++++++----------
1 file changed, 19 insertions(+), 10 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index b15fc046..7b92c23a 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -10,19 +10,19 @@ pub struct SendHandle<I> {
abort: Arc<Mutex<Option<String>>>,
}
-impl<I: Send> SendHandle<I> {
- /// Returns the first error happened, if any
- pub fn check_abort(&self) -> Result<(), Error> {
- let guard = self.abort.lock().unwrap();
- if let Some(err_msg) = &*guard {
- return Err(format_err!("{}", err_msg));
- }
- Ok(())
+/// Returns the first error happened, if any
+pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
}
+ Ok(())
+}
+impl<I: Send> SendHandle<I> {
/// Send data to the worker threads
pub fn send(&self, input: I) -> Result<(), Error> {
- self.check_abort()?;
+ check_abort(Arc::clone(&self.abort))?;
match self.input.send(input) {
Ok(()) => Ok(()),
Err(_) => bail!("send failed - channel closed"),
@@ -128,14 +128,23 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
Ok(())
}
+ /// Return Err if at least one invocation of the callback failed
+ /// Panics if complete() has been called on this instance
+ pub fn check_abort(&self) -> Result<(), Error> {
+ check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
+ }
+
/// Wait for worker threads to complete and check for errors
pub fn complete(mut self) -> Result<(), Error> {
- self.input.as_ref().unwrap().check_abort()?;
+ let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
+ check_abort(Arc::clone(&abort))?;
drop(self.input.take());
let msg_list = self.join_threads();
if msg_list.is_empty() {
+ // an error might be encountered while waiting for the join
+ check_abort(abort)?;
return Ok(());
}
Err(format_err!("{}", msg_list.join("\n")))
--
2.20.1
More information about the pbs-devel
mailing list