[pbs-devel] [PATCH proxmox-backup] ParallelHandler: check for errors during thread join

Stefan Reiter s.reiter at proxmox.com
Thu Oct 1 11:38:42 CEST 2020


Fix a potential bug where errors that happen after the SendHandle has
been dropped while doing the thread join might have been ignored.
Requires internal check_abort to be moved out of 'impl SendHandle' since
we only have the Mutex left, not the SendHandle.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---

Extracted from previous RFC series.

 src/tools/parallel_handler.rs | 26 +++++++++++++++-----------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..185ac2fc 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -10,19 +10,19 @@ pub struct SendHandle<I> {
     abort: Arc<Mutex<Option<String>>>,
 }
 
-impl<I: Send> SendHandle<I> {
-    /// Returns the first error happened, if any
-    pub fn check_abort(&self) -> Result<(), Error> {
-        let guard = self.abort.lock().unwrap();
-        if let Some(err_msg) = &*guard {
-            return Err(format_err!("{}", err_msg));
-        }
-        Ok(())
+/// Returns the first error happened, if any
+pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
+    let guard = abort.lock().unwrap();
+    if let Some(err_msg) = &*guard {
+        return Err(format_err!("{}", err_msg));
     }
+    Ok(())
+}
 
+impl<I: Send> SendHandle<I> {
     /// Send data to the worker threads
     pub fn send(&self, input: I) -> Result<(), Error> {
-        self.check_abort()?;
+        check_abort(Arc::clone(&self.abort))?;
         match self.input.send(input) {
             Ok(()) => Ok(()),
             Err(_) => bail!("send failed - channel closed"),
@@ -121,12 +121,16 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
 
     /// Wait for worker threads to complete and check for errors
     pub fn complete(mut self) -> Result<(), Error> {
-        self.input.as_ref().unwrap().check_abort()?;
-        drop(self.input.take());
+        let input = self.input.take().unwrap();
+        let abort = Arc::clone(&input.abort);
+        check_abort(Arc::clone(&abort))?;
+        drop(input);
 
         let msg_list = self.join_threads();
 
         if msg_list.is_empty() {
+            // an error might be encountered while waiting for the join
+            check_abort(abort)?;
             return Ok(());
         }
         Err(format_err!("{}", msg_list.join("\n")))
-- 
2.20.1






More information about the pbs-devel mailing list