[pve-devel] [RFC PATCH] fix #3106: correctly queue incoming connections
Dietmar Maurer
dietmar at proxmox.com
Tue Nov 3 14:25:21 CET 2020
> > - let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
> > + let connections = accept_connections(listener, acceptor);
> > + let connections = hyper::server::accept::from_stream(connections);
>
> If we move the `from_stream` into the function below...
I have tried to do that for 2 hours, then gave up....
So please tell m e how to make that work!
> >
> > Ok(ready
> > - .and_then(|_| hyper::Server::builder(connections)
> > + .and_then(|_| hyper::Server::builder(connections)
> > .serve(rest_server)
> > .with_graceful_shutdown(server::shutdown_future())
> > .map_err(Error::from)
> > @@ -170,6 +157,66 @@ async fn run() -> Result<(), Error> {
> > Ok(())
> > }
> >
> > +fn accept_connections(
> > + mut listener: tokio::net::TcpListener,
> > + acceptor: Arc<openssl::ssl::SslAcceptor>,
> > +) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> {
>
> ... then this could probably be shortened to
>
> ) -> impl Accept {
>
> shortens the line by 80 ;-)
>
> > +
> > + let (sender, receiver) = tokio::sync::mpsc::channel(100);
> > +
> > + let accept_counter = Arc::new(AtomicUsize::new(0));
> > +
> > + const MAX_PENDING_ACCEPTS: usize = 100;
> > +
> > + tokio::spawn(async move {
> > + loop {
> > + match listener.accept().await {
> > + Err(err) => {
> > + eprintln!("error accepting tcp connection: {}", err);
> > + }
> > + Ok((sock, _addr)) => {
> > + sock.set_nodelay(true).unwrap();
> > + let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
> > + let acceptor = Arc::clone(&acceptor);
> > + let mut sender = sender.clone();
> > +
> > + if accept_counter.load(Ordering::SeqCst) > MAX_PENDING_ACCEPTS {
> > + eprintln!("connection rejected - to many open connections");
> > + continue;
> > + }
> > + accept_counter.fetch_add(1, Ordering::SeqCst);
>
> We should think about making a counter guard for this sort of thing,
> because from this point onward we're not allowed to use `?` anywhere,
> which is quite annoying.
yes
>
> > +
> > + let accept_counter = accept_counter.clone();
> > + tokio::spawn(async move {
> > + let accept_future = tokio::time::timeout(
> > + Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock));
> > +
> > + let result = accept_future.await;
> > +
> > + match result {
> > + Ok(Ok(connection)) => {
> > + if let Err(_) = sender.send(Ok(connection)).await {
> > + eprintln!("detect closed connection channel");
> > + }
> > + }
> > + Ok(Err(err)) => {
> > + eprintln!("https handshakeX failed - {}", err);
> > + }
> > + Err(_) => {
> > + eprintln!("https handshake timeout");
> > + }
> > + }
>
> which is why I'd rather thave the part above in its own `async fn`
> followed by the `fetch_sub` below, followed by the `eprintln!()`s.
>
> > +
> > + accept_counter.fetch_sub(1, Ordering::SeqCst);
> > + });
> > + }
> > + }
> > + }
> > + });
> > +
> > + receiver
> > +}
> > +
> > fn start_stat_generator() {
> > let abort_future = server::shutdown_future();
> > let future = Box::pin(run_stat_generator());
> > --
> > 2.20.1
More information about the pve-devel
mailing list