[pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt to hyper/http 1.0
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Mar 26 16:23:24 CET 2025
similar to the http one:
- Body to Incoming for incoming requests
- Body to proxmox-http's Body for everything else
- use legacy client
- use wrappers for hyper<->tower and hyper<->tokio
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
pbs-client/src/vsock_client.rs | 27 +++++++++++++++------------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/pbs-client/src/vsock_client.rs b/pbs-client/src/vsock_client.rs
index 5c18c6f3b..578433b79 100644
--- a/pbs-client/src/vsock_client.rs
+++ b/pbs-client/src/vsock_client.rs
@@ -3,17 +3,20 @@ use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::*;
-use hyper::client::connect::{Connected, Connection};
-use hyper::client::Client;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
use hyper::http::Uri;
use hyper::http::{Request, Response};
-use hyper::{body::HttpBody, Body};
+use hyper_util::client::legacy::connect::{Connected, Connection};
+use hyper_util::client::legacy::Client;
+use hyper_util::rt::{TokioExecutor, TokioIo};
use pin_project_lite::pin_project;
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::UnixStream;
use proxmox_http::uri::json_object_to_query;
+use proxmox_http::Body;
use proxmox_router::HttpError;
pub const DEFAULT_VSOCK_PORT: u16 = 807;
@@ -30,9 +33,9 @@ pin_project! {
}
impl tower_service::Service<Uri> for VsockConnector {
- type Response = UnixConnection;
+ type Response = TokioIo<UnixConnection>;
type Error = Error;
- type Future = Pin<Box<dyn Future<Output = Result<UnixConnection, Error>> + Send>>;
+ type Future = Pin<Box<dyn Future<Output = Result<TokioIo<UnixConnection>, Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@@ -82,7 +85,7 @@ impl tower_service::Service<Uri> for VsockConnector {
let stream = tokio::net::UnixStream::from_std(std_stream)?;
let connection = UnixConnection { stream };
- Ok(connection)
+ Ok(TokioIo::new(connection))
})
// unravel the thread JoinHandle to a usable future
.map(|res| match res {
@@ -133,7 +136,7 @@ impl AsyncWrite for UnixConnection {
/// Slimmed down version of HttpClient for virtio-vsock connections (file restore daemon)
pub struct VsockClient {
- client: Client<VsockConnector>,
+ client: Client<VsockConnector, Body>,
cid: i32,
port: u16,
auth: Option<String>,
@@ -142,7 +145,7 @@ pub struct VsockClient {
impl VsockClient {
pub fn new(cid: i32, port: u16, auth: Option<String>) -> Self {
let conn = VsockConnector {};
- let client = Client::builder().build::<_, Body>(conn);
+ let client = Client::builder(TokioExecutor::new()).build::<_, Body>(conn);
Self {
client,
cid,
@@ -179,7 +182,7 @@ impl VsockClient {
if !status.is_success() {
Self::api_response(resp).await.map(|_| ())?
} else {
- futures::TryStreamExt::map_err(resp.into_body(), Error::from)
+ futures::TryStreamExt::map_err(BodyDataStream::new(resp.into_body()), Error::from)
.try_fold(output, move |acc, chunk| async move {
acc.write_all(&chunk).await?;
Ok::<_, Error>(acc)
@@ -189,9 +192,9 @@ impl VsockClient {
Ok(())
}
- async fn api_response(response: Response<Body>) -> Result<Value, Error> {
+ async fn api_response(response: Response<Incoming>) -> Result<Value, Error> {
let status = response.status();
- let data = HttpBody::collect(response.into_body()).await?.to_bytes();
+ let data = response.into_body().collect().await?.to_bytes();
let text = String::from_utf8(data.to_vec()).unwrap();
if status.is_success() {
@@ -237,7 +240,7 @@ impl VsockClient {
if let Some(data) = data {
if method == "POST" {
let builder = make_builder("application/json", &url);
- let request = builder.body(Body::from(data.to_string()))?;
+ let request = builder.body(data.to_string().into())?;
return Ok(request);
} else {
let query = json_object_to_query(data)?;
--
2.39.5
More information about the pbs-devel
mailing list