[pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
Dietmar Maurer
dietmar at proxmox.com
Fri Jan 29 07:05:38 CET 2021
Why did you change the _guard lifetime?
> On 01/26/2021 11:17 AM Dominik Csapak <d.csapak at proxmox.com> wrote:
>
>
> this way, the code is much more readable
>
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> changes from v1:
> * rebased on master
>
> src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
> 1 file changed, 31 insertions(+), 37 deletions(-)
>
> diff --git a/src/api2/reader.rs b/src/api2/reader.rs
> index 43d832ce..ae936184 100644
> --- a/src/api2/reader.rs
> +++ b/src/api2/reader.rs
> @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
>
> let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>
> - WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
> + WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
> + let _guard = _guard;
> +
> let mut env = ReaderEnvironment::new(
> env_type,
> auth_id,
> @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
>
> let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
>
> - let abort_future = worker.abort_future();
> -
> - let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
> - .map_err(Error::from)
> - .and_then({
> - let env = env.clone();
> - move |conn| {
> - env.debug("protocol upgrade done");
> -
> - let mut http = hyper::server::conn::Http::new();
> - http.http2_only(true);
> - // increase window size: todo - find optiomal size
> - let window_size = 32*1024*1024; // max = (1 << 31) - 2
> - http.http2_initial_stream_window_size(window_size);
> - http.http2_initial_connection_window_size(window_size);
> - http.http2_max_frame_size(4*1024*1024);
> -
> - http.serve_connection(conn, service)
> - .map_err(Error::from)
> - }
> - });
> - let abort_future = abort_future
> - .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
> -
> - use futures::future::Either;
> - futures::future::select(req_fut, abort_future)
> - .map(move |res| {
> - let _guard = _guard;
> - match res {
> - Either::Left((Ok(_), _)) => Ok(()),
> - Either::Left((Err(err), _)) => Err(err),
> - Either::Right((Ok(_), _)) => Ok(()),
> - Either::Right((Err(err), _)) => Err(err),
> - }
> - })
> - .map_ok(move |_| env.log("reader finished successfully"))
> + let mut abort_future = worker.abort_future()
> + .map(|_| Err(format_err!("task aborted")));
> +
> + let env2 = env.clone();
> + let req_fut = async move {
> + let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
> + env2.debug("protocol upgrade done");
> +
> + let mut http = hyper::server::conn::Http::new();
> + http.http2_only(true);
> + // increase window size: todo - find optiomal size
> + let window_size = 32*1024*1024; // max = (1 << 31) - 2
> + http.http2_initial_stream_window_size(window_size);
> + http.http2_initial_connection_window_size(window_size);
> + http.http2_max_frame_size(4*1024*1024);
> +
> + http.serve_connection(conn, service)
> + .map_err(Error::from).await
> + };
> +
> + futures::select!{
> + req = req_fut.fuse() => req?,
> + abort = abort_future => abort?,
> + };
> +
> + env.log("reader finished successfully");
> +
> + Ok(())
> })?;
>
> let response = Response::builder()
> --
> 2.20.1
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
More information about the pbs-devel
mailing list