[pbs-devel] [PATCH v7 proxmox-backup 69/69] chunk stream: tests: add regression tests for payload chunker

Christian Ebner c.ebner at proxmox.com
Mon May 27 16:33:23 CEST 2024


Regression tests to cover suggested and forced boundaries as well as
chunk injection.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 6:
- add missing `#[cfg(test)]` attribute
- fix formatting

 pbs-client/src/chunk_stream.rs | 117 +++++++++++++++++++++++++++++++++
 1 file changed, 117 insertions(+)

diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index de3e7bb5d..e3f0980c6 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -237,3 +237,120 @@ where
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use futures::stream::StreamExt;
+
+    use super::*;
+
+    struct DummyInput {
+        data: Vec<u8>,
+    }
+
+    impl DummyInput {
+        fn new(data: Vec<u8>) -> Self {
+            Self { data }
+        }
+    }
+
+    impl Stream for DummyInput {
+        type Item = Result<Vec<u8>, Error>;
+
+        fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+            let this = self.get_mut();
+            match this.data.len() {
+                0 => Poll::Ready(None),
+                size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))),
+                _ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))),
+            }
+        }
+    }
+
+    #[test]
+    fn test_chunk_stream_forced_boundaries() {
+        let mut data = Vec::new();
+        for i in 0..(256 * 1024) {
+            for j in 0..4 {
+                let byte = ((i >> (j << 3)) & 0xff) as u8;
+                data.push(byte);
+            }
+        }
+
+        let mut input = DummyInput::new(data);
+        let input = Pin::new(&mut input);
+
+        let (injections_tx, injections_rx) = mpsc::channel();
+        let (boundaries_tx, boundaries_rx) = mpsc::channel();
+        let (suggested_tx, suggested_rx) = mpsc::channel();
+        let injection_data = InjectionData::new(boundaries_rx, injections_tx);
+
+        let mut chunk_stream = ChunkStream::new(
+            input,
+            Some(64 * 1024),
+            Some(injection_data),
+            Some(suggested_rx),
+        );
+        let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
+        let chunks_clone = chunks.clone();
+
+        // Suggested boundary matching forced boundary
+        suggested_tx.send(32 * 1024).unwrap();
+        // Suggested boundary not matching forced boundary
+        suggested_tx.send(64 * 1024).unwrap();
+        // Force chunk boundary at suggested boundary
+        boundaries_tx
+            .send(InjectChunks {
+                boundary: 32 * 1024,
+                chunks: Vec::new(),
+                size: 1024,
+            })
+            .unwrap();
+        // Force chunk boundary within regular chunk
+        boundaries_tx
+            .send(InjectChunks {
+                boundary: 128 * 1024,
+                chunks: Vec::new(),
+                size: 2048,
+            })
+            .unwrap();
+        // Force chunk boundary aligned with regular boundary
+        boundaries_tx
+            .send(InjectChunks {
+                boundary: 657408,
+                chunks: Vec::new(),
+                size: 512,
+            })
+            .unwrap();
+        // Force chunk boundary within regular chunk, without injecting data
+        boundaries_tx
+            .send(InjectChunks {
+                boundary: 657408 + 1024,
+                chunks: Vec::new(),
+                size: 0,
+            })
+            .unwrap();
+
+        let rt = tokio::runtime::Runtime::new().unwrap();
+        rt.block_on(async move {
+            while let Some(chunk) = chunk_stream.next().await {
+                let chunk = chunk.unwrap();
+                let mut chunks = chunks.lock().unwrap();
+                chunks.push(chunk);
+            }
+        });
+
+        let mut total = 0;
+        let chunks = chunks_clone.lock().unwrap();
+        let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584];
+        for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) {
+            assert_eq!(chunk.len(), *expected);
+            total += chunk.len();
+        }
+        while let Ok(injection) = injections_rx.recv() {
+            total += injection.size;
+        }
+
+        assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512);
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list