[pbs-devel] [RFC proxmox-backup 1/5] http: rename EitherStream to MaybeTlsStream
Dietmar Maurer
dietmar at proxmox.com
Wed Apr 21 13:16:58 CEST 2021
And rename the enum values. Added an additional enum called Proxied.
The enum in now more specialized, but we only use it for the http client anyways.
---
src/tools/async_io.rs | 64 +++++++++++++++++++++++++++----------------
src/tools/http.rs | 20 ++++++--------
2 files changed, 48 insertions(+), 36 deletions(-)
diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs
index 844afaa9..963f6fdd 100644
--- a/src/tools/async_io.rs
+++ b/src/tools/async_io.rs
@@ -1,4 +1,4 @@
-//! Generic AsyncRead/AsyncWrite utilities.
+//! AsyncRead/AsyncWrite utilities.
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
@@ -9,41 +9,52 @@ use futures::stream::{Stream, TryStream};
use futures::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpListener;
-use hyper::client::connect::Connection;
-
-pub enum EitherStream<L, R> {
- Left(L),
- Right(R),
+use tokio_openssl::SslStream;
+use hyper::client::connect::{Connection, Connected};
+
+/// Asynchronous stream, possibly encrypted and proxied
+///
+/// Usefule for HTTP client implementations using hyper.
+pub enum MaybeTlsStream<S> {
+ Normal(S),
+ Proxied(S),
+ Secured(SslStream<S>),
}
-impl<L: AsyncRead + Unpin, R: AsyncRead + Unpin> AsyncRead for EitherStream<L, R> {
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
- EitherStream::Left(ref mut s) => {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_read(cx, buf)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
- EitherStream::Right(ref mut s) => {
+ MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_read(cx, buf)
}
}
}
}
-impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L, R> {
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.get_mut() {
- EitherStream::Left(ref mut s) => {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_write(cx, buf)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
- EitherStream::Right(ref mut s) => {
+ MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_write(cx, buf)
}
}
@@ -51,10 +62,13 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
- EitherStream::Left(ref mut s) => {
+ MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
- EitherStream::Right(ref mut s) => {
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_flush(cx)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_flush(cx)
}
}
@@ -62,25 +76,27 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
match self.get_mut() {
- EitherStream::Left(ref mut s) => {
+ MaybeTlsStream::Normal(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
- EitherStream::Right(ref mut s) => {
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_shutdown(cx)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
Pin::new(s).poll_shutdown(cx)
}
}
}
}
-// we need this for crate::client::http_client:
-impl Connection for EitherStream<
- tokio::net::TcpStream,
- Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
-> {
- fn connected(&self) -> hyper::client::connect::Connected {
+// we need this for the hyper http client
+impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
+{
+ fn connected(&self) -> Connected {
match self {
- EitherStream::Left(s) => s.connected(),
- EitherStream::Right(s) => s.get_ref().connected(),
+ MaybeTlsStream::Normal(s) => s.connected(),
+ MaybeTlsStream::Proxied(s) => s.connected().proxy(true),
+ MaybeTlsStream::Secured(s) => s.get_ref().connected(),
}
}
}
diff --git a/src/tools/http.rs b/src/tools/http.rs
index d08ce451..3cd3af4e 100644
--- a/src/tools/http.rs
+++ b/src/tools/http.rs
@@ -10,9 +10,11 @@ use hyper::client::{Client, HttpConnector};
use http::{Request, Response};
use openssl::ssl::{SslConnector, SslMethod};
use futures::*;
+use tokio::net::TcpStream;
+use tokio_openssl::SslStream;
use crate::tools::{
- async_io::EitherStream,
+ async_io::MaybeTlsStream,
socket::{
set_tcp_keepalive,
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
@@ -100,13 +102,8 @@ impl HttpsConnector {
}
}
-type MaybeTlsStream = EitherStream<
- tokio::net::TcpStream,
- Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
->;
-
impl hyper::service::Service<Uri> for HttpsConnector {
- type Response = MaybeTlsStream;
+ type Response = MaybeTlsStream<TcpStream>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@@ -140,12 +137,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
if is_https {
- let conn: tokio_openssl::SslStream<tokio::net::TcpStream> = tokio_openssl::SslStream::new(config?.into_ssl(&host)?, conn)?;
- let mut conn = Box::pin(conn);
- conn.as_mut().connect().await?;
- Ok(MaybeTlsStream::Right(conn))
+ let mut conn: SslStream<TcpStream> = SslStream::new(config?.into_ssl(&host)?, conn)?;
+ Pin::new(&mut conn).connect().await?;
+ Ok(MaybeTlsStream::Secured(conn))
} else {
- Ok(MaybeTlsStream::Left(conn))
+ Ok(MaybeTlsStream::Normal(conn))
}
}.boxed()
}
--
2.20.1
More information about the pbs-devel
mailing list