[pbs-devel] parallelize restore.rs fn restore_image: problems in

Niko Fellner n.fellner at logics.de
Sun Dec 6 03:51:55 CET 2020


Another update: 
- Now working with the await.unwrap of future pairs of 2
- Using atomics to make counting of percentage, bytes, zeroes work
- Sometimes the program runs and finishes OK.. first impression: performance looks good (about 230 vs 380 seconds (sync) for 32 GiB VM, but can't really measure performance now, server is busy)
- But sometimes the program segfaults... Not sure why? Maybe anyone has an idea?
- The more futs I await.unwrap in parallel (see "if futs.len() >= N ..."), the faster/more probable the segfaults occur. 

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..f7aa564 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -66,8 +66,10 @@ impl RestoreTask {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            //builder.max_threads(6);
+            //builder.core_threads(4);
+            builder.max_threads(12);
+            builder.core_threads(6);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -106,12 +108,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,46 +150,108 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        let per = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let bytes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let zeroes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        
+        //let mut tokio_handles = vec![];
+        //let mut futs = futures::stream::FuturesUnordered::new();
+        //use futures::stream::{self, StreamExt, TryStreamExt};
+        use futures::stream::{StreamExt};
+        //let futs = tokio::stream::iter;
+        let mut futs = futures::stream::FuturesUnordered::new();
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push and await tasks");
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            let per = std::sync::Arc::clone(&per);
+            let bytes = std::sync::Arc::clone(&bytes);
+            let zeroes = std::sync::Arc::clone(&zeroes);
+            futs.push(
+                tokio::spawn(
+                    async move {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("pos: {}, offset: {}", pos, offset);
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                            zeroes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                        } else {
+                            //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                            //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                            let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                            //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(raw_data.len(), std::sync::atomic::Ordering::SeqCst);
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index_count;
+                            let currPer = per.load(std::sync::atomic::Ordering::SeqCst);
+                            let currBytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+                            let currZeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
+                            //if per != next_per {
+                            if currPer < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, currBytes,
+                                      currZeroes*100/currBytes, currZeroes,
+                                      start_time.elapsed().as_secs());
+                                per.store(next_per, std::sync::atomic::Ordering::SeqCst);
+                            }
+                        }
+                        Ok(())
+                    }
+                )
+            );
+            
+            //if futs.len() >= 2 {
+            if futs.len() >= 2 {
+                let response = futs.next().await.unwrap();
+                if let Err(e) = response {
+                    eprintln!("Error during await: {}", e);
                 }
-                bytes += raw_data.len();
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await remaining");
+        // Wait for the remaining to finish.
+        while let Some(response) = futs.next().await {
+            if let Err(e) = response {
+                eprintln!("Error during await: {}", e);
             }
         }
+        eprintln!("END: await remaining");
+
+        //futs.try_buffer_unordered(20)
+        //.try_for_each(|_res| futures::future::ok(()))
+        //.await?;
+        //if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+        //    eprintln!("Error during await: {}", e);
+        //}
+        let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+        let zeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+        eprintln!("restore image complete (bytes={}, zeroes={}, duration={:.2}s, speed={:.2}MB/s)",
                   bytes,
+                  zeroes,
                   elapsed.as_secs_f64(),
                   bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
         );



More information about the pbs-devel mailing list