[pbs-devel] [PATCH proxmox-backup 3/6] pbs-client: vsock: adapt to hyper/http 1.0

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Mar 26 16:23:24 CET 2025


similar to the http one:
- Body to Incoming for incoming requests
- Body to proxmox-http's Body for everything else
- use legacy client
- use wrappers for hyper<->tower and hyper<->tokio

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 pbs-client/src/vsock_client.rs | 27 +++++++++++++++------------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/pbs-client/src/vsock_client.rs b/pbs-client/src/vsock_client.rs
index 5c18c6f3b..578433b79 100644
--- a/pbs-client/src/vsock_client.rs
+++ b/pbs-client/src/vsock_client.rs
@@ -3,17 +3,20 @@ use std::task::{Context, Poll};
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
-use hyper::client::connect::{Connected, Connection};
-use hyper::client::Client;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
 use hyper::http::Uri;
 use hyper::http::{Request, Response};
-use hyper::{body::HttpBody, Body};
+use hyper_util::client::legacy::connect::{Connected, Connection};
+use hyper_util::client::legacy::Client;
+use hyper_util::rt::{TokioExecutor, TokioIo};
 use pin_project_lite::pin_project;
 use serde_json::Value;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
 use tokio::net::UnixStream;
 
 use proxmox_http::uri::json_object_to_query;
+use proxmox_http::Body;
 use proxmox_router::HttpError;
 
 pub const DEFAULT_VSOCK_PORT: u16 = 807;
@@ -30,9 +33,9 @@ pin_project! {
 }
 
 impl tower_service::Service<Uri> for VsockConnector {
-    type Response = UnixConnection;
+    type Response = TokioIo<UnixConnection>;
     type Error = Error;
-    type Future = Pin<Box<dyn Future<Output = Result<UnixConnection, Error>> + Send>>;
+    type Future = Pin<Box<dyn Future<Output = Result<TokioIo<UnixConnection>, Error>> + Send>>;
 
     fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
@@ -82,7 +85,7 @@ impl tower_service::Service<Uri> for VsockConnector {
             let stream = tokio::net::UnixStream::from_std(std_stream)?;
             let connection = UnixConnection { stream };
 
-            Ok(connection)
+            Ok(TokioIo::new(connection))
         })
         // unravel the thread JoinHandle to a usable future
         .map(|res| match res {
@@ -133,7 +136,7 @@ impl AsyncWrite for UnixConnection {
 
 /// Slimmed down version of HttpClient for virtio-vsock connections (file restore daemon)
 pub struct VsockClient {
-    client: Client<VsockConnector>,
+    client: Client<VsockConnector, Body>,
     cid: i32,
     port: u16,
     auth: Option<String>,
@@ -142,7 +145,7 @@ pub struct VsockClient {
 impl VsockClient {
     pub fn new(cid: i32, port: u16, auth: Option<String>) -> Self {
         let conn = VsockConnector {};
-        let client = Client::builder().build::<_, Body>(conn);
+        let client = Client::builder(TokioExecutor::new()).build::<_, Body>(conn);
         Self {
             client,
             cid,
@@ -179,7 +182,7 @@ impl VsockClient {
         if !status.is_success() {
             Self::api_response(resp).await.map(|_| ())?
         } else {
-            futures::TryStreamExt::map_err(resp.into_body(), Error::from)
+            futures::TryStreamExt::map_err(BodyDataStream::new(resp.into_body()), Error::from)
                 .try_fold(output, move |acc, chunk| async move {
                     acc.write_all(&chunk).await?;
                     Ok::<_, Error>(acc)
@@ -189,9 +192,9 @@ impl VsockClient {
         Ok(())
     }
 
-    async fn api_response(response: Response<Body>) -> Result<Value, Error> {
+    async fn api_response(response: Response<Incoming>) -> Result<Value, Error> {
         let status = response.status();
-        let data = HttpBody::collect(response.into_body()).await?.to_bytes();
+        let data = response.into_body().collect().await?.to_bytes();
 
         let text = String::from_utf8(data.to_vec()).unwrap();
         if status.is_success() {
@@ -237,7 +240,7 @@ impl VsockClient {
         if let Some(data) = data {
             if method == "POST" {
                 let builder = make_builder("application/json", &url);
-                let request = builder.body(Body::from(data.to_string()))?;
+                let request = builder.body(data.to_string().into())?;
                 return Ok(request);
             } else {
                 let query = json_object_to_query(data)?;
-- 
2.39.5





More information about the pbs-devel mailing list