[pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Mar 26 16:23:26 CET 2025


similar to the other changes:
- Body to Incoming or proxmox-http's Body
- use adapters between hyper<->tower and hyper<->tokio
- adapt to new proxmox-rest-server interfaces

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 proxmox-backup-client/Cargo.toml      |  1 +
 proxmox-backup-client/src/snapshot.rs |  2 +-
 src/acme/client.rs                    |  6 ++-
 src/acme/plugin.rs                    | 62 ++++++++++++++++--------
 src/api2/admin/datastore.rs           | 20 +++-----
 src/api2/backup/environment.rs        |  3 +-
 src/api2/backup/mod.rs                | 10 ++--
 src/api2/backup/upload_chunk.rs       | 47 ++++++++++--------
 src/api2/helpers.rs                   |  3 +-
 src/api2/node/mod.rs                  |  7 +--
 src/api2/node/tasks.rs                |  7 +--
 src/api2/reader/mod.rs                | 17 ++++---
 src/bin/proxmox-backup-api.rs         | 40 ++++++++++-----
 src/bin/proxmox-backup-proxy.rs       | 70 ++++++++++++++++++++++-----
 14 files changed, 197 insertions(+), 98 deletions(-)

diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
index a91a4908b..5f0140e78 100644
--- a/proxmox-backup-client/Cargo.toml
+++ b/proxmox-backup-client/Cargo.toml
@@ -24,6 +24,7 @@ pxar.workspace = true
 
 proxmox-async.workspace = true
 proxmox-human-byte.workspace = true
+proxmox-http = { workspace = true, features = [ "body" ] }
 proxmox-log.workspace = true
 proxmox-io.workspace = true
 proxmox-router = { workspace = true, features = [ "cli" ] }
diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
index f195c23b7..f1569db2e 100644
--- a/proxmox-backup-client/src/snapshot.rs
+++ b/proxmox-backup-client/src/snapshot.rs
@@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
     );
 
     let args = snapshot_args(&backup_ns, &snapshot)?;
-    let body = hyper::Body::from(raw_data);
+    let body = proxmox_http::Body::from(raw_data);
 
     client
         .upload("application/octet-stream", body, &path, Some(args))
diff --git a/src/acme/client.rs b/src/acme/client.rs
index 97f628e37..4e55393e4 100644
--- a/src/acme/client.rs
+++ b/src/acme/client.rs
@@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
 
 use anyhow::{bail, format_err};
 use bytes::Bytes;
-use hyper::{body::HttpBody, Body, Request};
+use http_body_util::BodyExt;
+use hyper::Request;
 use nix::sys::stat::Mode;
+use proxmox_http::Body;
 use serde::{Deserialize, Serialize};
 
 use proxmox_acme::account::AccountCreator;
@@ -618,7 +620,7 @@ impl AcmeClient {
             response.json()?,
         ));
 
-        Ok((directory.as_ref().unwrap(), nonce.as_deref()))
+        Ok((directory.as_mut().unwrap(), nonce.as_deref()))
     }
 
     /// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
index c33cfe405..9141670e7 100644
--- a/src/acme/plugin.rs
+++ b/src/acme/plugin.rs
@@ -1,12 +1,21 @@
 use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
 use std::pin::Pin;
 use std::process::Stdio;
 use std::sync::Arc;
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
-use hyper::{Body, Request, Response};
+use bytes::Bytes;
+use futures::TryFutureExt;
+use http_body_util::Full;
+use hyper::body::Incoming;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
 use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
+use tokio::net::TcpListener;
 use tokio::process::Command;
 
 use proxmox_acme::{Authorization, Challenge};
@@ -235,10 +244,10 @@ impl StandaloneServer {
 }
 
 async fn standalone_respond(
-    req: Request<Body>,
+    req: Request<Incoming>,
     path: Arc<String>,
     key_auth: Arc<String>,
-) -> Result<Response<Body>, hyper::Error> {
+) -> Result<Response<Full<Bytes>>, hyper::Error> {
     if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
         Ok(Response::builder()
             .status(hyper::http::StatusCode::OK)
@@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
         _domain: &'d AcmeDomain,
         _task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
-        use hyper::server::conn::AddrIncoming;
-        use hyper::service::{make_service_fn, service_fn};
-
         Box::pin(async move {
             self.stop();
 
@@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
             let key_auth = Arc::new(client.key_authorization(token)?);
             let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
 
-            let service = make_service_fn(move |_| {
-                let path = Arc::clone(&path);
-                let key_auth = Arc::clone(&key_auth);
-                async move {
-                    Ok::<_, hyper::Error>(service_fn(move |request| {
-                        standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
-                    }))
-                }
-            });
-
             // `[::]:80` first, then `*:80`
-            let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
-                .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
+            let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
+            let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
+            let incoming = TcpListener::bind(dual)
+                .or_else(|_| TcpListener::bind(ipv4))
+                .await?;
 
-            let server = hyper::Server::builder(incoming).serve(service);
+            let server = async move {
+                loop {
+                    let key_auth = Arc::clone(&key_auth);
+                    let path = Arc::clone(&path);
+                    match incoming.accept().await {
+                        Ok((tcp, _)) => {
+                            let io = TokioIo::new(tcp);
+                            let service = service_fn(move |request| {
+                                standalone_respond(
+                                    request,
+                                    Arc::clone(&path),
+                                    Arc::clone(&key_auth),
+                                )
+                            });
 
+                            tokio::task::spawn(async move {
+                                if let Err(err) =
+                                    http1::Builder::new().serve_connection(io, service).await
+                                {
+                                    println!("Error serving connection: {err:?}");
+                                }
+                            });
+                        }
+                        Err(err) => println!("Error accepting connection: {err:?}"),
+                    }
+                }
+            };
             let (future, abort) = futures::future::abortable(server);
             self.abort_handle = Some(abort);
             tokio::spawn(future);
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 483e595c1..7aba5d313 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -9,8 +9,10 @@ use std::sync::Arc;
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
+use http_body_util::BodyExt;
 use hyper::http::request::Parts;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{body::Incoming, header, Response, StatusCode};
+use proxmox_http::Body;
 use serde::Deserialize;
 use serde_json::{json, Value};
 use tokio_stream::wrappers::ReceiverStream;
@@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
 
 pub fn download_file(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
 
 pub fn download_file_decoded(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
 
 pub fn upload_backup_log(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
             file_name = file_name.deref(),
         );
 
-        let data = req_body
-            .map_err(Error::from)
-            .try_fold(Vec::new(), |mut acc, chunk| {
-                acc.extend_from_slice(&chunk);
-                future::ok::<_, Error>(acc)
-            })
-            .await?;
+        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
 
         // always verify blob/CRC at server side
         let blob = DataBlob::load_from_reader(&mut &data[..])?;
@@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
 
 pub fn pxar_file_download(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2e..8a2e9ddcb 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -7,6 +7,7 @@ use tracing::info;
 use ::serde::Serialize;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
 
@@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
 
 use crate::backup::verify_backup_dir_with_lock;
 
-use hyper::{Body, Response};
+use hyper::Response;
 
 #[derive(Copy, Clone, Serialize)]
 struct UploadStatistic {
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index efc97a1fb..f4378e185 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -5,11 +5,13 @@ use futures::*;
 use hex::FromHex;
 use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming, Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
 use serde::Deserialize;
 use serde_json::{json, Value};
 use tracing::warn;
 
+use proxmox_http::Body;
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{http_err, list_subdirs_api_method};
 use proxmox_router::{
@@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
 
 fn upgrade_to_backup_protocol(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
                         http.max_frame_size(4 * 1024 * 1024);
 
                         let env3 = env2.clone();
-                        http.serve_connection(conn, service).map(move |result| {
+                        http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
                             match result {
                                 Err(err) => {
                                     // Avoid  Transport endpoint is not connected (os error 107)
@@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
 
 fn download_previous(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660a..2c66c2855 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,8 +5,9 @@ use std::task::{Context, Poll};
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
-use hyper::Body;
 use serde_json::{json, Value};
 
 use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
 use super::environment::*;
 
 pub struct UploadChunk {
-    stream: Body,
+    stream: BodyDataStream<Incoming>,
     store: Arc<DataStore>,
     digest: [u8; 32],
     size: u32,
@@ -31,7 +32,7 @@ pub struct UploadChunk {
 
 impl UploadChunk {
     pub fn new(
-        stream: Body,
+        stream: BodyDataStream<Incoming>,
         store: Arc<DataStore>,
         digest: [u8; 32],
         size: u32,
@@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
 
 fn upload_fixed_chunk(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -161,8 +162,14 @@ fn upload_fixed_chunk(
 
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) =
-            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+            BodyDataStream::new(req_body),
+            env.datastore.clone(),
+            digest,
+            size,
+            encoded_size,
+        )
+        .await?;
 
         env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
 
 fn upload_dynamic_chunk(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
 
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) =
-            UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+            BodyDataStream::new(req_body),
+            env.datastore.clone(),
+            digest,
+            size,
+            encoded_size,
+        )
+        .await?;
 
         env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
 
 fn upload_speedtest(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     _param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let result = req_body
+        let result = BodyDataStream::new(req_body)
             .map_err(Error::from)
             .try_fold(0, |size: usize, chunk| {
                 let sum = size + chunk.len();
@@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
 
 fn upload_blob(
     _parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -318,13 +331,7 @@ fn upload_blob(
             bail!("wrong blob file extension: '{}'", file_name);
         }
 
-        let data = req_body
-            .map_err(Error::from)
-            .try_fold(Vec::new(), |mut acc, chunk| {
-                acc.extend_from_slice(&chunk);
-                future::ok::<_, Error>(acc)
-            })
-            .await?;
+        let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
 
         if encoded_size != data.len() {
             bail!(
@@ -334,7 +341,7 @@ fn upload_blob(
             );
         }
 
-        env.add_blob(&file_name, data)?;
+        env.add_blob(&file_name, data.to_vec())?;
 
         Ok(env.format_response(Ok(Value::Null)))
     }
diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
index 3dc1befc1..f346b0cca 100644
--- a/src/api2/helpers.rs
+++ b/src/api2/helpers.rs
@@ -2,8 +2,9 @@ use std::path::PathBuf;
 
 use anyhow::Error;
 use futures::stream::TryStreamExt;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{header, Response, StatusCode};
 
+use proxmox_http::Body;
 use proxmox_router::http_bail;
 
 pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 62b447096..e7c6213c1 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
 
 use anyhow::{bail, format_err, Error};
 use futures::future::{FutureExt, TryFutureExt};
-use hyper::body::Body;
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use hyper::upgrade::Upgraded;
 use hyper::Request;
+use hyper_util::rt::TokioIo;
 use serde_json::{json, Value};
 use tokio::io::{AsyncBufReadExt, BufReader};
 
@@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
 
 fn upgrade_to_websocket(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -306,7 +307,7 @@ fn upgrade_to_websocket(
             };
 
             let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
-            ws.serve_connection(conn, local).await
+            ws.serve_connection(TokioIo::new(conn), local).await
         });
 
         Ok(response)
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index cad740559..bd6763069 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
 
 use anyhow::{bail, Error};
 use futures::FutureExt;
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use hyper::http::{header, Response, StatusCode};
-use hyper::Body;
+use proxmox_http::Body;
 use serde_json::{json, Value};
 
 use proxmox_async::stream::AsyncReaderStream;
@@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
 );
 fn read_task_log(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -404,7 +405,7 @@ fn read_task_log(
         Ok(Response::builder()
             .status(StatusCode::OK)
             .header(header::CONTENT_TYPE, "application/json")
-            .body(Body::from(json.to_string()))
+            .body(json.to_string().into())
             .unwrap())
     }
     .boxed()
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 1713f182b..b69000087 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -3,12 +3,15 @@
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
+use hyper::body::Incoming;
 use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
 use serde::Deserialize;
 use serde_json::Value;
 
+use proxmox_http::Body;
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{
     http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
@@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
 
 fn upgrade_to_backup_reader_protocol(
     parts: Parts,
-    req_body: Body,
+    req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
                     http.initial_connection_window_size(window_size);
                     http.max_frame_size(4 * 1024 * 1024);
 
-                    http.serve_connection(conn, service)
+                    http.serve_connection(conn, TowerToHyperService::new(service))
                         .map_err(Error::from)
                         .await
                 };
@@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
 
 fn download_file(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
 
 fn download_chunk(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -348,7 +351,7 @@ fn download_chunk(
 /* this is too slow
 fn download_chunk_old(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     param: Value,
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
@@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
 
 fn speedtest(
     _parts: Parts,
-    _req_body: Body,
+    _req_body: Incoming,
     _param: Value,
     _info: &ApiMethod,
     _rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 7b4187550..438fd9d7e 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -1,12 +1,15 @@
 use std::future::Future;
 use std::pin::{pin, Pin};
+use std::sync::Arc;
 
 use anyhow::{bail, Error};
-use futures::*;
 use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
+use tokio::net::TcpListener;
 use tracing::level_filters::LevelFilter;
 
+use proxmox_http::Body;
 use proxmox_lang::try_block;
 use proxmox_rest_server::{ApiConfig, RestServer};
 use proxmox_router::RpcEnvironmentType;
@@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
         Response::builder()
             .status(StatusCode::OK)
             .header(hyper::header::CONTENT_TYPE, "text/html")
-            .body(index.into())
+            .body(index.to_string().into())
             .unwrap()
     })
 }
@@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
     // http server future:
     let server = proxmox_daemon::server::create_daemon(
         ([127, 0, 0, 1], 82).into(),
-        move |listener| {
-            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
-
-            Ok(async {
+        move |listener: TcpListener| {
+            Ok(async move {
                 proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
-
-                hyper::Server::builder(incoming)
-                    .serve(rest_server)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from)
-                    .await
+                let graceful = Arc::new(GracefulShutdown::new());
+                loop {
+                    let graceful2 = Arc::clone(&graceful);
+                    tokio::select! {
+                        incoming = listener.accept() => {
+                            let (conn, _) = incoming?;
+                            let api_service = rest_server.api_service(&conn)?;
+                            tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
+                        },
+                        _shutdown = proxmox_daemon::shutdown_future() => {
+                            break;
+                        },
+                    }
+                }
+                if let Some(shutdown) = Arc::into_inner(graceful) {
+                    log::info!("shutting down..");
+                    shutdown.shutdown().await
+                }
+                Ok(())
             })
         },
         Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index c9a6032e6..8ee537207 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -7,7 +7,8 @@ use futures::*;
 use hyper::header;
 use hyper::http::request::Parts;
 use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
 use tracing::level_filters::LevelFilter;
 use tracing::{info, warn};
 use url::form_urlencoded;
@@ -15,6 +16,7 @@ use url::form_urlencoded;
 use openssl::ssl::SslAcceptor;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 use proxmox_sys::fs::CreateOptions;
@@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
     let server = proxmox_daemon::server::create_daemon(
         ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
         move |listener| {
-            let (secure_connections, insecure_connections) =
+            let (mut secure_connections, mut insecure_connections) =
                 connections.accept_tls_optional(listener, acceptor);
 
             Ok(async {
                 proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
 
-                let secure_server = hyper::Server::builder(secure_connections)
-                    .serve(rest_server)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from);
+                let secure_server = async move {
+                    let graceful = Arc::new(GracefulShutdown::new());
+                    loop {
+                        let graceful2 = Arc::clone(&graceful);
+                        tokio::select! {
+                            Some(conn) = secure_connections.next() => {
+                                match conn {
+                                    Ok(conn) => {
+                                        let api_service = rest_server.api_service(&conn)?;
+                                        tokio::spawn(async move {
+                                            api_service.serve(conn, Some(graceful2)).await
+                                        });
+                                    },
+                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+                                }
+                            },
+                            _shutdown = proxmox_daemon::shutdown_future() => {
+                                break;
+                            }
+                        }
+                    }
+                    if let Some(shutdown) = Arc::into_inner(graceful) {
+                        shutdown.shutdown().await
+                    }
+                    Ok::<(), Error>(())
+                };
 
-                let insecure_server = hyper::Server::builder(insecure_connections)
-                    .serve(redirector)
-                    .with_graceful_shutdown(proxmox_daemon::shutdown_future())
-                    .map_err(Error::from);
+                let insecure_server = async move {
+                    let graceful = Arc::new(GracefulShutdown::new());
+                    loop {
+                        let graceful2 = Arc::clone(&graceful);
+                        tokio::select! {
+                            Some(conn) = insecure_connections.next() => {
+                                match conn {
+                                    Ok(conn) => {
+                                        let redirect_service = redirector.redirect_service();
+                                        tokio::spawn(async move {
+                                            redirect_service.serve(conn, Some(graceful2)).await
+                                        });
+                                    },
+                                    Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+                                }
+                            },
+                            _shutdown = proxmox_daemon::shutdown_future() => {
+                                break;
+                            }
+                        }
+                    }
+                    if let Some(shutdown) = Arc::into_inner(graceful) {
+                        shutdown.shutdown().await
+                    }
+                    Ok::<(), Error>(())
+                };
 
                 let (secure_res, insecure_res) =
                     try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
                         .context("failed to complete REST server task")?;
 
-                let results = [secure_res, insecure_res];
+                let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
 
                 if results.iter().any(Result::is_err) {
                     let cat_errors = results
@@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
                     bail!(cat_errors);
                 }
 
-                Ok(())
+                Ok::<(), Error>(())
             })
         },
         Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
-- 
2.39.5





More information about the pbs-devel mailing list