[pbs-devel] [RFC proxmox-backup 1/5] http: rename EitherStream to MaybeTlsStream

Dietmar Maurer dietmar at proxmox.com
Wed Apr 21 13:16:58 CEST 2021


And rename the enum values. Added an additional enum called Proxied.

The enum in now more specialized, but we only use it for the http client anyways.
---
 src/tools/async_io.rs | 64 +++++++++++++++++++++++++++----------------
 src/tools/http.rs     | 20 ++++++--------
 2 files changed, 48 insertions(+), 36 deletions(-)

diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs
index 844afaa9..963f6fdd 100644
--- a/src/tools/async_io.rs
+++ b/src/tools/async_io.rs
@@ -1,4 +1,4 @@
-//! Generic AsyncRead/AsyncWrite utilities.
+//! AsyncRead/AsyncWrite utilities.
 
 use std::io;
 use std::os::unix::io::{AsRawFd, RawFd};
@@ -9,41 +9,52 @@ use futures::stream::{Stream, TryStream};
 use futures::ready;
 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tokio::net::TcpListener;
-use hyper::client::connect::Connection;
-
-pub enum EitherStream<L, R> {
-    Left(L),
-    Right(R),
+use tokio_openssl::SslStream;
+use hyper::client::connect::{Connection, Connected};
+
+/// Asynchronous stream, possibly encrypted and proxied
+///
+/// Usefule for HTTP client implementations using hyper.
+pub enum MaybeTlsStream<S> {
+    Normal(S),
+    Proxied(S),
+    Secured(SslStream<S>),
 }
 
-impl<L: AsyncRead + Unpin, R: AsyncRead + Unpin> AsyncRead for EitherStream<L, R> {
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context,
         buf: &mut ReadBuf,
     ) -> Poll<Result<(), io::Error>> {
         match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
+            MaybeTlsStream::Normal(ref mut s) => {
+                Pin::new(s).poll_read(cx, buf)
+            }
+            MaybeTlsStream::Proxied(ref mut s) => {
                 Pin::new(s).poll_read(cx, buf)
             }
-            EitherStream::Right(ref mut s) => {
+            MaybeTlsStream::Secured(ref mut s) => {
                 Pin::new(s).poll_read(cx, buf)
             }
         }
     }
 }
 
-impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L, R> {
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
     fn poll_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
         buf: &[u8],
     ) -> Poll<Result<usize, io::Error>> {
         match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
+            MaybeTlsStream::Normal(ref mut s) => {
+                Pin::new(s).poll_write(cx, buf)
+            }
+            MaybeTlsStream::Proxied(ref mut s) => {
                 Pin::new(s).poll_write(cx, buf)
             }
-            EitherStream::Right(ref mut s) => {
+            MaybeTlsStream::Secured(ref mut s) => {
                 Pin::new(s).poll_write(cx, buf)
             }
         }
@@ -51,10 +62,13 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
 
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
         match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
+            MaybeTlsStream::Normal(ref mut s) => {
                 Pin::new(s).poll_flush(cx)
             }
-            EitherStream::Right(ref mut s) => {
+            MaybeTlsStream::Proxied(ref mut s) => {
+                Pin::new(s).poll_flush(cx)
+            }
+            MaybeTlsStream::Secured(ref mut s) => {
                 Pin::new(s).poll_flush(cx)
             }
         }
@@ -62,25 +76,27 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
 
     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
         match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
+            MaybeTlsStream::Normal(ref mut s) => {
                 Pin::new(s).poll_shutdown(cx)
             }
-            EitherStream::Right(ref mut s) => {
+            MaybeTlsStream::Proxied(ref mut s) => {
+                Pin::new(s).poll_shutdown(cx)
+            }
+            MaybeTlsStream::Secured(ref mut s) => {
                 Pin::new(s).poll_shutdown(cx)
             }
         }
     }
 }
 
-// we need this for crate::client::http_client:
-impl Connection for EitherStream<
-    tokio::net::TcpStream,
-    Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
-> {
-    fn connected(&self) -> hyper::client::connect::Connected {
+// we need this for the hyper http client
+impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
+{
+    fn connected(&self) -> Connected {
         match self {
-            EitherStream::Left(s) => s.connected(),
-            EitherStream::Right(s) => s.get_ref().connected(),
+            MaybeTlsStream::Normal(s) => s.connected(),
+            MaybeTlsStream::Proxied(s) => s.connected().proxy(true),
+            MaybeTlsStream::Secured(s) => s.get_ref().connected(),
         }
     }
 }
diff --git a/src/tools/http.rs b/src/tools/http.rs
index d08ce451..3cd3af4e 100644
--- a/src/tools/http.rs
+++ b/src/tools/http.rs
@@ -10,9 +10,11 @@ use hyper::client::{Client, HttpConnector};
 use http::{Request, Response};
 use openssl::ssl::{SslConnector, SslMethod};
 use futures::*;
+use tokio::net::TcpStream;
+use tokio_openssl::SslStream;
 
 use crate::tools::{
-    async_io::EitherStream,
+    async_io::MaybeTlsStream,
     socket::{
         set_tcp_keepalive,
         PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
@@ -100,13 +102,8 @@ impl HttpsConnector {
     }
 }
 
-type MaybeTlsStream = EitherStream<
-    tokio::net::TcpStream,
-    Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
->;
-
 impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream;
+    type Response = MaybeTlsStream<TcpStream>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@@ -140,12 +137,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
             let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
 
             if is_https {
-                let conn: tokio_openssl::SslStream<tokio::net::TcpStream> = tokio_openssl::SslStream::new(config?.into_ssl(&host)?, conn)?;
-                let mut conn = Box::pin(conn);
-                conn.as_mut().connect().await?;
-                Ok(MaybeTlsStream::Right(conn))
+                let mut conn: SslStream<TcpStream> = SslStream::new(config?.into_ssl(&host)?, conn)?;
+                Pin::new(&mut conn).connect().await?;
+                Ok(MaybeTlsStream::Secured(conn))
             } else {
-                Ok(MaybeTlsStream::Left(conn))
+                Ok(MaybeTlsStream::Normal(conn))
             }
         }.boxed()
     }
-- 
2.20.1





More information about the pbs-devel mailing list