[pbs-devel] parallelize restore.rs fn restore_image: problems in async move

Niko Fellner n.fellner at logics.de
Sun Dec 6 00:39:51 CET 2020


Update: I was able to implement an unsafe SendRawPointer, but still have problems with the parallelization. 

I have two different versions:

Version ASYNC_CRASH:
- push all tasks into Vector tokio_handles and call try_join_all(tokio_handles).await later on
- It runs in parallel, but quickly leads to an out of memory error, because my implementation does all tasks once, instead of (number of CPU cores) tasks...
- Syslog even showed a segfault:
> Dec  5 22:50:49 pve kernel: [13437.190348] proxmox-restore[26081]: segfault at 8 ip 000055643d435b37 sp 00007f94d0f78c20 error 4 in pbs-restore[55643d34c000+104000]
> Dec  5 22:50:49 pve kernel: [13437.190357] Code: 48 85 ff 75 b2 48 8b 4c 24 28 64 48 33 0c 25 28 00 00 00 44 89 e8 75 43 48 83 c4 38 5b 5d 41 5c 41 5d c3 48 8b 85 b0 00 00 00 <48> 8b 50 08 48 89 95 b0 00 00 00 48 85 d2 74 11 48 c7 40 08 00 00
- Is there a way to call just a group of maybe 5 or 10 tokio handles? Or what am I doing wrong here in Version ASYNC_CRASH?

Version ASYNC_SINGLE: 
- use the async functions, but directly await after spawning the tokio task. 
- Restore works, it looks good
- But no parallelization
- Will do some performance tests tomorrow of ASYNC_SINGLE vs. the original version.

Original version:
commit 447552da4af1c7f0553873e4fd21335dab8fe029 (HEAD -> master, origin/master, origin/HEAD)
Author: Fabian Grünbichler <f.gruenbichler at proxmox.com>
Date:   Mon Nov 30 13:41:45 2020 +0100


Maybe ASYNC_CRASH crashes because of how I use the chunk reader?
   - I couldn't use "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?", because that one never finished reading in my async block...?!
   - So I tried with "AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?" which works fine in Version ASYNC_SINGLE, but within ASYNC_CRASH it leads to chaos.


@Dietmar: unfortunately I still couldn't use much of pull.rs - I still don't understand it well enough.


VERSION ASYNC_CRASH:

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..c0f0bf8 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -151,38 +151,66 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        
+        let mut tokio_handles = vec![];
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push tasks");
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            tokio_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("pos: {}, offset: {}", pos, offset);
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index_chunk_size;
+                            zeroes += index_chunk_size;
+                        } else {
+                            //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                            //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                            let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                            //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index_count;
+                            //if per != next_per {
+                            if per < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, bytes,
+                                      zeroes*100/bytes, zeroes,
+                                      start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                        Ok(())
+                    }
+                )
+            );
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await");
+        if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+            eprintln!("Error during await: {}", e);
         }
+        eprintln!("END: await");
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);





VERSION ASYNC_SINGLE:
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..9d4fb4d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,39 +148,45 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
         let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            if let Err(e) = tokio::spawn(
+                async move {
+                    let digest = &index_digest;
+                    let offset = (pos*index_chunk_size) as u64;
+                    //eprintln!("pos: {}, offset: {}", pos, offset);
+                    if digest == &zero_chunk_digest {
+                        let res = write_zero_callback(offset, index_chunk_size as u64);
+                        //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                        bytes += index_chunk_size;
+                    } else {
+                        //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                        //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                        let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                        //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                        let res = write_data_callback(offset, &raw_data);
+                        //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        bytes += raw_data.len();
+                    }
+                    Ok(())
                 }
+            ).await {
+                eprintln!("Error during await: {}", e);
             }
         }





More information about the pbs-devel mailing list