[pbs-devel] [PATCH proxmox 04/17] http: adapt connector to hyper 1.x

Max Carrara m.carrara at proxmox.com
Wed Apr 2 15:31:17 CEST 2025


On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> by switching to tower's Service and wrapping in TokioIo as needed. hyper
> now uses their own Service type to not expose tower in their public API,
> and their own Async IO traits, but they provide wrappers to not require
> too many changes for crates like ours here that already used hyper 0.14.
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
> ---
>  proxmox-http/Cargo.toml              |  9 ++++--
>  proxmox-http/src/client/connector.rs | 44 ++++++++++++++++++----------
>  2 files changed, 36 insertions(+), 17 deletions(-)
>
> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index c5137e2a..4ec142c9 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -25,6 +25,7 @@ tokio = { workspace = true, features = [], optional = true }
>  tokio-openssl = { workspace = true, optional = true }
>  ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
>  url = { workspace = true, optional = true }
> +tower-service = { workspace = true, optional = true }
>  
>  proxmox-async = { workspace = true, optional = true }
>  proxmox-sys = { workspace = true, optional = true }
> @@ -52,15 +53,19 @@ rate-limited-stream = [
>  client = [
>      "dep:futures",
>      "dep:hyper",
> +    "dep:hyper-util",
>      "dep:openssl",
>      "dep:proxmox-compression",
>      "dep:tokio",
>      "dep:tokio-openssl",
> +    "dep:tower-service",
>      "hyper?/client",
>      "hyper?/http1",
>      "hyper?/http2",
> -    "hyper?/stream",
> -    "hyper?/tcp",
> +    "hyper-util?/client",
> +    "hyper-util?/client-legacy",
> +    "hyper-util?/http1",
> +    "hyper-util?/tokio",
>      "tokio?/io-util",
>      "http-helpers",
>      "rate-limited-stream",
> diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
> index 63b9d10c..70421793 100644
> --- a/proxmox-http/src/client/connector.rs
> +++ b/proxmox-http/src/client/connector.rs
> @@ -6,7 +6,8 @@ use std::task::{Context, Poll};
>  
>  use futures::*;
>  use http::Uri;
> -use hyper::client::HttpConnector;
> +use hyper_util::client::legacy::connect::HttpConnector;
> +use hyper_util::rt::TokioIo;
>  use openssl::ssl::SslConnector;
>  use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
>  use tokio::net::TcpStream;
> @@ -122,8 +123,8 @@ impl HttpsConnector {
>      }
>  }
>  
> -impl hyper::service::Service<Uri> for HttpsConnector {
> -    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
> +impl tower_service::Service<Uri> for HttpsConnector {
> +    type Response = TokioIo<MaybeTlsStream<RateLimitedStream<TcpStream>>>;
>      type Error = Error;
>      #[allow(clippy::type_complexity)]
>      type Future =
> @@ -171,9 +172,13 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>              if use_connect {
>                  async move {
>                      use std::fmt::Write as _;
> -                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> -                        format_err!("error connecting to {} - {}", proxy_authority, err)

The call to `format_err!` above can be shortened by inlining the
parameters, i.e.:

    format_err!("error connecting to {proxy_authority} - {err}")

> -                    })?;
> +                    let tcp_stream = connector
> +                        .call(proxy_uri)
> +                        .await
> +                        .map_err(|err| {
> +                            format_err!("error connecting to {} - {}", proxy_authority, err)

Same here.

> +                        })?
> +                        .into_inner();
>  
>                      let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
> @@ -196,24 +201,30 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                      Self::parse_connect_response(&mut tcp_stream).await?;
>  
>                      if is_https {
> -                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> +                        Self::secure_stream(tcp_stream, &ssl_connector, &host)
> +                            .await
> +                            .map(TokioIo::new)
>                      } else {
> -                        Ok(MaybeTlsStream::Normal(tcp_stream))
> +                        Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
>                      }
>                  }
>                  .boxed()
>              } else {
>                  async move {
> -                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> -                        format_err!("error connecting to {} - {}", proxy_authority, err)

As well as here.

> -                    })?;
> +                    let tcp_stream = connector
> +                        .call(proxy_uri)
> +                        .await
> +                        .map_err(|err| {
> +                            format_err!("error connecting to {} - {}", proxy_authority, err)

And here.

> +                        })?
> +                        .into_inner();
>  
>                      let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
>                      let tcp_stream =
>                          RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>  
> -                    Ok(MaybeTlsStream::Proxied(tcp_stream))
> +                    Ok(TokioIo::new(MaybeTlsStream::Proxied(tcp_stream)))
>                  }
>                  .boxed()
>              }
> @@ -223,7 +234,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                  let tcp_stream = connector
>                      .call(dst)
>                      .await
> -                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
> +                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?

And here too.

> +                    .into_inner();
>  
>                  let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>  
> @@ -231,9 +243,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
>                      RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>  
>                  if is_https {
> -                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> +                    Self::secure_stream(tcp_stream, &ssl_connector, &host)
> +                        .await
> +                        .map(TokioIo::new)
>                  } else {
> -                    Ok(MaybeTlsStream::Normal(tcp_stream))
> +                    Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
>                  }
>              }
>              .boxed()





More information about the pbs-devel mailing list