[pbs-devel] [PATCH proxmox 05/17] http: add Body implementation
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Mar 26 16:23:09 CET 2025
hyper/http 1.0 now only have a Body trait and some implementations for
specific use cases. following reqwest's lead (and copying some parts of
its implementation), implement our own Body struct for the two common
use cases:
- a body instance containing the full body data as Bytes
- a streaming body instance
together with the most common helper methods (empty body, convert, wrap
existing stream as body) this should make the rest of the upgrade fairly
straight-forward.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
proxmox-http/Cargo.toml | 10 +++
proxmox-http/src/body.rs | 133 +++++++++++++++++++++++++++++++++++++++
proxmox-http/src/lib.rs | 5 ++
3 files changed, 148 insertions(+)
create mode 100644 proxmox-http/src/body.rs
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 4ec142c9..1fbc70a8 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -40,6 +40,15 @@ flate2 = { workspace = true }
[features]
default = []
+body = [
+ "dep:bytes",
+ "dep:futures",
+ "dep:http-body",
+ "dep:http-body-util",
+ "dep:hyper",
+ "dep:sync_wrapper",
+ "sync_wrapper?/futures",
+]
rate-limiter = ["dep:hyper"]
rate-limited-stream = [
"dep:tokio",
@@ -67,6 +76,7 @@ client = [
"hyper-util?/http1",
"hyper-util?/tokio",
"tokio?/io-util",
+ "body",
"http-helpers",
"rate-limited-stream",
]
diff --git a/proxmox-http/src/body.rs b/proxmox-http/src/body.rs
new file mode 100644
index 00000000..3eb17355
--- /dev/null
+++ b/proxmox-http/src/body.rs
@@ -0,0 +1,133 @@
+use std::{pin::Pin, task::Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+
+use futures::ready;
+use http_body_util::combinators::BoxBody;
+use hyper::body::{Body as HyperBody, Frame, SizeHint};
+
+// Partially copied and heavily based on reqwest 0.12 Body implementation from src/async_impl/body.rs
+// Copyright (c) 2016-2025 Sean McArthur
+
+/// Custom implementation of hyper::body::Body supporting either a "full" body that can return its
+/// contents as byte sequence in one go, or "streaming" body that can be polled.
+pub struct Body {
+ inner: InnerBody,
+}
+
+enum InnerBody {
+ Full(Bytes),
+ Streaming(BoxBody<Bytes, Error>),
+}
+
+impl Body {
+ /// Shortcut for creating an empty body instance with no data.
+ pub fn empty() -> Self {
+ Bytes::new().into()
+ }
+
+ /// Returns the body contents if it is a "full" body, None otherwise.
+ pub fn as_bytes(&self) -> Option<&[u8]> {
+ match self.inner {
+ InnerBody::Full(ref bytes) => Some(bytes),
+ InnerBody::Streaming(_) => None,
+ }
+ }
+
+ pub fn wrap_stream<S>(stream: S) -> Body
+ where
+ S: futures::stream::TryStream + Send + 'static,
+
+ S::Error: Into<Error>,
+
+ Bytes: From<S::Ok>,
+ {
+ Body::stream(stream)
+ }
+
+ pub(crate) fn stream<S>(stream: S) -> Body
+ where
+ S: futures::stream::TryStream + Send + 'static,
+
+ S::Error: Into<Error>,
+
+ Bytes: From<S::Ok>,
+ {
+ use futures::TryStreamExt;
+
+ use http_body::Frame;
+
+ use http_body_util::StreamBody;
+
+ let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
+ stream
+ .map_ok(|d| Frame::data(Bytes::from(d)))
+ .map_err(Into::into),
+ )));
+
+ Body {
+ inner: InnerBody::Streaming(body),
+ }
+ }
+}
+
+impl HyperBody for Body {
+ type Data = Bytes;
+
+ type Error = Error;
+
+ fn poll_frame(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
+ match self.inner {
+ InnerBody::Full(ref mut bytes) => {
+ let res = bytes.split_off(0);
+ if res.is_empty() {
+ return Poll::Ready(None);
+ } else {
+ return Poll::Ready(Some(Ok(Frame::data(res))));
+ }
+ }
+ InnerBody::Streaming(ref mut body) => Poll::Ready(
+ ready!(Pin::new(body).poll_frame(cx))
+ .map(|opt_chunk| opt_chunk.map_err(Error::from)),
+ ),
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ match self.inner {
+ InnerBody::Full(ref bytes) => bytes.is_empty(),
+ InnerBody::Streaming(ref box_body) => box_body.is_end_stream(),
+ }
+ }
+
+ fn size_hint(&self) -> hyper::body::SizeHint {
+ match self.inner {
+ InnerBody::Full(ref bytes) => SizeHint::with_exact(bytes.len() as u64),
+ InnerBody::Streaming(ref box_body) => box_body.size_hint(),
+ }
+ }
+}
+
+impl From<Bytes> for Body {
+ fn from(value: Bytes) -> Self {
+ Self {
+ inner: InnerBody::Full(value),
+ }
+ }
+}
+
+impl From<Vec<u8>> for Body {
+ fn from(value: Vec<u8>) -> Self {
+ Bytes::from(value).into()
+ }
+}
+
+impl From<String> for Body {
+ fn from(value: String) -> Self {
+ Bytes::copy_from_slice(value.as_bytes()).into()
+ }
+}
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 4770aaf4..8b6953b0 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -35,3 +35,8 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
mod rate_limited_stream;
#[cfg(feature = "rate-limited-stream")]
pub use rate_limited_stream::RateLimitedStream;
+
+#[cfg(feature = "body")]
+mod body;
+#[cfg(feature = "body")]
+pub use body::Body;
--
2.39.5
More information about the pbs-devel
mailing list