[pbs-devel] [PATCH proxmox-backup] api2/reader: asyncify the reader worker task

Dominik Csapak d.csapak at proxmox.com
Tue Jan 19 12:04:47 CET 2021


this way, the code is much more readable

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/api2/reader.rs | 64 +++++++++++++++++++++-------------------------
 1 file changed, 29 insertions(+), 35 deletions(-)

diff --git a/src/api2/reader.rs b/src/api2/reader.rs
index 72b6e33a..224a78de 100644
--- a/src/api2/reader.rs
+++ b/src/api2/reader.rs
@@ -113,7 +113,9 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+            let _guard = _guard;
+
             let mut env = ReaderEnvironment::new(
                 env_type,
                 auth_id,
@@ -128,42 +130,34 @@ fn upgrade_to_backup_reader_protocol(
 
             let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
 
-            let abort_future = worker.abort_future();
-
-            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
-                .map_err(Error::from)
-                .and_then({
-                    let env = env.clone();
-                    move |conn| {
-                        env.debug("protocol upgrade done");
-
-                        let mut http = hyper::server::conn::Http::new();
-                        http.http2_only(true);
-                        // increase window size: todo - find optiomal size
-                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
-                        http.http2_initial_stream_window_size(window_size);
-                        http.http2_initial_connection_window_size(window_size);
-                        http.http2_max_frame_size(4*1024*1024);
-
-                        http.serve_connection(conn, service)
-                            .map_err(Error::from)
-                    }
-                });
-            let abort_future = abort_future
+            let mut abort_future = worker.abort_future()
                 .map(|_| Err(format_err!("task aborted")));
 
-            use futures::future::Either;
-            futures::future::select(req_fut, abort_future)
-                .map(move |res| {
-                    let _guard = _guard;
-                    match res {
-                        Either::Left((Ok(res), _)) => Ok(res),
-                        Either::Left((Err(err), _)) => Err(err),
-                        Either::Right((Ok(res), _)) => Ok(res),
-                        Either::Right((Err(err), _)) => Err(err),
-                    }
-                })
-                .map_ok(move |_| env.log("reader finished successfully"))
+            let env2 = env.clone();
+            let req_fut = async move {
+                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
+                env2.debug("protocol upgrade done");
+
+                let mut http = hyper::server::conn::Http::new();
+                http.http2_only(true);
+                // increase window size: todo - find optiomal size
+                let window_size = 32*1024*1024; // max = (1 << 31) - 2
+                http.http2_initial_stream_window_size(window_size);
+                http.http2_initial_connection_window_size(window_size);
+                http.http2_max_frame_size(4*1024*1024);
+
+                http.serve_connection(conn, service)
+                    .map_err(Error::from).await
+            };
+
+            futures::select!{
+                req = req_fut.fuse() => req?,
+                abort = abort_future => abort?,
+            };
+
+            env.log("reader finished successfully");
+
+            Ok(())
         })?;
 
         let response = Response::builder()
-- 
2.20.1






More information about the pbs-devel mailing list