[pbs-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Stefan Reiter
s.reiter at proxmox.com
Mon Jun 7 17:35:27 CEST 2021
Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.
This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
v2:
* drop 'seek_to_pos' from struct and implement error handling directly in
start_seek
* simplify future handling in poll_read with Option::get_or_insert_with
src/backup/cached_chunk_reader.rs | 104 +++++++++++++++++++++++++++++-
1 file changed, 102 insertions(+), 2 deletions(-)
diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index ff476e37..c9ca4773 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
//! An async and concurrency safe data reader backed by a local LRU cache.
use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
struct AsyncChunkCacher<T> {
reader: Arc<T>,
@@ -87,3 +94,96 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
Ok(read)
}
}
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+ CachedChunkReader<I, R>
+{
+ /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+ /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+ /// otherwise.
+ pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+ SeekableCachedChunkReader {
+ index_bytes: self.index.index_bytes(),
+ reader: Arc::new(self),
+ position: 0,
+ read_future: None,
+ }
+ }
+}
+
+pub struct SeekableCachedChunkReader<
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+> {
+ reader: Arc<CachedChunkReader<I, R>>,
+ index_bytes: u64,
+ position: u64,
+ read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+{
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+ let this = Pin::get_mut(self);
+ let seek_to_pos = match pos {
+ SeekFrom::Start(offset) => offset as i64,
+ SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+ SeekFrom::Current(offset) => this.position as i64 + offset,
+ };
+ if seek_to_pos < 0 {
+ return Err(io_format_err!("cannot seek to negative values"));
+ } else if seek_to_pos > this.index_bytes as i64 {
+ this.position = this.index_bytes;
+ } else {
+ this.position = seek_to_pos as u64;
+ }
+ Ok(())
+ }
+
+ fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+ Poll::Ready(Ok(self.position))
+ }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut ReadBuf,
+ ) -> Poll<tokio::io::Result<()>> {
+ let this = Pin::get_mut(self);
+
+ let offset = this.position;
+ let wanted = buf.capacity();
+ let reader = Arc::clone(&this.reader);
+
+ let fut = this.read_future.get_or_insert_with(|| {
+ Box::pin(async move {
+ let mut read_buf = vec![0u8; wanted];
+ let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+ Ok((read_buf, read))
+ })
+ });
+
+ let ret = match ready!(fut.as_mut().poll(cx)) {
+ Ok((read_buf, read)) => {
+ buf.put_slice(&read_buf[..read]);
+ this.position += read as u64;
+ Ok(())
+ }
+ Err(err) => Err(io_err_other(err)),
+ };
+
+ // future completed, drop
+ this.read_future = None;
+
+ Poll::Ready(ret)
+ }
+}
--
2.30.2
More information about the pbs-devel
mailing list