[pbs-devel] parallelize restore.rs fn restore_image: problems in async move
Niko Fellner
n.fellner at logics.de
Fri Dec 4 15:14:33 CET 2020
In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:
let mut my_handles = vec![];
for pos in 0..100 {
my_handles.push(
tokio::spawn(
async move {
println!("Task: {}", pos)
}
)
);
}
futures::future::join_all(my_handles).await;
This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:
cargo build --release
Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
error[E0308]: mismatched types
--> src/restore.rs:181:48
|
181 | ... if per != next_per {
| __________________________________________^
182 | | ... eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ... next_per, bytes,
184 | | ... zeroes*100/bytes, zeroes,
185 | | ... start_time.elapsed().as_secs());
186 | | ... per = next_per;
187 | | ... }
| |_______________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error[E0308]: mismatched types
--> src/restore.rs:181:29
|
181 | / ... if per != next_per {
182 | | ... eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ... next_per, bytes,
184 | | ... zeroes*100/bytes, zeroes,
185 | | ... start_time.elapsed().as_secs());
186 | | ... per = next_per;
187 | | ... }
| |_______________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error[E0308]: mismatched types
--> src/restore.rs:179:25
|
179 | / if verbose {
180 | | let next_per = ((pos+1)*100)/index.index_count();
181 | | if per != next_per {
182 | | eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
... |
187 | | }
188 | | }
| |_________________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error: aborting due to 3 previous errors
For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-backup-qemu`.
To learn more, run the command again with --verbose.
make: *** [Makefile:22: all] Fehler 101
Do you have any clue how to fix this?
I guess I am doing rookie mistakes.
Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not?
The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...
diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
libc = "0.2"
once_cell = "1.3.1"
openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
bincode = "1.0"
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..4d1df9c 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -151,38 +151,46 @@ impl RestoreTask {
let mut per = 0;
let mut bytes = 0;
let mut zeroes = 0;
+ let mut my_handles = vec![];
let start_time = std::time::Instant::now();
for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
- }
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
- }
- }
+ my_handles.push(
+ tokio::spawn(
+ async move {
+ let digest = index.index_digest(pos).unwrap();
+ let offset = (pos*index.chunk_size) as u64;
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index.chunk_size as u64);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes += index.chunk_size;
+ zeroes += index.chunk_size;
+ } else {
+ let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+ let res = write_data_callback(offset, &raw_data);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes += raw_data.len();
+ }
+ if verbose {
+ let next_per = ((pos+1)*100)/index.index_count();
+ if per != next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ next_per, bytes,
+ zeroes*100/bytes, zeroes,
+ start_time.elapsed().as_secs());
+ per = next_per;
+ }
+ }
+ }
+ )
+ );
}
+ futures::future::join_all(my_handles).await;
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
More information about the pbs-devel
mailing list