[pbs-devel] parallelize restore.rs fn restore_image: problems in async move
Niko Fellner
n.fellner at logics.de
Sun Dec 6 00:39:51 CET 2020
Update: I was able to implement an unsafe SendRawPointer, but still have problems with the parallelization.
I have two different versions:
Version ASYNC_CRASH:
- push all tasks into Vector tokio_handles and call try_join_all(tokio_handles).await later on
- It runs in parallel, but quickly leads to an out of memory error, because my implementation does all tasks once, instead of (number of CPU cores) tasks...
- Syslog even showed a segfault:
> Dec 5 22:50:49 pve kernel: [13437.190348] proxmox-restore[26081]: segfault at 8 ip 000055643d435b37 sp 00007f94d0f78c20 error 4 in pbs-restore[55643d34c000+104000]
> Dec 5 22:50:49 pve kernel: [13437.190357] Code: 48 85 ff 75 b2 48 8b 4c 24 28 64 48 33 0c 25 28 00 00 00 44 89 e8 75 43 48 83 c4 38 5b 5d 41 5c 41 5d c3 48 8b 85 b0 00 00 00 <48> 8b 50 08 48 89 95 b0 00 00 00 48 85 d2 74 11 48 c7 40 08 00 00
- Is there a way to call just a group of maybe 5 or 10 tokio handles? Or what am I doing wrong here in Version ASYNC_CRASH?
Version ASYNC_SINGLE:
- use the async functions, but directly await after spawning the tokio task.
- Restore works, it looks good
- But no parallelization
- Will do some performance tests tomorrow of ASYNC_SINGLE vs. the original version.
Original version:
commit 447552da4af1c7f0553873e4fd21335dab8fe029 (HEAD -> master, origin/master, origin/HEAD)
Author: Fabian Grünbichler <f.gruenbichler at proxmox.com>
Date: Mon Nov 30 13:41:45 2020 +0100
Maybe ASYNC_CRASH crashes because of how I use the chunk reader?
- I couldn't use "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?", because that one never finished reading in my async block...?!
- So I tried with "AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?" which works fine in Version ASYNC_SINGLE, but within ASYNC_CRASH it leads to chaos.
@Dietmar: unfortunately I still couldn't use much of pull.rs - I still don't understand it well enough.
VERSION ASYNC_CRASH:
diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
libc = "0.2"
once_cell = "1.3.1"
openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
use std::ffi::CString;
pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
/// Opaque handle for backups jobs
#[repr(C)]
pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+ pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+ pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+ pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+ return (self.callback)(self.callback_data, offset, data, len);
+ }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
let archive_name = tools::utf8_c_string(archive_name)?
.ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+
+ let send_raw_pointer = SendRawPointer { callback, callback_data };
let write_data_callback = move |offset: u64, data: &[u8]| {
- callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+ return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
};
let write_zero_callback = move |offset: u64, len: u64| {
- callback(callback_data, offset, std::ptr::null(), len)
+ return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
};
proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..c0f0bf8 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
pub fn runtime(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
-
- pub async fn restore_image(
+
+ pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
&self,
archive_name: String,
- write_data_callback: impl Fn(u64, &[u8]) -> i32,
- write_zero_callback: impl Fn(u64, u64) -> i32,
+ write_data_callback: A,
+ write_zero_callback: B,
verbose: bool,
) -> Result<(), Error> {
@@ -151,38 +151,66 @@ impl RestoreTask {
let mut per = 0;
let mut bytes = 0;
let mut zeroes = 0;
+
+ let mut tokio_handles = vec![];
+ let index_chunk_size = index.chunk_size;
+ let index_count = index.index_count();
+ eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+ eprintln!("BEGIN: push tasks");
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
- }
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
- }
- }
+ for pos in 0..index_count {
+ let chunk_reader_clone = chunk_reader.clone();
+ let index_digest = index.index_digest(pos).unwrap().clone();
+ tokio_handles.push(
+ tokio::spawn(
+ async move {
+ let digest = &index_digest;
+ let offset = (pos*index_chunk_size) as u64;
+ //eprintln!("pos: {}, offset: {}", pos, offset);
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index_chunk_size as u64);
+ //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes += index_chunk_size;
+ zeroes += index_chunk_size;
+ } else {
+ //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+ //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+ let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+ //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+ let res = write_data_callback(offset, &raw_data);
+ //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes += raw_data.len();
+ }
+ if verbose {
+ let next_per = ((pos+1)*100)/index_count;
+ //if per != next_per {
+ if per < next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ next_per, bytes,
+ zeroes*100/bytes, zeroes,
+ start_time.elapsed().as_secs());
+ per = next_per;
+ }
+ }
+ Ok(())
+ }
+ )
+ );
+ }
+ eprintln!("END: push tasks");
+ eprintln!("BEGIN: await");
+ if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+ eprintln!("Error during await: {}", e);
}
+ eprintln!("END: await");
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
VERSION ASYNC_SINGLE:
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..9d4fb4d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
pub fn runtime(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
-
- pub async fn restore_image(
+
+ pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
&self,
archive_name: String,
- write_data_callback: impl Fn(u64, &[u8]) -> i32,
- write_zero_callback: impl Fn(u64, u64) -> i32,
+ write_data_callback: A,
+ write_zero_callback: B,
verbose: bool,
) -> Result<(), Error> {
@@ -148,39 +148,45 @@ impl RestoreTask {
most_used,
);
- let mut per = 0;
let mut bytes = 0;
- let mut zeroes = 0;
+
+ let index_chunk_size = index.chunk_size;
+ let index_count = index.index_count();
+ eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
- }
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
+ for pos in 0..index_count {
+ let chunk_reader_clone = chunk_reader.clone();
+ let index_digest = index.index_digest(pos).unwrap().clone();
+ if let Err(e) = tokio::spawn(
+ async move {
+ let digest = &index_digest;
+ let offset = (pos*index_chunk_size) as u64;
+ //eprintln!("pos: {}, offset: {}", pos, offset);
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index_chunk_size as u64);
+ //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes += index_chunk_size;
+ } else {
+ //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+ //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+ let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+ //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+ let res = write_data_callback(offset, &raw_data);
+ //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes += raw_data.len();
+ }
+ Ok(())
}
+ ).await {
+ eprintln!("Error during await: {}", e);
}
}
More information about the pbs-devel
mailing list