[pbs-devel] [PATCH v8 proxmox-backup 69/69] chunk stream: tests: add regression tests for payload chunker
Christian Ebner
c.ebner at proxmox.com
Tue May 28 11:43:03 CEST 2024
Regression tests to cover suggested and forced boundaries as well as
chunk injection.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 7:
- no changes
changes since version 6:
- add missing `#[cfg(test)]` attribute
- fix formatting
pbs-client/src/chunk_stream.rs | 117 +++++++++++++++++++++++++++++++++
1 file changed, 117 insertions(+)
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index de3e7bb5d..e3f0980c6 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -237,3 +237,120 @@ where
}
}
}
+
+#[cfg(test)]
+mod test {
+ use futures::stream::StreamExt;
+
+ use super::*;
+
+ struct DummyInput {
+ data: Vec<u8>,
+ }
+
+ impl DummyInput {
+ fn new(data: Vec<u8>) -> Self {
+ Self { data }
+ }
+ }
+
+ impl Stream for DummyInput {
+ type Item = Result<Vec<u8>, Error>;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ match this.data.len() {
+ 0 => Poll::Ready(None),
+ size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))),
+ _ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))),
+ }
+ }
+ }
+
+ #[test]
+ fn test_chunk_stream_forced_boundaries() {
+ let mut data = Vec::new();
+ for i in 0..(256 * 1024) {
+ for j in 0..4 {
+ let byte = ((i >> (j << 3)) & 0xff) as u8;
+ data.push(byte);
+ }
+ }
+
+ let mut input = DummyInput::new(data);
+ let input = Pin::new(&mut input);
+
+ let (injections_tx, injections_rx) = mpsc::channel();
+ let (boundaries_tx, boundaries_rx) = mpsc::channel();
+ let (suggested_tx, suggested_rx) = mpsc::channel();
+ let injection_data = InjectionData::new(boundaries_rx, injections_tx);
+
+ let mut chunk_stream = ChunkStream::new(
+ input,
+ Some(64 * 1024),
+ Some(injection_data),
+ Some(suggested_rx),
+ );
+ let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
+ let chunks_clone = chunks.clone();
+
+ // Suggested boundary matching forced boundary
+ suggested_tx.send(32 * 1024).unwrap();
+ // Suggested boundary not matching forced boundary
+ suggested_tx.send(64 * 1024).unwrap();
+ // Force chunk boundary at suggested boundary
+ boundaries_tx
+ .send(InjectChunks {
+ boundary: 32 * 1024,
+ chunks: Vec::new(),
+ size: 1024,
+ })
+ .unwrap();
+ // Force chunk boundary within regular chunk
+ boundaries_tx
+ .send(InjectChunks {
+ boundary: 128 * 1024,
+ chunks: Vec::new(),
+ size: 2048,
+ })
+ .unwrap();
+ // Force chunk boundary aligned with regular boundary
+ boundaries_tx
+ .send(InjectChunks {
+ boundary: 657408,
+ chunks: Vec::new(),
+ size: 512,
+ })
+ .unwrap();
+ // Force chunk boundary within regular chunk, without injecting data
+ boundaries_tx
+ .send(InjectChunks {
+ boundary: 657408 + 1024,
+ chunks: Vec::new(),
+ size: 0,
+ })
+ .unwrap();
+
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async move {
+ while let Some(chunk) = chunk_stream.next().await {
+ let chunk = chunk.unwrap();
+ let mut chunks = chunks.lock().unwrap();
+ chunks.push(chunk);
+ }
+ });
+
+ let mut total = 0;
+ let chunks = chunks_clone.lock().unwrap();
+ let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584];
+ for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) {
+ assert_eq!(chunk.len(), *expected);
+ total += chunk.len();
+ }
+ while let Ok(injection) = injections_rx.recv() {
+ total += injection.size;
+ }
+
+ assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512);
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list