[pbs-devel] [PATCH proxmox-backup v2 1/2] tools/zip: compress zips with deflate
Dominik Csapak
d.csapak at proxmox.com
Tue Mar 16 13:37:26 CET 2021
to get smaller zip files
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
changes from v1:
* factor out the compression call and use block_in_place
Cargo.toml | 1 +
src/tools/zip.rs | 74 ++++++++++++++++++++++++++++++++++++------------
2 files changed, 57 insertions(+), 18 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 79945312..06967c20 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,7 @@ crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
anyhow = "1.0"
futures = "0.3"
+flate2 = "1.0"
h2 = { version = "0.3", features = [ "stream" ] }
handlebars = "3.0"
http = "0.2"
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
index 55f2a24a..d7a09d1c 100644
--- a/src/tools/zip.rs
+++ b/src/tools/zip.rs
@@ -11,9 +11,10 @@ use std::mem::size_of;
use std::os::unix::ffi::OsStrExt;
use std::path::{Component, Path, PathBuf};
-use anyhow::{Error, Result};
+use anyhow::{bail, Error, Result};
use endian_trait::Endian;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+use flate2::{Compress, Compression, FlushCompress};
use crc32fast::Hasher;
use proxmox::tools::time::gmtime;
@@ -245,7 +246,7 @@ impl ZipEntry {
signature: LOCAL_FH_SIG,
version_needed: 0x2d,
flags: 1 << 3,
- compression: 0,
+ compression: 0x8,
time,
date,
crc32: 0,
@@ -328,7 +329,7 @@ impl ZipEntry {
version_made_by: VERSION_MADE_BY,
version_needed: VERSION_NEEDED,
flags: 1 << 3,
- compression: 0,
+ compression: 0x8,
time,
date,
crc32: self.crc32,
@@ -402,6 +403,7 @@ where
files: Vec<ZipEntry>,
target: W,
buf: ByteBuffer,
+ outbuf: ByteBuffer,
}
impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
@@ -410,10 +412,24 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
byte_count: 0,
files: Vec::new(),
target,
- buf: ByteBuffer::with_capacity(1024*1024),
+ buf: ByteBuffer::with_capacity(1024 * 1024),
+ outbuf: ByteBuffer::with_capacity(1024 * 1024),
}
}
+ fn compress(&mut self, encoder: &mut Compress, mode: FlushCompress) -> Result<usize, Error> {
+ let old_read = encoder.total_in();
+ let old_write = encoder.total_out();
+ crate::tools::runtime::block_in_place(|| {
+ encoder.compress(&self.buf, &mut self.outbuf.get_free_mut_slice(), mode)
+ })?;
+ let read = (encoder.total_in() - old_read) as usize;
+ let write = (encoder.total_out() - old_write) as usize;
+
+ self.outbuf.add_size(write);
+ Ok(read)
+ }
+
pub async fn add_entry<R: AsyncRead + Unpin>(
&mut self,
mut entry: ZipEntry,
@@ -423,25 +439,47 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
self.byte_count += entry.write_local_header(&mut self.target).await?;
if let Some(mut content) = content {
let mut hasher = Hasher::new();
- let mut size = 0;
- loop {
+ let mut deflate_encoder = Compress::new(Compression::fast(), false);
- let count = self.buf.read_from_async(&mut content).await?;
-
- // end of file
- if count == 0 {
- break;
+ loop {
+ let syncmode = if self.buf.is_full() {
+ FlushCompress::Sync
+ } else {
+ let old_pos = self.buf.len();
+ let count = self.buf.read_from_async(&mut content).await?;
+ // end of file
+ if count == 0 {
+ break;
+ }
+
+ hasher.update(&self.buf[old_pos..]);
+ FlushCompress::None
+ };
+
+ let read = self.compress(&mut deflate_encoder, syncmode)?;
+
+ if read == 0 {
+ bail!("did not consume any data!");
}
- size += count;
- hasher.update(&self.buf);
- self.target.write_all(&self.buf).await?;
- self.buf.consume(count);
+ self.target.write_all(&self.outbuf).await?;
+ self.buf.consume(read);
+ self.outbuf.clear();
}
- self.byte_count += size;
- entry.compressed_size = size.try_into()?;
- entry.uncompressed_size = size.try_into()?;
+ let read = self.compress(&mut deflate_encoder, FlushCompress::Finish)?;
+ if read != self.buf.len() {
+ bail!("deflate did not use all input bytes!");
+ }
+
+ self.target.write_all(&self.outbuf).await?;
+ self.buf.clear();
+ self.outbuf.clear();
+
+ self.byte_count += deflate_encoder.total_out() as usize;
+ entry.compressed_size = deflate_encoder.total_out();
+ entry.uncompressed_size = deflate_encoder.total_in();
+
entry.crc32 = hasher.finalize();
}
self.byte_count += entry.write_data_descriptor(&mut self.target).await?;
--
2.20.1
More information about the pbs-devel
mailing list