[pbs-devel] [PATCH pxar 02/22] decoder: add peek()

Stefan Reiter s.reiter at proxmox.com
Wed Feb 17 09:38:58 CET 2021


On 17/02/2021 09:20, Wolfgang Bumiller wrote:
> On Tue, Feb 16, 2021 at 06:06:50PM +0100, Stefan Reiter wrote:
>> Allows peeking the current element, but will not advance the state
>> (except for contents() and content_size() functions).
>>
>> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
>> ---
>>   src/accessor/mod.rs |  3 +++
>>   src/decoder/aio.rs  | 10 +++++++++-
>>   src/decoder/mod.rs  | 19 +++++++++++++++++--
>>   src/decoder/sync.rs | 10 +++++++++-
>>   4 files changed, 38 insertions(+), 4 deletions(-)
>>
>> diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
>> index d02dc13..aa1b3f6 100644
>> --- a/src/accessor/mod.rs
>> +++ b/src/accessor/mod.rs
>> @@ -293,6 +293,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>>           let entry = decoder
>>               .next()
>>               .await
>> +            .transpose()
>>               .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
>>           Ok(FileEntryImpl {
>>               input: self.input.clone(),
>> @@ -334,6 +335,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>>           let entry = decoder
>>               .next()
>>               .await
>> +            .transpose()
>>               .ok_or_else(|| io_format_err!("unexpected EOF while following a hardlink"))??;
>>   
>>           match entry.kind() {
>> @@ -516,6 +518,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
>>           let entry = decoder
>>               .next()
>>               .await
>> +            .transpose()
>>               .ok_or_else(|| io_format_err!("unexpected EOF while decoding directory entry"))??;
>>           Ok((entry, decoder))
>>       }
>> diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
>> index 5cc6694..c553d45 100644
>> --- a/src/decoder/aio.rs
>> +++ b/src/decoder/aio.rs
>> @@ -53,7 +53,15 @@ impl<T: SeqRead> Decoder<T> {
>>       #[allow(clippy::should_implement_trait)]
>>       /// If this is a directory entry, get the next item inside the directory.
>>       pub async fn next(&mut self) -> Option<io::Result<Entry>> {
>> -        self.inner.next_do().await.transpose()
>> +        self.inner.next().await.transpose()
>> +    }
>> +
>> +    /// If this is a directory entry, get the next item inside the directory.
>> +    /// Do not advance the cursor, so multiple calls to peek() will return the same entry,
>> +    /// and the next call to next() will read the item once again before moving on.
>> +    /// NOTE: This *will* advance the state for contents() and content_size()!
> 
> ^ Which is why I'm wondering whether we should maybe leave this up to
> the *user* rather than provide a sort-of broken API here?
> 
> I'd rather have this be guarded by a Seek trait, but that too is
> something we won't get from `std` and so we'd have to add one.
> 
> Why do we need this exactly?

See patches 8 and 22 (specifically 'fn extract_to_target_seq') of the 
series. I didn't want to add more special casing to the sequential 
extractors, they are "special-cased" enough as it is IMO, so they work 
on the assumption that they can just call "next()" and get the root 
entry of what they want to extract. But I also need to check whether 
that entry is a file or a dir before calling them, which I do with peek().

> 
> And would this be solved by simply *generally* storing a
> "current_entry"? Then we can have a `.current_entry() -> Option<&Entry>`
> which works after at least `next()` call, and `.next()` working as
> usual.  And we may just have `next()` also return a reference instead.
> The user can `.clone()` if necessary. Or we return a mutable reference
> and allow `.take()`, then the user is responsible for knowing whether
> calling `.current_entry()` makes sense ;-)
> 

current_entry() wouldn't help my use-case, and returning a reference is 
somewhat pointless since Entry is small and Clone anyway IIRC?

I believe there might be a way to avoid this patch entirely though if I 
give the sequential extractor API some more thought, if not I'll think 
about your proposals for a v2.

>> +    pub async fn peek(&mut self) -> Option<io::Result<Entry>> {
>> +        self.inner.peek().await.transpose()
>>       }
>>   
>>       /// Get a reader for the contents of the current entry, if the entry has contents.
>> diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
>> index 2a5e79a..041226d 100644
>> --- a/src/decoder/mod.rs
>> +++ b/src/decoder/mod.rs
>> @@ -155,6 +155,7 @@ pub(crate) struct DecoderImpl<T> {
>>       path_lengths: Vec<usize>,
>>       state: State,
>>       with_goodbye_tables: bool,
>> +    peeked: Option<io::Result<Option<Entry>>>,
>>   
>>       /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
>>       /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
>> @@ -218,6 +219,7 @@ impl<I: SeqRead> DecoderImpl<I> {
>>               path_lengths: Vec::new(),
>>               state: State::Begin,
>>               with_goodbye_tables: false,
>> +            peeked: None,
>>               eof_after_entry,
>>           };
>>   
>> @@ -227,8 +229,21 @@ impl<I: SeqRead> DecoderImpl<I> {
>>       }
>>   
>>       /// Get the next file entry, recursing into directories.
>> -    pub async fn next(&mut self) -> Option<io::Result<Entry>> {
>> -        self.next_do().await.transpose()
>> +    pub async fn next(&mut self) -> io::Result<Option<Entry>> {
>> +        if let Some(ent) = self.peeked.take() {
>> +            return ent;
>> +        }
>> +        self.next_do().await
>> +    }
>> +
>> +    pub async fn peek(&mut self) -> io::Result<Option<Entry>> {
>> +        self.peeked = Some(self.next().await);
>> +        match &self.peeked {
>> +            Some(Ok(ent)) => Ok(ent.clone()),
>> +            // io::Error does not implement Clone...
>> +            Some(Err(err)) => Err(io_format_err!("{}", err)),
>> +            None => unreachable!()
>> +        }
>>       }
>>   
>>       async fn next_do(&mut self) -> io::Result<Option<Entry>> {
>> diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
>> index 85b4865..c6a1bc3 100644
>> --- a/src/decoder/sync.rs
>> +++ b/src/decoder/sync.rs
>> @@ -63,7 +63,15 @@ impl<T: SeqRead> Decoder<T> {
>>       #[allow(clippy::should_implement_trait)]
>>       /// If this is a directory entry, get the next item inside the directory.
>>       pub fn next(&mut self) -> Option<io::Result<Entry>> {
>> -        poll_result_once(self.inner.next_do()).transpose()
>> +        poll_result_once(self.inner.next()).transpose()
>> +    }
>> +
>> +    /// If this is a directory entry, get the next item inside the directory.
>> +    /// Do not advance the cursor, so multiple calls to peek() will return the same entry,
>> +    /// and the next call to next() will read the item once again before moving on.
>> +    /// NOTE: This *will* advance the state for contents() and content_size()!
>> +    pub async fn peek(&mut self) -> Option<io::Result<Entry>> {
>> +        poll_result_once(self.inner.peek()).transpose()
>>       }
>>   
>>       /// Get a reader for the contents of the current entry, if the entry has contents.
>> -- 
>> 2.20.1





More information about the pbs-devel mailing list