[pbs-devel] [PATCH proxmox 05/17] http: add Body implementation

Max Carrara m.carrara at proxmox.com
Wed Apr 2 15:31:24 CEST 2025


On Wed Mar 26, 2025 at 4:23 PM CET, Fabian Grünbichler wrote:
> hyper/http 1.0 now only have a Body trait and some implementations for
> specific use cases. following reqwest's lead (and copying some parts of
> its implementation), implement our own Body struct for the two common
> use cases:
> - a body instance containing the full body data as Bytes
> - a streaming body instance
>
> together with the most common helper methods (empty body, convert, wrap
> existing stream as body) this should make the rest of the upgrade fairly
> straight-forward.
>
> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
> ---
>  proxmox-http/Cargo.toml  |  10 +++
>  proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
>  proxmox-http/src/lib.rs  |   5 ++
>  3 files changed, 148 insertions(+)
>  create mode 100644 proxmox-http/src/body.rs
>
> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index 4ec142c9..1fbc70a8 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -40,6 +40,15 @@ flate2 = { workspace = true }
>  [features]
>  default = []
>  
> +body = [
> +    "dep:bytes",
> +    "dep:futures",
> +    "dep:http-body",
> +    "dep:http-body-util",
> +    "dep:hyper",
> +    "dep:sync_wrapper",
> +    "sync_wrapper?/futures",
> +]
>  rate-limiter = ["dep:hyper"]
>  rate-limited-stream = [
>      "dep:tokio",
> @@ -67,6 +76,7 @@ client = [
>      "hyper-util?/http1",
>      "hyper-util?/tokio",
>      "tokio?/io-util",
> +    "body",
>      "http-helpers",
>      "rate-limited-stream",
>  ]
> diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
> new file mode 100644
> index 00000000..3eb17355
> --- /dev/null
> +++ b/proxmox-http/src/body.rs
> @@ -0,0 +1,133 @@
> +use std::{pin::Pin, task::Poll};
> +
> +use anyhow::Error;

Eh, not really too much of a fan of anyhow here because the pedant in me
doesn't like it in library-esque code, but given that `anyhow::Error`
has proliferated so much in our remaining code, adapting all the call
sites is a lot of churn. So, this is fine by me, esp. since this is a
sort of "compat layer" for hyper/1.0 and our existing code anyways.

For future stuff that's more isolated we should restrict ourselves to
using `Box<dyn std::error::Error + Send + Sync + 'static>` and
variations thereof, as that can just be tossed to anyhow while also
remaining agnostic to it. That's off-topic, though; I digress. :P

> +use bytes::Bytes;
> +
> +use futures::ready;
> +use http_body_util::combinators::BoxBody;
> +use hyper::body::{Body as HyperBody, Frame, SizeHint};
> +
> +// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
> +// Copyright (c) 2016-2025 Sean McArthur
> +
> +/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
> +/// contents as byte sequence in one go, or "streaming" body that can be polled.
> +pub struct Body {
> +    inner: InnerBody,
> +}
> +
> +enum InnerBody {
> +    Full(Bytes),
> +    Streaming(BoxBody<Bytes, Error>),
> +}
> +
> +impl Body {
> +    /// Shortcut for creating an empty body instance with no data.
> +    pub fn empty() -> Self {
> +        Bytes::new().into()
> +    }
> +
> +    /// Returns the body contents if it is a "full" body, None otherwise.
> +    pub fn as_bytes(&self) -> Option<&[u8]> {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => Some(bytes),
> +            InnerBody::Streaming(_) => None,
> +        }
> +    }
> +
> +    pub fn wrap_stream<S>(stream: S) -> Body
> +    where
> +        S: futures::stream::TryStream + Send + 'static,
> +
> +        S::Error: Into<Error>,
> +
> +        Bytes: From<S::Ok>,
> +    {
> +        Body::stream(stream)
> +    }
> +
> +    pub(crate) fn stream<S>(stream: S) -> Body
> +    where
> +        S: futures::stream::TryStream + Send + 'static,
> +
> +        S::Error: Into<Error>,
> +
> +        Bytes: From<S::Ok>,
> +    {
> +        use futures::TryStreamExt;
> +
> +        use http_body::Frame;
> +
> +        use http_body_util::StreamBody;
> +
> +        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
> +            stream
> +                .map_ok(|d| Frame::data(Bytes::from(d)))
> +                .map_err(Into::into),
> +        )));
> +
> +        Body {
> +            inner: InnerBody::Streaming(body),
> +        }
> +    }
> +}
> +
> +impl HyperBody for Body {
> +    type Data = Bytes;
> +
> +    type Error = Error;
> +
> +    fn poll_frame(
> +        mut self: std::pin::Pin<&mut Self>,
> +        cx: &mut std::task::Context<'_>,
> +    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
> +        match self.inner {
> +            InnerBody::Full(ref mut bytes) => {
> +                let res = bytes.split_off(0);
> +                if res.is_empty() {
> +                    return Poll::Ready(None);
> +                } else {
> +                    return Poll::Ready(Some(Ok(Frame::data(res))));
> +                }

The `return` statements above are superfluous; you can just

    if res.is_empty() {
        return Poll::Ready(None);
    } else {
        return Poll::Ready(Some(Ok(Frame::data(res))));
    }

> +            }
> +            InnerBody::Streaming(ref mut body) => Poll::Ready(
> +                ready!(Pin::new(body).poll_frame(cx))
> +                    .map(|opt_chunk| opt_chunk.map_err(Error::from)),

The `map_err` call here is redundant.

> +            ),
> +        }
> +    }
> +
> +    fn is_end_stream(&self) -> bool {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => bytes.is_empty(),
> +            InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
> +        }
> +    }
> +
> +    fn size_hint(&self) -> hyper::body::SizeHint {
> +        match self.inner {
> +            InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
> +            InnerBody::Streaming(ref box_body) => box_body.size_hint(),
> +        }
> +    }
> +}
> +
> +impl From<Bytes> for Body {
> +    fn from(value: Bytes) -> Self {
> +        Self {
> +            inner: InnerBody::Full(value),
> +        }
> +    }
> +}
> +
> +impl From<Vec<u8>> for Body {
> +    fn from(value: Vec<u8>) -> Self {
> +        Bytes::from(value).into()
> +    }
> +}
> +
> +impl From<String> for Body {
> +    fn from(value: String) -> Self {
> +        Bytes::copy_from_slice(value.as_bytes()).into()
> +    }
> +}
> diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
> index 4770aaf4..8b6953b0 100644
> --- a/proxmox-http/src/lib.rs
> +++ b/proxmox-http/src/lib.rs
> @@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
>  mod rate_limited_stream;
>  #[cfg(feature = "rate-limited-stream")]
>  pub use rate_limited_stream::RateLimitedStream;
> +
> +#[cfg(feature = "body")]
> +mod body;
> +#[cfg(feature = "body")]
> +pub use body::Body;





More information about the pbs-devel mailing list