[pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content

Maximiliano Sandoval m.sandoval at proxmox.com
Thu Mar 28 14:40:31 CET 2024


The Backup Server can compress the content using deflate so we teach the
client how to decode it.

If a request is sent with the `Accept-Encoding` [2] header set to
`deflate`, and the response's `Content-Encoding` [1] header is equal to
`deflate` we wrap the Body stream with a stream that can decode `zlib`
on the run.

Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
actually `zlib`.

The new `ZlibDecoder` Stream is basically the same as the
`DeflateEncoder` struct in the proxmox-compress crate.

This can be also tested against
http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
following test:

```rust
    #[tokio::test]
    async fn test_client() {
        let client = Client::new();
        let headers = HashMap::from([(
            hyper::header::ACCEPT_ENCODING.to_string(),
            "deflate".to_string(),
        )]);
        let response = client
            .get_string("https://eu.httpbin.org/deflate", Some(&headers))
            .await;
        assert!(response.is_ok());
    }
```

at `proxmox-http/src/client/simple.rs` and running

```
cargo test --features=client,client-trait
```

[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
[2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding

Suggested-by: Lukas Wagner <l.wagner at proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval at proxmox.com>
---
Differences from v1:
 - Only implement deflate for the moment
 - Turn decoder into a stream
 - Do not set Accept-Encoding

The API for setting the Accept-Encoding needs to be figured out in a follow-up.

 proxmox-compression/Cargo.toml          |   3 +-
 proxmox-compression/src/lib.rs          |   3 +
 proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++
 proxmox-http/Cargo.toml                 |   7 ++
 proxmox-http/src/client/simple.rs       |  63 +++++++++-
 5 files changed, 234 insertions(+), 3 deletions(-)
 create mode 100644 proxmox-compression/src/zlib_decoder.rs

diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
index 49735cbe..3879ed16 100644
--- a/proxmox-compression/Cargo.toml
+++ b/proxmox-compression/Cargo.toml
@@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
 proxmox-lang.workspace = true
 
 [dev-dependencies]
-tokio = { workspace = true, features = [ "macros" ] }
-
+tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 1fcfb977..f5d6e269 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,6 +1,9 @@
 mod compression;
 pub use compression::*;
 
+mod zlib_decoder;
+pub use zlib_decoder::*;
+
 pub mod tar;
 pub mod zip;
 pub mod zstd;
diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs
new file mode 100644
index 00000000..fdf9682b
--- /dev/null
+++ b/proxmox-compression/src/zlib_decoder.rs
@@ -0,0 +1,161 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+    Reading,
+    Writing,
+    Flushing,
+    Finished,
+}
+
+pub struct ZlibDecoder<T> {
+    inner: T,
+    decompressor: Decompress,
+    buffer: ByteBuffer,
+    input_buffer: Bytes,
+    state: EncoderState,
+}
+
+impl<T> ZlibDecoder<T> {
+    pub fn new(inner: T) -> Self {
+        Self::with_buffer_size(inner, BUFFER_SIZE)
+    }
+
+    fn with_buffer_size(inner: T, buffer_size: usize) -> Self {
+        Self {
+            inner,
+            decompressor: Decompress::new(true),
+            buffer: ByteBuffer::with_capacity(buffer_size),
+            input_buffer: Bytes::new(),
+            state: EncoderState::Reading,
+        }
+    }
+
+    fn decode(
+        &mut self,
+        inbuf: &[u8],
+        flush: FlushDecompress,
+    ) -> Result<(usize, flate2::Status), io::Error> {
+        let old_in = self.decompressor.total_in();
+        let old_out = self.decompressor.total_out();
+        let res = self
+            .decompressor
+            .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+        let new_in = (self.decompressor.total_in() - old_in) as usize;
+        let new_out = (self.decompressor.total_out() - old_out) as usize;
+        self.buffer.add_size(new_out);
+
+        Ok((new_in, res))
+    }
+}
+
+impl<T, O, E> Stream for ZlibDecoder<T>
+where
+    T: Stream<Item = Result<O, E>> + Unpin,
+    O: Into<Bytes>,
+    E: Into<Error>,
+{
+    type Item = Result<Bytes, anyhow::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.state {
+                EncoderState::Reading => {
+                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+                        let buf = res.map_err(Into::into)?;
+                        this.input_buffer = buf.into();
+                        this.state = EncoderState::Writing;
+                    } else {
+                        this.state = EncoderState::Flushing;
+                    }
+                }
+                EncoderState::Writing => {
+                    if this.input_buffer.is_empty() {
+                        return Poll::Ready(Some(Err(anyhow::format_err!(
+                            "empty input during write"
+                        ))));
+                    }
+                    let mut buf = this.input_buffer.split_off(0);
+                    let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+                    this.input_buffer = buf.split_off(read);
+                    if this.input_buffer.is_empty() {
+                        this.state = EncoderState::Reading;
+                    }
+                    if this.buffer.is_full() || res == flate2::Status::BufError {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                }
+                EncoderState::Flushing => {
+                    let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+                    if !this.buffer.is_empty() {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                    if res == flate2::Status::StreamEnd {
+                        this.state = EncoderState::Finished;
+                    }
+                }
+                EncoderState::Finished => return Poll::Ready(None),
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use futures::StreamExt;
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::Compression;
+        use std::io::Write;
+
+        let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+
+    #[tokio::test]
+    async fn test_decompression() {
+        const BUFFER_SIZE: usize = 5;
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let chunks: Vec<Result<_, std::io::Error>> = vec![
+            Ok(encoded[..10].to_vec()),
+            Ok(encoded[10..20].to_vec()),
+            Ok(encoded[20..30].to_vec()),
+            Ok(encoded[30..40].to_vec()),
+            Ok(encoded[40..].to_vec()),
+        ];
+        let stream = futures::stream::iter(chunks);
+        let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE);
+        let mut buf = Vec::with_capacity(BODY.len());
+
+        while let Some(Ok(res)) = decoder.next().await {
+            buf.write_all(&res).unwrap();
+        }
+        assert_eq!(buf, BODY.as_bytes());
+    }
+}
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9ece24eb..4455ba85 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
 proxmox-sys = { workspace = true, optional = true }
 proxmox-io = { workspace = true, optional = true }
 proxmox-lang = { workspace = true, optional = true }
+proxmox-compression = { workspace = true, optional = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = [ "macros" ] }
+flate2 = { workspace = true }
 
 [features]
 default = []
@@ -42,12 +47,14 @@ client = [
     "dep:futures",
     "dep:hyper",
     "dep:openssl",
+    "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
     "http-helpers",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
+    "hyper?/stream",
     "hyper?/tcp",
     "rate-limited-stream",
     "tokio?/io-util",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index e9910802..b9e240d2 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -78,7 +78,8 @@ impl Client {
 
         self.add_proxy_headers(&mut request)?;
 
-        self.client.request(request).map_err(Error::from).await
+        let encoded_response = self.client.request(request).map_err(Error::from).await?;
+        decode_response(encoded_response).await
     }
 
     pub async fn post(
@@ -245,3 +246,63 @@ impl crate::HttpClient<String, String> for Client {
         })
     }
 }
+
+/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding`
+/// header of the response is `deflate`, otherwise returns the original
+/// response.
+async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
+    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
+        return Ok(res);
+    };
+
+    let encodings = content_encoding.to_str()?;
+    if encodings == "deflate" {
+        let (parts, body) = res.into_parts();
+        let decoder = proxmox_compression::ZlibDecoder::new(body);
+        let decoded_body = Body::wrap_stream(decoder);
+        Ok(Response::from_parts(parts, decoded_body))
+    } else {
+        bail!("Unknown encoding format: {encodings}");
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    #[tokio::test]
+    async fn test_parse_response_deflate() {
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let encoded_body = Body::from(encoded);
+        let encoded_response = Response::builder()
+            .header(hyper::header::CONTENT_ENCODING, "deflate")
+            .body(encoded_body)
+            .unwrap();
+
+        let decoded_response = decode_response(encoded_response).await.unwrap();
+
+        assert_eq!(
+            Client::response_body_string(decoded_response)
+                .await
+                .unwrap(),
+            BODY
+        );
+    }
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::write::ZlibEncoder;
+        use flate2::Compression;
+
+        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list