[pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader

Stefan Reiter s.reiter at proxmox.com
Wed Jun 2 16:38:28 CEST 2021


Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.

This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++-
 1 file changed, 114 insertions(+), 2 deletions(-)

diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index fd5a049f..9b56fd14 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
 //! An async and concurrency safe data reader backed by a local LRU cache.
 
 use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
 use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
 
 struct AsyncChunkCacher<T> {
     reader: Arc<T>,
@@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
         Ok(read)
     }
 }
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+    CachedChunkReader<I, R>
+{
+    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+    /// otherwise.
+    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+        SeekableCachedChunkReader {
+            index_bytes: self.index.index_bytes(),
+            reader: Arc::new(self),
+            position: 0,
+            seek_to_pos: 0,
+            read_future: None,
+        }
+    }
+}
+
+pub struct SeekableCachedChunkReader<
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+> {
+    reader: Arc<CachedChunkReader<I, R>>,
+    index_bytes: u64,
+    position: u64,
+    seek_to_pos: i64,
+    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+            SeekFrom::Current(offset) => this.position as i64 + offset,
+        };
+        Ok(())
+    }
+
+    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index_bytes;
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        Poll::Ready(Ok(this.position))
+    }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+
+        let fut = match this.read_future {
+            Some(ref mut fut) => fut,
+            None => {
+                let offset = this.position;
+                let wanted = buf.capacity();
+                let reader = Arc::clone(&this.reader);
+                let fut = Box::pin(async move {
+                    let mut read_buf = vec![0u8; wanted];
+                    let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+                    Ok((read_buf, read))
+                });
+                this.read_future = Some(fut);
+                this.read_future.as_mut().unwrap()
+            }
+        };
+
+        let ret = match ready!(fut.as_mut().poll(cx)) {
+            Ok((read_buf, read)) => {
+                buf.put_slice(&read_buf[..read]);
+                this.position += read as u64;
+                Ok(())
+            }
+            Err(err) => {
+                Err(io_err_other(err))
+            }
+        };
+
+        // future completed, drop
+        let _drop = this.read_future.take();
+
+        Poll::Ready(ret)
+    }
+}
-- 
2.30.2






More information about the pve-devel mailing list