[pbs-devel] [PATCH proxmox-backup] fix #3106: correctly queue incoming connections
Dominik Csapak
d.csapak at proxmox.com
Mon Nov 2 16:10:05 CET 2020
For incoming connections, we mapped the results from TcpListeners
accept with 'try_filter_map', where we awaited tokio_openssl::accept
this resulted in blocking the incoming connection stream
to circumvent this, we accept the openssl connection in a seperate
tokio task (with timeout) and send the resulting connection to a
channel
hyper gets the wrapped receiver end of this channel
the tokio task accepting in a loop gets selected with the shutdown
future, to handle the shutdown gracefully
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
i am not sure if we need the select! at all, since on a shutdown, all
open futures get canceled anyway...
also, not sure here about logging, timeouts and channel size
i chose values that seemed sensible, but if anyone has suggestions
with actual reasoning, please say so
also the indentation seems weird, but rustfmt said this is the way..
src/bin/proxmox-backup-proxy.rs | 65 +++++++++++++++++++++++++--------
1 file changed, 49 insertions(+), 16 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 39254504..3eb92cbb 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -113,23 +113,56 @@ async fn run() -> Result<(), Error> {
let server = daemon::create_daemon(
([0,0,0,0,0,0,0,0], 8007).into(),
- |listener, ready| {
- let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
- .map_err(Error::from)
- .try_filter_map(move |(sock, _addr)| {
- let acceptor = Arc::clone(&acceptor);
- async move {
- sock.set_nodelay(true).unwrap();
-
- let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
-
- Ok(tokio_openssl::accept(&acceptor, sock)
- .await
- .ok() // handshake errors aren't be fatal, so return None to filter
- )
+ |mut listener, ready| {
+ let (sender, receiver) = tokio::sync::mpsc::channel(100);
+
+ let accept_future = async move {
+ loop {
+ match listener.accept().await {
+ Ok((sock, _)) => {
+ let mut sender2 = sender.clone();
+ let acceptor = Arc::clone(&acceptor);
+ tokio::spawn(async move {
+ sock.set_nodelay(true).unwrap();
+ let _ = set_tcp_keepalive(
+ sock.as_raw_fd(),
+ PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
+ );
+
+ if let Ok(connection) = tokio::time::timeout(
+ Duration::new(60, 0),
+ tokio_openssl::accept(&acceptor, sock),
+ )
+ .await
+ {
+ if connection.is_err() {
+ // ignore ssl connection errors
+ return;
+ }
+ if let Err(err) =
+ sender2.send_timeout(connection, Duration::new(60, 0)).await
+ {
+ eprintln!("send error: {}", err);
+ }
+ } // ignore ssl timeout errors
+ });
+ }
+ Err(err) => {
+ eprintln!("error accepting tcp connection: {}", err);
+ }
}
- });
- let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
+ }
+ };
+
+ // select with shutdown future for graceful shutdown
+ tokio::spawn(async move {
+ select! {
+ _ = accept_future.fuse() => {},
+ _ = server::shutdown_future().fuse() => {},
+ };
+ });
+
+ let connections = hyper::server::accept::from_stream(receiver);
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
--
2.20.1
More information about the pbs-devel
mailing list