[pbs-devel] [PATCH proxmox 04/17] http: adapt connector to hyper 1.x
Max Carrara
m.carrara at proxmox.com
Wed Apr 2 15:31:17 CEST 2025
On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> by switching to tower's Service and wrapping in TokioIo as needed. hyper
> now uses their own Service type to not expose tower in their public API,
> and their own Async IO traits, but they provide wrappers to not require
> too many changes for crates like ours here that already used hyper 0.14.
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
> ---
> proxmox-http/Cargo.toml | 9 ++++--
> proxmox-http/src/client/connector.rs | 44 ++++++++++++++++++----------
> 2 files changed, 36 insertions(+), 17 deletions(-)
>
> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index c5137e2a..4ec142c9 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -25,6 +25,7 @@ tokio = { workspace = true, features = [], optional = true }
> tokio-openssl = { workspace = true, optional = true }
> ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
> url = { workspace = true, optional = true }
> +tower-service = { workspace = true, optional = true }
>
> proxmox-async = { workspace = true, optional = true }
> proxmox-sys = { workspace = true, optional = true }
> @@ -52,15 +53,19 @@ rate-limited-stream = [
> client = [
> "dep:futures",
> "dep:hyper",
> + "dep:hyper-util",
> "dep:openssl",
> "dep:proxmox-compression",
> "dep:tokio",
> "dep:tokio-openssl",
> + "dep:tower-service",
> "hyper?/client",
> "hyper?/http1",
> "hyper?/http2",
> - "hyper?/stream",
> - "hyper?/tcp",
> + "hyper-util?/client",
> + "hyper-util?/client-legacy",
> + "hyper-util?/http1",
> + "hyper-util?/tokio",
> "tokio?/io-util",
> "http-helpers",
> "rate-limited-stream",
> diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
> index 63b9d10c..70421793 100644
> --- a/proxmox-http/src/client/connector.rs
> +++ b/proxmox-http/src/client/connector.rs
> @@ -6,7 +6,8 @@ use std::task::{Context, Poll};
>
> use futures::*;
> use http::Uri;
> -use hyper::client::HttpConnector;
> +use hyper_util::client::legacy::connect::HttpConnector;
> +use hyper_util::rt::TokioIo;
> use openssl::ssl::SslConnector;
> use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
> use tokio::net::TcpStream;
> @@ -122,8 +123,8 @@ impl HttpsConnector {
> }
> }
>
> -impl hyper::service::Service<Uri> for HttpsConnector {
> - type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
> +impl tower_service::Service<Uri> for HttpsConnector {
> + type Response = TokioIo<MaybeTlsStream<RateLimitedStream<TcpStream>>>;
> type Error = Error;
> #[allow(clippy::type_complexity)]
> type Future =
> @@ -171,9 +172,13 @@ impl hyper::service::Service<Uri> for HttpsConnector {
> if use_connect {
> async move {
> use std::fmt::Write as _;
> - let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> - format_err!("error connecting to {} - {}", proxy_authority, err)
The call to `format_err!` above can be shortened by inlining the
parameters, i.e.:
format_err!("error connecting to {proxy_authority} - {err}")
> - })?;
> + let tcp_stream = connector
> + .call(proxy_uri)
> + .await
> + .map_err(|err| {
> + format_err!("error connecting to {} - {}", proxy_authority, err)
Same here.
> + })?
> + .into_inner();
>
> let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>
> @@ -196,24 +201,30 @@ impl hyper::service::Service<Uri> for HttpsConnector {
> Self::parse_connect_response(&mut tcp_stream).await?;
>
> if is_https {
> - Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> + Self::secure_stream(tcp_stream, &ssl_connector, &host)
> + .await
> + .map(TokioIo::new)
> } else {
> - Ok(MaybeTlsStream::Normal(tcp_stream))
> + Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
> }
> }
> .boxed()
> } else {
> async move {
> - let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
> - format_err!("error connecting to {} - {}", proxy_authority, err)
As well as here.
> - })?;
> + let tcp_stream = connector
> + .call(proxy_uri)
> + .await
> + .map_err(|err| {
> + format_err!("error connecting to {} - {}", proxy_authority, err)
And here.
> + })?
> + .into_inner();
>
> let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>
> let tcp_stream =
> RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>
> - Ok(MaybeTlsStream::Proxied(tcp_stream))
> + Ok(TokioIo::new(MaybeTlsStream::Proxied(tcp_stream)))
> }
> .boxed()
> }
> @@ -223,7 +234,8 @@ impl hyper::service::Service<Uri> for HttpsConnector {
> let tcp_stream = connector
> .call(dst)
> .await
> - .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
> + .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?
And here too.
> + .into_inner();
>
> let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
>
> @@ -231,9 +243,11 @@ impl hyper::service::Service<Uri> for HttpsConnector {
> RateLimitedStream::with_limiter(tcp_stream, read_limiter, write_limiter);
>
> if is_https {
> - Self::secure_stream(tcp_stream, &ssl_connector, &host).await
> + Self::secure_stream(tcp_stream, &ssl_connector, &host)
> + .await
> + .map(TokioIo::new)
> } else {
> - Ok(MaybeTlsStream::Normal(tcp_stream))
> + Ok(TokioIo::new(MaybeTlsStream::Normal(tcp_stream)))
> }
> }
> .boxed()
More information about the pbs-devel
mailing list