[pbs-devel] [PATCH v2 proxmox 1/3] rest-server: connection: clean up accept data flow

Max Carrara m.carrara at proxmox.com
Mon Jul 8 18:48:15 CEST 2024


This adds the structs `AcceptState` and `AcceptFlags` and adapts
relevant method signatures of `AcceptBuilder` accordingly. This makes
it easier to add further parameters in the future.

Signed-off-by: Max Carrara <m.carrara at proxmox.com>
---
Changes v1 --> v2:
  * none

 proxmox-rest-server/src/connection.rs | 72 ++++++++++++++-------------
 1 file changed, 38 insertions(+), 34 deletions(-)

diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 34b585cb..243348c0 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -255,6 +255,16 @@ impl From<(ClientSender, InsecureClientSender)> for Sender {
     }
 }
 
+struct AcceptState {
+    pub socket: InsecureClientStream,
+    pub acceptor: Arc<Mutex<SslAcceptor>>,
+    pub accept_counter: Arc<()>,
+}
+
+struct AcceptFlags {
+    pub is_debug: bool,
+}
+
 impl AcceptBuilder {
     async fn accept_connections(
         self,
@@ -285,24 +295,26 @@ impl AcceptBuilder {
                 continue;
             }
 
+            let state = AcceptState {
+                socket,
+                acceptor,
+                accept_counter,
+            };
+
+            let flags = AcceptFlags {
+                is_debug: self.debug,
+            };
+
             match sender {
                 Sender::Secure(ref secure_sender) => {
-                    let accept_future = Self::do_accept_tls(
-                        socket,
-                        acceptor,
-                        accept_counter,
-                        self.debug,
-                        secure_sender.clone(),
-                    );
+                    let accept_future = Self::do_accept_tls(state, flags, secure_sender.clone());
 
                     tokio::spawn(accept_future);
                 }
                 Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => {
                     let accept_future = Self::do_accept_tls_optional(
-                        socket,
-                        acceptor,
-                        accept_counter,
-                        self.debug,
+                        state,
+                        flags,
                         secure_sender.clone(),
                         insecure_sender.clone(),
                     );
@@ -343,17 +355,11 @@ impl AcceptBuilder {
         Ok(socket)
     }
 
-    async fn do_accept_tls(
-        socket: InsecureClientStream,
-        acceptor: Arc<Mutex<SslAcceptor>>,
-        accept_counter: Arc<()>,
-        debug: bool,
-        secure_sender: ClientSender,
-    ) {
+    async fn do_accept_tls(state: AcceptState, flags: AcceptFlags, secure_sender: ClientSender) {
         let ssl = {
             // limit acceptor_guard scope
             // Acceptor can be reloaded using the command socket "reload-certificate" command
-            let acceptor_guard = acceptor.lock().unwrap();
+            let acceptor_guard = state.acceptor.lock().unwrap();
 
             match openssl::ssl::Ssl::new(acceptor_guard.context()) {
                 Ok(ssl) => ssl,
@@ -364,7 +370,7 @@ impl AcceptBuilder {
             }
         };
 
-        let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) {
+        let secure_stream = match tokio_openssl::SslStream::new(ssl, state.socket) {
             Ok(stream) => stream,
             Err(err) => {
                 log::error!("failed to create SslStream using ssl and connection socket - {err}");
@@ -381,41 +387,39 @@ impl AcceptBuilder {
 
         match result {
             Ok(Ok(())) => {
-                if secure_sender.send(Ok(secure_stream)).await.is_err() && debug {
+                if secure_sender.send(Ok(secure_stream)).await.is_err() && flags.is_debug {
                     log::error!("detected closed connection channel");
                 }
             }
             Ok(Err(err)) => {
-                if debug {
+                if flags.is_debug {
                     log::error!("https handshake failed - {err}");
                 }
             }
             Err(_) => {
-                if debug {
+                if flags.is_debug {
                     log::error!("https handshake timeout");
                 }
             }
         }
 
-        drop(accept_counter); // decrease reference count
+        drop(state.accept_counter); // decrease reference count
     }
 
     async fn do_accept_tls_optional(
-        socket: InsecureClientStream,
-        acceptor: Arc<Mutex<SslAcceptor>>,
-        accept_counter: Arc<()>,
-        debug: bool,
+        state: AcceptState,
+        flags: AcceptFlags,
         secure_sender: ClientSender,
         insecure_sender: InsecureClientSender,
     ) {
         let client_initiates_handshake = {
             #[cfg(feature = "rate-limited-stream")]
-            let socket = socket.inner();
+            let socket_ref = state.socket.inner();
 
             #[cfg(not(feature = "rate-limited-stream"))]
-            let socket = &socket;
+            let socket_ref = &state.socket;
 
-            match Self::wait_for_client_tls_handshake(socket).await {
+            match Self::wait_for_client_tls_handshake(socket_ref).await {
                 Ok(initiates_handshake) => initiates_handshake,
                 Err(err) => {
                     log::error!("error checking for TLS handshake: {err}");
@@ -425,16 +429,16 @@ impl AcceptBuilder {
         };
 
         if !client_initiates_handshake {
-            let insecure_stream = Box::pin(socket);
+            let insecure_stream = Box::pin(state.socket);
 
-            if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug {
+            if insecure_sender.send(Ok(insecure_stream)).await.is_err() && flags.is_debug {
                 log::error!("detected closed connection channel")
             }
 
             return;
         }
 
-        Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await
+        Self::do_accept_tls(state, flags, secure_sender).await
     }
 
     async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result<bool, Error> {
-- 
2.39.2





More information about the pbs-devel mailing list