[pbs-devel] parallelize restore.rs fn restore_image: problems in
Dominik Csapak
d.csapak at proxmox.com
Mon Dec 7 14:24:07 CET 2020
hi,
i looked a bit more at this today
first, it seems that you are not that familiar with rust and
especially async rust (tokio, etc), please correct me
if i am wrong. it is a complicated topic, so i would suggest that you
make yourself familiar with the concepts, rusts 'reference
documentation' can be found under
https://doc.rust-lang.org/book/
https://rust-lang.github.io/async-book/
and there are sure many other great resources out there.
ofc you can always ask here too, but it may take longer to explain.
On 12/7/20 3:51 AM, Niko Fellner wrote:
> By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why? >
here a atomicbool/thread::yield are all not necessary (i think it is
even a hindrance, because you now pause tokio threads), all you need is
rusts
std::sync::Mutex
the problem afaics, is that qemu cannot have multiple writers on the
same block backend from multiple threads, and it seems
that is a general qemu limitation (you can even only have
one iothread per image) so i am not sure that it is possible
to implement what you want with a qemu layer
ofc you can ask the qemu people on their mailing list, maybe
i am simply not seeing how
so we have to synchronize on the writes, which makes the whole
patch less interesting since it will not solve your bottleneck
i assume?
the only thing we can parallelize is the download of the chunks,
but in my tests here, that did not improve restore speed at all
i attach my code (which is based on yours) that should be
ok i think, but please do not use that as i only
tested very shortly... (it is more intended to be
a reference on how one could do such a thing)
i commented the mutex out for now, like this
it will crash with segfaults (like your code does)
with the mutex compiled in, it works but is
as slow as without the patch
(maybe playing around with the threadcount could still make a difference)
i hope this helps :)
------------
diff --git a/src/lib.rs b/src/lib.rs
index b755014..20650b7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -810,6 +810,14 @@ pub extern "C" fn proxmox_restore_image(
verbose: bool,
) -> c_int {
+ #[derive(Clone,Copy)]
+ struct SafePointer {
+ pointer: *mut c_void,
+ }
+
+ unsafe impl Send for SafePointer {};
+ unsafe impl Sync for SafePointer {};
+
let restore_task = restore_handle_to_task(handle);
let result: Result<_, Error> = try_block!({
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
let archive_name = tools::utf8_c_string(archive_name)?
.ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+ let foo = SafePointer { pointer: callback_data.clone() };
let write_data_callback = move |offset: u64, data: &[u8]| {
- callback(callback_data, offset, data.as_ptr(), data.len()
as u64)
+ callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
};
let write_zero_callback = move |offset: u64, len: u64| {
- callback(callback_data, offset, std::ptr::null(), len)
+ callback(foo.pointer, offset, std::ptr::null(), len)
};
proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..caa5aeb 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
use tokio::prelude::*;
use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
use proxmox_backup::backup::*;
use proxmox_backup::client::{HttpClient, HttpClientOptions,
BackupReader, RemoteChunkReader};
@@ -66,8 +67,8 @@ impl RestoreTask {
let mut builder = tokio::runtime::Builder::new();
builder.threaded_scheduler();
builder.enable_all();
- builder.max_threads(6);
- builder.core_threads(4);
+ builder.max_threads(8);
+ builder.core_threads(6);
builder.thread_name("proxmox-restore-worker");
builder
});
@@ -110,8 +111,8 @@ impl RestoreTask {
pub async fn restore_image(
&self,
archive_name: String,
- write_data_callback: impl Fn(u64, &[u8]) -> i32,
- write_zero_callback: impl Fn(u64, u64) -> i32,
+ write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy +
'static,
+ write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy +
'static,
verbose: bool,
) -> Result<(), Error> {
@@ -148,51 +149,92 @@ impl RestoreTask {
most_used,
);
- let mut per = 0;
- let mut bytes = 0;
- let mut zeroes = 0;
-
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size
as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader,
&digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
+ let (sender, mut receiver) =
tokio::sync::mpsc::unbounded_channel();
+
+// let mutex = Arc::new(Mutex::new(()));
+
+ let index_count = index.index_count();
+ let pool = ParallelHandler::new(
+ "restore", 4,
+ move |(digest, offset, size): ([u8;32], u64, u64)| {
+// let mutex = mutex.clone();
+ let chunk_reader = chunk_reader.clone();
+ let (bytes, zeroes) = if digest == zero_chunk_digest {
+ {
+// let _guard = mutex.lock().unwrap();
+ let res = write_zero_callback(offset, size);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ }
+ (size, size)
+ } else {
+ let size = {
+// let _guard = mutex.lock().unwrap();
+ let raw_data =
ReadChunk::read_chunk(&chunk_reader, &digest)?;
+ let res = write_data_callback(offset, &raw_data);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ raw_data.len() as u64
+ };
+ (size, 0)
+ };
+
+ sender.send((bytes, zeroes))?;
+
+ Ok(())
}
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes =
{}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
+ );
+
+ let channel = pool.channel();
+
+ let output = tokio::spawn(async move {
+ let mut count = 0;
+ let mut per = 0;
+ let mut bytes = 0;
+ let mut zeroes = 0;
+ while let Some((new_bytes, new_zeroes)) =
receiver.recv().await {
+ bytes += new_bytes;
+ zeroes += new_zeroes;
+ count += 1;
+ if verbose {
+ let next_per = ((count)*100)/index_count;
+ if per != next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes
= {}% ({} bytes), duration {} sec)",
+ next_per, bytes,
+ zeroes*100/bytes, zeroes,
+ start_time.elapsed().as_secs());
+ per = next_per;
+ }
+ }
+ if count >= index_count {
+ break;
}
}
- }
- let end_time = std::time::Instant::now();
- let elapsed = end_time.duration_since(start_time);
- eprintln!("restore image complete (bytes={}, duration={:.2}s,
speed={:.2}MB/s)",
- bytes,
- elapsed.as_secs_f64(),
- bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
- );
+ let end_time = std::time::Instant::now();
+ let elapsed = end_time.duration_since(start_time);
+ eprintln!("restore image complete (bytes={},
duration={:.2}s, speed={:.2}MB/s)",
+ bytes,
+ elapsed.as_secs_f64(),
+ bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+ );
+
+ Ok::<_, Error>(())
+ });
+
+ for pos in 0..index.index_count() {
+ let digest = index.index_digest(pos).unwrap().clone();
+ let offset = (pos*index.chunk_size) as u64;
+ let chunk_size = index.chunk_size;
+
+ proxmox_backup::tools::runtime::block_in_place(||
channel.send((digest, offset, chunk_size as u64)))?;
+ }
- Ok(())
+ output.await?
}
pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {
More information about the pbs-devel
mailing list