[pbs-devel] parallelize restore.rs fn restore_image: problems in

Niko Fellner n.fellner at logics.de
Mon Dec 7 23:59:15 CET 2020


Hi Dominik!

Thanks for your quick feedback.

Indeed, I have never done Rust before and only have a limited background in parallel programming. Thanks a lot for your help!

Anyhow, yes, your code as is performs even worse than the original, no matter how many threads you throw at it.

- Your code, with activated mutex: restore image complete (bytes=34359738368, duration=235.59s, speed=139.09MB/s)
- Original sync code:  restore image complete (bytes=34359738368, duration=224.78s, speed=145.78MB/s)

But there is an easy fix for it! 

+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;

Put your mutex here just below the line of the read_chunk, and you won't synchronize on the reads anymore.

Please run your restore performance test again, and try out if it performs faster now, for you too! 

In my benchmark (32 GB VM), your solution with this small fix ran faster than my code (best I had was "duration=153.03s, speed=214.13MB/s")
- 4 threads: restore image complete (bytes=34359738368, duration=154.67s, speed=211.86MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=144.58s, speed=226.65MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=143.41s, speed=228.49MB/s) (just another run, to verify it) 

Therefore the bottleneck of single threaded CPU performance is removed to a certain degree already, even without the parallel writes. On my machine it ran about 57% faster than the original sync code, as you can see. 

Yes, I think we should ask the qemu guys about it. Maybe they can even provide a fix - who knows.

I'll also benchmark an Azure VM, to check out the speedup with a big number of CPU threads and NVMe disks. 

By the way: I also benchmarked a mix of your lib.rs with my restore.rs "pub async fn restore_image":
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=155.07s, speed=211.31MB/s)
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=156.37s, speed=209.56MB/s) (just another run, to verify it)

Therefore your code really is superior (maybe it was the std::sync::Mutex instead of my thread::yield, or something different), and it would be great if you could try it out again.

Here I automatically set the number of CPUs to use:


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..4b42e02 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,10 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
+num_cpus = "1.13.0"
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..dee952e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -809,6 +809,14 @@ pub extern "C" fn proxmox_restore_image(
     error: * mut * mut c_char,
     verbose: bool,
 ) -> c_int {
+    
+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
 
     let restore_task = restore_handle_to_task(handle);
 
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
 
+        let foo = SafePointer { pointer: callback_data.clone() };
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..168853b 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
 use tokio::prelude::*;
 
 use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -62,12 +63,14 @@ impl RestoreTask {
     }
 
     pub fn new(setup: BackupSetup) -> Result<Self, Error> {
+        let vcpus = num_cpus::get();
+        eprintln!("{} vCPUs detected", vcpus);
         let runtime = get_runtime_with_builder(|| {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(2 * vcpus);
+            builder.core_threads(vcpus);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -110,8 +113,8 @@ impl RestoreTask {
     pub async fn restore_image(
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 'static,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -147,52 +150,96 @@ impl RestoreTask {
             file_info.chunk_crypt_mode(),
             most_used,
         );
-
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let vcpus = num_cpus::get();
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
+
+        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", vcpus,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+                        //let _guard = mutex.lock().unwrap(); // we don't want to sync too early here
+                        let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                 }
             }
-        }
 
-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| channel.send((digest, offset, chunk_size as u64)))?;
+        }
 
-        Ok(())
+        output.await?
     }
 
     pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {



More information about the pbs-devel mailing list