[pbs-devel] [PATCH proxmox v3 3/5] compression: deflate: add a decoder
Maximiliano Sandoval
m.sandoval at proxmox.com
Wed Apr 10 16:30:34 CEST 2024
Signed-off-by: Maximiliano Sandoval <m.sandoval at proxmox.com>
---
.../src/deflate/decompression.rs | 141 ++++++++++++++++++
proxmox-compression/src/deflate/mod.rs | 2 +
proxmox-compression/src/lib.rs | 2 +-
3 files changed, 144 insertions(+), 1 deletion(-)
create mode 100644 proxmox-compression/src/deflate/decompression.rs
diff --git a/proxmox-compression/src/deflate/decompression.rs b/proxmox-compression/src/deflate/decompression.rs
new file mode 100644
index 00000000..45ed8579
--- /dev/null
+++ b/proxmox-compression/src/deflate/decompression.rs
@@ -0,0 +1,141 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+#[derive(Eq, PartialEq)]
+enum DecoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateDecoder<T> {
+ inner: T,
+ decompressor: Decompress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: DecoderState,
+}
+
+pub struct DeflateDecoderBuilder<T> {
+ inner: T,
+ is_zlib: bool,
+ buffer_size: usize,
+}
+
+impl<T> DeflateDecoderBuilder<T> {
+ pub fn zlib(mut self, is_zlib: bool) -> Self {
+ self.is_zlib = is_zlib;
+ self
+ }
+
+ pub fn buffer_size(mut self, buffer_size: usize) -> Self {
+ self.buffer_size = buffer_size;
+ self
+ }
+
+ pub fn build(self) -> DeflateDecoder<T> {
+ DeflateDecoder {
+ inner: self.inner,
+ decompressor: Decompress::new(self.is_zlib),
+ buffer: ByteBuffer::with_capacity(self.buffer_size),
+ input_buffer: Bytes::new(),
+ state: DecoderState::Reading,
+ }
+ }
+}
+
+impl<T> DeflateDecoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::builder(inner).build()
+ }
+
+ pub fn builder(inner: T) -> DeflateDecoderBuilder<T> {
+ DeflateDecoderBuilder {
+ inner,
+ is_zlib: false,
+ buffer_size: super::BUFFER_SIZE,
+ }
+ }
+
+ fn decode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushDecompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.decompressor.total_in();
+ let old_out = self.decompressor.total_out();
+ let res = self
+ .decompressor
+ .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.decompressor.total_in() - old_in) as usize;
+ let new_out = (self.decompressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl<T, O, E> Stream for DeflateDecoder<T>
+where
+ T: Stream<Item = Result<O, E>> + Unpin,
+ O: Into<Bytes>,
+ E: Into<Error>,
+{
+ type Item = Result<Bytes, anyhow::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ DecoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res.map_err(Into::into)?;
+ this.input_buffer = buf.into();
+ this.state = DecoderState::Writing;
+ } else {
+ this.state = DecoderState::Flushing;
+ }
+ }
+ DecoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(anyhow::format_err!(
+ "empty input during write"
+ ))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = DecoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ DecoderState::Flushing => {
+ let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = DecoderState::Finished;
+ }
+ }
+ DecoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
index 514ccbdc..6867176c 100644
--- a/proxmox-compression/src/deflate/mod.rs
+++ b/proxmox-compression/src/deflate/mod.rs
@@ -1,5 +1,7 @@
mod compression;
+mod decompression;
pub use compression::{DeflateEncoder, Level};
+pub use decompression::DeflateDecoder;
const BUFFER_SIZE: usize = 8192;
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 239de623..f33212e5 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,4 +1,4 @@
-pub use deflate::{DeflateEncoder, Level};
+pub use deflate::{DeflateDecoder, DeflateEncoder, Level};
pub(crate) mod deflate;
pub mod tar;
--
2.39.2
More information about the pbs-devel
mailing list