[pbs-devel] [PATCH proxmox 14/17] proxmox-rest-server: update to hyper 1.0
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Mar 26 16:23:18 CET 2025
and switch to proxmox-http's Body implementation.
hyper now has a special (opaque) Body implementation called Incoming
which is used for incoming request bodies on the server side, and
incoming response bodies on the client side, so our API handler's now
consume an instance of this type.
the Accept trait previously offered by hyper is gone in 1.0, and needs
to be replaced with an accept loop. our corresponding Acceptor
implementations have been dropped as well.
hyper now has its own Service and async Read/Write traits, but helpfully
provides wrappers for tower's Service and tokio's AsyncRead/AsyncWrite
variants.
graceful shutdown handling is now exposed differently on the hyper side,
and to allow usage with upgradable connections the variant form
hyper-util needs to be used, as the one straight from hyper doesn't
support it (yet).
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
proxmox-rest-server/Cargo.toml | 7 +-
.../examples/minimal-rest-server.rs | 5 +-
proxmox-rest-server/src/api_config.rs | 44 ++---
proxmox-rest-server/src/connection.rs | 14 +-
proxmox-rest-server/src/formatter.rs | 8 +-
proxmox-rest-server/src/h2service.rs | 15 +-
proxmox-rest-server/src/lib.rs | 2 +-
proxmox-rest-server/src/rest.rs | 164 +++++++++++-------
8 files changed, 142 insertions(+), 117 deletions(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index ffbd925a..ee253b4f 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -20,7 +20,9 @@ anyhow.workspace = true
futures.workspace = true
handlebars = { workspace = true, optional = true }
http.workspace = true
+http-body-util.workspace = true
hyper = { workspace = true, features = [ "full" ] }
+hyper-util = { workspace = true, features = [ "client", "client-legacy", "http1", "server", "server-auto", "server-graceful", "service", "tokio" ]}
libc.workspace = true
log.workspace = true
nix.workspace = true
@@ -39,7 +41,7 @@ url.workspace = true
proxmox-async.workspace = true
proxmox-compression.workspace = true
proxmox-daemon.workspace = true
-proxmox-http = { workspace = true, optional = true }
+proxmox-http = { workspace = true, features = ["body"] }
proxmox-lang.workspace = true
proxmox-log.workspace = true
proxmox-router.workspace = true
@@ -52,6 +54,5 @@ proxmox-worker-task.workspace = true
default = []
templates = ["dep:handlebars"]
rate-limited-stream = [
- "dep:proxmox-http",
- "proxmox-http?/rate-limited-stream",
+ "proxmox-http/rate-limited-stream",
]
diff --git a/proxmox-rest-server/examples/minimal-rest-server.rs b/proxmox-rest-server/examples/minimal-rest-server.rs
index 23be586c..454430fb 100644
--- a/proxmox-rest-server/examples/minimal-rest-server.rs
+++ b/proxmox-rest-server/examples/minimal-rest-server.rs
@@ -6,8 +6,9 @@ use std::sync::{LazyLock, Mutex};
use anyhow::{bail, format_err, Error};
use http::request::Parts;
use http::HeaderMap;
-use hyper::{Body, Method, Response};
+use hyper::{Method, Response};
+use proxmox_http::Body;
use proxmox_router::{
list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
};
@@ -57,7 +58,7 @@ fn get_index(
Box::pin(async move {
// build an index page
http::Response::builder()
- .body("hello world".into())
+ .body("hello world".to_owned().into_bytes().into())
.unwrap()
})
}
diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
index b20b2da0..0b847a0c 100644
--- a/proxmox-rest-server/src/api_config.rs
+++ b/proxmox-rest-server/src/api_config.rs
@@ -9,10 +9,12 @@ use std::task::{Context, Poll};
use anyhow::{format_err, Error};
use http::{HeaderMap, Method, Uri};
use hyper::http::request::Parts;
-use hyper::{Body, Response};
+use hyper::Response;
+use hyper_util::rt::TokioIo;
use tower_service::Service;
use proxmox_daemon::command_socket::CommandSocket;
+use proxmox_http::Body;
use proxmox_log::{FileLogOptions, FileLogger};
use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{create_path, CreateOptions};
@@ -107,7 +109,7 @@ impl ApiConfig {
) -> Response<Body> {
match self.index_handler.as_ref() {
Some(handler) => (handler.func)(rest_env, parts).await,
- None => Response::builder().status(404).body("".into()).unwrap(),
+ None => Response::builder().status(404).body(Body::empty()).unwrap(),
}
}
@@ -511,7 +513,7 @@ impl From<std::os::unix::net::SocketAddr> for PrivilegedAddr {
}
impl Service<Uri> for PrivilegedAddr {
- type Response = PrivilegedSocket;
+ type Response = TokioIo<PrivilegedSocket>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@@ -527,6 +529,7 @@ impl Service<Uri> for PrivilegedAddr {
tokio::net::TcpStream::connect(addr)
.await
.map(PrivilegedSocket::Tcp)
+ .map(TokioIo::new)
})
}
PrivilegedAddr::Unix(addr) => {
@@ -537,6 +540,7 @@ impl Service<Uri> for PrivilegedAddr {
})?)
.await
.map(PrivilegedSocket::Unix)
+ .map(TokioIo::new)
})
}
}
@@ -607,39 +611,11 @@ impl tokio::io::AsyncWrite for PrivilegedSocket {
}
}
-impl hyper::client::connect::Connection for PrivilegedSocket {
- fn connected(&self) -> hyper::client::connect::Connected {
+impl hyper_util::client::legacy::connect::Connection for PrivilegedSocket {
+ fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
match self {
Self::Tcp(s) => s.connected(),
- Self::Unix(_) => hyper::client::connect::Connected::new(),
+ Self::Unix(_) => hyper_util::client::legacy::connect::Connected::new(),
}
}
}
-
-/// Implements hyper's `Accept` for `UnixListener`s.
-pub struct UnixAcceptor {
- listener: tokio::net::UnixListener,
-}
-
-impl From<tokio::net::UnixListener> for UnixAcceptor {
- fn from(listener: tokio::net::UnixListener) -> Self {
- Self { listener }
- }
-}
-
-impl hyper::server::accept::Accept for UnixAcceptor {
- type Conn = tokio::net::UnixStream;
- type Error = io::Error;
-
- fn poll_accept(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<io::Result<Self::Conn>>> {
- Pin::new(&mut self.get_mut().listener)
- .poll_accept(cx)
- .map(|res| match res {
- Ok((stream, _addr)) => Some(Ok(stream)),
- Err(err) => Some(Err(err)),
- })
- }
-}
diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
index 526555ae..a65ef398 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -14,7 +14,6 @@ use std::time::Duration;
use anyhow::{format_err, Context, Error};
use futures::FutureExt;
-use hyper::server::accept;
use openssl::ec::{EcGroup, EcKey};
use openssl::nid::Nid;
use openssl::pkey::{PKey, Private};
@@ -226,12 +225,13 @@ impl AcceptBuilder {
self,
listener: TcpListener,
acceptor: Arc<Mutex<SslAcceptor>>,
- ) -> impl accept::Accept<Conn = ClientStreamResult, Error = Error> {
+ // FIXME: replace return value with own trait? see now removed UnixAcceptor
+ ) -> ReceiverStream<Result<ClientStreamResult, Error>> {
let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
tokio::spawn(self.accept_connections(listener, acceptor, secure_sender.into()));
- accept::from_stream(ReceiverStream::new(secure_receiver))
+ ReceiverStream::new(secure_receiver)
}
pub fn accept_tls_optional(
@@ -239,8 +239,8 @@ impl AcceptBuilder {
listener: TcpListener,
acceptor: Arc<Mutex<SslAcceptor>>,
) -> (
- impl accept::Accept<Conn = ClientStreamResult, Error = Error>,
- impl accept::Accept<Conn = InsecureClientStreamResult, Error = Error>,
+ ReceiverStream<Result<ClientStreamResult, Error>>,
+ ReceiverStream<Result<InsecureClientStreamResult, Error>>,
) {
let (secure_sender, secure_receiver) = mpsc::channel(self.max_pending_accepts);
let (insecure_sender, insecure_receiver) = mpsc::channel(self.max_pending_accepts);
@@ -252,8 +252,8 @@ impl AcceptBuilder {
));
(
- accept::from_stream(ReceiverStream::new(secure_receiver)),
- accept::from_stream(ReceiverStream::new(insecure_receiver)),
+ ReceiverStream::new(secure_receiver),
+ ReceiverStream::new(insecure_receiver),
)
}
}
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index 32ca9936..9ce87205 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -5,12 +5,14 @@ use anyhow::Error;
use serde_json::{json, Value};
use hyper::header;
-use hyper::{Body, Response, StatusCode};
+use hyper::{Response, StatusCode};
+use proxmox_http::Body;
use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
+#[derive(Clone)]
pub(crate) struct ErrorMessageExtension(pub String);
/// Methods to format data and errors
@@ -168,11 +170,11 @@ impl OutputFormatter for JsonFormatter {
pub(crate) fn error_to_response(err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
- let mut resp = Response::new(Body::from(apierr.message.clone()));
+ let mut resp = Response::new(apierr.message.clone().into());
*resp.status_mut() = apierr.code;
resp
} else {
- let mut resp = Response::new(Body::from(err.to_string()));
+ let mut resp = Response::new(err.to_string().into());
*resp.status_mut() = StatusCode::BAD_REQUEST;
resp
};
diff --git a/proxmox-rest-server/src/h2service.rs b/proxmox-rest-server/src/h2service.rs
index db6e3b0a..18258e14 100644
--- a/proxmox-rest-server/src/h2service.rs
+++ b/proxmox-rest-server/src/h2service.rs
@@ -6,8 +6,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use futures::*;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::body::Incoming;
+use hyper::{Request, Response, StatusCode};
+use proxmox_http::Body;
use proxmox_router::http_err;
use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
@@ -19,6 +21,7 @@ use crate::{normalize_path_with_components, WorkerTask};
/// We use this kind of service to handle backup protocol
/// connections. State is stored inside the generic ``rpcenv``. Logs
/// goes into the ``WorkerTask`` log.
+#[derive(Clone)]
pub struct H2Service<E> {
router: &'static Router,
rpcenv: E,
@@ -42,7 +45,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
}
}
- fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
+ fn handle_request(&self, req: Request<Incoming>) -> ApiResponseFuture {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
@@ -103,7 +106,7 @@ impl<E: RpcEnvironment + Clone> H2Service<E> {
}
}
-impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
+impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Incoming>> for H2Service<E> {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
@@ -113,7 +116,7 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let path = req.uri().path().to_owned();
let method = req.method().clone();
let worker = self.worker.clone();
@@ -126,14 +129,14 @@ impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Serv
}
Err(err) => {
if let Some(apierr) = err.downcast_ref::<HttpError>() {
- let mut resp = Response::new(Body::from(apierr.message.clone()));
+ let mut resp = Response::new(apierr.message.clone().into());
resp.extensions_mut()
.insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
} else {
- let mut resp = Response::new(Body::from(err.to_string()));
+ let mut resp = Response::new(err.to_string().into());
resp.extensions_mut()
.insert(ErrorMessageExtension(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index 43dafa91..5ddd3667 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -34,7 +34,7 @@ mod environment;
pub use environment::*;
mod api_config;
-pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
+pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler};
mod rest;
pub use rest::{Redirector, RestServer};
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
index f5a72052..f902592d 100644
--- a/proxmox-rest-server/src/rest.rs
+++ b/proxmox-rest-server/src/rest.rs
@@ -10,17 +10,25 @@ use std::task::{Context, Poll};
use anyhow::{bail, format_err, Error};
use futures::future::FutureExt;
use futures::stream::TryStreamExt;
-use hyper::body::HttpBody;
+use http_body_util::{BodyDataStream, BodyStream};
+use hyper::body::{Body as HyperBody, Incoming};
use hyper::header::{self, HeaderMap};
use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
+use hyper_util::rt::{TokioExecutor, TokioIo};
+use hyper_util::server::conn;
+use hyper_util::server::graceful::GracefulShutdown;
+use hyper_util::service::TowerToHyperService;
use regex::Regex;
use serde_json::Value;
use tokio::fs::File;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Instant;
+use tokio_stream::wrappers::ReceiverStream;
use tower_service::Service;
use url::form_urlencoded;
+use proxmox_http::Body;
use proxmox_router::{
check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
RpcEnvironmentType, UserInformation,
@@ -40,6 +48,7 @@ unsafe extern "C" {
fn tzset();
}
+#[derive(Clone)]
struct AuthStringExtension(String);
pub(crate) struct EmptyUserInformation {}
@@ -74,24 +83,11 @@ impl RestServer {
api_config: Arc::new(api_config),
}
}
-}
-impl<T: PeerAddress> Service<&T> for RestServer {
- type Response = ApiService;
- type Error = Error;
- type Future = std::future::Ready<Result<ApiService, Error>>;
-
- fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, ctx: &T) -> Self::Future {
- std::future::ready(match ctx.peer_addr() {
- Err(err) => Err(format_err!("unable to get peer address - {}", err)),
- Ok(peer) => Ok(ApiService {
- peer,
- api_config: Arc::clone(&self.api_config),
- }),
+ pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
+ Ok(ApiService {
+ peer: peer.peer_addr()?,
+ api_config: Arc::clone(&self.api_config),
})
}
}
@@ -108,25 +104,40 @@ impl Redirector {
pub fn new() -> Self {
Self {}
}
-}
-impl<T> Service<&T> for Redirector {
- type Response = RedirectService;
- type Error = Error;
- type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
-
- fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, _ctx: &T) -> Self::Future {
- std::future::ready(Ok(RedirectService {}))
+ pub fn redirect_service(&self) -> RedirectService {
+ RedirectService {}
}
}
+#[derive(Clone)]
pub struct RedirectService;
-impl Service<Request<Body>> for RedirectService {
+impl RedirectService {
+ pub async fn serve<S>(
+ self,
+ conn: S,
+ mut graceful: Option<Arc<GracefulShutdown>>,
+ ) -> Result<(), Error>
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
+ let api_service = TowerToHyperService::new(self);
+ let io = TokioIo::new(conn);
+ let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+ let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+ if let Some(graceful) = graceful.take() {
+ let api_conn = graceful.watch(api_conn);
+ drop(graceful);
+ api_conn.await
+ } else {
+ api_conn.await
+ }
+ .map_err(|err| format_err!("error serving redirect connection: {err}"))
+ }
+}
+
+impl Service<Request<Incoming>> for RedirectService {
type Response = Response<Body>;
type Error = anyhow::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@@ -135,7 +146,7 @@ impl Service<Request<Body>> for RedirectService {
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let future = async move {
let header_host_value = req
.headers()
@@ -194,12 +205,6 @@ impl PeerAddress for tokio::net::TcpStream {
}
}
-impl PeerAddress for hyper::server::conn::AddrStream {
- fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
- Ok(self.remote_addr())
- }
-}
-
impl PeerAddress for tokio::net::UnixStream {
fn peer_addr(&self) -> Result<std::net::SocketAddr, Error> {
// TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
@@ -223,11 +228,36 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
// Rust wants this type 'pub' here (else we get 'private type `ApiService`
// in public interface'). The type is still private because the crate does
// not export it.
+#[derive(Clone)]
pub struct ApiService {
pub peer: std::net::SocketAddr,
pub api_config: Arc<ApiConfig>,
}
+impl ApiService {
+ pub async fn serve<S>(
+ self,
+ conn: S,
+ mut graceful: Option<Arc<GracefulShutdown>>,
+ ) -> Result<(), Error>
+ where
+ S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
+ let api_service = TowerToHyperService::new(self);
+ let io = TokioIo::new(conn);
+ let api_conn = conn::auto::Builder::new(TokioExecutor::new());
+ let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
+ if let Some(graceful) = graceful.take() {
+ let api_conn = graceful.watch(api_conn);
+ drop(graceful);
+ api_conn.await
+ } else {
+ api_conn.await
+ }
+ .map_err(|err| format_err!("error serving connection: {err}"))
+ }
+}
+
fn log_response(
logfile: Option<&Arc<Mutex<FileLogger>>>,
peer: &std::net::SocketAddr,
@@ -307,7 +337,7 @@ fn get_user_agent(headers: &HeaderMap) -> Option<String> {
.ok()
}
-impl Service<Request<Body>> for ApiService {
+impl Service<Request<Incoming>> for ApiService {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
@@ -317,7 +347,7 @@ impl Service<Request<Body>> for ApiService {
Poll::Ready(Ok(()))
}
- fn call(&mut self, req: Request<Body>) -> Self::Future {
+ fn call(&mut self, req: Request<Incoming>) -> Self::Future {
let path = req.uri().path_and_query().unwrap().as_str().to_owned();
let method = req.method().clone();
let user_agent = get_user_agent(req.headers());
@@ -384,7 +414,7 @@ fn parse_query_parameters<S: 'static + BuildHasher + Send>(
async fn get_request_parameters<S: 'static + BuildHasher + Send>(
param_schema: ParameterSchema,
parts: &Parts,
- req_body: Body,
+ req_body: Incoming,
uri_param: HashMap<String, String, S>,
) -> Result<Value, Error> {
let mut is_json = false;
@@ -401,13 +431,17 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
}
- let body = TryStreamExt::map_err(req_body, |err| {
+ let stream_body = BodyStream::new(req_body);
+ let body = TryStreamExt::map_err(stream_body, |err| {
http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
})
- .try_fold(Vec::new(), |mut acc, chunk| async move {
+ .try_fold(Vec::new(), |mut acc, frame| async move {
// FIXME: max request body size?
- if acc.len() + chunk.len() < 64 * 1024 {
- acc.extend_from_slice(&chunk);
+ let frame = frame
+ .into_data()
+ .map_err(|err| format_err!("Failed to read request body frame - {err:?}"))?;
+ if acc.len() + frame.len() < 64 * 1024 {
+ acc.extend_from_slice(&frame);
Ok(acc)
} else {
Err(http_err!(BAD_REQUEST, "Request body too large"))
@@ -437,13 +471,14 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
}
+#[derive(Clone)]
struct NoLogExtension();
async fn proxy_protected_request(
config: &ApiConfig,
info: &ApiMethod,
mut parts: Parts,
- req_body: Body,
+ req_body: Incoming,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let mut uri_parts = parts.uri.clone().into_parts();
@@ -463,9 +498,14 @@ async fn proxy_protected_request(
let reload_timezone = info.reload_timezone;
let mut resp = match config.privileged_addr.clone() {
- None => hyper::client::Client::new().request(request).await?,
+ None => {
+ hyper_util::client::legacy::Client::builder(TokioExecutor::new())
+ .build_http()
+ .request(request)
+ .await?
+ }
Some(addr) => {
- hyper::client::Client::builder()
+ hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.build(addr)
.request(request)
.await?
@@ -479,7 +519,7 @@ async fn proxy_protected_request(
}
}
- Ok(resp)
+ Ok(resp.map(|b| Body::wrap_stream(BodyDataStream::new(b))))
}
fn delay_unauth_time() -> std::time::Instant {
@@ -491,22 +531,23 @@ fn access_forbidden_time() -> std::time::Instant {
}
fn handle_stream_as_json_seq(stream: proxmox_router::Stream) -> Result<Response<Body>, Error> {
- let (mut send, body) = hyper::Body::channel();
+ let (send, body) = tokio::sync::mpsc::channel::<Result<Vec<u8>, Error>>(1);
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = stream.into_inner();
while let Some(record) = stream.next().await {
- if send.send_data(record.to_bytes().into()).await.is_err() {
+ if send.send(Ok(record.to_bytes())).await.is_err() {
break;
}
}
});
- Ok(Response::builder()
+ Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json-seq")
- .body(body)?)
+ .body(Body::wrap_stream(ReceiverStream::new(body)))
+ .map_err(Error::from)
}
fn handle_sync_stream_as_json_seq(
@@ -527,7 +568,7 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
info: &'static ApiMethod,
formatter: Option<&'static dyn OutputFormatter>,
parts: Parts,
- req_body: Body,
+ req_body: Incoming,
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let formatter = formatter.unwrap_or(crate::formatter::DIRECT_JSON_FORMATTER);
@@ -630,9 +671,10 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
);
resp.map(|body| {
Body::wrap_stream(
- DeflateEncoder::builder(TryStreamExt::map_err(body, |err| {
- proxmox_lang::io_format_err!("error during compression: {}", err)
- }))
+ DeflateEncoder::builder(TryStreamExt::map_err(
+ BodyDataStream::new(body),
+ |err| proxmox_lang::io_format_err!("error during compression: {}", err),
+ ))
.zlib(true)
.flush_window(is_streaming.then_some(64 * 1024))
.build(),
@@ -796,7 +838,7 @@ fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMe
impl ApiConfig {
pub async fn handle_request(
self: Arc<ApiConfig>,
- req: Request<Body>,
+ req: Request<Incoming>,
peer: &std::net::SocketAddr,
) -> Result<Response<Body>, Error> {
let (parts, body) = req.into_parts();
@@ -808,7 +850,7 @@ impl ApiConfig {
if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
return Ok(Response::builder()
.status(StatusCode::URI_TOO_LONG)
- .body("".into())
+ .body(Body::empty())
.unwrap());
}
@@ -907,7 +949,7 @@ impl Action {
pub struct ApiRequestData<'a> {
parts: Parts,
- body: Body,
+ body: Incoming,
peer: &'a std::net::SocketAddr,
config: &'a ApiConfig,
full_path: &'a str,
--
2.39.5
More information about the pbs-devel
mailing list