[pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode

Stefan Reiter s.reiter at proxmox.com
Wed Sep 30 16:15:58 CEST 2020


Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/backup/verify.rs          |  3 ++-
 src/client/pull.rs            |  3 ++-
 src/tools/parallel_handler.rs | 15 ++++++++++++---
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
             }
 
             Ok(())
-        }
+        },
+        true
     );
 
     for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
             chunk.verify_unencrypted(size as usize, &digest)?;
             target.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+       },
+       true
     );
 
     let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
 
 /// A handle to send data to the worker thread (implements clone)
 pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
 impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
     /// Create a new thread pool, each thread processing incoming data
     /// with 'handler_fn'.
-    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    pub fn new<F>(
+        name: &str,
+        threads: usize,
+        handler_fn: F,
+        bounded_mode: bool,
+    ) -> Self
         where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
     {
         let mut handles = Vec::new();
-        let (input_tx, input_rx) = bounded::<I>(threads);
+        let (input_tx, input_rx) = if bounded_mode {
+            bounded::<I>(threads)
+        } else {
+            unbounded::<I>()
+        };
 
         let abort = Arc::new(Mutex::new(None));
 
-- 
2.20.1






More information about the pbs-devel mailing list