[pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content

Max Carrara m.carrara at proxmox.com
Wed Apr 3 18:44:39 CEST 2024


On Thu Mar 28, 2024 at 2:40 PM CET, Maximiliano Sandoval wrote:
> The Backup Server can compress the content using deflate so we teach the
> client how to decode it.
>
> If a request is sent with the `Accept-Encoding` [2] header set to
> `deflate`, and the response's `Content-Encoding` [1] header is equal to
> `deflate` we wrap the Body stream with a stream that can decode `zlib`
> on the run.
>
> Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
> actually `zlib`.
>
> The new `ZlibDecoder` Stream is basically the same as the
> `DeflateEncoder` struct in the proxmox-compress crate.
>
> This can be also tested against
> http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
> following test:
>
> ```rust
>     #[tokio::test]
>     async fn test_client() {
>         let client = Client::new();
>         let headers = HashMap::from([(
>             hyper::header::ACCEPT_ENCODING.to_string(),
>             "deflate".to_string(),
>         )]);
>         let response = client
>             .get_string("https://eu.httpbin.org/deflate", Some(&headers))
>             .await;
>         assert!(response.is_ok());
>     }
> ```
>
> at `proxmox-http/src/client/simple.rs` and running
>
> ```
> cargo test --features=client,client-trait
> ```

Note that the tests you added (in both crates) also run if you invoke
`cargo test --all-features` in the workspace root.

Overall I'd prefer if this patch was split up, because you're touching
two crates at once - one adds a new public API, the other extends the
capability of an existing one. This would just make it easier to track
what parts are actually affected, as the changes to
`proxmox-compression` could be applied separately first (and those to
`proxmox-http` at a later point if necessary).

Apart from that, it seems you haven't checked whether we always compress
our responses server-side (if the client allows it) as I had mentioned
in my response to v1 (or at least you didn't mention here if we do).
Since you've now omitted sending `Accept-Encoding` on requests, it won't
make a difference - but perhaps this is something we should check out in
a later patch (series) after this one's done? IMO we shouldn't just
generically compress data everywhere even if the client accepts it (for
reasons see my response to v1).

Otherwise, looks pretty good to me! See some more comments inline.

>
> [1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
> [2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding
>
> Suggested-by: Lukas Wagner <l.wagner at proxmox.com>
> Signed-off-by: Maximiliano Sandoval <m.sandoval at proxmox.com>
> ---
> Differences from v1:
>  - Only implement deflate for the moment
>  - Turn decoder into a stream
>  - Do not set Accept-Encoding
>
> The API for setting the Accept-Encoding needs to be figured out in a follow-up.
>
>  proxmox-compression/Cargo.toml          |   3 +-
>  proxmox-compression/src/lib.rs          |   3 +
>  proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++
>  proxmox-http/Cargo.toml                 |   7 ++
>  proxmox-http/src/client/simple.rs       |  63 +++++++++-
>  5 files changed, 234 insertions(+), 3 deletions(-)
>  create mode 100644 proxmox-compression/src/zlib_decoder.rs
>
> diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
> index 49735cbe..3879ed16 100644
> --- a/proxmox-compression/Cargo.toml
> +++ b/proxmox-compression/Cargo.toml
> @@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
>  proxmox-lang.workspace = true
>  
>  [dev-dependencies]
> -tokio = { workspace = true, features = [ "macros" ] }
> -
> +tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
> diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
> index 1fcfb977..f5d6e269 100644
> --- a/proxmox-compression/src/lib.rs
> +++ b/proxmox-compression/src/lib.rs
> @@ -1,6 +1,9 @@
>  mod compression;
>  pub use compression::*;
>  
> +mod zlib_decoder;
> +pub use zlib_decoder::*;
> +
>  pub mod tar;
>  pub mod zip;
>  pub mod zstd;
> diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs
> new file mode 100644
> index 00000000..fdf9682b
> --- /dev/null
> +++ b/proxmox-compression/src/zlib_decoder.rs
> @@ -0,0 +1,161 @@
> +use std::io;
> +use std::pin::Pin;
> +use std::task::{Context, Poll};
> +
> +use anyhow::Error;
> +use bytes::Bytes;
> +use flate2::{Decompress, FlushDecompress};
> +use futures::ready;
> +use futures::stream::Stream;
> +
> +use proxmox_io::ByteBuffer;
> +
> +const BUFFER_SIZE: usize = 8192;
> +
> +#[derive(Eq, PartialEq)]
> +enum EncoderState {
> +    Reading,
> +    Writing,
> +    Flushing,
> +    Finished,
> +}
> +
> +pub struct ZlibDecoder<T> {
> +    inner: T,
> +    decompressor: Decompress,
> +    buffer: ByteBuffer,
> +    input_buffer: Bytes,
> +    state: EncoderState,
> +}

The decoder is pretty fine as it is - though as we had discussed off
list a while ago, I wonder if there's a way to make this (and the
encoder) a little more generic.

> +
> +impl<T> ZlibDecoder<T> {
> +    pub fn new(inner: T) -> Self {
> +        Self::with_buffer_size(inner, BUFFER_SIZE)
> +    }
> +
> +    fn with_buffer_size(inner: T, buffer_size: usize) -> Self {
> +        Self {
> +            inner,
> +            decompressor: Decompress::new(true),

The encoder apparently passes `false` for specifying the zlib header -
theoretically, you could first extend the encoder with two more
constructors that allow you to specify that the output will include a
zlib header, e.g.:

(in proxmox-compression/src/compression.rs)

    pub fn new_zlib(inner: T) -> Self { ... }
    fn with_buffer_size_zlib(inner: T, buffer_size: usize) -> Self { ... }


Even though this feels a little clunky (I'd prefer to have a builder
here, but it's not [yet] worth it) it doesn't break the API - though I'm
of course open for other suggestions as well.

You could then provide matching constructor `fn`s for your
`ZlibDecoder<T>` - you might as well call it `DeflateDecoder<T>` in that
case, too.

That way we'd avoid having to introduce a separate en-/decoder `struct` 
with its own `En`-/`DecoderState` `enum`. To elaborate, instead of:

  * `DeflateEncoder`
  * `DeflateDecoder` (doesn't exist yet)
  * `ZlibEncoder` (doesn't exist yet)
  * `ZlibDecoder`

.. we'd end up with:

  * `DeflateEncoder` (now supports both DEFLATE and zlib)
  * `DefladeDecoder` (also supports both)

So, we'd reduce a lot of additional duplicated code.

Additionally, you could rename the `compression.rs` module to `flate.rs`
or something like that - because the encoder is exported in `lib.rs`
anyway, this shouldn't break any existing usages (and if it does, you
could provide a small compatibility module in `lib.rs` for the time
being, if that's preferable).


Either way, even though this turned into a little wall of text, don't
fret - I have nothing to bicker about in regards to the code here ;)

LGTM!

> +            buffer: ByteBuffer::with_capacity(buffer_size),
> +            input_buffer: Bytes::new(),
> +            state: EncoderState::Reading,
> +        }
> +    }
> +
> +    fn decode(
> +        &mut self,
> +        inbuf: &[u8],
> +        flush: FlushDecompress,
> +    ) -> Result<(usize, flate2::Status), io::Error> {
> +        let old_in = self.decompressor.total_in();
> +        let old_out = self.decompressor.total_out();
> +        let res = self
> +            .decompressor
> +            .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
> +        let new_in = (self.decompressor.total_in() - old_in) as usize;
> +        let new_out = (self.decompressor.total_out() - old_out) as usize;
> +        self.buffer.add_size(new_out);
> +
> +        Ok((new_in, res))
> +    }
> +}
> +
> +impl<T, O, E> Stream for ZlibDecoder<T>
> +where
> +    T: Stream<Item = Result<O, E>> + Unpin,
> +    O: Into<Bytes>,
> +    E: Into<Error>,
> +{
> +    type Item = Result<Bytes, anyhow::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.state {
> +                EncoderState::Reading => {
> +                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
> +                        let buf = res.map_err(Into::into)?;
> +                        this.input_buffer = buf.into();
> +                        this.state = EncoderState::Writing;
> +                    } else {
> +                        this.state = EncoderState::Flushing;
> +                    }
> +                }
> +                EncoderState::Writing => {
> +                    if this.input_buffer.is_empty() {
> +                        return Poll::Ready(Some(Err(anyhow::format_err!(
> +                            "empty input during write"
> +                        ))));
> +                    }
> +                    let mut buf = this.input_buffer.split_off(0);
> +                    let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
> +                    this.input_buffer = buf.split_off(read);
> +                    if this.input_buffer.is_empty() {
> +                        this.state = EncoderState::Reading;
> +                    }
> +                    if this.buffer.is_full() || res == flate2::Status::BufError {
> +                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> +                        return Poll::Ready(Some(Ok(bytes.into())));
> +                    }
> +                }
> +                EncoderState::Flushing => {
> +                    let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
> +                    if !this.buffer.is_empty() {
> +                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> +                        return Poll::Ready(Some(Ok(bytes.into())));
> +                    }
> +                    if res == flate2::Status::StreamEnd {
> +                        this.state = EncoderState::Finished;
> +                    }
> +                }
> +                EncoderState::Finished => return Poll::Ready(None),
> +            }
> +        }
> +    }
> +}
> +
> +#[cfg(test)]
> +mod test {
> +    use super::*;
> +
> +    use futures::StreamExt;
> +    use std::io::Write;
> +
> +    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
> +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
> +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
> +si aliquod aeternum et infinitum impendere."#;
> +
> +    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
> +        use flate2::Compression;
> +        use std::io::Write;
> +
> +        let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
> +        e.write_all(bytes).unwrap();
> +
> +        e.finish()
> +    }
> +
> +    #[tokio::test]
> +    async fn test_decompression() {
> +        const BUFFER_SIZE: usize = 5;
> +        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
> +        let chunks: Vec<Result<_, std::io::Error>> = vec![
> +            Ok(encoded[..10].to_vec()),
> +            Ok(encoded[10..20].to_vec()),
> +            Ok(encoded[20..30].to_vec()),
> +            Ok(encoded[30..40].to_vec()),
> +            Ok(encoded[40..].to_vec()),
> +        ];
> +        let stream = futures::stream::iter(chunks);
> +        let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE);
> +        let mut buf = Vec::with_capacity(BODY.len());
> +
> +        while let Some(Ok(res)) = decoder.next().await {
> +            buf.write_all(&res).unwrap();
> +        }
> +        assert_eq!(buf, BODY.as_bytes());
> +    }

Once you're incorporating the changes I suggested above, you could then
do some more elaborate testing for both the en- and decoder - e.g. test
if the content compressed by the encoder gets correctly decompressed by
`flate2`'s decompressor and vice versa for the decoder (for both DEFLATE
and zlib modes).

While I trust that the encoder works as-is (not like we've been using it
all the time, duh), it doesn't hurt to have tests there too, just in
case.

> +}

As mentioned above, the changes below should be put into a separate
patch.

> diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
> index 9ece24eb..4455ba85 100644
> --- a/proxmox-http/Cargo.toml
> +++ b/proxmox-http/Cargo.toml
> @@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
>  proxmox-sys = { workspace = true, optional = true }
>  proxmox-io = { workspace = true, optional = true }
>  proxmox-lang = { workspace = true, optional = true }
> +proxmox-compression = { workspace = true, optional = true }
> +
> +[dev-dependencies]
> +tokio = { workspace = true, features = [ "macros" ] }
> +flate2 = { workspace = true }
>  
>  [features]
>  default = []
> @@ -42,12 +47,14 @@ client = [
>      "dep:futures",
>      "dep:hyper",
>      "dep:openssl",
> +    "dep:proxmox-compression",
>      "dep:tokio",
>      "dep:tokio-openssl",
>      "http-helpers",
>      "hyper?/client",
>      "hyper?/http1",
>      "hyper?/http2",
> +    "hyper?/stream",
>      "hyper?/tcp",
>      "rate-limited-stream",
>      "tokio?/io-util",
> diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
> index e9910802..b9e240d2 100644
> --- a/proxmox-http/src/client/simple.rs
> +++ b/proxmox-http/src/client/simple.rs
> @@ -78,7 +78,8 @@ impl Client {
>  
>          self.add_proxy_headers(&mut request)?;
>  
> -        self.client.request(request).map_err(Error::from).await
> +        let encoded_response = self.client.request(request).map_err(Error::from).await?;
> +        decode_response(encoded_response).await
>      }
>  
>      pub async fn post(
> @@ -245,3 +246,63 @@ impl crate::HttpClient<String, String> for Client {
>          })
>      }
>  }
> +
> +/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding`
> +/// header of the response is `deflate`, otherwise returns the original
> +/// response.
> +async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
> +    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
> +        return Ok(res);
> +    };
> +
> +    let encodings = content_encoding.to_str()?;
> +    if encodings == "deflate" {
> +        let (parts, body) = res.into_parts();
> +        let decoder = proxmox_compression::ZlibDecoder::new(body);
> +        let decoded_body = Body::wrap_stream(decoder);
> +        Ok(Response::from_parts(parts, decoded_body))
> +    } else {
> +        bail!("Unknown encoding format: {encodings}");
> +    }
> +}
> +
> +#[cfg(test)]
> +mod test {
> +    use super::*;
> +
> +    use std::io::Write;
> +
> +    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
> +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
> +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
> +si aliquod aeternum et infinitum impendere."#;
> +
> +    #[tokio::test]
> +    async fn test_parse_response_deflate() {
> +        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
> +        let encoded_body = Body::from(encoded);
> +        let encoded_response = Response::builder()
> +            .header(hyper::header::CONTENT_ENCODING, "deflate")
> +            .body(encoded_body)
> +            .unwrap();
> +
> +        let decoded_response = decode_response(encoded_response).await.unwrap();
> +
> +        assert_eq!(
> +            Client::response_body_string(decoded_response)
> +                .await
> +                .unwrap(),
> +            BODY
> +        );
> +    }
> +
> +    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
> +        use flate2::write::ZlibEncoder;
> +        use flate2::Compression;
> +
> +        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
> +        e.write_all(bytes).unwrap();
> +
> +        e.finish()
> +    }
> +}





More information about the pbs-devel mailing list