[pbs-devel] [PATCH v6 proxmox-backup 65/65] chunk stream: tests: add regression tests for payload chunker
Dominik Csapak
d.csapak at proxmox.com
Tue May 21 13:21:06 CEST 2024
this patch adds some warnings:
warning: unused import: `futures::stream::StreamExt`
warning: associated function `new` is never used
you can fix that with putting
#[cfg(test)]
before `mod test` so that code does not get executed outside `cargo test`
On 5/14/24 12:34, Christian Ebner wrote:
> Regression tests to cover suggested and forced boundaries as well as
> chunk injection.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> pbs-client/src/chunk_stream.rs | 87 ++++++++++++++++++++++++++++++++++
> 1 file changed, 87 insertions(+)
>
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 4d5431a2b..764ae9989 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -238,3 +238,90 @@ where
> }
> }
> }
> +
> +mod test {
> + use futures::stream::StreamExt;
> +
> + use super::*;
> +
> + struct DummyInput {
> + data: Vec<u8>,
> + }
> +
> + impl DummyInput {
> + fn new(data: Vec<u8>) -> Self {
> + Self { data }
> + }
> + }
> +
> + impl Stream for DummyInput {
> + type Item = Result<Vec<u8>, Error>;
> +
> + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> + let this = self.get_mut();
> + match this.data.len() {
> + 0 => Poll::Ready(None),
> + size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))),
> + _ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))),
> + }
> + }
> + }
> +
> + #[test]
> + fn test_chunk_stream_forced_boundaries() {
> + let mut data = Vec::new();
> + for i in 0..(256 * 1024) {
> + for j in 0..4 {
> + let byte = ((i >> (j << 3)) & 0xff) as u8;
> + data.push(byte);
> + }
> + }
> +
> + let mut input = DummyInput::new(data);
> + let input = Pin::new(&mut input);
> +
> + let (injections_tx, injections_rx) = mpsc::channel();
> + let (boundaries_tx, boundaries_rx) = mpsc::channel();
> + let (suggested_tx, suggested_rx) = mpsc::channel();
> + let injection_data = InjectionData::new(boundaries_rx, injections_tx);
> +
> + let mut chunk_stream = ChunkStream::new(input, Some(64 * 1024), Some(injection_data), Some(suggested_rx));
> + let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
> + let chunks_clone = chunks.clone();
> +
> + // Suggested boundary matching forced boundary
> + suggested_tx.send(32 * 1024).unwrap();
> + // Suggested boundary not matching forced boundary
> + suggested_tx.send(64 * 1024).unwrap();
> + // Force chunk boundary at suggested boundary
> + boundaries_tx.send(InjectChunks { boundary: 32 * 1024, chunks: Vec::new(), size: 1024 }).unwrap();
> + // Force chunk boundary within regular chunk
> + boundaries_tx.send(InjectChunks { boundary: 128 * 1024, chunks: Vec::new(), size: 2048 }).unwrap();
> + // Force chunk boundary aligned with regular boundary
> + boundaries_tx.send(InjectChunks { boundary: 657408, chunks: Vec::new(), size: 512 }).unwrap();
> + // Force chunk boundary within regular chunk, without injecting data
> + boundaries_tx.send(InjectChunks { boundary: 657408 + 1024, chunks: Vec::new(), size: 0 }).unwrap();
> +
> + let rt = tokio::runtime::Runtime::new().unwrap();
> + rt.block_on(async move {
> + while let Some(chunk) = chunk_stream.next().await {
> + let chunk = chunk.unwrap();
> + let mut chunks = chunks.lock().unwrap();
> + chunks.push(chunk);
> + }
> + });
> +
> + let mut total = 0;
> + let chunks = chunks_clone.lock().unwrap();
> + let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584];
> + for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) {
> + assert_eq!(chunk.len(), *expected);
> + total += chunk.len();
> + }
> + while let Ok(injection) = injections_rx.recv() {
> + total += injection.size;
> + }
> +
> + assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512);
> + }
> +}
More information about the pbs-devel
mailing list