[pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader

Stefan Reiter s.reiter at proxmox.com
Wed Jul 22 15:56:22 CEST 2020


Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
 src/backup/index.rs              |   1 +
 2 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
 use std::future::Future;
 use std::task::{Poll, Context};
 use std::pin::Pin;
+use std::io::SeekFrom;
 
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
 
 use super::IndexFile;
 use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
 
 enum AsyncIndexReaderState<S> {
     NoData,
     WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
-    HaveData(usize),
+    HaveData,
 }
 
 pub struct AsyncIndexReader<S, I: IndexFile> {
     store: Option<S>,
     index: I,
     read_buffer: Vec<u8>,
+    current_chunk_offset: u64,
     current_chunk_idx: usize,
-    current_chunk_digest: [u8; 32],
+    current_chunk_info: Option<ChunkReadInfo>,
+    position: u64,
+    seek_to_pos: i64,
     state: AsyncIndexReaderState<S>,
 }
 
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
             store: Some(store),
             index,
             read_buffer: Vec::with_capacity(1024 * 1024),
+            current_chunk_offset: 0,
             current_chunk_idx: 0,
-            current_chunk_digest: [0u8; 32],
+            current_chunk_info: None,
+            position: 0,
+            seek_to_pos: 0,
             state: AsyncIndexReaderState::NoData,
         }
     }
@@ -58,23 +66,41 @@ where
         loop {
             match &mut this.state {
                 AsyncIndexReaderState::NoData => {
-                    if this.current_chunk_idx >= this.index.index_count() {
+                    let (idx, offset) = if this.current_chunk_info.is_some() &&
+                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
+                    {
+                        // optimization for sequential chunk read
+                        let next_idx = this.current_chunk_idx + 1;
+                        (next_idx, 0)
+                    } else {
+                        match this.index.chunk_from_offset(this.position) {
+                            Some(res) => res,
+                            None => return Poll::Ready(Ok(0))
+                        }
+                    };
+
+                    if idx >= this.index.index_count() {
                         return Poll::Ready(Ok(0));
                     }
 
-                    let digest = this
+                    let info = this
                         .index
-                        .index_digest(this.current_chunk_idx)
-                        .ok_or(io_format_err!("could not get digest"))?
-                        .clone();
+                        .chunk_info(idx)
+                        .ok_or(io_format_err!("could not get digest"))?;
 
-                    if digest == this.current_chunk_digest {
-                        this.state = AsyncIndexReaderState::HaveData(0);
-                        continue;
+                    this.current_chunk_offset = offset;
+                    this.current_chunk_idx = idx;
+                    let old_info = this.current_chunk_info.replace(info.clone());
+
+                    if let Some(old_info) = old_info {
+                        if old_info.digest == info.digest {
+                            // hit, chunk is currently in cache
+                            this.state = AsyncIndexReaderState::HaveData;
+                            continue;
+                        }
                     }
 
-                    this.current_chunk_digest = digest;
-
+                    // miss, need to download new chunk
                     let store = match this.store.take() {
                         Some(store) => store,
                         None => {
@@ -83,7 +109,7 @@ where
                     };
 
                     let future = async move {
-                        store.read_chunk(&digest)
+                        store.read_chunk(&info.digest)
                             .await
                             .map(move |x| (store, x))
                     };
@@ -95,7 +121,7 @@ where
                         Ok((store, mut chunk_data)) => {
                             this.read_buffer.clear();
                             this.read_buffer.append(&mut chunk_data);
-                            this.state = AsyncIndexReaderState::HaveData(0);
+                            this.state = AsyncIndexReaderState::HaveData;
                             this.store = Some(store);
                         }
                         Err(err) => {
@@ -103,8 +129,8 @@ where
                         }
                     };
                 }
-                AsyncIndexReaderState::HaveData(offset) => {
-                    let offset = *offset;
+                AsyncIndexReaderState::HaveData => {
+                    let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
                     let n = if len - offset < buf.len() {
                         len - offset
@@ -113,11 +139,13 @@ where
                     };
 
                     buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    this.position += n as u64;
+
                     if offset + n == len {
                         this.state = AsyncIndexReaderState::NoData;
-                        this.current_chunk_idx += 1;
                     } else {
-                        this.state = AsyncIndexReaderState::HaveData(offset + n);
+                        this.current_chunk_offset += n as u64;
+                        this.state = AsyncIndexReaderState::HaveData;
                     }
 
                     return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
         }
     }
 }
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+    S: AsyncReadChunk + Unpin + Sync + 'static,
+    I: IndexFile + Unpin,
+{
+    fn start_seek(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => {
+                offset as i64
+            },
+            SeekFrom::End(offset) => {
+                this.index.index_bytes() as i64 + offset
+            },
+            SeekFrom::Current(offset) => {
+                this.position as i64 + offset
+            }
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index.index_bytes();
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        // even if seeking within one chunk, we need to go to NoData to
+        // recalculate the current_chunk_offset (data is cached anyway)
+        this.state = AsyncIndexReaderState::NoData;
+
+        Poll::Ready(Ok(this.position))
+    }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::ops::Range;
 
+#[derive(Clone)]
 pub struct ChunkReadInfo {
     pub range: Range<u64>,
     pub digest: [u8; 32],
-- 
2.20.1






More information about the pbs-devel mailing list