[pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads
Fabian Grünbichler
f.gruenbichler at proxmox.com
Mon Mar 11 14:21:25 CET 2024
On March 5, 2024 10:26 am, Christian Ebner wrote:
> During regular pxar archive encoding, the payload of regular files is
> written as part of the archive.
>
> This patch introduces functionality to attach an optional, dedicated
> writer instance to redirect the payload to a different output.
> The intention for this change is to allow to separate data and metadata
> streams in order to allow the reuse of payload data by referencing the
> payload writer byte offset, without having to re-encode it.
>
> Whenever the payload of regular files is redirected to a dedicated
> output writer, encode a payload reference header followed by the
> required data to locate the data, instead of adding the regular payload
> header followed by the encoded payload to the archive.
>
> This is in preparation for reusing payload chunks for unchanged files
> of backups created via the proxmox-backup-client.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - no changes
>
> src/encoder/aio.rs | 7 +++++
> src/encoder/mod.rs | 75 +++++++++++++++++++++++++++++++++++++++------
> src/encoder/sync.rs | 7 +++++
> 3 files changed, 79 insertions(+), 10 deletions(-)
>
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index ad25fea..82b9ab2 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -52,6 +52,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
> })
> }
>
> + /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
> + pub fn attach_payload_output(self, payload_output: T) -> Self {
> + Self {
> + inner: self.inner.attach_payload_output(payload_output),
see below
> + }
> + }
> +
> /// Create a new regular file in the archive. This returns a `File` object to which the
> /// contents have to be written out *completely*. Failing to do so will put the encoder into an
> /// error state.
> diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
> index 0d342ec..e4ea69b 100644
> --- a/src/encoder/mod.rs
> +++ b/src/encoder/mod.rs
> @@ -221,6 +221,9 @@ struct EncoderState {
>
> /// We need to keep track how much we have written to get offsets.
> write_position: u64,
> +
> + /// Track the bytes written to the payload writer
> + payload_write_position: u64,
> }
>
> impl EncoderState {
> @@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
> /// synchronous or `async` I/O objects in as output.
> pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> output: EncoderOutput<'a, T>,
> + payload_output: EncoderOutput<'a, Option<T>>,
> state: EncoderState,
> parent: Option<&'a mut EncoderState>,
> finished: bool,
> @@ -312,6 +316,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> }
> let mut this = Self {
> output,
> + payload_output: EncoderOutput::Owned(None),
> state: EncoderState::default(),
> parent: None,
> finished: false,
> @@ -326,6 +331,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> Ok(this)
> }
>
> + pub fn attach_payload_output(mut self, payload_output: T) -> Self {
> + self.payload_output = EncoderOutput::Owned(Some(payload_output));
should we prevent/catch this being called multiple times?
> + self
> + }
> +
> fn check(&self) -> io::Result<()> {
> match self.state.encode_error {
> Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
> @@ -361,10 +371,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> let file_offset = self.position();
> self.start_file_do(Some(metadata), file_name).await?;
>
> - let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
> - header.check_header_size()?;
> -
> - seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
> + if self.payload_output.as_mut().is_some() {
> + let mut data = self.payload_position().to_le_bytes().to_vec();
> + data.append(&mut file_size.to_le_bytes().to_vec());
> + seq_write_pxar_entry(
> + self.output.as_mut(),
> + format::PXAR_PAYLOAD_REF,
> + &data,
> + &mut self.state.write_position,
> + )
> + .await?;
this part here and the read counter-part in the next commit basically
hard-code the format of this entry type, maybe that could be handled
nicer? e.g., construct a PayloadRef here, and let that implement the
conversion to/from data?
it is a pre-existing pattern here though ;)
> + } else {
> + let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
> + header.check_header_size()?;
> + seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
> + };
>
> let payload_data_offset = self.position();
>
> @@ -372,6 +393,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>
> Ok(FileImpl {
> output: self.output.as_mut(),
> + payload_output: self.payload_output.as_mut().as_mut(),
> goodbye_item: GoodbyeItem {
> hash: format::hash_filename(file_name),
> offset: file_offset,
> @@ -564,6 +586,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> self.state.write_position
> }
>
> + #[inline]
> + fn payload_position(&mut self) -> u64 {
> + self.state.payload_write_position
> + }
> +
> pub async fn create_directory(
> &mut self,
> file_name: &Path,
> @@ -588,18 +615,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>
> // the child will write to OUR state now:
> let write_position = self.position();
> + let payload_write_position = self.payload_position();
>
> let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
>
> Ok(EncoderImpl {
> // always forward as Borrowed(), to avoid stacking references on nested calls
> output: self.output.to_borrowed_mut(),
> + payload_output: self.payload_output.to_borrowed_mut(),
> state: EncoderState {
> entry_offset,
> files_offset,
> file_offset: Some(file_offset),
> file_hash,
> write_position,
> + payload_write_position,
> ..Default::default()
> },
> parent: Some(&mut self.state),
> @@ -764,15 +794,23 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> )
> .await?;
>
> + if let EncoderOutput::Owned(output) = &mut self.payload_output {
> + if let Some(output) = output {
> + flush(output).await?;
> + }
> + }
nit: the two if-lets could be combined:
if let EncoderOutput::Owned(Some(output)) = &mut self.payload_output {
..
}
> +
> if let EncoderOutput::Owned(output) = &mut self.output {
> flush(output).await?;
> }
>
> // done up here because of the self-borrow and to propagate
> let end_offset = self.position();
> + let payload_end_offset = self.payload_position();
>
> if let Some(parent) = &mut self.parent {
> parent.write_position = end_offset;
> + parent.payload_write_position = payload_end_offset;
>
> let file_offset = self
> .state
> @@ -837,6 +875,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> pub(crate) struct FileImpl<'a, S: SeqWrite> {
> output: &'a mut S,
>
> + /// Optional write redirection of file payloads to this sequential stream
> + payload_output: Option<&'a mut S>,
> +
> /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
> /// directly instead of on Drop of FileImpl?
> goodbye_item: GoodbyeItem,
> @@ -916,19 +957,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
> /// for convenience.
> pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
> self.check_remaining(data.len())?;
> - let put =
> - poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
> - .await?;
> - //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
> + let put = if let Some(mut output) = self.payload_output.as_mut() {
> + let put =
> + poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
> + .await?;
> + self.parent.payload_write_position += put as u64;
> + put
> + } else {
> + let put = poll_fn(|cx| unsafe {
> + Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data)
> + })
> + .await?;
> + self.parent.write_position += put as u64;
> + put
> + };
> +
> self.remaining_size -= put as u64;
> - self.parent.write_position += put as u64;
> Ok(put)
> }
>
> /// Completely write file data for the current file entry in a pxar archive.
> pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
> self.check_remaining(data.len())?;
> - seq_write_all(self.output, data, &mut self.parent.write_position).await?;
> + if let Some(ref mut output) = self.payload_output {
> + seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
> + } else {
> + seq_write_all(self.output, data, &mut self.parent.write_position).await?;
> + }
> self.remaining_size -= data.len() as u64;
> Ok(())
> }
> diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
> index 1ec91b8..28981df 100644
> --- a/src/encoder/sync.rs
> +++ b/src/encoder/sync.rs
> @@ -56,6 +56,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
> })
> }
>
> + /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
> + pub fn attach_payload_output(self, payload_output: T) -> Self {
> + Self {
> + inner: self.inner.attach_payload_output(payload_output),
same question as above here..
> + }
> + }
> +
> /// Create a new regular file in the archive. This returns a `File` object to which the
> /// contents have to be written out *completely*. Failing to do so will put the encoder into an
> /// error state.
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list