[pbs-devel] [PATCH proxmox 04/17] http: adapt connector to hyper 1.x

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Mar 26 16:23:08 CET 2025


by switching to tower's Service and wrapping in TokioIo as needed. hyper
now uses their own Service type to not expose tower in their public API,
and their own Async IO traits, but they provide wrappers to not require
too many changes for crates like ours here that already used hyper 0.14.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 proxmox-http/Cargo.toml              |  9 ++++--
 proxmox-http/src/client/connector.rs | 44 ++++++++++++++++++----------
 2 files changed, 36 insertions(+), 17 deletions(-)

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index c5137e2a..4ec142c9 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -25,6 +25,7 @@ tokio = { workspace = true, features = [], optional = true }
 tokio-openssl = { workspace = true, optional = true }
 ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
 url = { workspace = true, optional = true }
+tower-service = { workspace = true, optional = true }
 
 proxmox-async = { workspace = true, optional = true }
 proxmox-sys = { workspace = true, optional = true }
@@ -52,15 +53,19 @@ rate-limited-stream = [
 client = [
     "dep:futures",
     "dep:hyper",
+    "dep:hyper-util",
     "dep:openssl",
     "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
+    "dep:tower-service",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
-    "hyper?/stream",
-    "hyper?/tcp",
+    "hyper-util?/client",
+    "hyper-util?/client-legacy",
+    "hyper-util?/http1",
+    "hyper-util?/tokio",
     "tokio?/io-util",
     "http-helpers",
     "rate-limited-stream",
diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index 63b9d10c..70421793 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -6,7 +6,8 @@ use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
-use hyper::client::HttpConnector;
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::rt::TokioIo;
 use openssl::ssl::SslConnector;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
@@ -122,8 +123,8 @@ impl HttpsConnector {
     }
 }
 
-impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
+impl tower_service::Service<Uri> for HttpsConnector {
+    type Response = TokioIo<MaybeTlsStream<RateLimitedStream<TcpStream>>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -171,9 +172,13 @@ impl hyper::service::Service<Uri> for HttpsConnector {
             if use_connect {
                 async move {
                     use std::fmt::Write as _;
-                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
-                        format_err!("error connecting to {} - {}", proxy_authority, err)
-                    })?;
+                    let tcp_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| {
+                            format_err!("error connecting to {} - {}", proxy_authority, err)
+                        })?
+                        .into_inner();
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
@@ -196,24 +201,30 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                     Self::parse_connect_response(&mut tcp_stream).await?;
 
                     if is_https {
-                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                        Self::secure_stream(tcp_stream, &ssl_connector, &host)
+                            .await
+                            .map(TokioIo::new)
                     } else {
-                        Ok(MaybeTlsStream::Normal(tcp_stream))
+                        Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
                     }
                 }
                 .boxed()
             } else {
                 async move {
-                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
-                        format_err!("error connecting to {} - {}", proxy_authority, err)
-                    })?;
+                    let tcp_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| {
+                            format_err!("error connecting to {} - {}", proxy_authority, err)
+                        })?
+                        .into_inner();
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
                     let tcp_stream =
                         RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
 
-                    Ok(MaybeTlsStream::Proxied(tcp_stream))
+                    Ok(TokioIo::new(MaybeTlsStream::Proxied(tcp_stream)))
                 }
                 .boxed()
             }
@@ -223,7 +234,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                 let tcp_stream = connector
                     .call(dst)
                     .await
-                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?
+                    .into_inner();
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
@@ -231,9 +243,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                     RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
 
                 if is_https {
-                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                    Self::secure_stream(tcp_stream, &ssl_connector, &host)
+                        .await
+                        .map(TokioIo::new)
                 } else {
-                    Ok(MaybeTlsStream::Normal(tcp_stream))
+                    Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
                 }
             }
             .boxed()
-- 
2.39.5





More information about the pbs-devel mailing list