[pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content
Maximiliano Sandoval
m.sandoval at proxmox.com
Thu Mar 28 14:40:31 CET 2024
The Backup Server can compress the content using deflate so we teach the
client how to decode it.
If a request is sent with the `Accept-Encoding` [2] header set to
`deflate`, and the response's `Content-Encoding` [1] header is equal to
`deflate` we wrap the Body stream with a stream that can decode `zlib`
on the run.
Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
actually `zlib`.
The new `ZlibDecoder` Stream is basically the same as the
`DeflateEncoder` struct in the proxmox-compress crate.
This can be also tested against
http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
following test:
```rust
#[tokio::test]
async fn test_client() {
let client = Client::new();
let headers = HashMap::from([(
hyper::header::ACCEPT_ENCODING.to_string(),
"deflate".to_string(),
)]);
let response = client
.get_string("https://eu.httpbin.org/deflate", Some(&headers))
.await;
assert!(response.is_ok());
}
```
at `proxmox-http/src/client/simple.rs` and running
```
cargo test --features=client,client-trait
```
[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
[2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding
Suggested-by: Lukas Wagner <l.wagner at proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval at proxmox.com>
---
Differences from v1:
- Only implement deflate for the moment
- Turn decoder into a stream
- Do not set Accept-Encoding
The API for setting the Accept-Encoding needs to be figured out in a follow-up.
proxmox-compression/Cargo.toml | 3 +-
proxmox-compression/src/lib.rs | 3 +
proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++
proxmox-http/Cargo.toml | 7 ++
proxmox-http/src/client/simple.rs | 63 +++++++++-
5 files changed, 234 insertions(+), 3 deletions(-)
create mode 100644 proxmox-compression/src/zlib_decoder.rs
diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
index 49735cbe..3879ed16 100644
--- a/proxmox-compression/Cargo.toml
+++ b/proxmox-compression/Cargo.toml
@@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
proxmox-lang.workspace = true
[dev-dependencies]
-tokio = { workspace = true, features = [ "macros" ] }
-
+tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 1fcfb977..f5d6e269 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,6 +1,9 @@
mod compression;
pub use compression::*;
+mod zlib_decoder;
+pub use zlib_decoder::*;
+
pub mod tar;
pub mod zip;
pub mod zstd;
diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs
new file mode 100644
index 00000000..fdf9682b
--- /dev/null
+++ b/proxmox-compression/src/zlib_decoder.rs
@@ -0,0 +1,161 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct ZlibDecoder<T> {
+ inner: T,
+ decompressor: Decompress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: EncoderState,
+}
+
+impl<T> ZlibDecoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::with_buffer_size(inner, BUFFER_SIZE)
+ }
+
+ fn with_buffer_size(inner: T, buffer_size: usize) -> Self {
+ Self {
+ inner,
+ decompressor: Decompress::new(true),
+ buffer: ByteBuffer::with_capacity(buffer_size),
+ input_buffer: Bytes::new(),
+ state: EncoderState::Reading,
+ }
+ }
+
+ fn decode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushDecompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.decompressor.total_in();
+ let old_out = self.decompressor.total_out();
+ let res = self
+ .decompressor
+ .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.decompressor.total_in() - old_in) as usize;
+ let new_out = (self.decompressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl<T, O, E> Stream for ZlibDecoder<T>
+where
+ T: Stream<Item = Result<O, E>> + Unpin,
+ O: Into<Bytes>,
+ E: Into<Error>,
+{
+ type Item = Result<Bytes, anyhow::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ EncoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res.map_err(Into::into)?;
+ this.input_buffer = buf.into();
+ this.state = EncoderState::Writing;
+ } else {
+ this.state = EncoderState::Flushing;
+ }
+ }
+ EncoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(anyhow::format_err!(
+ "empty input during write"
+ ))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = EncoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ EncoderState::Flushing => {
+ let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = EncoderState::Finished;
+ }
+ }
+ EncoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ use futures::StreamExt;
+ use std::io::Write;
+
+ const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+ fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+ use flate2::Compression;
+ use std::io::Write;
+
+ let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
+ e.write_all(bytes).unwrap();
+
+ e.finish()
+ }
+
+ #[tokio::test]
+ async fn test_decompression() {
+ const BUFFER_SIZE: usize = 5;
+ let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+ let chunks: Vec<Result<_, std::io::Error>> = vec![
+ Ok(encoded[..10].to_vec()),
+ Ok(encoded[10..20].to_vec()),
+ Ok(encoded[20..30].to_vec()),
+ Ok(encoded[30..40].to_vec()),
+ Ok(encoded[40..].to_vec()),
+ ];
+ let stream = futures::stream::iter(chunks);
+ let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE);
+ let mut buf = Vec::with_capacity(BODY.len());
+
+ while let Some(Ok(res)) = decoder.next().await {
+ buf.write_all(&res).unwrap();
+ }
+ assert_eq!(buf, BODY.as_bytes());
+ }
+}
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9ece24eb..4455ba85 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
proxmox-sys = { workspace = true, optional = true }
proxmox-io = { workspace = true, optional = true }
proxmox-lang = { workspace = true, optional = true }
+proxmox-compression = { workspace = true, optional = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = [ "macros" ] }
+flate2 = { workspace = true }
[features]
default = []
@@ -42,12 +47,14 @@ client = [
"dep:futures",
"dep:hyper",
"dep:openssl",
+ "dep:proxmox-compression",
"dep:tokio",
"dep:tokio-openssl",
"http-helpers",
"hyper?/client",
"hyper?/http1",
"hyper?/http2",
+ "hyper?/stream",
"hyper?/tcp",
"rate-limited-stream",
"tokio?/io-util",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index e9910802..b9e240d2 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -78,7 +78,8 @@ impl Client {
self.add_proxy_headers(&mut request)?;
- self.client.request(request).map_err(Error::from).await
+ let encoded_response = self.client.request(request).map_err(Error::from).await?;
+ decode_response(encoded_response).await
}
pub async fn post(
@@ -245,3 +246,63 @@ impl crate::HttpClient<String, String> for Client {
})
}
}
+
+/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding`
+/// header of the response is `deflate`, otherwise returns the original
+/// response.
+async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
+ let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
+ return Ok(res);
+ };
+
+ let encodings = content_encoding.to_str()?;
+ if encodings == "deflate" {
+ let (parts, body) = res.into_parts();
+ let decoder = proxmox_compression::ZlibDecoder::new(body);
+ let decoded_body = Body::wrap_stream(decoder);
+ Ok(Response::from_parts(parts, decoded_body))
+ } else {
+ bail!("Unknown encoding format: {encodings}");
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ use std::io::Write;
+
+ const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+ #[tokio::test]
+ async fn test_parse_response_deflate() {
+ let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+ let encoded_body = Body::from(encoded);
+ let encoded_response = Response::builder()
+ .header(hyper::header::CONTENT_ENCODING, "deflate")
+ .body(encoded_body)
+ .unwrap();
+
+ let decoded_response = decode_response(encoded_response).await.unwrap();
+
+ assert_eq!(
+ Client::response_body_string(decoded_response)
+ .await
+ .unwrap(),
+ BODY
+ );
+ }
+
+ fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+ use flate2::write::ZlibEncoder;
+ use flate2::Compression;
+
+ let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
+ e.write_all(bytes).unwrap();
+
+ e.finish()
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list