[pbs-devel] [PATCH proxmox-backup 5/6] adapt to hyper/http 1.0
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Mar 26 16:23:26 CET 2025
similar to the other changes:
- Body to Incoming or proxmox-http's Body
- use adapters between hyper<->tower and hyper<->tokio
- adapt to new proxmox-rest-server interfaces
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
proxmox-backup-client/Cargo.toml | 1 +
proxmox-backup-client/src/snapshot.rs | 2 +-
src/acme/client.rs | 6 ++-
src/acme/plugin.rs | 62 ++++++++++++++++--------
src/api2/admin/datastore.rs | 20 +++-----
src/api2/backup/environment.rs | 3 +-
src/api2/backup/mod.rs | 10 ++--
src/api2/backup/upload_chunk.rs | 47 ++++++++++--------
src/api2/helpers.rs | 3 +-
src/api2/node/mod.rs | 7 +--
src/api2/node/tasks.rs | 7 +--
src/api2/reader/mod.rs | 17 ++++---
src/bin/proxmox-backup-api.rs | 40 ++++++++++-----
src/bin/proxmox-backup-proxy.rs | 70 ++++++++++++++++++++++-----
14 files changed, 197 insertions(+), 98 deletions(-)
diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
index a91a4908b..5f0140e78 100644
--- a/proxmox-backup-client/Cargo.toml
+++ b/proxmox-backup-client/Cargo.toml
@@ -24,6 +24,7 @@ pxar.workspace = true
proxmox-async.workspace = true
proxmox-human-byte.workspace = true
+proxmox-http = { workspace = true, features = [ "body" ] }
proxmox-log.workspace = true
proxmox-io.workspace = true
proxmox-router = { workspace = true, features = [ "cli" ] }
diff --git a/proxmox-backup-client/src/snapshot.rs b/proxmox-backup-client/src/snapshot.rs
index f195c23b7..f1569db2e 100644
--- a/proxmox-backup-client/src/snapshot.rs
+++ b/proxmox-backup-client/src/snapshot.rs
@@ -271,7 +271,7 @@ async fn upload_log(param: Value) -> Result<Value, Error> {
);
let args = snapshot_args(&backup_ns, &snapshot)?;
- let body = hyper::Body::from(raw_data);
+ let body = proxmox_http::Body::from(raw_data);
client
.upload("application/octet-stream", body, &path, Some(args))
diff --git a/src/acme/client.rs b/src/acme/client.rs
index 97f628e37..4e55393e4 100644
--- a/src/acme/client.rs
+++ b/src/acme/client.rs
@@ -6,8 +6,10 @@ use std::os::unix::fs::OpenOptionsExt;
use anyhow::{bail, format_err};
use bytes::Bytes;
-use hyper::{body::HttpBody, Body, Request};
+use http_body_util::BodyExt;
+use hyper::Request;
use nix::sys::stat::Mode;
+use proxmox_http::Body;
use serde::{Deserialize, Serialize};
use proxmox_acme::account::AccountCreator;
@@ -618,7 +620,7 @@ impl AcmeClient {
response.json()?,
));
- Ok((directory.as_ref().unwrap(), nonce.as_deref()))
+ Ok((directory.as_mut().unwrap(), nonce.as_deref()))
}
/// Like `get_directory`, but if the directory provides no nonce, also performs a `HEAD`
diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
index c33cfe405..9141670e7 100644
--- a/src/acme/plugin.rs
+++ b/src/acme/plugin.rs
@@ -1,12 +1,21 @@
use std::future::Future;
+use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, format_err, Error};
-use hyper::{Body, Request, Response};
+use bytes::Bytes;
+use futures::TryFutureExt;
+use http_body_util::Full;
+use hyper::body::Incoming;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
+use tokio::net::TcpListener;
use tokio::process::Command;
use proxmox_acme::{Authorization, Challenge};
@@ -235,10 +244,10 @@ impl StandaloneServer {
}
async fn standalone_respond(
- req: Request<Body>,
+ req: Request<Incoming>,
path: Arc<String>,
key_auth: Arc<String>,
-) -> Result<Response<Body>, hyper::Error> {
+) -> Result<Response<Full<Bytes>>, hyper::Error> {
if req.method() == hyper::Method::GET && req.uri().path() == path.as_str() {
Ok(Response::builder()
.status(hyper::http::StatusCode::OK)
@@ -260,9 +269,6 @@ impl AcmePlugin for StandaloneServer {
_domain: &'d AcmeDomain,
_task: Arc<WorkerTask>,
) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
- use hyper::server::conn::AddrIncoming;
- use hyper::service::{make_service_fn, service_fn};
-
Box::pin(async move {
self.stop();
@@ -273,22 +279,40 @@ impl AcmePlugin for StandaloneServer {
let key_auth = Arc::new(client.key_authorization(token)?);
let path = Arc::new(format!("/.well-known/acme-challenge/{}", token));
- let service = make_service_fn(move |_| {
- let path = Arc::clone(&path);
- let key_auth = Arc::clone(&key_auth);
- async move {
- Ok::<_, hyper::Error>(service_fn(move |request| {
- standalone_respond(request, Arc::clone(&path), Arc::clone(&key_auth))
- }))
- }
- });
-
// `[::]:80` first, then `*:80`
- let incoming = AddrIncoming::bind(&(([0u16; 8], 80).into()))
- .or_else(|_| AddrIncoming::bind(&(([0u8; 4], 80).into())))?;
+ let dual = SocketAddr::new(IpAddr::from([0u16; 8]), 80);
+ let ipv4 = SocketAddr::new(IpAddr::from([0u8; 4]), 80);
+ let incoming = TcpListener::bind(dual)
+ .or_else(|_| TcpListener::bind(ipv4))
+ .await?;
- let server = hyper::Server::builder(incoming).serve(service);
+ let server = async move {
+ loop {
+ let key_auth = Arc::clone(&key_auth);
+ let path = Arc::clone(&path);
+ match incoming.accept().await {
+ Ok((tcp, _)) => {
+ let io = TokioIo::new(tcp);
+ let service = service_fn(move |request| {
+ standalone_respond(
+ request,
+ Arc::clone(&path),
+ Arc::clone(&key_auth),
+ )
+ });
+ tokio::task::spawn(async move {
+ if let Err(err) =
+ http1::Builder::new().serve_connection(io, service).await
+ {
+ println!("Error serving connection: {err:?}");
+ }
+ });
+ }
+ Err(err) => println!("Error accepting connection: {err:?}"),
+ }
+ }
+ };
let (future, abort) = futures::future::abortable(server);
self.abort_handle = Some(abort);
tokio::spawn(future);
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 483e595c1..7aba5d313 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -9,8 +9,10 @@ use std::sync::Arc;
use anyhow::{bail, format_err, Error};
use futures::*;
+use http_body_util::BodyExt;
use hyper::http::request::Parts;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{body::Incoming, header, Response, StatusCode};
+use proxmox_http::Body;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream;
@@ -1387,7 +1389,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
pub fn download_file(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -1472,7 +1474,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
pub fn download_file_decoded(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -1598,7 +1600,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
pub fn upload_backup_log(
_parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -1636,13 +1638,7 @@ pub fn upload_backup_log(
file_name = file_name.deref(),
);
- let data = req_body
- .map_err(Error::from)
- .try_fold(Vec::new(), |mut acc, chunk| {
- acc.extend_from_slice(&chunk);
- future::ok::<_, Error>(acc)
- })
- .await?;
+ let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
// always verify blob/CRC at server side
let blob = DataBlob::load_from_reader(&mut &data[..])?;
@@ -1815,7 +1811,7 @@ fn get_local_pxar_reader(
pub fn pxar_file_download(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2e..8a2e9ddcb 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -7,6 +7,7 @@ use tracing::info;
use ::serde::Serialize;
use serde_json::{json, Value};
+use proxmox_http::Body;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
use proxmox_sys::fs::{lock_dir_noblock_shared, replace_file, CreateOptions};
@@ -19,7 +20,7 @@ use proxmox_rest_server::{formatter::*, WorkerTask};
use crate::backup::verify_backup_dir_with_lock;
-use hyper::{Body, Response};
+use hyper::Response;
#[derive(Copy, Clone, Serialize)]
struct UploadStatistic {
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index efc97a1fb..f4378e185 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -5,11 +5,13 @@ use futures::*;
use hex::FromHex;
use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{body::Incoming, Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
use serde::Deserialize;
use serde_json::{json, Value};
use tracing::warn;
+use proxmox_http::Body;
use proxmox_rest_server::{H2Service, WorkerTask};
use proxmox_router::{http_err, list_subdirs_api_method};
use proxmox_router::{
@@ -70,7 +72,7 @@ pub(crate) fn optional_ns_param(param: &Value) -> Result<BackupNamespace, Error>
fn upgrade_to_backup_protocol(
parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -247,7 +249,7 @@ fn upgrade_to_backup_protocol(
http.max_frame_size(4 * 1024 * 1024);
let env3 = env2.clone();
- http.serve_connection(conn, service).map(move |result| {
+ http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
match result {
Err(err) => {
// Avoid Transport endpoint is not connected (os error 107)
@@ -824,7 +826,7 @@ pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
fn download_previous(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660a..2c66c2855 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,8 +5,9 @@ use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::*;
use hex::FromHex;
+use http_body_util::{BodyDataStream, BodyExt};
+use hyper::body::Incoming;
use hyper::http::request::Parts;
-use hyper::Body;
use serde_json::{json, Value};
use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -21,7 +22,7 @@ use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
pub struct UploadChunk {
- stream: Body,
+ stream: BodyDataStream<Incoming>,
store: Arc<DataStore>,
digest: [u8; 32],
size: u32,
@@ -31,7 +32,7 @@ pub struct UploadChunk {
impl UploadChunk {
pub fn new(
- stream: Body,
+ stream: BodyDataStream<Incoming>,
store: Arc<DataStore>,
digest: [u8; 32],
size: u32,
@@ -146,7 +147,7 @@ pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
fn upload_fixed_chunk(
_parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -161,8 +162,14 @@ fn upload_fixed_chunk(
let env: &BackupEnvironment = rpcenv.as_ref();
- let (digest, size, compressed_size, is_duplicate) =
- UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+ let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+ BodyDataStream::new(req_body),
+ env.datastore.clone(),
+ digest,
+ size,
+ encoded_size,
+ )
+ .await?;
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = hex::encode(digest);
@@ -215,7 +222,7 @@ pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
fn upload_dynamic_chunk(
_parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -230,8 +237,14 @@ fn upload_dynamic_chunk(
let env: &BackupEnvironment = rpcenv.as_ref();
- let (digest, size, compressed_size, is_duplicate) =
- UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
+ let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
+ BodyDataStream::new(req_body),
+ env.datastore.clone(),
+ digest,
+ size,
+ encoded_size,
+ )
+ .await?;
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = hex::encode(digest);
@@ -250,13 +263,13 @@ pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
fn upload_speedtest(
_parts: Parts,
- req_body: Body,
+ req_body: Incoming,
_param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let result = req_body
+ let result = BodyDataStream::new(req_body)
.map_err(Error::from)
.try_fold(0, |size: usize, chunk| {
let sum = size + chunk.len();
@@ -303,7 +316,7 @@ pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
fn upload_blob(
_parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -318,13 +331,7 @@ fn upload_blob(
bail!("wrong blob file extension: '{}'", file_name);
}
- let data = req_body
- .map_err(Error::from)
- .try_fold(Vec::new(), |mut acc, chunk| {
- acc.extend_from_slice(&chunk);
- future::ok::<_, Error>(acc)
- })
- .await?;
+ let data = req_body.collect().await.map_err(Error::from)?.to_bytes();
if encoded_size != data.len() {
bail!(
@@ -334,7 +341,7 @@ fn upload_blob(
);
}
- env.add_blob(&file_name, data)?;
+ env.add_blob(&file_name, data.to_vec())?;
Ok(env.format_response(Ok(Value::Null)))
}
diff --git a/src/api2/helpers.rs b/src/api2/helpers.rs
index 3dc1befc1..f346b0cca 100644
--- a/src/api2/helpers.rs
+++ b/src/api2/helpers.rs
@@ -2,8 +2,9 @@ use std::path::PathBuf;
use anyhow::Error;
use futures::stream::TryStreamExt;
-use hyper::{header, Body, Response, StatusCode};
+use hyper::{header, Response, StatusCode};
+use proxmox_http::Body;
use proxmox_router::http_bail;
pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 62b447096..e7c6213c1 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -5,10 +5,11 @@ use std::os::unix::io::AsRawFd;
use anyhow::{bail, format_err, Error};
use futures::future::{FutureExt, TryFutureExt};
-use hyper::body::Body;
+use hyper::body::Incoming;
use hyper::http::request::Parts;
use hyper::upgrade::Upgraded;
use hyper::Request;
+use hyper_util::rt::TokioIo;
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, BufReader};
@@ -267,7 +268,7 @@ pub const API_METHOD_WEBSOCKET: ApiMethod = ApiMethod::new(
fn upgrade_to_websocket(
parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -306,7 +307,7 @@ fn upgrade_to_websocket(
};
let local = tokio::net::TcpStream::connect(format!("localhost:{}", port)).await?;
- ws.serve_connection(conn, local).await
+ ws.serve_connection(TokioIo::new(conn), local).await
});
Ok(response)
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index cad740559..bd6763069 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -3,9 +3,10 @@ use std::io::{BufRead, BufReader};
use anyhow::{bail, Error};
use futures::FutureExt;
+use hyper::body::Incoming;
use hyper::http::request::Parts;
use hyper::http::{header, Response, StatusCode};
-use hyper::Body;
+use proxmox_http::Body;
use serde_json::{json, Value};
use proxmox_async::stream::AsyncReaderStream;
@@ -321,7 +322,7 @@ pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
);
fn read_task_log(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -404,7 +405,7 @@ fn read_task_log(
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
- .body(Body::from(json.to_string()))
+ .body(json.to_string().into())
.unwrap())
}
.boxed()
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 1713f182b..b69000087 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -3,12 +3,15 @@
use anyhow::{bail, format_err, Error};
use futures::*;
use hex::FromHex;
+use hyper::body::Incoming;
use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::service::TowerToHyperService;
use serde::Deserialize;
use serde_json::Value;
+use proxmox_http::Body;
use proxmox_rest_server::{H2Service, WorkerTask};
use proxmox_router::{
http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
@@ -68,7 +71,7 @@ pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
fn upgrade_to_backup_reader_protocol(
parts: Parts,
- req_body: Body,
+ req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -190,7 +193,7 @@ fn upgrade_to_backup_reader_protocol(
http.initial_connection_window_size(window_size);
http.max_frame_size(4 * 1024 * 1024);
- http.serve_connection(conn, service)
+ http.serve_connection(conn, TowerToHyperService::new(service))
.map_err(Error::from)
.await
};
@@ -244,7 +247,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
fn download_file(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -300,7 +303,7 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
fn download_chunk(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -348,7 +351,7 @@ fn download_chunk(
/* this is too slow
fn download_chunk_old(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
@@ -393,7 +396,7 @@ pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
fn speedtest(
_parts: Parts,
- _req_body: Body,
+ _req_body: Incoming,
_param: Value,
_info: &ApiMethod,
_rpcenv: Box<dyn RpcEnvironment>,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 7b4187550..438fd9d7e 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -1,12 +1,15 @@
use std::future::Future;
use std::pin::{pin, Pin};
+use std::sync::Arc;
use anyhow::{bail, Error};
-use futures::*;
use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
+use tokio::net::TcpListener;
use tracing::level_filters::LevelFilter;
+use proxmox_http::Body;
use proxmox_lang::try_block;
use proxmox_rest_server::{ApiConfig, RestServer};
use proxmox_router::RpcEnvironmentType;
@@ -34,7 +37,7 @@ fn get_index() -> Pin<Box<dyn Future<Output = Response<Body>> + Send>> {
Response::builder()
.status(StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/html")
- .body(index.into())
+ .body(index.to_string().into())
.unwrap()
})
}
@@ -108,17 +111,28 @@ async fn run() -> Result<(), Error> {
// http server future:
let server = proxmox_daemon::server::create_daemon(
([127, 0, 0, 1], 82).into(),
- move |listener| {
- let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
-
- Ok(async {
+ move |listener: TcpListener| {
+ Ok(async move {
proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
-
- hyper::Server::builder(incoming)
- .serve(rest_server)
- .with_graceful_shutdown(proxmox_daemon::shutdown_future())
- .map_err(Error::from)
- .await
+ let graceful = Arc::new(GracefulShutdown::new());
+ loop {
+ let graceful2 = Arc::clone(&graceful);
+ tokio::select! {
+ incoming = listener.accept() => {
+ let (conn, _) = incoming?;
+ let api_service = rest_server.api_service(&conn)?;
+ tokio::spawn(async move { api_service.serve(conn, Some(graceful2)).await });
+ },
+ _shutdown = proxmox_daemon::shutdown_future() => {
+ break;
+ },
+ }
+ }
+ if let Some(shutdown) = Arc::into_inner(graceful) {
+ log::info!("shutting down..");
+ shutdown.shutdown().await
+ }
+ Ok(())
})
},
Some(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN),
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index c9a6032e6..8ee537207 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -7,7 +7,8 @@ use futures::*;
use hyper::header;
use hyper::http::request::Parts;
use hyper::http::Response;
-use hyper::{Body, StatusCode};
+use hyper::StatusCode;
+use hyper_util::server::graceful::GracefulShutdown;
use tracing::level_filters::LevelFilter;
use tracing::{info, warn};
use url::form_urlencoded;
@@ -15,6 +16,7 @@ use url::form_urlencoded;
use openssl::ssl::SslAcceptor;
use serde_json::{json, Value};
+use proxmox_http::Body;
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
use proxmox_sys::fs::CreateOptions;
@@ -289,27 +291,71 @@ async fn run() -> Result<(), Error> {
let server = proxmox_daemon::server::create_daemon(
([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
move |listener| {
- let (secure_connections, insecure_connections) =
+ let (mut secure_connections, mut insecure_connections) =
connections.accept_tls_optional(listener, acceptor);
Ok(async {
proxmox_systemd::notify::SystemdNotify::Ready.notify()?;
- let secure_server = hyper::Server::builder(secure_connections)
- .serve(rest_server)
- .with_graceful_shutdown(proxmox_daemon::shutdown_future())
- .map_err(Error::from);
+ let secure_server = async move {
+ let graceful = Arc::new(GracefulShutdown::new());
+ loop {
+ let graceful2 = Arc::clone(&graceful);
+ tokio::select! {
+ Some(conn) = secure_connections.next() => {
+ match conn {
+ Ok(conn) => {
+ let api_service = rest_server.api_service(&conn)?;
+ tokio::spawn(async move {
+ api_service.serve(conn, Some(graceful2)).await
+ });
+ },
+ Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+ }
+ },
+ _shutdown = proxmox_daemon::shutdown_future() => {
+ break;
+ }
+ }
+ }
+ if let Some(shutdown) = Arc::into_inner(graceful) {
+ shutdown.shutdown().await
+ }
+ Ok::<(), Error>(())
+ };
- let insecure_server = hyper::Server::builder(insecure_connections)
- .serve(redirector)
- .with_graceful_shutdown(proxmox_daemon::shutdown_future())
- .map_err(Error::from);
+ let insecure_server = async move {
+ let graceful = Arc::new(GracefulShutdown::new());
+ loop {
+ let graceful2 = Arc::clone(&graceful);
+ tokio::select! {
+ Some(conn) = insecure_connections.next() => {
+ match conn {
+ Ok(conn) => {
+ let redirect_service = redirector.redirect_service();
+ tokio::spawn(async move {
+ redirect_service.serve(conn, Some(graceful2)).await
+ });
+ },
+ Err(err) => { log::warn!("Failed to accept insecure connection: {err:?}"); }
+ }
+ },
+ _shutdown = proxmox_daemon::shutdown_future() => {
+ break;
+ }
+ }
+ }
+ if let Some(shutdown) = Arc::into_inner(graceful) {
+ shutdown.shutdown().await
+ }
+ Ok::<(), Error>(())
+ };
let (secure_res, insecure_res) =
try_join!(tokio::spawn(secure_server), tokio::spawn(insecure_server))
.context("failed to complete REST server task")?;
- let results = [secure_res, insecure_res];
+ let results: [Result<(), Error>; 2] = [secure_res, insecure_res];
if results.iter().any(Result::is_err) {
let cat_errors = results
@@ -321,7 +367,7 @@ async fn run() -> Result<(), Error> {
bail!(cat_errors);
}
- Ok(())
+ Ok::<(), Error>(())
})
},
Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
--
2.39.5
More information about the pbs-devel
mailing list