[pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>`
Max Carrara
m.carrara at proxmox.com
Mon Aug 28 16:42:03 CEST 2023
This preliminary commit adds a fully functioning prototype
implementation of `AsyncExtractor<T>`, which is - unsurprisingly -
a complete `async` implementation of the original `Extractor` (and its
accompanying `ExtractorIter`).
In order to limit the scope of this RFC and focus on its intent,
the `pbs_client::pxar::aio` module is introduced, which contains all
relevant changes (instead of refactoring half of the crate in a dozen
or two commits).
The design of the new extractor is split into three main parts:
1. the higher-level `AsyncExtractor<T>` which exposes the public
extraction API
2. the lower-level `RawAsyncExtractor` which serves as the
"workhorse" of its implementors, including `AsyncExtractor<T>`
3. the `SyncTaskWorker` which receives blocking / synchronous tasks
via a queue and runs them in parallel - this allows the
`RawAsyncExtractor` to remain fully cooperative and non-blocking
Furthermore, by moving everything that can be considered an
*extraction option* into the `PxarExtractionOptions` `struct`, the
overall control flow and usage of `AsyncExtractor<T>` and
`RawAsyncExtractor` is much simpler to follow.
The `ErrorHandler` type alias is made more flexible by requiring
an `Fn` instead of an `FnMut`; the callback is no longer generic but
instead becomes its own threadsafe alias named `Callback`.
The following modules are adapted in a similar fashion, but remain
identical in their function:
`pbs_client::pxar::dir_stack` --> `pbs_client::pxar::aio::dir_stack`
* The original `PxarDirStack` is split in two parts; it now uses
the new `DirStack<T>` in order to be somewhat more testable
* Synchronization mechanisms and guarantees are put in place
in regards to the new extractor implementation, as the
`PxarDirStack` must now be shared between multiple threads or
tasks
`pbs_client::pxar::metadata` --> `pbs_client::pxar::aio::metadata`
* Function signatures now take an `&Arc<pxar::Entry>` in order to
make it easier to be passed around threads.
(There was no measurable difference between `Arc<pxar::Entry>`
vs `&Arc<pxar::Entry>` vs `&pxar::Entry`)
* The old `ErrorHandler` is replaced with the new version in
`pbs_client::pxar::aio`
Signed-off-by: Max Carrara <m.carrara at proxmox.com>
---
Cargo.toml | 1 +
pbs-client/Cargo.toml | 1 +
pbs-client/src/pxar/aio/dir_stack.rs | 543 +++++++++++++++++++
pbs-client/src/pxar/aio/extract/extractor.rs | 446 +++++++++++++++
pbs-client/src/pxar/aio/extract/mod.rs | 220 ++++++++
pbs-client/src/pxar/aio/extract/raw.rs | 503 +++++++++++++++++
pbs-client/src/pxar/aio/metadata.rs | 412 ++++++++++++++
pbs-client/src/pxar/aio/mod.rs | 11 +
pbs-client/src/pxar/aio/worker.rs | 167 ++++++
pbs-client/src/pxar/mod.rs | 1 +
10 files changed, 2305 insertions(+)
create mode 100644 pbs-client/src/pxar/aio/dir_stack.rs
create mode 100644 pbs-client/src/pxar/aio/extract/extractor.rs
create mode 100644 pbs-client/src/pxar/aio/extract/mod.rs
create mode 100644 pbs-client/src/pxar/aio/extract/raw.rs
create mode 100644 pbs-client/src/pxar/aio/metadata.rs
create mode 100644 pbs-client/src/pxar/aio/mod.rs
create mode 100644 pbs-client/src/pxar/aio/worker.rs
diff --git a/Cargo.toml b/Cargo.toml
index c7773f0e..ba6d874a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -122,6 +122,7 @@ hyper = { version = "0.14", features = [ "full" ] }
lazy_static = "1.4"
libc = "0.2"
log = "0.4.17"
+memchr = "2.5"
nix = "0.26.1"
nom = "7"
num-traits = "0.2"
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index ed7d651d..047efb41 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -17,6 +17,7 @@ hyper.workspace = true
lazy_static.workspace = true
libc.workspace = true
log.workspace = true
+memchr.workspace = true
nix.workspace = true
openssl.workspace = true
percent-encoding.workspace = true
diff --git a/pbs-client/src/pxar/aio/dir_stack.rs b/pbs-client/src/pxar/aio/dir_stack.rs
new file mode 100644
index 00000000..62cf9ee5
--- /dev/null
+++ b/pbs-client/src/pxar/aio/dir_stack.rs
@@ -0,0 +1,543 @@
+#![allow(unused)]
+
+use std::ffi::OsStr;
+use std::fmt::Debug;
+use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::dir::Dir;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{mkdirat, Mode};
+
+use proxmox_sys::error::SysError;
+
+// NOTE: This is essentially crate::pxar::tools:assert_single_path_component
+// but kept separate here for the time being
+pub fn assert_path_has_single_normal_component<P: AsRef<Path>>(path: P) -> Result<(), Error> {
+ let path = path.as_ref();
+
+ if !path.is_relative() {
+ bail!("path is absolute: {:?}", path)
+ }
+
+ let mut components = path.components();
+
+ if !matches!(components.next(), Some(std::path::Component::Normal(_))) {
+ bail!("invalid component in path: {:?}", path)
+ }
+
+ if components.next().is_some() {
+ bail!("path has multiple components: {:?}", path)
+ }
+
+ Ok(())
+}
+
+/// A stack which stores directory path components and associates each directory
+/// with some data.
+pub struct DirStack<T> {
+ path: PathBuf,
+ data_stack: Vec<T>,
+}
+
+impl<T> Default for DirStack<T> {
+ #[inline]
+ fn default() -> Self {
+ Self {
+ path: PathBuf::new(),
+ data_stack: vec![],
+ }
+ }
+}
+
+impl<T: Clone> Clone for DirStack<T> {
+ #[inline]
+ fn clone(&self) -> Self {
+ Self {
+ path: self.path.clone(),
+ data_stack: self.data_stack.clone(),
+ }
+ }
+}
+
+impl<T: Debug> Debug for DirStack<T> {
+ #[inline]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("DirStack")
+ .field("path", &self.path)
+ .field("data_stack", &self.data_stack)
+ .finish()
+ }
+}
+
+impl<T> DirStack<T> {
+ /// Returns a [`Path`] slice of the entire path that's on the stack.
+ ///
+ /// The [dangling root](DirStack::with_dangling_root) will be included if it exists.
+ #[inline]
+ pub fn as_path(&self) -> &Path {
+ self.path.as_path()
+ }
+
+ /// Removes all data from the stack.
+ ///
+ /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+ #[inline]
+ pub fn clear(&mut self) {
+ while self.pop().is_some() {}
+ }
+
+ /// Returns the [dangling root](DirStack::with_dangling_root) or an empty
+ /// path if it doesn't exist.
+ #[inline]
+ pub fn dangling_root(&self) -> &Path {
+ if self.data_stack.is_empty() {
+ self.path.as_ref()
+ } else {
+ let mut iter = self.path.iter();
+ iter.nth_back(self.data_stack.len() - 1);
+ iter.as_path()
+ }
+ }
+
+ /// Returns the first path component and its associated data.
+ ///
+ /// If the stack has a [dangling root](DirStack::with_dangling_root), it
+ /// will be ignored. Meaning, only the first *added* path component will
+ /// be returned.
+ #[inline]
+ pub fn first(&self) -> Option<(&Path, &T)> {
+ self.data_stack.first().and_then(|data| {
+ self.path
+ .iter()
+ .nth_back(self.data_stack.len() - 1)
+ .map(|component| (component.as_ref(), data))
+ })
+ }
+
+ /// Checks whether the stack is empty.
+ ///
+ /// The stack is considered empty if there are no path components with
+ /// associated data, even if it has a [dangling root](DirStack::with_dangling_root).
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.data_stack.is_empty()
+ }
+
+ /// Returns the last path component and its associated data.
+ #[inline]
+ pub fn last(&self) -> Option<(&Path, &T)> {
+ self.data_stack.last().and_then(|data| {
+ self.path
+ .file_name()
+ .map(|file_name| (file_name.as_ref(), data))
+ })
+ }
+
+ /// Returns the number of elements on the stack.
+ ///
+ /// Note that the root path (the very first path) may consist of multiple components,
+ /// but is still counted as one element.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.data_stack.len()
+ }
+
+ /// Creates a new, empty [`DirStack`].
+ #[inline]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Returns the **full path** before [`DirStack::last()`] and the data associated
+ /// with the component it leads to, **if both exist.**
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use std::path::Path;
+ ///
+ /// # use tools::DirStack;
+ ///
+ /// let mut dir_stack: DirStack<u8> = DirStack::new();
+ ///
+ /// dir_stack.push("foo", 1);
+ /// dir_stack.push("bar", 2);
+ /// dir_stack.push("baz", 3);
+ ///
+ /// let expected = Path::new("foo/bar");
+ ///
+ /// assert_eq!(dir_stack.parent(), Some((expected, &2)));
+ ///
+ /// ```
+ #[inline]
+ pub fn parent(&self) -> Option<(&Path, &T)> {
+ if self.is_empty() {
+ None
+ } else {
+ self.path.parent().and_then(|parent| {
+ if self.data_stack.len() > 1 {
+ Some((parent, &self.data_stack[self.data_stack.len() - 2]))
+ } else {
+ None
+ }
+ })
+ }
+ }
+ /// Removes the last path component from the stack and returns its associated
+ /// data, or [`None`] if the stack is empty.
+ #[inline]
+ pub fn pop(&mut self) -> Option<T> {
+ self.data_stack.pop().map(|data| {
+ self.path.pop();
+ data
+ })
+ }
+
+ /// Pushes a path and its associated data onto the stack.
+ ///
+ /// The path must consist of a single normal component.
+ /// See [`Component::Normal`][std::path::Component::Normal].
+ #[inline]
+ pub fn push<P: AsRef<Path>>(&mut self, path: P, data: T) -> Result<(), Error> {
+ assert_path_has_single_normal_component(&path)?;
+
+ self.path.push(path.as_ref());
+ self.data_stack.push(data);
+
+ Ok(())
+ }
+
+ /// Clears the stack, replacing all elements with the given path and that path's
+ /// associated data.
+ ///
+ /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+ ///
+ /// Contrary to [`DirStack::push()`], this can be any path and doesn't have to consist of
+ /// only a single path component. This is useful if you want to e.g. initialize the stack
+ /// with your current working directory or similar.
+ #[inline]
+ pub fn replace<P: AsRef<Path>>(&mut self, path: P, data: T) {
+ self.clear();
+ self.path.push(path);
+ self.data_stack.push(data);
+ }
+
+ /// Creates a new [`DirStack`] from a path without any associated data.
+ /// This stack has a **dangling root** which means that even though it has a
+ /// path, it is still considered to be empty, as it lacks any data.
+ ///
+ /// This is useful if you want to have a base directory that can never
+ /// be left through [`DirStack::pop()`], such as your current working directory,
+ /// for example.
+ #[inline]
+ pub fn with_dangling_root<P: AsRef<Path>>(path: P) -> Self {
+ Self {
+ path: path.as_ref().to_owned(),
+ data_stack: vec![],
+ }
+ }
+}
+
+impl<T, Idx> std::ops::Index<Idx> for DirStack<T>
+where
+ Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+ type Output = T;
+
+ #[inline]
+ fn index(&self, index: Idx) -> &Self::Output {
+ &self.data_stack[index]
+ }
+}
+
+impl<T, Idx> std::ops::IndexMut<Idx> for DirStack<T>
+where
+ Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+ #[inline]
+ fn index_mut(&mut self, index: Idx) -> &mut Self::Output {
+ &mut self.data_stack[index]
+ }
+}
+
+/// Helper struct that associates a [`Entry`][e] with a potentially opened directory [`Dir`].
+///
+/// It's the developer's responsibility that the contained [`Entry`][e]'s *kind*
+/// is actually a [`Directory`][d], and that the opened [`Dir`] corresponds to the
+/// provided [`Entry`][e].
+///
+/// [e]: pxar::Entry
+/// [d]: pxar::EntryKind::Directory
+#[derive(Debug)]
+pub(super) struct PxarDir {
+ entry: Arc<pxar::Entry>,
+ dir: Option<Dir>,
+}
+
+impl PxarDir {
+ /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry].
+ #[inline]
+ pub fn new(entry: Arc<pxar::Entry>) -> Self {
+ Self { entry, dir: None }
+ }
+
+ /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry]
+ /// which is associated with a [`Dir`].
+ #[inline]
+ pub fn with_dir(entry: Arc<pxar::Entry>, dir: Dir) -> Self {
+ Self {
+ entry,
+ dir: Some(dir),
+ }
+ }
+
+ /// Return the file name of the inner [`Entry`][pxar::Entry].
+ #[inline]
+ pub fn file_name(&self) -> &OsStr {
+ self.entry.file_name()
+ }
+
+ /// Return a [`BorrowedFd`] to the inner [`Dir`] if available.
+ #[inline]
+ pub fn try_as_borrowed_fd(&self) -> Option<BorrowedFd> {
+ // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+ self.dir
+ .as_ref()
+ .map(|dir| unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) })
+ }
+
+ /// Moves the inner [`Entry`][pxar::Entry] and [`Option<Dir>`][dir] out of the [`PxarDir`].
+ ///
+ /// [dir]: nix::dir::Dir
+ #[inline]
+ pub fn into_inner(self) -> (Arc<pxar::Entry>, Option<Dir>) {
+ (self.entry, self.dir)
+ }
+
+ #[inline]
+ fn create_at(&mut self, parent: RawFd, allow_existing: bool) -> Result<BorrowedFd, Error> {
+ const PERMS: Mode = Mode::from_bits_truncate(0o700);
+
+ match mkdirat(parent, self.file_name(), PERMS) {
+ Ok(()) => (),
+ Err(error) => {
+ if !(allow_existing && error.already_exists()) {
+ return Err(error).context("directory already exists");
+ }
+ }
+ }
+
+ self.open_at(parent)
+ }
+
+ #[inline]
+ fn open_at(&mut self, parent: RawFd) -> Result<BorrowedFd, Error> {
+ Dir::openat(parent, self.file_name(), OFlag::O_DIRECTORY, Mode::empty())
+ .context("failed to open directory")
+ .map(|dir| {
+ // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+ let fd = unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) };
+
+ self.dir = Some(dir);
+
+ fd
+ })
+ }
+}
+
+/// The [`PxarDirStack`] is used to keep track of traversed and created [`PxarDir`s][PxarDir].
+#[derive(Debug)]
+pub(super) struct PxarDirStack {
+ inner: DirStack<PxarDir>,
+ len_created: usize,
+}
+
+/// This struct may safely be `Sync` as it is only used in the context of the
+/// [`RawAsyncExtractor`][super::RawAsyncExtractor], which never accesses an
+/// underlying [`Dir`] concurrently or in parallel.
+unsafe impl Sync for PxarDirStack {}
+
+impl PxarDirStack {
+ #[inline]
+ pub fn new() -> Self {
+ Self {
+ inner: DirStack::new(),
+ len_created: 0,
+ }
+ }
+
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+
+ #[inline]
+ pub fn as_path(&self) -> &Path {
+ self.inner.as_path()
+ }
+
+ #[inline]
+ pub fn push<P: AsRef<Path>>(&mut self, path: P, dir: PxarDir) -> Result<(), Error> {
+ let path = path.as_ref();
+ self.inner.push(path, dir).map(|_| {
+ if self.inner[self.inner.len() - 1].dir.is_some() {
+ self.len_created += 1;
+ }
+ })
+ }
+
+ #[inline]
+ pub fn pop(&mut self) -> Option<PxarDir> {
+ self.inner.pop().map(|dir| {
+ self.len_created = self.len_created.min(self.len());
+ dir
+ })
+ }
+
+ #[inline]
+ pub fn root_dir_fd(&self) -> Result<BorrowedFd, Error> {
+ self.inner
+ .first()
+ .ok_or_else(|| format_err!("stack underrun"))
+ .map(|(_, pxar_dir)| {
+ pxar_dir
+ .try_as_borrowed_fd()
+ .context("lost track of directory file descriptors")
+ })?
+ }
+
+ /// Return the last [`Dir`]'s file descriptor as a [`BorrowedFd`]
+ /// or create it if it's not available.
+ #[inline]
+ pub fn last_dir_fd(&mut self, allow_existing: bool) -> Result<BorrowedFd, Error> {
+ if self.is_empty() {
+ bail!("no directory entries on the stack")
+ }
+
+ if self.len_created == 0 {
+ bail!("no created file descriptors on the stack")
+ }
+
+ let mut fd = self.inner[self.len_created - 1]
+ .try_as_borrowed_fd()
+ .context("lost track of directory file descriptors")?
+ .as_raw_fd();
+
+ while self.len_created < self.len() {
+ fd = self.inner[self.len_created]
+ .create_at(fd, allow_existing)
+ .map(|borrowed_fd| borrowed_fd.as_raw_fd())?;
+ self.len_created += 1;
+ }
+
+ self.inner[self.len_created - 1]
+ .try_as_borrowed_fd()
+ .context("lost track of directory file descriptors")
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::ffi::OsStr;
+ use std::path::Path;
+
+ use super::DirStack;
+
+ // helper extension trait to make asserts more concise
+ trait PathHelper {
+ fn as_path(&self) -> &Path;
+ }
+
+ impl<T> PathHelper for T
+ where
+ T: AsRef<OsStr>,
+ {
+ fn as_path(&self) -> &Path {
+ self.as_ref().as_ref()
+ }
+ }
+
+ #[test]
+ fn test_dir_stack_base() {
+ let mut dir_stack: DirStack<u8> = DirStack::new();
+
+ assert!(
+ dir_stack.push("foo", 1).is_ok(),
+ "couldn't push onto stack!"
+ );
+
+ dir_stack.push("bar", 2).unwrap();
+ dir_stack.push("baz", 3).unwrap();
+
+ assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+
+ assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+ assert_eq!(dir_stack.last(), Some(("baz".as_path(), &3)));
+ assert_eq!(dir_stack.parent(), Some(("foo/bar".as_path(), &2)));
+
+ assert_eq!(dir_stack.pop(), Some(3));
+
+ assert_eq!(dir_stack.last(), Some(("bar".as_path(), &2)));
+ assert_eq!(dir_stack.parent(), Some(("foo".as_path(), &1)));
+
+ assert_eq!(dir_stack.pop(), Some(2));
+
+ assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+ assert_eq!(dir_stack.last(), Some(("foo".as_path(), &1)));
+ assert!(dir_stack.parent().is_none());
+
+ assert_eq!(dir_stack.pop(), Some(1));
+
+ assert!(dir_stack.first().is_none());
+ assert!(dir_stack.last().is_none());
+ assert!(dir_stack.parent().is_none());
+
+ assert_eq!(dir_stack.as_path(), "".as_path());
+
+ dir_stack.push("foo", 1).unwrap();
+ dir_stack.clear();
+
+ assert_eq!(dir_stack.as_path(), "".as_path());
+ }
+
+ #[test]
+ fn test_dir_stack_dangling_root() {
+ let mut dir_stack: DirStack<u8> = DirStack::with_dangling_root("foo/bar/baz");
+
+ assert!(dir_stack.is_empty());
+
+ assert!(dir_stack.push("qux", 1).is_ok());
+
+ let expected = Some(("qux".as_path(), &1u8));
+
+ assert_eq!(dir_stack.first(), expected);
+ assert_eq!(dir_stack.last(), expected);
+ assert!(dir_stack.parent().is_none());
+
+ assert_eq!(dir_stack.as_path(), "foo/bar/baz/qux".as_path());
+ assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+ assert!(!dir_stack.is_empty());
+
+ assert_eq!(dir_stack.pop(), Some(1));
+
+ assert!(dir_stack.first().is_none());
+ assert!(dir_stack.last().is_none());
+ assert!(dir_stack.parent().is_none());
+
+ assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+ assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+ assert!(dir_stack.is_empty());
+
+ let empty_dir_stack: DirStack<u8> = DirStack::new();
+ assert!(empty_dir_stack.is_empty());
+ assert_eq!(empty_dir_stack.dangling_root(), "".as_path())
+ }
+}
diff --git a/pbs-client/src/pxar/aio/extract/extractor.rs b/pbs-client/src/pxar/aio/extract/extractor.rs
new file mode 100644
index 00000000..944ec157
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/extractor.rs
@@ -0,0 +1,446 @@
+use std::os::unix::prelude::OsStrExt;
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error, Result};
+use memchr;
+use pathpatterns::{MatchList, MatchType};
+
+use pxar::accessor::aio::Accessor;
+use pxar::accessor::SeqReadAtAdapter;
+use pxar::decoder::aio::Decoder;
+use pxar::decoder::SeqRead;
+
+use super::raw::RawAsyncExtractor;
+use super::PxarExtractOptions;
+
+use crate::pxar::Flags;
+
+/// Helper enum that represents the state of the root [`Entry`][pxar::Entry]
+/// in the context of the [`AsyncExtractor<T>`].
+///
+/// For example: Because the [`Accessor<T>`] skips the archive's root directory once a
+/// [`Decoder<T>`] is instantiated from it, the root entry has to be decoded
+/// and provided beforehand.
+#[derive(Debug)]
+pub(crate) enum RootEntryState {
+ None,
+ Decoded { entry: Arc<pxar::Entry> },
+ Extracted,
+}
+
+pub struct AsyncExtractor<T> {
+ decoder: Box<Decoder<T>>,
+ inner: Box<RawAsyncExtractor>,
+
+ options: Arc<PxarExtractOptions>,
+
+ root_entry_state: RootEntryState,
+ end_reached: bool,
+
+ match_stack: Vec<bool>,
+ current_match: bool,
+}
+
+type FileReader = pxar::accessor::sync::FileRefReader<Arc<std::fs::File>>;
+
+impl AsyncExtractor<SeqReadAtAdapter<FileReader>> {
+ /// Create a new extractor from an existing pxar archive file.
+ ///
+ /// This is the preferred way to extract an archive that has been saved to
+ /// local storage, taking advantage of the fact that the archive is available on a drive.
+ pub async fn with_file<P>(src_path: P, options: PxarExtractOptions) -> Result<Self, Error>
+ where
+ P: AsRef<Path>,
+ {
+ let src_path = src_path.as_ref();
+
+ let file = std::fs::File::open(src_path)
+ .with_context(|| format!("failed to open file: {:#?}", src_path))?;
+
+ // allows us to toss the accessor away once we've gotten our decoder from it
+ let file = Arc::new(file);
+
+ let accessor = Accessor::from_file_ref(file)
+ .await
+ .context("failed to instantiate pxar accessor")?;
+
+ // root entry needs to be pre-decoded because `decode_full()` skips it
+ let root_dir = accessor.open_root().await?;
+ let root_entry = root_dir.lookup_self().await?.entry().clone();
+
+ let decoder = root_dir
+ .decode_full()
+ .await
+ .context("failed to instantiate decoder from accessor")?;
+
+ Ok(Self::with_root_entry(decoder, options, root_entry))
+ }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+ /// Create a new extractor from an existing [`Decoder`].
+ /// It is assumed that no entries were decoded prior to the creation of the extractor.
+ pub fn new(decoder: Decoder<T>, options: PxarExtractOptions) -> Self {
+ let root_entry_state = RootEntryState::None;
+ Self::new_impl(decoder, options, root_entry_state)
+ }
+
+ /// Create a new extractor from an existing [`Decoder`] and a pre-decoded root
+ /// [`Entry`][pxar::Entry]. This is useful if you need to handle the extraction
+ /// of the archive's root in a special manner.
+ pub fn with_root_entry(
+ decoder: Decoder<T>,
+ options: PxarExtractOptions,
+ root_entry: pxar::Entry,
+ ) -> Self {
+ let root_entry_state = RootEntryState::Decoded {
+ entry: Arc::new(root_entry),
+ };
+
+ Self::new_impl(decoder, options, root_entry_state)
+ }
+
+ /// Create a new extractor from an arbitrary input that implements [`SeqRead`].
+ pub async fn with_input(input: T, options: PxarExtractOptions) -> Result<Self, Error> {
+ let decoder = Decoder::new(input)
+ .await
+ .context("failed to instantiate decoder")?;
+
+ let root_entry_state = RootEntryState::None;
+
+ Ok(Self::new_impl(decoder, options, root_entry_state))
+ }
+
+ fn new_impl(
+ mut decoder: Decoder<T>,
+ options: PxarExtractOptions,
+ root_entry_state: RootEntryState,
+ ) -> Self {
+ decoder.enable_goodbye_entries(true);
+
+ let current_match = options.default_match;
+
+ Self {
+ decoder: Box::new(decoder),
+ inner: Box::new(RawAsyncExtractor::new()),
+ options: Arc::new(options),
+ root_entry_state,
+ end_reached: false,
+ match_stack: vec![],
+ current_match,
+ }
+ }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+ /// Decode and extract the next [`Entry`][pxar::Entry].
+ ///
+ /// Upon each call, this function will continue to extract entries until
+ /// either an error is encountered or the archive has been fully extracted.
+ #[inline]
+ pub async fn next(&mut self) -> Option<Result<(), Error>> {
+ if self.end_reached {
+ return None;
+ }
+
+ if !matches!(self.root_entry_state, RootEntryState::Extracted) {
+ return Some(self.handle_root_entry().await);
+ }
+
+ let entry = match self.decode_next().await {
+ Some(Ok(entry)) => entry,
+ // other cases are already handled in `decode_next()` so we can just return here
+ rval => {
+ return rval.map(|result| result.map(drop));
+ }
+ };
+
+ let mut result = match self.extract_next(entry).await {
+ Err(error) => self.exec_error_handler(error).await,
+ res => res,
+ };
+
+ if result.is_err() {
+ if let Err(stop_error) = self.stop_extraction() {
+ result = result
+ .context(stop_error)
+ .context("encountered another error during extractor shutdown");
+ }
+ }
+
+ Some(result)
+ }
+
+ /// Helper method that starts the extraction process by extracting the root
+ /// directory of the archive to the given destination.
+ #[inline]
+ async fn start_extraction(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+ if matches!(self.root_entry_state, RootEntryState::Extracted) {
+ bail!("root entry of archive was already extracted")
+ }
+
+ self.inner.start_worker_thread(&self.options)?;
+
+ self.inner
+ .extract_root_dir(entry, &self.options)
+ .await
+ .map(|_| {
+ self.root_entry_state = RootEntryState::Extracted;
+ })
+ }
+
+ /// Helper method that stops the extraction process by
+ #[inline]
+ fn stop_extraction(&mut self) -> Result<(), Error> {
+ self.end_reached = true;
+ proxmox_async::runtime::block_in_place(|| self.inner.join_worker_thread())
+ }
+
+ /// Decodes the next [`Entry`][pxar::Entry] and wraps it in an [`Arc`] on success.
+ ///
+ /// If an error occurs while decoding an entry, the extractor is subsequently stopped.
+ #[inline(always)] // hot path
+ async fn decode_next(&mut self) -> Option<Result<Arc<pxar::Entry>, Error>> {
+ let entry = match self.decoder.next().await {
+ None => {
+ return if let Err(error) = self.stop_extraction() {
+ Some(Err(error))
+ } else {
+ None
+ };
+ }
+ Some(Err(error)) => {
+ let result = Err(error).context("error reading pxar archive");
+
+ let result = if let Err(stop_error) = self.stop_extraction() {
+ result
+ .context(stop_error)
+ .context("encountered another error during extractor shutdown")
+ } else {
+ result
+ };
+
+ return Some(result);
+ }
+ Some(Ok(entry)) => entry,
+ };
+
+ Some(Ok(Arc::new(entry)))
+ }
+
+ /// Helper method that extracts the archive's root entry depending on the
+ /// [`RootEntryState`].
+ ///
+ /// If the root pxar entry is decoded or was provided, the extraction process
+ /// is started via [`RawAsyncExtractor::start_extraction()`].
+ async fn handle_root_entry(&mut self) -> Result<(), Error> {
+ let decode_result = match self.root_entry_state {
+ RootEntryState::None => match self.decode_next().await {
+ Some(result) => result.context("error while decoding root entry"),
+ None => Err(format_err!("no root entry found - archive is empty")),
+ },
+ RootEntryState::Decoded { ref entry } => Ok(Arc::clone(entry)),
+ RootEntryState::Extracted => Err(format_err!("root entry was already extracted")),
+ };
+
+ let entry = decode_result.map_err(|error| {
+ let _ = self.stop_extraction();
+ error
+ })?;
+
+ self.start_extraction(entry).await.map_err(|error| {
+ let _ = self.stop_extraction();
+ error
+ })
+ }
+
+ /// Extract an [`Entry`][pxar::Entry] depending on its [`EntryKind`][pxar::EntryKind].
+ ///
+ ///
+ /// This method checks each entry's filename and matches it against the given
+ /// [`MatchList`]. Once an entry is safe to extract and matches, the
+ /// [`Callback`][super::Callback] is executed and the underlying [`RawAsyncExtractor`]
+ /// is used to perform the actual extraction operation.
+ #[inline(always)] // hot path
+ async fn extract_next(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+ Self::check_entry_filename(&entry)?;
+
+ let (match_type, did_match) = self.match_entry(&entry);
+
+ use pxar::EntryKind::*;
+
+ match (did_match, entry.kind()) {
+ (_, Directory) => {
+ self.exec_callback(&entry).await;
+
+ let do_create = self.current_match && match_type != Some(MatchType::Exclude);
+
+ let res = self
+ .inner
+ .enter_directory(entry, &self.options, do_create)
+ .await;
+
+ if res.is_ok() {
+ // We're starting a new directory, push our old matching
+ // state and replace it with our new one:
+ self.match_stack.push(self.current_match);
+ self.current_match = did_match;
+ }
+
+ res
+ }
+ (_, GoodbyeTable) => {
+ self.exec_callback(&entry).await;
+ let res = self.inner.leave_directory(&self.options).await;
+
+ if res.is_ok() {
+ // We left a directory, also get back our previous matching state. This is in sync
+ // with `dir_stack` so this should never be empty except for the final goodbye
+ // table, in which case we get back to the default of `true`.
+ self.current_match = self.match_stack.pop().unwrap_or(true);
+ }
+
+ res
+ }
+ (true, Symlink(_)) => {
+ self.exec_callback(&entry).await;
+
+ self.inner.extract_symlink(entry, &self.options).await
+ }
+ (true, Hardlink(_)) => {
+ self.exec_callback(&entry).await;
+
+ self.inner.extract_hardlink(entry, &self.options).await
+ }
+ (true, Device(dev)) => {
+ self.exec_callback(&entry).await;
+
+ if self.has_feature_flags(Flags::WITH_DEVICE_NODES) {
+ let dev = dev.to_owned();
+ self.inner.extract_device(entry, &self.options, dev).await
+ } else {
+ Ok(())
+ }
+ }
+ (true, Fifo) => {
+ self.exec_callback(&entry).await;
+
+ if self.has_feature_flags(Flags::WITH_FIFOS) {
+ self.inner.extract_fifo(entry, &self.options, 0).await
+ } else {
+ Ok(())
+ }
+ }
+ (true, Socket) => {
+ self.exec_callback(&entry).await;
+
+ if self.has_feature_flags(Flags::WITH_SOCKETS) {
+ self.inner.extract_socket(entry, &self.options, 0).await
+ } else {
+ Ok(())
+ }
+ }
+ (true, File { size, .. }) => {
+ self.exec_callback(&entry).await;
+
+ if let Some(ref mut contents) = self.decoder.contents() {
+ let size = size.to_owned();
+ self.inner
+ .extract_file(entry, &self.options, contents, size)
+ .await
+ } else {
+ Err(format_err!(
+ "found regular file entry without contents in archive"
+ ))
+ }
+ }
+ (false, _) => Ok(()),
+ }
+ }
+
+ /// Checks whether the [`Entry`'s][e] filename is valid.
+ ///
+ /// The filename is valid if and only if:
+ /// * it doesn't contain slashes `/`
+ /// * it doesn't contain null bytes `\0`
+ ///
+ /// [e]: pxar::Entry
+ fn check_entry_filename(entry: &pxar::Entry) -> Result<(), Error> {
+ let file_name_bytes = entry.file_name().as_bytes();
+
+ if let Some(pos) = memchr::memchr(b'/', file_name_bytes) {
+ bail!(
+ "archive entry filename contains slash at position {pos}, \
+ which is invalid and a security concern"
+ )
+ }
+
+ if let Some(pos) = memchr::memchr(0, file_name_bytes) {
+ bail!("archive entry filename contains NULL byte at position {pos}")
+ }
+
+ Ok(())
+ }
+
+ /// Helper method that checks whether the [`Entry`'s][pxar::Entry] path and file mode
+ /// match the provided [`MatchList`] in [`PxarExtractOptions`].
+ ///
+ /// Whether the entry actually matched is also returned together with the
+ /// [`Option<MatchType>`][MatchType]. If the latter is [`None`], the
+ /// **current match** is used as a fallback.
+ #[inline]
+ pub(crate) fn match_entry(&self, entry: &pxar::Entry) -> (Option<MatchType>, bool) {
+ let path_bytes = entry.path().as_os_str().as_bytes();
+ let file_mode = entry.metadata().file_type() as u32;
+
+ // NOTE: On large match lists this blocks for quite some time,
+ // so this could maybe be called in spawn_blocking() above a certain size
+ // ... buuut that depends on how we define what a "large" `match_list` actually is
+
+ // We can `unwrap()` safely here because we get a `Result<_, Infallible>`
+ let match_type = self
+ .options
+ .match_list
+ .matches(path_bytes, Some(file_mode))
+ .unwrap();
+
+ let did_match = match match_type {
+ Some(MatchType::Include) => true,
+ Some(MatchType::Exclude) => false,
+ None => self.current_match,
+ };
+
+ (match_type, did_match)
+ }
+
+ #[inline]
+ fn has_feature_flags(&self, feature_flags: Flags) -> bool {
+ self.options.feature_flags.contains(feature_flags)
+ }
+
+ /// Helper method to spawn and await a task that executes the extractor's
+ /// [`ErrorHandler`][super::ErrorHandler].
+ #[inline]
+ async fn exec_error_handler(&self, error: Error) -> Result<(), Error> {
+ tokio::task::spawn_blocking({
+ let error_handler = Arc::clone(&self.options.error_handler);
+ move || error_handler(error)
+ })
+ .await
+ .context("failed to execute error handler")?
+ }
+
+ /// Helper method to spawn and await a task that executes the extractor's
+ /// [`Callback`][super::Callback].
+ #[inline]
+ async fn exec_callback(&self, entry: &Arc<pxar::Entry>) {
+ if let Some(ref callback) = self.options.callback {
+ tokio::task::spawn({
+ let callback = Arc::clone(callback);
+ let entry = Arc::clone(entry);
+ async move { callback(entry).await }
+ });
+ }
+ }
+}
diff --git a/pbs-client/src/pxar/aio/extract/mod.rs b/pbs-client/src/pxar/aio/extract/mod.rs
new file mode 100644
index 00000000..82d9a596
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/mod.rs
@@ -0,0 +1,220 @@
+use std::borrow::Cow;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use futures::future::BoxFuture;
+use lazy_static::lazy_static;
+
+use pathpatterns::MatchEntry;
+
+use crate::pxar::Flags;
+
+pub(crate) mod extractor;
+pub use extractor::AsyncExtractor;
+
+pub(crate) mod raw;
+pub use raw::RawAsyncExtractor;
+
+// NOTE: These will be put into a separate module and are only used here
+// to avoid (too much) code duplication
+use crate::pxar::extract::OverwriteFlags;
+use crate::pxar::extract::PxarExtractContext;
+
+pub type ErrorHandler =
+ Box<dyn Fn(anyhow::Error) -> Result<(), anyhow::Error> + Send + Sync + 'static>;
+
+pub type Callback = Box<dyn Fn(Arc<pxar::Entry>) -> BoxFuture<'static, ()> + Send + Sync + 'static>;
+
+lazy_static! {
+ static ref DEFAULT_ERROR_HANDLER: Arc<ErrorHandler> = Arc::new(Box::new(Err));
+}
+
+/// Options for extracting a pxar archive.
+pub struct PxarExtractOptions {
+ /// The destination directory to which the archive should be extracted.
+ pub destination: PathBuf,
+
+ /// The flags that control what's extracted from the archive.
+ pub feature_flags: Flags,
+
+ /// The flags that control what kind of file system entries may or may not
+ /// be overwritten.
+ pub overwrite_flags: OverwriteFlags,
+
+ /// Whether to allow already existing directories or not.
+ pub allow_existing_dirs: bool,
+
+ /// The initial matching case for extracted entries.
+ pub default_match: bool,
+
+ /// A list of match entries. Each [`MatchEntry`] lets you control which files
+ /// should or shouldn't be extracted.
+ pub match_list: Vec<MatchEntry>,
+
+ /// A boxed closure that's used to handle errors throughout the entire
+ /// extraction process.
+ pub error_handler: Arc<ErrorHandler>,
+
+ /// An optional future that is called whenever a [`pxar::Entry`] matches
+ /// and is about to be extracted.
+ ///
+ /// **Note that this future is spawned as a task and not awaited until completion.**
+ pub callback: Option<Arc<Callback>>,
+}
+
+impl PxarExtractOptions {
+ pub fn builder<D>(destination: D) -> PxarExtractOptionsBuilder
+ where
+ D: Into<Cow<'static, Path>>,
+ {
+ PxarExtractOptionsBuilder::new(destination)
+ }
+}
+
+impl std::fmt::Debug for PxarExtractOptions {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let error_handler_msg = if Arc::ptr_eq(&self.error_handler, &DEFAULT_ERROR_HANDLER) {
+ &"default (no-op)"
+ } else {
+ &"custom"
+ };
+
+ f.debug_struct("PxarExtractOptions")
+ .field("destination", &self.destination)
+ .field("feature_flags", &self.feature_flags)
+ .field("overwrite_flags", &self.overwrite_flags)
+ .field("allow_existing_dirs", &self.allow_existing_dirs)
+ .field("default_match", &self.default_match)
+ .field("match_list", &self.match_list)
+ .field("error_handler", error_handler_msg)
+ .field("callback", &self.callback.is_some())
+ .finish()
+ }
+}
+
+/// This builder is used to configure the behaviour of the [`AsyncExtractor`][extr].
+///
+/// See [`PxarExtractOptions`] for a complete description of all extraction options.
+///
+/// [extr]: extractor::AsyncExtractor
+pub struct PxarExtractOptionsBuilder {
+ destination: Cow<'static, Path>,
+
+ feature_flags: Flags,
+ overwrite_flags: OverwriteFlags,
+ allow_existing_dirs: bool,
+
+ default_match: bool,
+ match_list: Vec<MatchEntry>,
+
+ error_handler: Option<Arc<ErrorHandler>>,
+ callback: Option<Arc<Callback>>,
+}
+
+impl std::fmt::Debug for PxarExtractOptionsBuilder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PxarExtractOptionsBuilder")
+ .field("destination", &self.destination)
+ .field("feature_flags", &self.feature_flags)
+ .field("overwrite_flags", &self.overwrite_flags)
+ .field("allow_existing_dirs", &self.allow_existing_dirs)
+ .field("default_match", &self.default_match)
+ .field("match_list", &self.match_list)
+ .field("error_handler", &self.error_handler.is_some())
+ .field("callback", &self.callback.is_some())
+ .finish()
+ }
+}
+
+impl PxarExtractOptionsBuilder {
+ pub fn new<D>(destination: D) -> Self
+ where
+ D: Into<Cow<'static, Path>>,
+ {
+ Self {
+ destination: destination.into(),
+ feature_flags: Flags::default(),
+ overwrite_flags: OverwriteFlags::empty(),
+ allow_existing_dirs: false,
+ default_match: true, // entries are considered as matching by default
+ match_list: Default::default(),
+ error_handler: None,
+ callback: None,
+ }
+ }
+
+ pub fn build(&mut self) -> PxarExtractOptions {
+ let error_handler = self
+ .error_handler
+ .as_ref()
+ .unwrap_or(&DEFAULT_ERROR_HANDLER);
+
+ PxarExtractOptions {
+ destination: self.destination.to_path_buf(),
+ feature_flags: self.feature_flags,
+ overwrite_flags: self.overwrite_flags,
+ allow_existing_dirs: self.allow_existing_dirs,
+ default_match: self.default_match,
+ match_list: self.match_list.clone(),
+ error_handler: Arc::clone(error_handler),
+ callback: self.callback.as_ref().map(Arc::clone),
+ }
+ }
+}
+
+impl PxarExtractOptionsBuilder {
+ pub fn feature_flags(&mut self, flags: Flags) -> &mut Self {
+ self.feature_flags = flags;
+ self
+ }
+
+ pub fn overwrite_flags(&mut self, flags: OverwriteFlags) -> &mut Self {
+ self.overwrite_flags = flags;
+ self
+ }
+
+ pub fn allow_existing_dirs(&mut self, value: bool) -> &mut Self {
+ self.allow_existing_dirs = value;
+ self
+ }
+
+ pub fn default_match(&mut self, value: bool) -> &mut Self {
+ self.default_match = value;
+ self
+ }
+
+ pub fn push_match_entry(&mut self, match_entry: MatchEntry) -> &mut Self {
+ self.match_list.push(match_entry);
+ self
+ }
+
+ pub fn push_match_list<T: ToOwned<Owned = Vec<MatchEntry>>>(
+ &mut self,
+ match_list: T,
+ ) -> &mut Self {
+ // for some reason, clippy doesn't like this
+ #[allow(clippy::redundant_clone)]
+ self.match_list.extend(match_list.to_owned());
+ self
+ }
+
+ pub fn error_handler(&mut self, error_handler: ErrorHandler) -> &mut Self {
+ let error_handler = Arc::new(error_handler);
+ self.error_handler_ref(error_handler)
+ }
+
+ pub fn error_handler_ref(&mut self, error_handler: Arc<ErrorHandler>) -> &mut Self {
+ self.error_handler.replace(error_handler);
+ self
+ }
+
+ pub fn callback(&mut self, callback: Callback) -> &mut Self {
+ let callback = Arc::new(callback);
+ self.callback_ref(callback)
+ }
+
+ pub fn callback_ref(&mut self, callback: Arc<Callback>) -> &mut Self {
+ self.callback.replace(callback);
+ self
+ }
+}
diff --git a/pbs-client/src/pxar/aio/extract/raw.rs b/pbs-client/src/pxar/aio/extract/raw.rs
new file mode 100644
index 00000000..d6e20981
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/raw.rs
@@ -0,0 +1,503 @@
+use std::os::fd::{AsRawFd, FromRawFd, RawFd};
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, Context, Error, Result};
+use nix::sys::stat::SFlag;
+use nix::{dir::Dir, fcntl::OFlag, sys::stat::Mode};
+
+use pxar::format::Device;
+use tokio::io::AsyncRead;
+use tokio::sync::RwLock;
+
+use proxmox_sys::fs::CreateOptions;
+
+use super::OverwriteFlags;
+use super::PxarExtractContext;
+use super::PxarExtractOptions;
+
+use crate::pxar::aio::dir_stack::{PxarDir, PxarDirStack};
+use crate::pxar::aio::metadata;
+use crate::pxar::aio::worker::SyncTaskWorker;
+
+/// The [`RawAsyncExtractor`] is used to extract individual [`pxar::Entries`][pxar::Entry].
+/// Its purpose is to provide a common underlying mechanism with which an archive is
+/// extracted.
+///
+/// The tracking of directories is based on a stack. [Entering][enter] and [leaving][leave]
+/// directories simply corresponds to [`push()`][push] and [`pop()`][pop]. All remaining
+/// operations do not modify the raw extractor's stack otherwise.
+///
+/// In order to make extraction fully concurrent, an internal worker thread is used
+/// to which synchronous tasks are sent to. This worker thread has to be started manually.
+///
+/// No state is tracked otherwise, which means that it's your own responsibility
+/// to [extract the archive's root][extr_root], as well as [start][w_start] or [stop][w_stop]
+/// the [`RawAsyncExtractor`]'s internal worker thread.
+///
+///
+/// [enter]: Self::enter_directory()
+/// [leave]: Self::leave_directory()
+/// [push]: std::path::PathBuf::push()
+/// [pop]: std::path::PathBuf::pop()
+/// [extr_root]: Self::extract_root_dir
+/// [w_start]: Self::start_worker_thread
+/// [w_stop]: Self::join_worker_thread
+pub struct RawAsyncExtractor {
+ /// A stack of directories used to aid in traversing and extracting the pxar archive.
+ dir_stack: Arc<RwLock<PxarDirStack>>,
+
+ /// Worker thread for synchronous tasks.
+ worker: Option<SyncTaskWorker>,
+}
+
+impl Default for RawAsyncExtractor {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl RawAsyncExtractor {
+ #[inline]
+ pub fn new() -> Self {
+ let dir_stack = Arc::new(RwLock::new(PxarDirStack::new()));
+
+ Self {
+ dir_stack,
+ worker: None,
+ }
+ }
+
+ /// Extract the root directory of a pxar archive to the given [`destination`][dest].
+ ///
+ /// [dest]: PxarExtractOptions::destination
+ pub async fn extract_root_dir(
+ &mut self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ ) -> Result<(), Error> {
+ let path = options.destination.clone();
+
+ let root_pxar_dir =
+ tokio::task::spawn_blocking(move || Self::do_extract_root_dir(path, entry))
+ .await
+ .context("failed to execute task to create extraction directory")??;
+
+ let dir_name = options
+ .destination
+ .file_name()
+ .context("extraction destination directory has no name")?;
+
+ self.dir_stack.write().await.push(dir_name, root_pxar_dir)
+ }
+
+ #[inline]
+ fn do_extract_root_dir<P: AsRef<Path>>(
+ path: P,
+ entry: Arc<pxar::Entry>,
+ ) -> Result<PxarDir, Error> {
+ const CREATE_OPTS: CreateOptions =
+ CreateOptions::new().perm(Mode::from_bits_truncate(0o700));
+
+ if !entry.is_dir() {
+ bail!("pxar archive does not start with a directory entry!");
+ }
+
+ let path = path.as_ref();
+
+ proxmox_sys::fs::create_path(path, None, Some(CREATE_OPTS))
+ .with_context(|| format!("error creating extraction directory {path:?}"))?;
+
+ let dir = Dir::open(path, OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, Mode::empty())
+ .with_context(|| format!("unable to open extraction directory {path:?}"))?;
+
+ Ok(PxarDir::with_dir(entry, dir))
+ }
+
+ #[inline]
+ pub fn start_worker_thread(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+ // When encountering lots of symlinks or hardlinks, the queue may become quite large
+ const WORKER_QUEUE_SIZE: usize = 5000;
+
+ if self.worker.is_some() {
+ bail!("worker thread already started")
+ }
+
+ let error_handler = Arc::clone(&options.error_handler);
+
+ self.worker.replace(SyncTaskWorker::with_error_handler(
+ WORKER_QUEUE_SIZE,
+ error_handler,
+ ));
+
+ Ok(())
+ }
+
+ #[inline]
+ pub fn join_worker_thread(&mut self) -> Result<(), Error> {
+ if let Some(mut worker) = self.worker.take() {
+ worker.join()
+ } else {
+ bail!("worker thread already joined")
+ }
+ }
+
+ /// Convenience wrapper to send a task to the [`SyncTaskWorker`].
+ #[inline]
+ async fn send_to_worker<F>(&self, task: F) -> Result<(), Error>
+ where
+ F: FnOnce() -> Result<(), Error> + Send + 'static,
+ {
+ if let Some(worker) = self.worker.as_ref() {
+ worker.send(task).await
+ } else {
+ bail!("failed to send task to worker - worker already finished")
+ }
+ }
+}
+
+// Extraction operations for each kind of pxar entry.
+impl RawAsyncExtractor {
+ #[inline]
+ fn parent_fd(dir_stack: &RwLock<PxarDirStack>, allow_existing: bool) -> Result<RawFd, Error> {
+ dir_stack
+ .blocking_write()
+ .last_dir_fd(allow_existing)
+ .map(|fd| fd.as_raw_fd())
+ }
+
+ pub async fn enter_directory(
+ &mut self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ do_create: bool,
+ ) -> Result<(), Error> {
+ let dir_stack = Arc::clone(&self.dir_stack);
+ let allow_existing = options.allow_existing_dirs;
+
+ let task = move || {
+ {
+ let pxar_dir = PxarDir::new(Arc::clone(&entry));
+ let mut locked_dir_stack = dir_stack.blocking_write();
+
+ locked_dir_stack.push(entry.file_name(), pxar_dir)?;
+
+ if do_create {
+ locked_dir_stack.last_dir_fd(allow_existing).map(drop)?;
+ }
+
+ Ok::<(), Error>(())
+ }
+ .context(PxarExtractContext::EnterDirectory)
+ };
+
+ self.send_to_worker(task).await
+ }
+
+ pub async fn leave_directory(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+ let dir_stack = Arc::clone(&self.dir_stack);
+ let error_handler = Arc::clone(&options.error_handler);
+ let flags = options.feature_flags;
+
+ let task = move || {
+ {
+ let pxar_dir = dir_stack
+ .blocking_write()
+ .pop()
+ .context("unexpected end of directory stack")?;
+
+ let fd = pxar_dir.try_as_borrowed_fd().map(|fd| fd.as_raw_fd());
+ let (entry, dir) = pxar_dir.into_inner();
+
+ if let Some(fd) = fd {
+ metadata::apply(flags, &entry, fd, &error_handler)?;
+ }
+
+ // dropping `dir` closes it - we want that to happen after metadata was applied
+ drop(dir);
+ Ok::<(), Error>(())
+ }
+ .context(PxarExtractContext::LeaveDirectory)
+ };
+
+ self.send_to_worker(task).await
+ }
+
+ pub async fn extract_symlink(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ ) -> Result<(), Error> {
+ use pxar::EntryKind::Symlink;
+
+ let dir_stack = Arc::clone(&self.dir_stack);
+ let error_handler = Arc::clone(&options.error_handler);
+
+ let allow_existing = options.allow_existing_dirs;
+ let feature_flags = options.feature_flags;
+
+ let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::SYMLINK);
+
+ let task = move || {
+ {
+ let link = match entry.kind() {
+ Symlink(link) => link,
+ _ => bail!("received invalid entry kind while trying to extract symlink"),
+ };
+
+ let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+ let result =
+ nix::unistd::symlinkat(link.as_os_str(), Some(parent_fd), entry.file_name());
+
+ match result {
+ Ok(()) => {}
+ Err(nix::errno::Errno::EEXIST) if do_overwrite => {
+ // Never unlink directories
+ let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+ nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+
+ nix::unistd::symlinkat(
+ link.as_os_str(),
+ Some(parent_fd),
+ entry.file_name(),
+ )?;
+ }
+ Err(error) => return Err(error.into()),
+ }
+
+ metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+ }
+ .context(PxarExtractContext::ExtractSymlink)
+ };
+
+ self.send_to_worker(task)
+ .await
+ .context(PxarExtractContext::ExtractSymlink)
+ }
+
+ pub async fn extract_hardlink(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ ) -> Result<(), Error> {
+ use pxar::EntryKind::Hardlink;
+
+ let dir_stack = Arc::clone(&self.dir_stack);
+
+ let allow_existing = options.allow_existing_dirs;
+ let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::HARDLINK);
+
+ let task = move || {
+ {
+ let link = match entry.kind() {
+ Hardlink(link) => {
+ if !AsRef::<Path>::as_ref(link.as_os_str()).is_relative() {
+ bail!("received absolute path while trying to extract hardlink")
+ } else {
+ link.as_os_str().to_owned()
+ }
+ }
+ _ => bail!("received invalid entry kind while trying to extract hardlink"),
+ };
+
+ let root_fd = dir_stack.blocking_read().root_dir_fd()?.as_raw_fd();
+ let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+ let make_link = || {
+ nix::unistd::linkat(
+ Some(root_fd),
+ link.as_os_str(),
+ Some(parent_fd),
+ entry.file_name(),
+ nix::unistd::LinkatFlags::NoSymlinkFollow,
+ )
+ };
+
+ match make_link() {
+ Err(nix::Error::EEXIST) if do_overwrite => {
+ let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+ nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+ make_link()
+ }
+ result => result,
+ }
+ }
+ .context(PxarExtractContext::ExtractHardlink)
+ };
+
+ self.send_to_worker(task)
+ .await
+ .context(PxarExtractContext::ExtractHardlink)
+ }
+
+ pub async fn extract_device(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ device: Device,
+ ) -> Result<(), Error> {
+ self.extract_special(entry, options, device.to_dev_t())
+ .await
+ .context(PxarExtractContext::ExtractDevice)
+ }
+
+ pub async fn extract_fifo(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ device: nix::sys::stat::dev_t,
+ ) -> Result<(), Error> {
+ self.extract_special(entry, options, device)
+ .await
+ .context(PxarExtractContext::ExtractFifo)
+ }
+
+ pub async fn extract_socket(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ device: nix::sys::stat::dev_t,
+ ) -> Result<(), Error> {
+ self.extract_special(entry, options, device)
+ .await
+ .context(PxarExtractContext::ExtractSocket)
+ }
+
+ #[inline(always)]
+ async fn extract_special(
+ &self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ device: nix::sys::stat::dev_t,
+ ) -> Result<(), Error> {
+ let dir_stack = Arc::clone(&self.dir_stack);
+ let allow_existing = options.allow_existing_dirs;
+
+ let feature_flags = options.feature_flags;
+ let error_handler = Arc::clone(&options.error_handler);
+
+ let task = move || {
+ let mode = metadata::perms_from_metadata(entry.metadata())?;
+
+ let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+ nix::sys::stat::mknodat(parent_fd, entry.file_name(), SFlag::empty(), mode, device)
+ .context("failed to create special device")?;
+
+ metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+ .context("failed to apply metadata to special device")
+ };
+
+ self.send_to_worker(task).await
+ }
+
+ pub async fn extract_file<C>(
+ &mut self,
+ entry: Arc<pxar::Entry>,
+ options: &PxarExtractOptions,
+ contents: C,
+ size: u64,
+ ) -> Result<(), Error>
+ where
+ C: AsyncRead + Unpin,
+ {
+ let (sender, receiver) = tokio::sync::oneshot::channel();
+
+ let file_creation_task = {
+ let mut oflags = OFlag::O_CREAT | OFlag::O_WRONLY | OFlag::O_CLOEXEC;
+ if options.overwrite_flags.contains(OverwriteFlags::FILE) {
+ oflags |= OFlag::O_TRUNC;
+ } else {
+ oflags |= OFlag::O_EXCL;
+ }
+
+ let entry = Arc::clone(&entry);
+ let dir_stack = Arc::clone(&self.dir_stack);
+ let error_handler = Arc::clone(&options.error_handler);
+
+ let allow_existing = options.allow_existing_dirs;
+ let feature_flags = options.feature_flags;
+
+ move || {
+ {
+ let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+ let raw_fd = nix::fcntl::openat(
+ parent_fd,
+ entry.file_name(),
+ oflags,
+ Mode::from_bits(0o700).unwrap(),
+ )
+ .with_context(|| "failed to create file".to_string())?;
+
+ metadata::apply_initial_flags(feature_flags, &entry, raw_fd, &error_handler)?;
+
+ let file = unsafe { tokio::fs::File::from_raw_fd(raw_fd) };
+
+ match sender.send(file) {
+ Ok(()) => Ok::<(), Error>(()),
+ Err(_) => bail!("failed to send file descriptor to origin future"),
+ }
+ }
+ .context(PxarExtractContext::ExtractFile)
+ }
+ };
+
+ // The task is queued for the worker ...
+ self.send_to_worker(file_creation_task)
+ .await
+ .context(PxarExtractContext::ExtractFile)?;
+
+ // ... and its response is then awaited, so other things
+ // may be done in the meantime
+ let mut file = receiver.await.context(
+ "failed to receive raw file descriptor from worker thread - did the thread die?",
+ )?;
+
+ let mut contents = tokio::io::BufReader::new(contents);
+ let copy_result = proxmox_io::sparse_copy_async(&mut contents, &mut file)
+ .await
+ .context(PxarExtractContext::ExtractFile)?;
+
+ if size != copy_result.written {
+ bail!(
+ "extracted {} bytes of a file of {} bytes",
+ copy_result.written,
+ size
+ );
+ }
+
+ let task = {
+ let raw_fd = file.as_raw_fd();
+ let feature_flags = options.feature_flags;
+ let error_handler = Arc::clone(&options.error_handler);
+
+ move || {
+ {
+ if copy_result.seeked_last {
+ while match nix::unistd::ftruncate(raw_fd, size as i64) {
+ Ok(_) => false,
+ Err(errno) if errno == nix::errno::Errno::EINTR => true,
+ Err(err) => return Err(err).context("error setting file size"),
+ } {}
+ }
+
+ metadata::apply(feature_flags, &entry, raw_fd, &error_handler)
+ .with_context(|| "failed to apply metadata to file".to_string())?;
+
+ // 'file' closes itself when dropped, so we move and drop it here explicitly
+ // after the remaining work on it was done
+ drop(file);
+
+ Ok::<(), Error>(())
+ }
+ .context(PxarExtractContext::ExtractFile)
+ }
+ };
+
+ self.send_to_worker(task)
+ .await
+ .context(PxarExtractContext::ExtractFile)
+ }
+}
diff --git a/pbs-client/src/pxar/aio/metadata.rs b/pbs-client/src/pxar/aio/metadata.rs
new file mode 100644
index 00000000..06927498
--- /dev/null
+++ b/pbs-client/src/pxar/aio/metadata.rs
@@ -0,0 +1,412 @@
+use std::ffi::{CStr, CString};
+use std::os::fd::{AsRawFd, RawFd};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::errno::Errno;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{Mode, UtimensatFlags};
+
+use proxmox_sys::c_result;
+use proxmox_sys::error::SysError;
+use proxmox_sys::fs::{self, acl, xattr};
+use pxar::Metadata;
+
+use super::ErrorHandler;
+use crate::pxar::Flags;
+
+// NOTE: The functions here are equivalent to crate::pxar::metadata and have
+// only been adapted to take a pxar::Entry directly.
+
+//
+// utility functions
+//
+
+fn allow_notsupp<E: SysError>(err: E) -> Result<(), E> {
+ if err.is_errno(Errno::EOPNOTSUPP) {
+ Ok(())
+ } else {
+ Err(err)
+ }
+}
+
+fn allow_notsupp_remember<E: SysError>(err: E, not_supp: &mut bool) -> Result<(), E> {
+ if err.is_errno(Errno::EOPNOTSUPP) {
+ *not_supp = true;
+ Ok(())
+ } else {
+ Err(err)
+ }
+}
+
+fn timestamp_to_update_timespec(mtime: &pxar::format::StatxTimestamp) -> [libc::timespec; 2] {
+ // restore mtime
+ const UTIME_OMIT: i64 = (1 << 30) - 2;
+
+ [
+ libc::timespec {
+ tv_sec: 0,
+ tv_nsec: UTIME_OMIT,
+ },
+ libc::timespec {
+ tv_sec: mtime.secs,
+ tv_nsec: mtime.nanos as _,
+ },
+ ]
+}
+
+/// Get the file permissions as `nix::Mode`
+pub fn perms_from_metadata(meta: &Metadata) -> Result<Mode, Error> {
+ let mode = meta.stat.get_permission_bits();
+
+ u32::try_from(mode)
+ .context("couldn't narrow permission bits")
+ .and_then(|mode| {
+ Mode::from_bits(mode)
+ .with_context(|| format!("mode contains illegal bits: 0x{:x} (0o{:o})", mode, mode))
+ })
+}
+
+//
+// metadata application:
+//
+
+pub fn apply_initial_flags(
+ feature_flags: Flags,
+ entry: &Arc<pxar::Entry>,
+ fd: RawFd,
+ error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+ let entry_flags = Flags::from_bits_truncate(entry.metadata().stat.flags);
+
+ apply_chattr(
+ fd,
+ entry_flags.to_initial_chattr(),
+ feature_flags.to_initial_chattr(),
+ )
+ .or_else(&**error_handler)
+}
+
+pub fn apply_at(
+ feature_flags: Flags,
+ entry: &Arc<pxar::Entry>,
+ parent: RawFd,
+ error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+ let fd = proxmox_sys::fd::openat(
+ &parent,
+ entry.file_name(),
+ OFlag::O_PATH | OFlag::O_CLOEXEC | OFlag::O_NOFOLLOW,
+ Mode::empty(),
+ )
+ .context("failed to open parent file descriptor")?;
+
+ apply(feature_flags, entry, fd.as_raw_fd(), error_handler)
+}
+
+pub fn apply(
+ feature_flags: Flags,
+ entry: &Arc<pxar::Entry>,
+ fd: RawFd,
+ error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+ let metadata = entry.metadata();
+ let c_proc_path = CString::new(format!("/proc/self/fd/{}", fd)).unwrap();
+
+ apply_ownership(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+ let mut skip_xattrs = false;
+ apply_xattrs(feature_flags, &c_proc_path, metadata, &mut skip_xattrs)
+ .or_else(&**error_handler)?;
+
+ add_fcaps(feature_flags, &c_proc_path, metadata, &mut skip_xattrs).or_else(&**error_handler)?;
+
+ apply_acls(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+ apply_quota_project_id(feature_flags, fd, metadata).or_else(&**error_handler)?;
+
+ // Finally mode and time. We may lose access with mode, but the changing the mode also
+ // affects times.
+ if !metadata.is_symlink() && feature_flags.contains(Flags::WITH_PERMISSIONS) {
+ let mode = perms_from_metadata(metadata)?;
+ nix::sys::stat::fchmod(fd, mode)
+ .or_else(allow_notsupp)
+ .context("failed to change file mode")
+ .or_else(&**error_handler)?;
+ }
+
+ let [atime, mtime] = timestamp_to_update_timespec(&metadata.stat.mtime);
+ let res = nix::sys::stat::utimensat(
+ None,
+ c_proc_path.as_ref(),
+ &atime.into(),
+ &mtime.into(),
+ UtimensatFlags::FollowSymlink,
+ );
+
+ match res {
+ Ok(_) => (),
+ Err(ref err) if err.is_errno(Errno::EOPNOTSUPP) => (),
+ Err(err) => {
+ let err = format_err!(err).context("failed to restore mtime attribute");
+ error_handler(err)?;
+ }
+ }
+
+ if metadata.stat.flags != 0 {
+ apply_flags(feature_flags, fd, metadata.stat.flags).or_else(&**error_handler)?;
+ }
+
+ Ok(())
+}
+
+fn apply_ownership(
+ feature_flags: Flags,
+ c_proc_path: &CStr,
+ metadata: &Metadata,
+) -> Result<(), Error> {
+ if !feature_flags.contains(Flags::WITH_OWNER) {
+ return Ok(());
+ }
+
+ // UID and GID first, as this fails if we lose access anyway.
+ nix::unistd::chown(
+ c_proc_path,
+ Some(metadata.stat.uid.into()),
+ Some(metadata.stat.gid.into()),
+ )
+ .or_else(allow_notsupp)
+ .context("failed to apply ownership")
+}
+
+fn add_fcaps(
+ feature_flags: Flags,
+ c_proc_path: &CStr,
+ metadata: &Metadata,
+ skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+ if *skip_xattrs || !feature_flags.contains(Flags::WITH_FCAPS) {
+ return Ok(());
+ }
+ let fcaps = match metadata.fcaps.as_ref() {
+ Some(fcaps) => fcaps,
+ None => return Ok(()),
+ };
+
+ c_result!(unsafe {
+ libc::setxattr(
+ c_proc_path.as_ptr(),
+ xattr::xattr_name_fcaps().as_ptr(),
+ fcaps.data.as_ptr() as *const libc::c_void,
+ fcaps.data.len(),
+ 0,
+ )
+ })
+ .map(drop)
+ .or_else(|err| allow_notsupp_remember(err, skip_xattrs))
+ .context("failed to apply file capabilities")
+}
+
+fn apply_xattrs(
+ feature_flags: Flags,
+ c_proc_path: &CStr,
+ metadata: &Metadata,
+ skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+ if *skip_xattrs || !feature_flags.contains(Flags::WITH_XATTRS) {
+ return Ok(());
+ }
+
+ for xattr in &metadata.xattrs {
+ if *skip_xattrs {
+ return Ok(());
+ }
+
+ if !xattr::is_valid_xattr_name(xattr.name()) {
+ log::info!("skipping invalid xattr named {:?}", xattr.name());
+ continue;
+ }
+
+ c_result!(unsafe {
+ libc::setxattr(
+ c_proc_path.as_ptr(),
+ xattr.name().as_ptr() as *const libc::c_char,
+ xattr.value().as_ptr() as *const libc::c_void,
+ xattr.value().len(),
+ 0,
+ )
+ })
+ .map(drop)
+ .or_else(|err| allow_notsupp_remember(err, &mut *skip_xattrs))
+ .context("failed to apply extended attributes")?;
+ }
+
+ Ok(())
+}
+
+fn apply_acls(feature_flags: Flags, c_proc_path: &CStr, metadata: &Metadata) -> Result<(), Error> {
+ if !feature_flags.contains(Flags::WITH_ACL) || metadata.acl.is_empty() {
+ return Ok(());
+ }
+
+ let mut acl = acl::ACL::init(5)?;
+
+ // acl type access:
+ acl.add_entry_full(
+ acl::ACL_USER_OBJ,
+ None,
+ acl::mode_user_to_acl_permissions(metadata.stat.mode),
+ )?;
+
+ acl.add_entry_full(
+ acl::ACL_OTHER,
+ None,
+ acl::mode_other_to_acl_permissions(metadata.stat.mode),
+ )?;
+
+ match metadata.acl.group_obj.as_ref() {
+ Some(group_obj) => {
+ acl.add_entry_full(
+ acl::ACL_MASK,
+ None,
+ acl::mode_group_to_acl_permissions(metadata.stat.mode),
+ )?;
+ acl.add_entry_full(acl::ACL_GROUP_OBJ, None, group_obj.permissions.0)?;
+ }
+ None => {
+ let mode = acl::mode_group_to_acl_permissions(metadata.stat.mode);
+
+ acl.add_entry_full(acl::ACL_GROUP_OBJ, None, mode)?;
+
+ if !metadata.acl.users.is_empty() || !metadata.acl.groups.is_empty() {
+ log::warn!("Warning: Missing GROUP_OBJ entry in ACL, resetting to value of MASK");
+ acl.add_entry_full(acl::ACL_MASK, None, mode)?;
+ }
+ }
+ }
+
+ for user in &metadata.acl.users {
+ acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+ }
+
+ for group in &metadata.acl.groups {
+ acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+ }
+
+ if !acl.is_valid() {
+ bail!("Error while restoring ACL - ACL invalid");
+ }
+
+ acl.set_file(c_proc_path, acl::ACL_TYPE_ACCESS)?;
+ drop(acl);
+
+ // acl type default:
+ if let Some(default) = metadata.acl.default.as_ref() {
+ let mut acl = acl::ACL::init(5)?;
+
+ acl.add_entry_full(acl::ACL_USER_OBJ, None, default.user_obj_permissions.0)?;
+
+ acl.add_entry_full(acl::ACL_GROUP_OBJ, None, default.group_obj_permissions.0)?;
+
+ acl.add_entry_full(acl::ACL_OTHER, None, default.other_permissions.0)?;
+
+ if default.mask_permissions != pxar::format::acl::Permissions::NO_MASK {
+ acl.add_entry_full(acl::ACL_MASK, None, default.mask_permissions.0)?;
+ }
+
+ for user in &metadata.acl.default_users {
+ acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+ }
+
+ for group in &metadata.acl.default_groups {
+ acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+ }
+
+ if !acl.is_valid() {
+ bail!("Error while restoring ACL - ACL invalid");
+ }
+
+ acl.set_file(c_proc_path, acl::ACL_TYPE_DEFAULT)?;
+ }
+
+ Ok(())
+}
+
+fn apply_quota_project_id(
+ feature_flags: Flags,
+ fd: RawFd,
+ metadata: &Metadata,
+) -> Result<(), Error> {
+ if !feature_flags.contains(Flags::WITH_QUOTA_PROJID) {
+ return Ok(());
+ }
+
+ let projid = match metadata.quota_project_id {
+ Some(projid) => projid,
+ None => return Ok(()),
+ };
+
+ let mut fsxattr = fs::FSXAttr::default();
+ unsafe {
+ fs::fs_ioc_fsgetxattr(fd, &mut fsxattr)
+ .context("error while getting fsxattr to restore quota project id")?;
+
+ fsxattr.fsx_projid = projid.projid as u32;
+
+ fs::fs_ioc_fssetxattr(fd, &fsxattr)
+ .context("error while setting fsxattr to restore quota project id")?;
+ }
+
+ Ok(())
+}
+
+fn errno_is_unsupported(errno: Errno) -> bool {
+ matches!(
+ errno,
+ Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL
+ )
+}
+
+fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> {
+ if chattr == 0 {
+ return Ok(());
+ }
+
+ let mut fattr: libc::c_long = 0;
+ match unsafe { fs::read_attr_fd(fd, &mut fattr) } {
+ Ok(_) => (),
+ Err(errno) if errno_is_unsupported(errno) => {
+ return Ok(());
+ }
+ Err(err) => return Err(err).context("failed to read file attributes"),
+ }
+
+ let attr = (chattr & mask) | (fattr & !mask);
+
+ if attr == fattr {
+ return Ok(());
+ }
+
+ match unsafe { fs::write_attr_fd(fd, &attr) } {
+ Ok(_) => Ok(()),
+ Err(errno) if errno_is_unsupported(errno) => Ok(()),
+ Err(err) => Err(err).context("failed to set file attributes"),
+ }
+}
+
+fn apply_flags(feature_flags: Flags, fd: RawFd, entry_flags: u64) -> Result<(), Error> {
+ let entry_flags = Flags::from_bits_truncate(entry_flags);
+
+ apply_chattr(fd, entry_flags.to_chattr(), feature_flags.to_chattr())?;
+
+ let fatattr = (feature_flags & entry_flags).to_fat_attr();
+ if fatattr != 0 {
+ match unsafe { fs::write_fat_attr_fd(fd, &fatattr) } {
+ Ok(_) => (),
+ Err(errno) if errno_is_unsupported(errno) => (),
+ Err(err) => return Err(err).context("failed to set file FAT attributes"),
+ }
+ }
+
+ Ok(())
+}
diff --git a/pbs-client/src/pxar/aio/mod.rs b/pbs-client/src/pxar/aio/mod.rs
new file mode 100644
index 00000000..dfa8d84a
--- /dev/null
+++ b/pbs-client/src/pxar/aio/mod.rs
@@ -0,0 +1,11 @@
+mod dir_stack;
+
+pub(crate) mod extract;
+pub use extract::{
+ AsyncExtractor, Callback, ErrorHandler, PxarExtractOptions, PxarExtractOptionsBuilder,
+ RawAsyncExtractor,
+};
+
+mod metadata;
+
+mod worker;
diff --git a/pbs-client/src/pxar/aio/worker.rs b/pbs-client/src/pxar/aio/worker.rs
new file mode 100644
index 00000000..791e3129
--- /dev/null
+++ b/pbs-client/src/pxar/aio/worker.rs
@@ -0,0 +1,167 @@
+#![allow(unused)]
+use std::sync::{Arc, Mutex};
+use std::thread::{self, JoinHandle};
+
+use anyhow::{bail, Error};
+use tokio::sync::mpsc;
+
+use super::ErrorHandler;
+
+type WorkerTask = Box<dyn FnOnce() -> Result<(), Error> + Send>;
+
+/// [`SyncTaskWorker`] is a wrapper around a [`Thread`][std::thread::Thread]
+/// with the purpose of running synchronous tasks provided by a queue.
+///
+/// A task is anything that implements [`FnOnce() -> Result<(), Error>`]
+/// and can be sent asynchronously to the worker via [`send()`].
+///
+/// As soon as a task returns an [`Error`], the worker will drop its channel,
+/// discarding any remaining tasks. Errors therefore either ought to be handled
+/// inside tasks themselves, or with an [explicitly provided][eh] [`ErrorHandler`].
+///
+/// [eh]: Self::with_error_handler
+pub(crate) struct SyncTaskWorker {
+ sender: Option<mpsc::Sender<WorkerTask>>,
+ join_handle: Option<Box<JoinHandle<()>>>,
+ task_error: Arc<Mutex<Option<Error>>>,
+}
+
+impl SyncTaskWorker {
+ pub fn new(queue_size: usize) -> Self {
+ Self::new_inner(queue_size, Arc::new(Box::new(Err)))
+ }
+
+ pub fn with_error_handler(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+ Self::new_inner(queue_size, error_handler)
+ }
+
+ fn new_inner(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+ let (sender, receiver) = mpsc::channel(queue_size);
+
+ let task_error = Arc::new(Mutex::new(None));
+
+ let worker_loop = {
+ let mut receiver = receiver;
+ let last_error = Arc::clone(&task_error);
+ let error_handler = Arc::clone(&error_handler);
+
+ move || loop {
+ let function: WorkerTask = if let Some(function) = receiver.blocking_recv() {
+ function
+ } else {
+ return;
+ };
+
+ let result = (function)().or_else(|error| error_handler(error));
+
+ match result {
+ Ok(()) => (),
+ Err(error) => {
+ let mut guard = last_error.lock().unwrap();
+ if guard.is_none() {
+ let error = error
+ .context("extractor worker thread encountered unexpected error");
+ *guard = Some(error);
+ }
+
+ drop(receiver);
+ return;
+ }
+ }
+ }
+ };
+
+ let join_handle = thread::spawn(worker_loop);
+
+ Self {
+ sender: Some(sender),
+ join_handle: Some(Box::new(join_handle)),
+ task_error,
+ }
+ }
+
+ #[inline]
+ fn check_for_error(&self) -> Result<(), Error> {
+ let mut guard = self.task_error.lock().unwrap();
+ if let Some(error) = guard.take() {
+ Err(error)
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Send a task to the worker to execute.
+ ///
+ /// In case any of the worker thread's previous tasks failed, its error
+ /// will be returned. Subsequent calls will not return the same error.
+ ///
+ /// Otherwise, this will fail if the worker's channel is already closed or
+ /// if its internal sender has already been dropped.
+ #[inline]
+ pub async fn send<F>(&self, task: F) -> Result<(), Error>
+ where
+ F: FnOnce() -> Result<(), Error> + Send + 'static,
+ {
+ self.check_for_error()?;
+
+ if let Some(ref sender) = self.sender {
+ match sender.send(Box::new(task)).await {
+ Ok(()) => Ok(()),
+ Err(_) => {
+ bail!("failed to send task input to worker - channel closed");
+ }
+ }
+ } else {
+ bail!("failed to send task input to worker - sender already dropped");
+ }
+ }
+
+ /// Get the current available capacity of the worker thread's queue.
+ #[inline]
+ pub fn queue_capacity(&self) -> Option<usize> {
+ self.sender.as_ref().map(|s| s.capacity())
+ }
+
+ /// Get the maximum capacity of the worker thread's queue.
+ #[inline]
+ pub fn max_queue_capacity(&self) -> Option<usize> {
+ self.sender.as_ref().map(|s| s.max_capacity())
+ }
+
+ /// Run all outstanding tasks until completion or until a task returns an error.
+ ///
+ /// The first encountered error will be returned, if it exists.
+ ///
+ /// Subsequent calls to [`join()`] will not have any effect.
+ #[inline]
+ pub fn join(&mut self) -> Result<(), Error> {
+ drop(self.sender.take());
+
+ let join_res = self
+ .join_handle
+ .take()
+ .map(|handle| handle.join())
+ .unwrap_or(Ok(()));
+
+ // Can't use `anyhow::Context` on `JoinError`, grr
+ match join_res {
+ Ok(()) => (),
+ Err(_) => bail!("extractor worker thread panicked"),
+ };
+
+ self.check_for_error()?;
+
+ Ok(())
+ }
+}
+
+// [`Drop`] is implemented to ensure the thread is joined appropriately when
+// the worker is dropped.
+//
+// Note however, that this will consume `last_error` (if any), so it's always
+// better to check for errors manually.
+impl Drop for SyncTaskWorker {
+ fn drop(&mut self) {
+ let _ = self.join();
+ }
+}
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 14674b9b..9060bc0e 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -47,6 +47,7 @@
//! (user, group, acl, ...) because this is already defined by the
//! linked `ENTRY`.
+pub mod aio;
pub(crate) mod create;
pub(crate) mod dir_stack;
pub(crate) mod extract;
--
2.39.2
More information about the pbs-devel
mailing list