[pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
Wolfgang Bumiller
w.bumiller at proxmox.com
Tue Nov 3 10:16:21 CET 2020
On Mon, Nov 02, 2020 at 04:10:05PM +0100, Dominik Csapak wrote:
> For incoming connections, we mapped the results from TcpListeners
> accept with 'try_filter_map', where we awaited tokio_openssl::accept
>
> this resulted in blocking the incoming connection stream
>
> to circumvent this, we accept the openssl connection in a seperate
> tokio task (with timeout) and send the resulting connection to a
> channel
>
> hyper gets the wrapped receiver end of this channel
>
> the tokio task accepting in a loop gets selected with the shutdown
> future, to handle the shutdown gracefully
>
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> i am not sure if we need the select! at all, since on a shutdown, all
> open futures get canceled anyway...
>
> also, not sure here about logging, timeouts and channel size
> i chose values that seemed sensible, but if anyone has suggestions
> with actual reasoning, please say so
>
> also the indentation seems weird, but rustfmt said this is the way..
>
> src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
> 1 file changed, 49 insertions(+), 16 deletions(-)
>
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 39254504..3eb92cbb 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
>
> let server = daemon::create_daemon(
> ([0,0,0,0,0,0,0,0], 8007).into(),
> - |listener, ready| {
> - let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
> - .map_err(Error::from)
> - .try_filter_map(move |(sock, _addr)| {
> - let acceptor = Arc::clone(&acceptor);
> - async move {
> - sock.set_nodelay(true).unwrap();
> -
> - let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> -
> - Ok(tokio_openssl::accept(&acceptor, sock)
> - .await
> - .ok() // handshake errors aren't be fatal, so return None to filter
> - )
> + |mut listener, ready| {
> + let (sender, receiver) = tokio::sync::mpsc::channel(100);
> +
please factorize the below code out into functions, this is too much
indentation
> + let accept_future = async move {
> + loop {
> + match listener.accept().await {
> + Ok((sock, _)) => {
> + let mut sender2 = sender.clone();
> + let acceptor = Arc::clone(&acceptor);
> + tokio::spawn(async move {
> + sock.set_nodelay(true).unwrap();
> + let _ = set_tcp_keepalive(
> + sock.as_raw_fd(),
> + PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
> + );
> +
> + if let Ok(connection) = tokio::time::timeout(
> + Duration::new(60, 0),
> + tokio_openssl::accept(&acceptor, sock),
> + )
> + .await
> + {
^ as that just gets too long
> + if connection.is_err() {
> + // ignore ssl connection errors
> + return;
> + }
> + if let Err(err) =
> + sender2.send_timeout(connection, Duration::new(60, 0)).await
> + {
> + eprintln!("send error: {}", err);
> + }
> + } // ignore ssl timeout errors
> + });
> + }
> + Err(err) => {
> + eprintln!("error accepting tcp connection: {}", err);
> + }
> }
> - });
> - let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
> + }
> + };
> +
> + // select with shutdown future for graceful shutdown
> + tokio::spawn(async move {
> + select! {
> + _ = accept_future.fuse() => {},
> + _ = server::shutdown_future().fuse() => {},
> + };
> + });
> +
> + let connections = hyper::server::accept::from_stream(receiver);
>
> Ok(ready
> .and_then(|_| hyper::Server::builder(connections)
> --
> 2.20.1
More information about the pbs-devel
mailing list