[pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode
Stefan Reiter
s.reiter at proxmox.com
Wed Sep 30 16:15:58 CEST 2020
Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/backup/verify.rs | 3 ++-
src/client/pull.rs | 3 ++-
src/tools/parallel_handler.rs | 15 ++++++++++++---
3 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
}
Ok(())
- }
+ },
+ true
);
for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
chunk.verify_unencrypted(size as usize, &digest)?;
target.insert_chunk(&chunk, &digest)?;
Ok(())
- }
+ },
+ true
);
let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
/// A handle to send data to the worker thread (implements clone)
pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
/// Create a new thread pool, each thread processing incoming data
/// with 'handler_fn'.
- pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ pub fn new<F>(
+ name: &str,
+ threads: usize,
+ handler_fn: F,
+ bounded_mode: bool,
+ ) -> Self
where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
{
let mut handles = Vec::new();
- let (input_tx, input_rx) = bounded::<I>(threads);
+ let (input_tx, input_rx) = if bounded_mode {
+ bounded::<I>(threads)
+ } else {
+ unbounded::<I>()
+ };
let abort = Arc::new(Mutex::new(None));
--
2.20.1
More information about the pbs-devel
mailing list