[pbs-devel] parallelize restore.rs fn restore_image: problems in

Dominik Csapak d.csapak at proxmox.com
Mon Dec 7 14:24:07 CET 2020


hi,

i looked a bit more at this today

first, it seems that you are not that familiar with rust and
especially async rust (tokio, etc), please correct me
if i am wrong. it is a complicated topic, so i would suggest that you
make yourself familiar with the concepts, rusts 'reference 
documentation' can be found under

https://doc.rust-lang.org/book/
https://rust-lang.github.io/async-book/

and there are sure many other great resources out there.
ofc you can always ask here too, but it may take longer to explain.


On 12/7/20 3:51 AM, Niko Fellner wrote:
> By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why? >

here a atomicbool/thread::yield are all not necessary (i think it is 
even a hindrance, because you now pause tokio threads), all you need is 
rusts

std::sync::Mutex

the problem afaics, is that qemu cannot have multiple writers on the 
same block backend from multiple threads, and it seems
that is a general qemu limitation (you can even only have
one iothread per image) so i am not sure that it is possible
to implement what you want with a qemu layer

ofc you can ask the qemu people on their mailing list, maybe
i am simply not seeing how

so we have to synchronize on the writes, which makes the whole
patch less interesting since it will not solve your bottleneck
i assume?

the only thing we can parallelize is the download of the chunks,
but in my tests here, that did not improve restore speed at all

i attach my code (which is based on yours) that should be
ok i think,  but please do not use that as i only
tested very shortly... (it is more intended to be
a reference on how one could do such a thing)

i commented the mutex out for now, like this
it will crash with segfaults (like your code does)

with the mutex compiled in, it works but is
as slow as without the patch
(maybe playing around with the threadcount could still make a difference)

i hope this helps :)

------------

diff --git a/src/lib.rs b/src/lib.rs
index b755014..20650b7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -810,6 +810,14 @@ pub extern "C" fn proxmox_restore_image(
      verbose: bool,
  ) -> c_int {

+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
+
      let restore_task = restore_handle_to_task(handle);

      let result: Result<_, Error> = try_block!({
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
          let archive_name = tools::utf8_c_string(archive_name)?
              .ok_or_else(|| format_err!("archive_name must not be NULL"))?;

+        let foo = SafePointer { pointer: callback_data.clone() };
          let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() 
as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
          };

          let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
          };

          proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..caa5aeb 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
  use tokio::prelude::*;

  use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
  use proxmox_backup::backup::*;
  use proxmox_backup::client::{HttpClient, HttpClientOptions, 
BackupReader, RemoteChunkReader};

@@ -66,8 +67,8 @@ impl RestoreTask {
              let mut builder = tokio::runtime::Builder::new();
              builder.threaded_scheduler();
              builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(8);
+            builder.core_threads(6);
              builder.thread_name("proxmox-restore-worker");
              builder
          });
@@ -110,8 +111,8 @@ impl RestoreTask {
      pub async fn restore_image(
          &self,
          archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 
'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 
'static,
          verbose: bool,
      ) -> Result<(), Error> {

@@ -148,51 +149,92 @@ impl RestoreTask {
              most_used,
          );

-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
-
          let start_time = std::time::Instant::now();

-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size 
as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, 
&digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = 
tokio::sync::mpsc::unbounded_channel();
+
+//        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", 4,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+//                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+//                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
              }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = 
{}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = 
receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes 
= {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                  }
              }
-        }

-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, 
speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, 
duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| 
channel.send((digest, offset, chunk_size as u64)))?;
+        }

-        Ok(())
+        output.await?
      }

      pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {




More information about the pbs-devel mailing list