[pbs-devel] [PATCH proxmox-backup v2 3/4] datastore: data blob: increase compression throughput

Dominik Csapak d.csapak at proxmox.com
Wed Jul 31 11:36:03 CEST 2024


by not using `zstd::stream::copy_encode`, because that has an allocation
pattern that reduces throughput if the target/source storage and the
network are faster than the chunk creation.

instead use `zstd::bulk::compress_to_buffer` which shouldn't do any big
allocations, since we provide the target buffer.

To handle the case that the target buffer is too small, we now ignore
all zstd error and continue with the uncompressed data, logging the error
except if the target buffer is too small.

For now, we have to parse the error string for that, as `zstd` maps all
errors as `io::ErrorKind::Other`. Until that gets changed, there is no
other way to differentiate between different kind of errors.

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
changes from v1:
* fixed commit message
* reduced log severity to `warn`
* use vec![0; size]
* omit unnecessary buffer allocation in the unencrypted,uncompressed case
  by reusing the initial buffer that was tried for compression
 pbs-datastore/src/data_blob.rs | 37 +++++++++++++++++++---------------
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git a/pbs-datastore/src/data_blob.rs b/pbs-datastore/src/data_blob.rs
index 8715afef..2a528204 100644
--- a/pbs-datastore/src/data_blob.rs
+++ b/pbs-datastore/src/data_blob.rs
@@ -136,39 +136,44 @@ impl DataBlob {
 
             DataBlob { raw_data }
         } else {
-            let max_data_len = data.len() + std::mem::size_of::<DataBlobHeader>();
+            let header_len = std::mem::size_of::<DataBlobHeader>();
+            let max_data_len = data.len() + header_len;
+            let mut raw_data = vec![0; max_data_len];
             if compress {
-                let mut comp_data = Vec::with_capacity(max_data_len);
-
                 let head = DataBlobHeader {
                     magic: COMPRESSED_BLOB_MAGIC_1_0,
                     crc: [0; 4],
                 };
                 unsafe {
-                    comp_data.write_le_value(head)?;
+                    (&mut raw_data[0..header_len]).write_le_value(head)?;
                 }
 
-                zstd::stream::copy_encode(data, &mut comp_data, 1)?;
-
-                if comp_data.len() < max_data_len {
-                    let mut blob = DataBlob {
-                        raw_data: comp_data,
-                    };
-                    blob.set_crc(blob.compute_crc());
-                    return Ok(blob);
+                match zstd::bulk::compress_to_buffer(data, &mut raw_data[header_len..], 1) {
+                    Ok(size) if size <= data.len() => {
+                        raw_data.truncate(header_len + size);
+                        let mut blob = DataBlob { raw_data };
+                        blob.set_crc(blob.compute_crc());
+                        return Ok(blob);
+                    }
+                    // if size is bigger than the data, or any error is returned, continue with non
+                    // compressed archive but log all errors beside buffer too small
+                    Ok(_) => {}
+                    Err(err) => {
+                        if !err.to_string().contains("Destination buffer is too small") {
+                            log::warn!("zstd compression error: {err}");
+                        }
+                    }
                 }
             }
 
-            let mut raw_data = Vec::with_capacity(max_data_len);
-
             let head = DataBlobHeader {
                 magic: UNCOMPRESSED_BLOB_MAGIC_1_0,
                 crc: [0; 4],
             };
             unsafe {
-                raw_data.write_le_value(head)?;
+                (&mut raw_data[0..header_len]).write_le_value(head)?;
             }
-            raw_data.extend_from_slice(data);
+            (&mut raw_data[header_len..]).write_all(data)?;
 
             DataBlob { raw_data }
         };
-- 
2.39.2





More information about the pbs-devel mailing list