[pbs-devel] parallelize restore.rs fn restore_image: problems in async move

Niko Fellner n.fellner at logics.de
Fri Dec 4 15:14:33 CET 2020

In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:

let mut my_handles = vec![];
for pos in 0..100 {
            async move {
                println!("Task: {}", pos)

This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:

cargo build --release
   Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
error[E0308]: mismatched types
   --> src/restore.rs:181:48
181 |   ...                   if per != next_per {
    |  __________________________________________^
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:181:29
181 | / ...                   if per != next_per {
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:179:25
179 | /                         if verbose {
180 | |                             let next_per = ((pos+1)*100)/index.index_count();
181 | |                             if per != next_per {
182 | |                                 eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
...   |
187 | |                             }
188 | |                         }
    | |_________________________^ expected enum `std::result::Result`, found `()`
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error: aborting due to 3 previous errors

For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-backup-qemu`.

To learn more, run the command again with --verbose.
make: *** [Makefile:22: all] Fehler 101

Do you have any clue how to fix this?
I guess I am doing rookie mistakes.

Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not? 
The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..4d1df9c 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -151,38 +151,46 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        let mut my_handles = vec![];
         let start_time = std::time::Instant::now();
         for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+            my_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = index.index_digest(pos).unwrap();
+                        let offset = (pos*index.chunk_size) as u64;
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index.chunk_size as u64);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index.chunk_size;
+                            zeroes += index.chunk_size;
+                        } else {
+                            let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                            let res = write_data_callback(offset, &raw_data);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index.index_count();
+                            if per != next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                          next_per, bytes,
+                                          zeroes*100/bytes, zeroes,
+                                          start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                    }
+                )
+            );
+        futures::future::join_all(my_handles).await;
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);

More information about the pbs-devel mailing list