[pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader
Stefan Reiter
s.reiter at proxmox.com
Wed Jul 22 15:56:22 CEST 2020
Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.
Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.
AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
src/backup/index.rs | 1 +
2 files changed, 97 insertions(+), 20 deletions(-)
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
use std::future::Future;
use std::task::{Poll, Context};
use std::pin::Pin;
+use std::io::SeekFrom;
use anyhow::Error;
use futures::future::FutureExt;
use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
use proxmox::sys::error::io_err_other;
use proxmox::io_format_err;
use super::IndexFile;
use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
enum AsyncIndexReaderState<S> {
NoData,
WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
- HaveData(usize),
+ HaveData,
}
pub struct AsyncIndexReader<S, I: IndexFile> {
store: Option<S>,
index: I,
read_buffer: Vec<u8>,
+ current_chunk_offset: u64,
current_chunk_idx: usize,
- current_chunk_digest: [u8; 32],
+ current_chunk_info: Option<ChunkReadInfo>,
+ position: u64,
+ seek_to_pos: i64,
state: AsyncIndexReaderState<S>,
}
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
store: Some(store),
index,
read_buffer: Vec::with_capacity(1024 * 1024),
+ current_chunk_offset: 0,
current_chunk_idx: 0,
- current_chunk_digest: [0u8; 32],
+ current_chunk_info: None,
+ position: 0,
+ seek_to_pos: 0,
state: AsyncIndexReaderState::NoData,
}
}
@@ -58,23 +66,41 @@ where
loop {
match &mut this.state {
AsyncIndexReaderState::NoData => {
- if this.current_chunk_idx >= this.index.index_count() {
+ let (idx, offset) = if this.current_chunk_info.is_some() &&
+ this.position == this.current_chunk_info.as_ref().unwrap().range.end
+ {
+ // optimization for sequential chunk read
+ let next_idx = this.current_chunk_idx + 1;
+ (next_idx, 0)
+ } else {
+ match this.index.chunk_from_offset(this.position) {
+ Some(res) => res,
+ None => return Poll::Ready(Ok(0))
+ }
+ };
+
+ if idx >= this.index.index_count() {
return Poll::Ready(Ok(0));
}
- let digest = this
+ let info = this
.index
- .index_digest(this.current_chunk_idx)
- .ok_or(io_format_err!("could not get digest"))?
- .clone();
+ .chunk_info(idx)
+ .ok_or(io_format_err!("could not get digest"))?;
- if digest == this.current_chunk_digest {
- this.state = AsyncIndexReaderState::HaveData(0);
- continue;
+ this.current_chunk_offset = offset;
+ this.current_chunk_idx = idx;
+ let old_info = this.current_chunk_info.replace(info.clone());
+
+ if let Some(old_info) = old_info {
+ if old_info.digest == info.digest {
+ // hit, chunk is currently in cache
+ this.state = AsyncIndexReaderState::HaveData;
+ continue;
+ }
}
- this.current_chunk_digest = digest;
-
+ // miss, need to download new chunk
let store = match this.store.take() {
Some(store) => store,
None => {
@@ -83,7 +109,7 @@ where
};
let future = async move {
- store.read_chunk(&digest)
+ store.read_chunk(&info.digest)
.await
.map(move |x| (store, x))
};
@@ -95,7 +121,7 @@ where
Ok((store, mut chunk_data)) => {
this.read_buffer.clear();
this.read_buffer.append(&mut chunk_data);
- this.state = AsyncIndexReaderState::HaveData(0);
+ this.state = AsyncIndexReaderState::HaveData;
this.store = Some(store);
}
Err(err) => {
@@ -103,8 +129,8 @@ where
}
};
}
- AsyncIndexReaderState::HaveData(offset) => {
- let offset = *offset;
+ AsyncIndexReaderState::HaveData => {
+ let offset = this.current_chunk_offset as usize;
let len = this.read_buffer.len();
let n = if len - offset < buf.len() {
len - offset
@@ -113,11 +139,13 @@ where
};
buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+ this.position += n as u64;
+
if offset + n == len {
this.state = AsyncIndexReaderState::NoData;
- this.current_chunk_idx += 1;
} else {
- this.state = AsyncIndexReaderState::HaveData(offset + n);
+ this.current_chunk_offset += n as u64;
+ this.state = AsyncIndexReaderState::HaveData;
}
return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
}
}
}
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+ S: AsyncReadChunk + Unpin + Sync + 'static,
+ I: IndexFile + Unpin,
+{
+ fn start_seek(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<tokio::io::Result<()>> {
+ let this = Pin::get_mut(self);
+ this.seek_to_pos = match pos {
+ SeekFrom::Start(offset) => {
+ offset as i64
+ },
+ SeekFrom::End(offset) => {
+ this.index.index_bytes() as i64 + offset
+ },
+ SeekFrom::Current(offset) => {
+ this.position as i64 + offset
+ }
+ };
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_complete(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<tokio::io::Result<u64>> {
+ let this = Pin::get_mut(self);
+
+ let index_bytes = this.index.index_bytes();
+ if this.seek_to_pos < 0 {
+ return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+ } else if this.seek_to_pos > index_bytes as i64 {
+ this.position = index_bytes;
+ } else {
+ this.position = this.seek_to_pos as u64;
+ }
+
+ // even if seeking within one chunk, we need to go to NoData to
+ // recalculate the current_chunk_offset (data is cached anyway)
+ this.state = AsyncIndexReaderState::NoData;
+
+ Poll::Ready(Ok(this.position))
+ }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::ops::Range;
+#[derive(Clone)]
pub struct ChunkReadInfo {
pub range: Range<u64>,
pub digest: [u8; 32],
--
2.20.1
More information about the pbs-devel
mailing list