[pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task

Dominik Csapak d.csapak at proxmox.com
Fri Jan 29 08:01:15 CET 2021


On 1/29/21 7:05 AM, Dietmar Maurer wrote:
> Why did you change the _guard lifetime?

how so?

before this patch the guard was moved to the 'map' closure of
the future::select call
so it did go out of scope after either future (req_fut, abort_future) 
resolved

now i moved it in the async move block of the worker, and goes out of
scope after the future resolves

the difference is the 'env.log(...)' and 'Ok(())' line, but
that should not make any difference for the guard?

> 
> 
>> On 01/26/2021 11:17 AM Dominik Csapak <d.csapak at proxmox.com> wrote:
>>
>>   
>> this way, the code is much more readable
>>
>> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
>> ---
>> changes from v1:
>> * rebased on master
>>
>>   src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
>>   1 file changed, 31 insertions(+), 37 deletions(-)
>>
>> diff --git a/src/api2/reader.rs b/src/api2/reader.rs
>> index 43d832ce..ae936184 100644
>> --- a/src/api2/reader.rs
>> +++ b/src/api2/reader.rs
>> @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
>>   
>>           let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>>   
>> -        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
>> +        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
>> +            let _guard = _guard;
>> +
>>               let mut env = ReaderEnvironment::new(
>>                   env_type,
>>                   auth_id,
>> @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
>>   
>>               let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
>>   
>> -            let abort_future = worker.abort_future();
>> -
>> -            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
>> -                .map_err(Error::from)
>> -                .and_then({
>> -                    let env = env.clone();
>> -                    move |conn| {
>> -                        env.debug("protocol upgrade done");
>> -
>> -                        let mut http = hyper::server::conn::Http::new();
>> -                        http.http2_only(true);
>> -                        // increase window size: todo - find optiomal size
>> -                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
>> -                        http.http2_initial_stream_window_size(window_size);
>> -                        http.http2_initial_connection_window_size(window_size);
>> -                        http.http2_max_frame_size(4*1024*1024);
>> -
>> -                        http.serve_connection(conn, service)
>> -                            .map_err(Error::from)
>> -                    }
>> -                });
>> -            let abort_future = abort_future
>> -                .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
>> -
>> -            use futures::future::Either;
>> -            futures::future::select(req_fut, abort_future)
>> -                .map(move |res| {
>> -                    let _guard = _guard;
>> -                    match res {
>> -                        Either::Left((Ok(_), _)) => Ok(()),
>> -                        Either::Left((Err(err), _)) => Err(err),
>> -                        Either::Right((Ok(_), _)) => Ok(()),
>> -                        Either::Right((Err(err), _)) => Err(err),
>> -                    }
>> -                })
>> -                .map_ok(move |_| env.log("reader finished successfully"))
>> +            let mut abort_future = worker.abort_future()
>> +                .map(|_| Err(format_err!("task aborted")));
>> +
>> +            let env2 = env.clone();
>> +            let req_fut = async move {
>> +                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
>> +                env2.debug("protocol upgrade done");
>> +
>> +                let mut http = hyper::server::conn::Http::new();
>> +                http.http2_only(true);
>> +                // increase window size: todo - find optiomal size
>> +                let window_size = 32*1024*1024; // max = (1 << 31) - 2
>> +                http.http2_initial_stream_window_size(window_size);
>> +                http.http2_initial_connection_window_size(window_size);
>> +                http.http2_max_frame_size(4*1024*1024);
>> +
>> +                http.serve_connection(conn, service)
>> +                    .map_err(Error::from).await
>> +            };
>> +
>> +            futures::select!{
>> +                req = req_fut.fuse() => req?,
>> +                abort = abort_future => abort?,
>> +            };
>> +
>> +            env.log("reader finished successfully");
>> +
>> +            Ok(())
>>           })?;
>>   
>>           let response = Response::builder()
>> -- 
>> 2.20.1
>>
>>
>>
>> _______________________________________________
>> pbs-devel mailing list
>> pbs-devel at lists.proxmox.com
>> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel






More information about the pbs-devel mailing list