[pbs-devel] [RFC pxar 05/36] accessor: add optional payload input stream
Christian Ebner
c.ebner at proxmox.com
Wed Feb 28 15:01:55 CET 2024
Allows to read regular file payloads from a split pxar archive, where
the payload stream has been redirected to a different archive on
creation.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/accessor/aio.rs | 7 ++++
src/accessor/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++------
2 files changed, 82 insertions(+), 10 deletions(-)
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index 98d7755..db6c5e4 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -91,6 +91,13 @@ impl<T: ReadAt> Accessor<T> {
})
}
+ /// Take the file payloads from the provided input stream rather than the regular pxar stream
+ pub fn redirect_payload_input(self, payload_input: T) -> Self {
+ Self {
+ inner: self.inner.redirect_payload_input(payload_input),
+ }
+ }
+
/// Open a directory handle to the root of the pxar archive.
pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
Ok(Directory::new(self.inner.open_root_ref().await?))
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index ed99c85..6b3dfd2 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -182,6 +182,7 @@ pub(crate) struct AccessorImpl<T> {
input: T,
size: u64,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: ReadAt> AccessorImpl<T> {
@@ -194,9 +195,15 @@ impl<T: ReadAt> AccessorImpl<T> {
input,
size,
caches: Arc::new(Caches::default()),
+ payload_input: None,
})
}
+ pub fn redirect_payload_input(mut self, payload_input: T) -> Self {
+ self.payload_input = Some(payload_input);
+ self
+ }
+
pub fn size(&self) -> u64 {
self.size
}
@@ -207,6 +214,9 @@ impl<T: ReadAt> AccessorImpl<T> {
self.size,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input
+ .as_ref()
+ .map(|input| input as &dyn ReadAt),
)
.await
}
@@ -227,8 +237,21 @@ async fn get_decoder<T: ReadAt>(
input: T,
entry_range: Range<u64>,
path: PathBuf,
+ payload_input: Option<T>,
) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
- DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
+ let mut decoder = DecoderImpl::new_full(
+ SeqReadAtAdapter::new(input, entry_range.clone()),
+ path,
+ true,
+ )
+ .await?;
+
+ if let Some(payload_input) = payload_input {
+ // Payload stream is just passed along, the range can therefore be zero
+ decoder = decoder.redirect_payload_input(SeqReadAtAdapter::new(payload_input, 0..0));
+ }
+
+ Ok(decoder)
}
// NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
@@ -236,6 +259,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
input: T,
entry_range: Range<u64>,
path: PathBuf,
+ payload_input: Option<T>,
) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
// Read the header, it should be a FILENAME, then skip over it and its length:
let header: format::Header = read_entry_at(&input, entry_range.start).await?;
@@ -251,7 +275,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
}
Ok((
- get_decoder(input, entry_offset..entry_range.end, path).await?,
+ get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?,
entry_offset,
))
}
@@ -263,6 +287,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
self.size,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -274,6 +299,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
offset,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -287,23 +313,30 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
self.input.clone(),
entry_range_info.entry_range.clone(),
PathBuf::new(),
+ self.payload_input.clone(),
)
.await?;
let entry = decoder
.next()
.await
.ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
+
Ok(FileEntryImpl {
input: self.input.clone(),
entry,
entry_range_info: entry_range_info.clone(),
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
/// Allow opening arbitrary contents from a specific range.
pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
- FileContentsImpl::new(self.input.clone(), range)
+ if let Some(payload_input) = &self.payload_input {
+ FileContentsImpl::new(payload_input.clone(), range)
+ } else {
+ FileContentsImpl::new(self.input.clone(), range)
+ }
}
/// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@@ -326,9 +359,13 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
let link_offset = entry_file_offset - link_offset;
- let (mut decoder, entry_offset) =
- get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
- .await?;
+ let (mut decoder, entry_offset) = get_decoder_at_filename(
+ self.input.clone(),
+ link_offset..self.size,
+ PathBuf::new(),
+ self.payload_input.clone(),
+ )
+ .await?;
let entry = decoder
.next()
@@ -354,6 +391,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
entry_range: entry_offset..entry_end,
},
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
_ => io_bail!("hardlink does not point to a regular file"),
@@ -370,6 +408,7 @@ pub(crate) struct DirectoryImpl<T> {
table: Arc<[GoodbyeItem]>,
path: PathBuf,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: Clone + ReadAt> DirectoryImpl<T> {
@@ -379,6 +418,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
end_offset: u64,
path: PathBuf,
caches: Arc<Caches>,
+ payload_input: Option<T>,
) -> io::Result<DirectoryImpl<T>> {
let tail = Self::read_tail_entry(&input, end_offset).await?;
@@ -408,6 +448,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
path,
caches,
+ payload_input,
};
// sanity check:
@@ -503,6 +544,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
None => self.path.clone(),
Some(file) => self.path.join(file),
},
+ self.payload_input.clone(),
)
.await
}
@@ -534,6 +576,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
entry_range: self.entry_range(),
},
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
@@ -686,6 +729,7 @@ pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
entry: Entry,
entry_range_info: EntryRangeInfo,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: Clone + ReadAt> FileEntryImpl<T> {
@@ -699,6 +743,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
self.entry_range_info.entry_range.end,
self.entry.path.clone(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -711,16 +756,35 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
}
EntryKind::File {
size,
- offset: Some(offset),
- ..
- } => Ok(Some(offset..(offset + size))),
+ offset,
+ payload_offset,
+ } => {
+ // Payload offset will be some for PXAR_PAYLOAD_REF's
+ // It should win over the regular offset, since the actual payloads
+ // are stored in the separated payload input stream
+ if let Some(payload_offset) = payload_offset {
+ return Ok(Some(payload_offset..(payload_offset + size)));
+ }
+
+ if let Some(offset) = offset {
+ return Ok(Some(offset..(offset + size)));
+ }
+
+ Ok(None)
+ }
_ => Ok(None),
}
}
pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
match self.content_range()? {
- Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
+ Some(range) => {
+ if let Some(ref payload_input) = self.payload_input {
+ Ok(FileContentsImpl::new(payload_input.clone(), range))
+ } else {
+ Ok(FileContentsImpl::new(self.input.clone(), range))
+ }
+ }
None => io_bail!("not a file"),
}
}
@@ -810,6 +874,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
entry,
entry_range_info: self.entry_range_info.clone(),
caches: Arc::clone(&self.caches),
+ payload_input: self.dir.payload_input.clone(),
})
}
--
2.39.2
More information about the pbs-devel
mailing list