[pbs-devel] parallelize restore.rs fn restore_image: problems in
Niko Fellner
n.fellner at logics.de
Sun Dec 6 03:51:55 CET 2020
Another update:
- Now working with the await.unwrap of future pairs of 2
- Using atomics to make counting of percentage, bytes, zeroes work
- Sometimes the program runs and finishes OK.. first impression: performance looks good (about 230 vs 380 seconds (sync) for 32 GiB VM, but can't really measure performance now, server is busy)
- But sometimes the program segfaults... Not sure why? Maybe anyone has an idea?
- The more futs I await.unwrap in parallel (see "if futs.len() >= N ..."), the faster/more probable the segfaults occur.
diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
libc = "0.2"
once_cell = "1.3.1"
openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
use std::ffi::CString;
pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
/// Opaque handle for backups jobs
#[repr(C)]
pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+ pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+ pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+ pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+ return (self.callback)(self.callback_data, offset, data, len);
+ }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
let archive_name = tools::utf8_c_string(archive_name)?
.ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+
+ let send_raw_pointer = SendRawPointer { callback, callback_data };
let write_data_callback = move |offset: u64, data: &[u8]| {
- callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+ return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
};
let write_zero_callback = move |offset: u64, len: u64| {
- callback(callback_data, offset, std::ptr::null(), len)
+ return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
};
proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..f7aa564 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -66,8 +66,10 @@ impl RestoreTask {
let mut builder = tokio::runtime::Builder::new();
builder.threaded_scheduler();
builder.enable_all();
- builder.max_threads(6);
- builder.core_threads(4);
+ //builder.max_threads(6);
+ //builder.core_threads(4);
+ builder.max_threads(12);
+ builder.core_threads(6);
builder.thread_name("proxmox-restore-worker");
builder
});
@@ -106,12 +108,12 @@ impl RestoreTask {
pub fn runtime(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
-
- pub async fn restore_image(
+
+ pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
&self,
archive_name: String,
- write_data_callback: impl Fn(u64, &[u8]) -> i32,
- write_zero_callback: impl Fn(u64, u64) -> i32,
+ write_data_callback: A,
+ write_zero_callback: B,
verbose: bool,
) -> Result<(), Error> {
@@ -148,46 +150,108 @@ impl RestoreTask {
most_used,
);
- let mut per = 0;
- let mut bytes = 0;
- let mut zeroes = 0;
+ let per = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let bytes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let zeroes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+
+ //let mut tokio_handles = vec![];
+ //let mut futs = futures::stream::FuturesUnordered::new();
+ //use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::stream::{StreamExt};
+ //let futs = tokio::stream::iter;
+ let mut futs = futures::stream::FuturesUnordered::new();
+
+ let index_chunk_size = index.chunk_size;
+ let index_count = index.index_count();
+ eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+ eprintln!("BEGIN: push and await tasks");
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
+ for pos in 0..index_count {
+ let chunk_reader_clone = chunk_reader.clone();
+ let index_digest = index.index_digest(pos).unwrap().clone();
+ let per = std::sync::Arc::clone(&per);
+ let bytes = std::sync::Arc::clone(&bytes);
+ let zeroes = std::sync::Arc::clone(&zeroes);
+ futs.push(
+ tokio::spawn(
+ async move {
+ let digest = &index_digest;
+ let offset = (pos*index_chunk_size) as u64;
+ //eprintln!("pos: {}, offset: {}", pos, offset);
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index_chunk_size as u64);
+ //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+ zeroes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+ } else {
+ //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+ //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+ let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+ //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+ let res = write_data_callback(offset, &raw_data);
+ //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes.fetch_add(raw_data.len(), std::sync::atomic::Ordering::SeqCst);
+ }
+ if verbose {
+ let next_per = ((pos+1)*100)/index_count;
+ let currPer = per.load(std::sync::atomic::Ordering::SeqCst);
+ let currBytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+ let currZeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
+ //if per != next_per {
+ if currPer < next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ next_per, currBytes,
+ currZeroes*100/currBytes, currZeroes,
+ start_time.elapsed().as_secs());
+ per.store(next_per, std::sync::atomic::Ordering::SeqCst);
+ }
+ }
+ Ok(())
+ }
+ )
+ );
+
+ //if futs.len() >= 2 {
+ if futs.len() >= 2 {
+ let response = futs.next().await.unwrap();
+ if let Err(e) = response {
+ eprintln!("Error during await: {}", e);
}
- bytes += raw_data.len();
}
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
- }
+ }
+ eprintln!("END: push tasks");
+ eprintln!("BEGIN: await remaining");
+ // Wait for the remaining to finish.
+ while let Some(response) = futs.next().await {
+ if let Err(e) = response {
+ eprintln!("Error during await: {}", e);
}
}
+ eprintln!("END: await remaining");
+
+ //futs.try_buffer_unordered(20)
+ //.try_for_each(|_res| futures::future::ok(()))
+ //.await?;
+ //if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+ // eprintln!("Error during await: {}", e);
+ //}
+ let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+ let zeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
- eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+ eprintln!("restore image complete (bytes={}, zeroes={}, duration={:.2}s, speed={:.2}MB/s)",
bytes,
+ zeroes,
elapsed.as_secs_f64(),
bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
);
More information about the pbs-devel
mailing list