[pbs-devel] [PATCH proxmox-backup v3 1/1] tape: introduce a tape backup job worker thread option
Dominik Csapak
d.csapak at proxmox.com
Fri Feb 21 16:06:31 CET 2025
Using a single thread for reading is not optimal in some cases, e.g.
when the underlying storage can handle reads from multiple threads in
parallel.
We use the ParallelHandler to handle the actual reads. Make the
sync_channel buffer size depend on the number of threads so we have
space for two chunks per thread. (But keep the minimum to 3 like
before).
How this impacts the backup speed largely depends on the underlying
storage and how the backup is laid out on it.
I benchmarked the following setups:
* Setup A: relatively spread out backup on a virtualized pbs on single HDDs
* Setup B: mostly sequential chunks on a virtualized pbs on single HDDs
* Setup C: backup on virtualized pbs on a fast NVME
* Setup D: backup on bare metal pbs with ZFS in a RAID10 with 6 HDDs
and 2 fast special devices in a mirror
(values are reported in MB/s as seen in the task log, caches were
cleared between runs, backups were bigger than the memory available)
setup 1 thread 2 threads 4 threads 8 threads
A 55 70 80 95
B 110 89 100 108
C 294 294 294 294
D 118 180 300 300
So there are cases where multiple read threads speed up the tape backup
(dramatically). On the other hand there are situations where reading
from a single thread is actually faster, probably because we can read
from the HDD sequentially.
I left the default value of '1' to not change the default behavior.
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
changes from v2:
* move the ui config to the tape backup jobs and tape backup window
* use pool writer to store the thread count, not datastore
* keep minimum of channel size 3
* keep default of one thread
i left the benchmark data intact, since the actual code that does
the multithreading is the same as before, and i could not find a
virtual setup to replicate the performance of a hdd very well
(limiting virtual disks iops does not really do the trick due to
disk caching, etc.)
If wanted i can of course setup a physical testbed with multiple hdds
again.
src/api2/config/tape_backup_job.rs | 8 ++++
src/api2/tape/backup.rs | 4 ++
src/tape/pool_writer/mod.rs | 14 ++++++-
src/tape/pool_writer/new_chunks_iterator.rs | 44 +++++++++++++--------
www/tape/window/TapeBackup.js | 12 ++++++
www/tape/window/TapeBackupJob.js | 14 +++++++
6 files changed, 79 insertions(+), 17 deletions(-)
diff --git a/src/api2/config/tape_backup_job.rs b/src/api2/config/tape_backup_job.rs
index b6db92998..f2abf652b 100644
--- a/src/api2/config/tape_backup_job.rs
+++ b/src/api2/config/tape_backup_job.rs
@@ -140,6 +140,8 @@ pub enum DeletableProperty {
MaxDepth,
/// Delete the 'ns' property
Ns,
+ /// Delete the 'worker-threads' property
+ WorkerThreads,
}
#[api(
@@ -222,6 +224,9 @@ pub fn update_tape_backup_job(
DeletableProperty::Ns => {
data.setup.ns = None;
}
+ DeletableProperty::WorkerThreads => {
+ data.setup.worker_threads = None;
+ }
}
}
}
@@ -260,6 +265,9 @@ pub fn update_tape_backup_job(
if update.setup.max_depth.is_some() {
data.setup.max_depth = update.setup.max_depth;
}
+ if update.setup.worker_threads.is_some() {
+ data.setup.worker_threads = update.setup.worker_threads;
+ }
let schedule_changed = data.schedule != update.schedule;
if update.schedule.is_some() {
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a01897..31293a9a9 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -387,6 +387,10 @@ fn backup_worker(
ns_magic,
)?;
+ if let Some(threads) = setup.worker_threads {
+ pool_writer.set_read_thread_count(threads as usize);
+ }
+
let mut group_list = Vec::new();
let namespaces = datastore.recursive_iter_backup_ns_ok(root_namespace, setup.max_depth)?;
for ns in namespaces {
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 3114ec061..540844212 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -56,6 +56,7 @@ pub struct PoolWriter {
notification_mode: TapeNotificationMode,
ns_magic: bool,
used_tapes: HashSet<Uuid>,
+ read_threads: usize,
}
impl PoolWriter {
@@ -93,9 +94,15 @@ impl PoolWriter {
notification_mode,
ns_magic,
used_tapes: HashSet::new(),
+ read_threads: 1,
})
}
+ /// Set the read threads to use when writing a backup to tape
+ pub fn set_read_thread_count(&mut self, read_threads: usize) {
+ self.read_threads = read_threads;
+ }
+
pub fn pool(&mut self) -> &mut MediaPool {
&mut self.pool
}
@@ -541,7 +548,12 @@ impl PoolWriter {
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
- NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
+ NewChunksIterator::spawn(
+ datastore,
+ snapshot_reader,
+ Arc::clone(&self.catalog_set),
+ self.read_threads,
+ )
}
pub(crate) fn catalog_version(&self) -> [u8; 8] {
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 1454b33d2..de847b3c9 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -6,8 +6,9 @@ use anyhow::{format_err, Error};
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
+use crate::tools::parallel_handler::ParallelHandler;
-/// Chunk iterator which use a separate thread to read chunks
+/// Chunk iterator which uses separate threads to read chunks
///
/// The iterator skips duplicate chunks and chunks already in the
/// catalog.
@@ -24,8 +25,11 @@ impl NewChunksIterator {
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
catalog_set: Arc<Mutex<CatalogSet>>,
+ read_threads: usize,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
- let (tx, rx) = std::sync::mpsc::sync_channel(3);
+ // use twice the threadcount for the channel, so the read thread can already send another
+ // one when the previous one was not consumed yet, but keep the minimum at 3
+ let (tx, rx) = std::sync::mpsc::sync_channel((read_threads * 2).max(3));
let reader_thread = std::thread::spawn(move || {
let snapshot_reader = snapshot_reader.lock().unwrap();
@@ -35,36 +39,44 @@ impl NewChunksIterator {
let datastore_name = snapshot_reader.datastore_name().to_string();
let result: Result<(), Error> = proxmox_lang::try_block!({
- let mut chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
+ let chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
catalog_set
.lock()
.unwrap()
.contains_chunk(&datastore_name, digest)
})?;
- loop {
- let digest = match chunk_iter.next() {
- None => {
- let _ = tx.send(Ok(None)); // ignore send error
- break;
+ let reader_pool =
+ ParallelHandler::new("tape backup chunk reader pool", read_threads, {
+ let tx = tx.clone();
+ move |digest| {
+ let blob = datastore.load_chunk(&digest)?;
+ //println!("LOAD CHUNK {}", hex::encode(&digest));
+
+ tx.send(Ok(Some((digest, blob)))).map_err(|err| {
+ format_err!("error sending result from reader thread: {err}")
+ })?;
+
+ Ok(())
}
- Some(digest) => digest?,
- };
+ });
+
+ for digest in chunk_iter {
+ let digest = digest?;
if chunk_index.contains(&digest) {
continue;
}
- let blob = datastore.load_chunk(&digest)?;
- //println!("LOAD CHUNK {}", hex::encode(&digest));
- if let Err(err) = tx.send(Ok(Some((digest, blob)))) {
- eprintln!("could not send chunk to reader thread: {err}");
- break;
- }
+ reader_pool.send(digest)?;
chunk_index.insert(digest);
}
+ reader_pool.complete()?;
+
+ let _ = tx.send(Ok(None)); // ignore send error
+
Ok(())
});
if let Err(err) = result {
diff --git a/www/tape/window/TapeBackup.js b/www/tape/window/TapeBackup.js
index 7a45e388c..fc8be80e3 100644
--- a/www/tape/window/TapeBackup.js
+++ b/www/tape/window/TapeBackup.js
@@ -119,6 +119,18 @@ Ext.define('PBS.TapeManagement.TapeBackupWindow', {
renderer: Ext.String.htmlEncode,
},
],
+
+ advancedColumn1: [
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Worker Threads'),
+ emptyText: '1',
+ skipEmptyText: true,
+ minValue: 1,
+ maxValue: 32,
+ },
+ ],
},
],
diff --git a/www/tape/window/TapeBackupJob.js b/www/tape/window/TapeBackupJob.js
index 12623712a..24afb9fb6 100644
--- a/www/tape/window/TapeBackupJob.js
+++ b/www/tape/window/TapeBackupJob.js
@@ -216,6 +216,20 @@ Ext.define('PBS.TapeManagement.BackupJobEdit', {
},
},
],
+
+ advancedColumn1: [
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Worker Threads'),
+ emptyText: '1',
+ minValue: 1,
+ maxValue: 32,
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ },
+ ],
},
{
xtype: 'inputpanel',
--
2.39.5
More information about the pbs-devel
mailing list