[pbs-devel] [PATCH proxmox 06/17] http: adapt simple client to hyper 1.x
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Mar 26 16:23:10 CET 2025
the main change requiring adaptations here is that hyper no longer
provides a Body struct, but switched to a trait, so we use our own Body
type introduced by the previous commit.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
Cargo.toml | 8 ++-
proxmox-http/Cargo.toml | 8 ++-
proxmox-http/src/client/simple.rs | 93 ++++++++++++++++++++++---------
3 files changed, 81 insertions(+), 28 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 268b39eb..202cfd1e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -84,8 +84,11 @@ form_urlencoded = "1.1"
futures = "0.3"
handlebars = "3.0"
hex = "0.4"
-http = "0.2"
-hyper = "0.14.5"
+http = "1"
+http-body = "1"
+http-body-util = "0.1"
+hyper-util = "0.1"
+hyper = "1"
ldap3 = { version = "0.11", default-features = false }
lettre = "0.11.1"
libc = "0.2.107"
@@ -105,6 +108,7 @@ serde_cbor = "0.11.1"
serde_json = "1.0"
serde_plain = "1.0"
syn = { version = "2", features = [ "full", "visit-mut" ] }
+sync_wrapper = "1.0.2"
tar = "0.4"
tokio = "1.6"
tokio-openssl = "0.6.1"
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 1fbc70a8..7668c6ee 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -14,13 +14,17 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
base64 = { workspace = true, optional = true }
+bytes = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
http = { workspace = true, optional = true }
+http-body = { workspace = true, optional = true }
+http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
hyper-util = { workspace = true, optional = true }
native-tls = { workspace = true, optional = true }
openssl = { version = "0.10", optional = true }
serde_json = { workspace = true, optional = true }
+sync_wrapper = { workspace = true, optional = true }
tokio = { workspace = true, features = [], optional = true }
tokio-openssl = { workspace = true, optional = true }
ureq = { version = "2.4", features = ["native-certs", "native-tls"], optional = true, default-features = false }
@@ -60,8 +64,10 @@ rate-limited-stream = [
"rate-limiter",
]
client = [
+ "dep:bytes",
"dep:futures",
- "dep:hyper",
+ "dep:http-body",
+ "dep:http-body-util",
"dep:hyper-util",
"dep:openssl",
"dep:proxmox-compression",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index 062889ac..b5318f19 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -1,19 +1,24 @@
use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
+use std::fmt::Display;
+
+#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
+use http::header::HeaderName;
#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
use std::str::FromStr;
use futures::*;
-#[cfg(all(feature = "client-trait", feature = "proxmox-async"))]
-use http::header::HeaderName;
+
use http::{HeaderValue, Request, Response};
-use hyper::client::Client as HyperClient;
-use hyper::client::HttpConnector;
-use hyper::Body;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper_util::client::legacy::connect::HttpConnector;
+use hyper_util::client::legacy::Client as HyperClient;
+use hyper_util::rt::TokioExecutor;
use openssl::ssl::{SslConnector, SslMethod};
use crate::client::HttpsConnector;
+use crate::Body;
use crate::HttpOptions;
/// Asynchronous HTTP client implementation
@@ -44,7 +49,9 @@ impl Client {
if let Some(ref proxy_config) = options.proxy_config {
https.set_proxy(proxy_config.clone());
}
- let client = HyperClient::builder().build(https);
+
+ let client =
+ HyperClient::builder(TokioExecutor::new()).build::<HttpsConnector, Body>(https);
Self { client, options }
}
@@ -74,7 +81,7 @@ impl Client {
request
.headers_mut()
- .insert(hyper::header::USER_AGENT, user_agent);
+ .insert(http::header::USER_AGENT, user_agent);
self.add_proxy_headers(&mut request)?;
@@ -94,7 +101,7 @@ impl Client {
let mut request = Request::builder()
.method("POST")
.uri(uri)
- .header(hyper::header::CONTENT_TYPE, content_type);
+ .header(http::header::CONTENT_TYPE, content_type);
if let Some(extra_headers) = extra_headers {
for (header, value) in extra_headers {
@@ -102,9 +109,8 @@ impl Client {
}
}
- let request = request.body(body.unwrap_or_default())?;
-
- self.request(request).await
+ let body = body.unwrap_or(Body::empty());
+ self.request(request.body(body)?).await
}
pub async fn get_string(
@@ -145,7 +151,7 @@ impl Client {
Ok(res) => {
let (parts, body) = res.into_parts();
- let buf = hyper::body::to_bytes(body).await?;
+ let buf = body.collect().await?.to_bytes();
let new_body = String::from_utf8(buf.to_vec())
.map_err(|err| format_err!("Error converting HTTP result data: {}", err))?;
@@ -154,6 +160,25 @@ impl Client {
Err(err) => Err(err),
}
}
+
+ pub async fn response_body_bytes(res: Response<Body>) -> Result<Body, Error> {
+ Self::convert_body_to_bytes(Ok(res))
+ .await
+ .map(|res| res.into_body())
+ }
+
+ async fn convert_body_to_bytes(
+ response: Result<Response<Body>, Error>,
+ ) -> Result<Response<Body>, Error> {
+ match response {
+ Ok(res) => {
+ let (parts, body) = res.into_parts();
+ let buf = body.collect().await?.to_bytes();
+ Ok(Response::from_parts(parts, buf.into()))
+ }
+ Err(err) => Err(err),
+ }
+ }
}
impl Default for Client {
@@ -181,7 +206,9 @@ impl crate::HttpClient<Body, Body> for Client {
}
}
- proxmox_async::runtime::block_on(self.request(req))
+ proxmox_async::runtime::block_on(async move {
+ Self::convert_body_to_bytes(self.request(req).await).await
+ })
}
fn post(
@@ -191,11 +218,16 @@ impl crate::HttpClient<Body, Body> for Client {
content_type: Option<&str>,
extra_headers: Option<&HashMap<String, String>>,
) -> Result<Response<Body>, Error> {
- proxmox_async::runtime::block_on(self.post(uri, body, content_type, extra_headers))
+ proxmox_async::runtime::block_on(async move {
+ Self::convert_body_to_bytes(self.post(uri, body, content_type, extra_headers).await)
+ .await
+ })
}
fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
- proxmox_async::runtime::block_on(async move { self.request(request).await })
+ proxmox_async::runtime::block_on(async move {
+ Self::convert_body_to_bytes(self.request(request).await).await
+ })
}
}
@@ -231,7 +263,7 @@ impl crate::HttpClient<String, String> for Client {
extra_headers: Option<&HashMap<String, String>>,
) -> Result<Response<String>, Error> {
proxmox_async::runtime::block_on(async move {
- let body = body.map(|s| Body::from(s.into_bytes()));
+ let body = body.map(|s| s.into());
Self::convert_body_to_string(self.post(uri, body, content_type, extra_headers).await)
.await
})
@@ -240,25 +272,34 @@ impl crate::HttpClient<String, String> for Client {
fn request(&self, request: Request<String>) -> Result<Response<String>, Error> {
proxmox_async::runtime::block_on(async move {
let (parts, body) = request.into_parts();
- let body = Body::from(body);
+ let body = body.into();
let request = Request::from_parts(parts, body);
Self::convert_body_to_string(self.request(request).await).await
})
}
}
-/// Wraps the `Body` stream in a DeflateDecoder stream if the `Content-Encoding`
+/// Wraps the `Response` contents in a DeflateDecoder stream if the `Content-Encoding`
/// header of the response is `deflate`, otherwise returns the original
/// response.
-async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
- let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
- return Ok(res);
+async fn decode_response<B>(mut res: Response<B>) -> Result<Response<Body>, Error>
+where
+ B: http_body::Body<Data = bytes::Bytes> + Send + Unpin + 'static,
+ <B as http_body::Body>::Error: Into<Error> + Display,
+{
+ let Some(content_encoding) = res.headers_mut().remove(&http::header::CONTENT_ENCODING) else {
+ let (parts, body) = res.into_parts();
+ let stream = BodyDataStream::new(body);
+ let body = Body::wrap_stream(stream);
+ return Ok(Response::from_parts(parts, body));
};
let encodings = content_encoding.to_str()?;
if encodings == "deflate" {
let (parts, body) = res.into_parts();
- let decoder = proxmox_compression::DeflateDecoder::builder(body)
+
+ let stream = BodyDataStream::new(body);
+ let decoder = proxmox_compression::DeflateDecoder::builder(stream)
.zlib(true)
.build();
let decoded_body = Body::wrap_stream(decoder);
@@ -270,6 +311,8 @@ async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Erro
#[cfg(test)]
mod test {
+ use bytes::Bytes;
+
use super::*;
use std::io::Write;
@@ -282,10 +325,10 @@ si aliquod aeternum et infinitum impendere."#;
#[tokio::test]
async fn test_parse_response_deflate() {
let encoded = encode_deflate(BODY.as_bytes()).unwrap();
- let encoded_body = Body::from(encoded);
+ let encoded_body = Body::from(Bytes::from(encoded));
let encoded_response = Response::builder()
- .header(hyper::header::CONTENT_ENCODING, "deflate")
- .body(encoded_body)
+ .header(http::header::CONTENT_ENCODING, "deflate")
+ .body::<Body>(encoded_body)
.unwrap();
let decoded_response = decode_response(encoded_response).await.unwrap();
--
2.39.5
More information about the pbs-devel
mailing list