[pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Mar 26 16:23:10 CET 2025


the main change requiring adaptations here is that hyper no longer
provides a Body struct, but switched to a trait, so we use our own Body
type introduced by the previous commit.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 Cargo.toml                        |  8 ++-
 proxmox-http/Cargo.toml           |  8 ++-
 proxmox-http/src/client/simple.rs | 93 ++++++++++++++++++++++---------
 3 files changed, 81 insertions(+), 28 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 268b39eb..202cfd1e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -84,8 +84,11 @@ form_urlencoded = "1.1"
 futures = "0.3"
 handlebars = "3.0"
 hex = "0.4"
-http = "0.2"
-hyper = "0.14.5"
+http = "1"
+http-body = "1"
+http-body-util = "0.1"
+hyper-util = "0.1"
+hyper = "1"
 ldap3 = { version = "0.11", default-features = false }
 lettre = "0.11.1"
 libc = "0.2.107"
@@ -105,6 +108,7 @@ serde_cbor = "0.11.1"
 serde_json = "1.0"
 serde_plain = "1.0"
 syn = { version = "2", features = [ "full", "visit-mut" ] }
+sync_wrapper = "1.0.2"
 tar = "0.4"
 tokio = "1.6"
 tokio-openssl = "0.6.1"
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 1fbc70a8..7668c6ee 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -14,13 +14,17 @@ rust-version.workspace = true
 [dependencies]
 anyhow.workspace = true
 base64 = { workspace = true, optional = true }
+bytes = { workspace = true, optional = true }
 futures = { workspace = true, optional = true }
 http = { workspace = true, optional = true }
+http-body = { workspace = true, optional = true }
+http-body-util = { workspace = true, optional = true }
 hyper = { workspace = true, optional = true }
 hyper-util = { workspace = true, optional = true }
 native-tls = { workspace = true, optional = true }
 openssl =  { version = "0.10", optional = true }
 serde_json = { workspace = true, optional = true }
+sync_wrapper = { workspace = true, optional = true }
 tokio = { workspace = true, features = [], optional = true }
 tokio-openssl = { workspace = true, optional = true }
 ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
@@ -60,8 +64,10 @@ rate-limited-stream = [
     "rate-limiter",
 ]
 client = [
+    "dep:bytes",
     "dep:futures",
-    "dep:hyper",
+    "dep:http-body",
+    "dep:http-body-util",
     "dep:hyper-util",
     "dep:openssl",
     "dep:proxmox-compression",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index 062889ac..b5318f19 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -1,19 +1,24 @@
 use anyhow::{bail, format_err, Error};
 use std::collections::HashMap;
 
+use std::fmt::Display;
+
+#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
+use http::header::HeaderName;
 #[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
 use std::str::FromStr;
 
 use futures::*;
-#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
-use http::header::HeaderName;
+
 use http::{HeaderValue, Request, Response};
-use hyper::client::Client as HyperClient;
-use hyper::client::HttpConnector;
-use hyper::Body;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::client::legacy::Client as HyperClient;
+use hyper_util::rt::TokioExecutor;
 use openssl::ssl::{SslConnector, SslMethod};
 
 use crate::client::HttpsConnector;
+use crate::Body;
 use crate::HttpOptions;
 
 /// Asynchronous HTTP client implementation
@@ -44,7 +49,9 @@ impl Client {
         if let Some(ref proxy_config) = options.proxy_config {
             https.set_proxy(proxy_config.clone());
         }
-        let client = HyperClient::builder().build(https);
+
+        let client =
+            HyperClient::builder(TokioExecutor::new()).build::<HttpsConnector, Body>(https);
         Self { client, options }
     }
 
@@ -74,7 +81,7 @@ impl Client {
 
         request
             .headers_mut()
-            .insert(hyper::header::USER_AGENT, user_agent);
+            .insert(http::header::USER_AGENT, user_agent);
 
         self.add_proxy_headers(&mut request)?;
 
@@ -94,7 +101,7 @@ impl Client {
         let mut request = Request::builder()
             .method("POST")
             .uri(uri)
-            .header(hyper::header::CONTENT_TYPE, content_type);
+            .header(http::header::CONTENT_TYPE, content_type);
 
         if let Some(extra_headers) = extra_headers {
             for (header, value) in extra_headers {
@@ -102,9 +109,8 @@ impl Client {
             }
         }
 
-        let request = request.body(body.unwrap_or_default())?;
-
-        self.request(request).await
+        let body = body.unwrap_or(Body::empty());
+        self.request(request.body(body)?).await
     }
 
     pub async fn get_string(
@@ -145,7 +151,7 @@ impl Client {
             Ok(res) => {
                 let (parts, body) = res.into_parts();
 
-                let buf = hyper::body::to_bytes(body).await?;
+                let buf = body.collect().await?.to_bytes();
                 let new_body = String::from_utf8(buf.to_vec())
                     .map_err(|err| format_err!("Error converting HTTP result data: {}", err))?;
 
@@ -154,6 +160,25 @@ impl Client {
             Err(err) => Err(err),
         }
     }
+
+    pub async fn response_body_bytes(res: Response<Body>) -> Result<Body, Error> {
+        Self::convert_body_to_bytes(Ok(res))
+            .await
+            .map(|res| res.into_body())
+    }
+
+    async fn convert_body_to_bytes(
+        response: Result<Response<Body>, Error>,
+    ) -> Result<Response<Body>, Error> {
+        match response {
+            Ok(res) => {
+                let (parts, body) = res.into_parts();
+                let buf = body.collect().await?.to_bytes();
+                Ok(Response::from_parts(parts, buf.into()))
+            }
+            Err(err) => Err(err),
+        }
+    }
 }
 
 impl Default for Client {
@@ -181,7 +206,9 @@ impl crate::HttpClient<Body, Body> for Client {
             }
         }
 
-        proxmox_async::runtime::block_on(self.request(req))
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.request(req).await).await
+        })
     }
 
     fn post(
@@ -191,11 +218,16 @@ impl crate::HttpClient<Body, Body> for Client {
         content_type: Option<&str>,
         extra_headers: Option<&HashMap<String, String>>,
     ) -> Result<Response<Body>, Error> {
-        proxmox_async::runtime::block_on(self.post(uri, body, content_type, extra_headers))
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.post(uri, body, content_type, extra_headers).await)
+                .await
+        })
     }
 
     fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
-        proxmox_async::runtime::block_on(async move { self.request(request).await })
+        proxmox_async::runtime::block_on(async move {
+            Self::convert_body_to_bytes(self.request(request).await).await
+        })
     }
 }
 
@@ -231,7 +263,7 @@ impl crate::HttpClient<String, String> for Client {
         extra_headers: Option<&HashMap<String, String>>,
     ) -> Result<Response<String>, Error> {
         proxmox_async::runtime::block_on(async move {
-            let body = body.map(|s| Body::from(s.into_bytes()));
+            let body = body.map(|s| s.into());
             Self::convert_body_to_string(self.post(uri, body, content_type, extra_headers).await)
                 .await
         })
@@ -240,25 +272,34 @@ impl crate::HttpClient<String, String> for Client {
     fn request(&self, request: Request<String>) -> Result<Response<String>, Error> {
         proxmox_async::runtime::block_on(async move {
             let (parts, body) = request.into_parts();
-            let body = Body::from(body);
+            let body = body.into();
             let request = Request::from_parts(parts, body);
             Self::convert_body_to_string(self.request(request).await).await
         })
     }
 }
 
-/// Wraps the `Body` stream in a DeflateDecoder stream if the `Content-Encoding`
+/// Wraps the `Response` contents in a DeflateDecoder stream if the `Content-Encoding`
 /// header of the response is `deflate`, otherwise returns the original
 /// response.
-async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
-    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
-        return Ok(res);
+async fn decode_response<B>(mut res: Response<B>) -> Result<Response<Body>, Error>
+where
+    B: http_body::Body<Data = bytes::Bytes> + Send + Unpin + 'static,
+    <B as http_body::Body>::Error: Into<Error> + Display,
+{
+    let Some(content_encoding) = res.headers_mut().remove(&http::header::CONTENT_ENCODING) else {
+        let (parts, body) = res.into_parts();
+        let stream = BodyDataStream::new(body);
+        let body = Body::wrap_stream(stream);
+        return Ok(Response::from_parts(parts, body));
     };
 
     let encodings = content_encoding.to_str()?;
     if encodings == "deflate" {
         let (parts, body) = res.into_parts();
-        let decoder = proxmox_compression::DeflateDecoder::builder(body)
+
+        let stream = BodyDataStream::new(body);
+        let decoder = proxmox_compression::DeflateDecoder::builder(stream)
             .zlib(true)
             .build();
         let decoded_body = Body::wrap_stream(decoder);
@@ -270,6 +311,8 @@ async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Erro
 
 #[cfg(test)]
 mod test {
+    use bytes::Bytes;
+
     use super::*;
 
     use std::io::Write;
@@ -282,10 +325,10 @@ si aliquod aeternum et infinitum impendere."#;
     #[tokio::test]
     async fn test_parse_response_deflate() {
         let encoded = encode_deflate(BODY.as_bytes()).unwrap();
-        let encoded_body = Body::from(encoded);
+        let encoded_body = Body::from(Bytes::from(encoded));
         let encoded_response = Response::builder()
-            .header(hyper::header::CONTENT_ENCODING, "deflate")
-            .body(encoded_body)
+            .header(http::header::CONTENT_ENCODING, "deflate")
+            .body::<Body>(encoded_body)
             .unwrap();
 
         let decoded_response = decode_response(encoded_response).await.unwrap();
-- 
2.39.5





More information about the pbs-devel mailing list