[pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>`

Max Carrara m.carrara at proxmox.com
Mon Aug 28 16:42:03 CEST 2023


This preliminary commit adds a fully functioning prototype
implementation of `AsyncExtractor<T>`, which is - unsurprisingly -
a complete `async` implementation of the original `Extractor` (and its
accompanying `ExtractorIter`).

In order to limit the scope of this RFC and focus on its intent,
the `pbs_client::pxar::aio` module is introduced, which contains all
relevant changes (instead of refactoring half of the crate in a dozen
or two commits).

The design of the new extractor is split into three main parts:

  1. the higher-level `AsyncExtractor<T>` which exposes the public
     extraction API
  2. the lower-level `RawAsyncExtractor` which serves as the
     "workhorse" of its implementors, including `AsyncExtractor<T>`
  3. the `SyncTaskWorker` which receives blocking / synchronous tasks
     via a queue and runs them in parallel - this allows the
     `RawAsyncExtractor` to remain fully cooperative and non-blocking

Furthermore, by moving everything that can be considered an
*extraction option* into the `PxarExtractionOptions` `struct`, the
overall control flow and usage of `AsyncExtractor<T>` and
`RawAsyncExtractor` is much simpler to follow.

The `ErrorHandler` type alias is made more flexible by requiring
an `Fn` instead of an `FnMut`; the callback is no longer generic but
instead becomes its own threadsafe alias named `Callback`.

The following modules are adapted in a similar fashion, but remain
identical in their function:

  `pbs_client::pxar::dir_stack` --> `pbs_client::pxar::aio::dir_stack`

  * The original `PxarDirStack` is split in two parts; it now uses
    the new `DirStack<T>` in order to be somewhat more testable
  * Synchronization mechanisms and guarantees are put in place
    in regards to the new extractor implementation, as the
    `PxarDirStack` must now be shared between multiple threads or
    tasks

  `pbs_client::pxar::metadata`  --> `pbs_client::pxar::aio::metadata`

  * Function signatures now take an `&Arc<pxar::Entry>` in order to
    make it easier to be passed around threads.
    (There was no measurable difference between `Arc<pxar::Entry>`
    vs `&Arc<pxar::Entry>` vs `&pxar::Entry`)
  * The old `ErrorHandler` is replaced with the new version in
    `pbs_client::pxar::aio`

Signed-off-by: Max Carrara <m.carrara at proxmox.com>
---
 Cargo.toml                                   |   1 +
 pbs-client/Cargo.toml                        |   1 +
 pbs-client/src/pxar/aio/dir_stack.rs         | 543 +++++++++++++++++++
 pbs-client/src/pxar/aio/extract/extractor.rs | 446 +++++++++++++++
 pbs-client/src/pxar/aio/extract/mod.rs       | 220 ++++++++
 pbs-client/src/pxar/aio/extract/raw.rs       | 503 +++++++++++++++++
 pbs-client/src/pxar/aio/metadata.rs          | 412 ++++++++++++++
 pbs-client/src/pxar/aio/mod.rs               |  11 +
 pbs-client/src/pxar/aio/worker.rs            | 167 ++++++
 pbs-client/src/pxar/mod.rs                   |   1 +
 10 files changed, 2305 insertions(+)
 create mode 100644 pbs-client/src/pxar/aio/dir_stack.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/extractor.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/raw.rs
 create mode 100644 pbs-client/src/pxar/aio/metadata.rs
 create mode 100644 pbs-client/src/pxar/aio/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/worker.rs

diff --git a/Cargo.toml b/Cargo.toml
index c7773f0e..ba6d874a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -122,6 +122,7 @@ hyper = { version = "0.14", features = [ "full" ] }
 lazy_static = "1.4"
 libc = "0.2"
 log = "0.4.17"
+memchr = "2.5"
 nix = "0.26.1"
 nom = "7"
 num-traits = "0.2"
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index ed7d651d..047efb41 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -17,6 +17,7 @@ hyper.workspace = true
 lazy_static.workspace = true
 libc.workspace = true
 log.workspace = true
+memchr.workspace = true
 nix.workspace = true
 openssl.workspace = true
 percent-encoding.workspace = true
diff --git a/pbs-client/src/pxar/aio/dir_stack.rs b/pbs-client/src/pxar/aio/dir_stack.rs
new file mode 100644
index 00000000..62cf9ee5
--- /dev/null
+++ b/pbs-client/src/pxar/aio/dir_stack.rs
@@ -0,0 +1,543 @@
+#![allow(unused)]
+
+use std::ffi::OsStr;
+use std::fmt::Debug;
+use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::dir::Dir;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{mkdirat, Mode};
+
+use proxmox_sys::error::SysError;
+
+// NOTE: This is essentially crate::pxar::tools:assert_single_path_component
+// but kept separate here for the time being
+pub fn assert_path_has_single_normal_component<P: AsRef<Path>>(path: P) -> Result<(), Error> {
+    let path = path.as_ref();
+
+    if !path.is_relative() {
+        bail!("path is absolute: {:?}", path)
+    }
+
+    let mut components = path.components();
+
+    if !matches!(components.next(), Some(std::path::Component::Normal(_))) {
+        bail!("invalid component in path: {:?}", path)
+    }
+
+    if components.next().is_some() {
+        bail!("path has multiple components: {:?}", path)
+    }
+
+    Ok(())
+}
+
+/// A stack which stores directory path components and associates each directory
+/// with some data.
+pub struct DirStack<T> {
+    path: PathBuf,
+    data_stack: Vec<T>,
+}
+
+impl<T> Default for DirStack<T> {
+    #[inline]
+    fn default() -> Self {
+        Self {
+            path: PathBuf::new(),
+            data_stack: vec![],
+        }
+    }
+}
+
+impl<T: Clone> Clone for DirStack<T> {
+    #[inline]
+    fn clone(&self) -> Self {
+        Self {
+            path: self.path.clone(),
+            data_stack: self.data_stack.clone(),
+        }
+    }
+}
+
+impl<T: Debug> Debug for DirStack<T> {
+    #[inline]
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("DirStack")
+            .field("path", &self.path)
+            .field("data_stack", &self.data_stack)
+            .finish()
+    }
+}
+
+impl<T> DirStack<T> {
+    /// Returns a [`Path`] slice of the entire path that's on the stack.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be included if it exists.
+    #[inline]
+    pub fn as_path(&self) -> &Path {
+        self.path.as_path()
+    }
+
+    /// Removes all data from the stack.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+    #[inline]
+    pub fn clear(&mut self) {
+        while self.pop().is_some() {}
+    }
+
+    /// Returns the [dangling root](DirStack::with_dangling_root) or an empty
+    /// path if it doesn't exist.
+    #[inline]
+    pub fn dangling_root(&self) -> &Path {
+        if self.data_stack.is_empty() {
+            self.path.as_ref()
+        } else {
+            let mut iter = self.path.iter();
+            iter.nth_back(self.data_stack.len() - 1);
+            iter.as_path()
+        }
+    }
+
+    /// Returns the first path component and its associated data.
+    ///
+    /// If the stack has a [dangling root](DirStack::with_dangling_root), it
+    /// will be ignored. Meaning, only the first *added* path component will
+    /// be returned.
+    #[inline]
+    pub fn first(&self) -> Option<(&Path, &T)> {
+        self.data_stack.first().and_then(|data| {
+            self.path
+                .iter()
+                .nth_back(self.data_stack.len() - 1)
+                .map(|component| (component.as_ref(), data))
+        })
+    }
+
+    /// Checks whether the stack is empty.
+    ///
+    /// The stack is considered empty if there are no path components with
+    /// associated data, even if it has a [dangling root](DirStack::with_dangling_root).
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.data_stack.is_empty()
+    }
+
+    /// Returns the last path component and its associated data.
+    #[inline]
+    pub fn last(&self) -> Option<(&Path, &T)> {
+        self.data_stack.last().and_then(|data| {
+            self.path
+                .file_name()
+                .map(|file_name| (file_name.as_ref(), data))
+        })
+    }
+
+    /// Returns the number of elements on the stack.
+    ///
+    /// Note that the root path (the very first path) may consist of multiple components,
+    /// but is still counted as one element.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.data_stack.len()
+    }
+
+    /// Creates a new, empty [`DirStack`].
+    #[inline]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Returns the **full path** before [`DirStack::last()`] and the data associated
+    /// with the component it leads to, **if both exist.**
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// use std::path::Path;
+    ///
+    /// # use tools::DirStack;
+    ///
+    /// let mut dir_stack: DirStack<u8> = DirStack::new();
+    ///
+    /// dir_stack.push("foo", 1);
+    /// dir_stack.push("bar", 2);
+    /// dir_stack.push("baz", 3);
+    ///
+    /// let expected = Path::new("foo/bar");
+    ///
+    /// assert_eq!(dir_stack.parent(), Some((expected, &2)));
+    ///
+    /// ```
+    #[inline]
+    pub fn parent(&self) -> Option<(&Path, &T)> {
+        if self.is_empty() {
+            None
+        } else {
+            self.path.parent().and_then(|parent| {
+                if self.data_stack.len() > 1 {
+                    Some((parent, &self.data_stack[self.data_stack.len() - 2]))
+                } else {
+                    None
+                }
+            })
+        }
+    }
+    /// Removes the last path component from the stack and returns its associated
+    /// data, or [`None`] if the stack is empty.
+    #[inline]
+    pub fn pop(&mut self) -> Option<T> {
+        self.data_stack.pop().map(|data| {
+            self.path.pop();
+            data
+        })
+    }
+
+    /// Pushes a path and its associated data onto the stack.
+    ///
+    /// The path must consist of a single normal component.
+    /// See [`Component::Normal`][std::path::Component::Normal].
+    #[inline]
+    pub fn push<P: AsRef<Path>>(&mut self, path: P, data: T) -> Result<(), Error> {
+        assert_path_has_single_normal_component(&path)?;
+
+        self.path.push(path.as_ref());
+        self.data_stack.push(data);
+
+        Ok(())
+    }
+
+    /// Clears the stack, replacing all elements with the given path and that path's
+    /// associated data.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+    ///
+    /// Contrary to [`DirStack::push()`], this can be any path and doesn't have to consist of
+    /// only a single path component. This is useful if you want to e.g. initialize the stack
+    /// with your current working directory or similar.
+    #[inline]
+    pub fn replace<P: AsRef<Path>>(&mut self, path: P, data: T) {
+        self.clear();
+        self.path.push(path);
+        self.data_stack.push(data);
+    }
+
+    /// Creates a new [`DirStack`] from a path without any associated data.
+    /// This stack has a **dangling root** which means that even though it has a
+    /// path, it is still considered to be empty, as it lacks any data.
+    ///
+    /// This is useful if you want to have a base directory that can never
+    /// be left through [`DirStack::pop()`], such as your current working directory,
+    /// for example.
+    #[inline]
+    pub fn with_dangling_root<P: AsRef<Path>>(path: P) -> Self {
+        Self {
+            path: path.as_ref().to_owned(),
+            data_stack: vec![],
+        }
+    }
+}
+
+impl<T, Idx> std::ops::Index<Idx> for DirStack<T>
+where
+    Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+    type Output = T;
+
+    #[inline]
+    fn index(&self, index: Idx) -> &Self::Output {
+        &self.data_stack[index]
+    }
+}
+
+impl<T, Idx> std::ops::IndexMut<Idx> for DirStack<T>
+where
+    Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+    #[inline]
+    fn index_mut(&mut self, index: Idx) -> &mut Self::Output {
+        &mut self.data_stack[index]
+    }
+}
+
+/// Helper struct that associates a [`Entry`][e] with a potentially opened directory [`Dir`].
+///
+/// It's the developer's responsibility that the contained [`Entry`][e]'s *kind*
+/// is actually a [`Directory`][d], and that the opened [`Dir`] corresponds to the
+/// provided [`Entry`][e].
+///
+/// [e]: pxar::Entry
+/// [d]: pxar::EntryKind::Directory
+#[derive(Debug)]
+pub(super) struct PxarDir {
+    entry: Arc<pxar::Entry>,
+    dir: Option<Dir>,
+}
+
+impl PxarDir {
+    /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry].
+    #[inline]
+    pub fn new(entry: Arc<pxar::Entry>) -> Self {
+        Self { entry, dir: None }
+    }
+
+    /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry]
+    /// which is associated with a [`Dir`].
+    #[inline]
+    pub fn with_dir(entry: Arc<pxar::Entry>, dir: Dir) -> Self {
+        Self {
+            entry,
+            dir: Some(dir),
+        }
+    }
+
+    /// Return the file name of the inner [`Entry`][pxar::Entry].
+    #[inline]
+    pub fn file_name(&self) -> &OsStr {
+        self.entry.file_name()
+    }
+
+    /// Return a [`BorrowedFd`] to the inner [`Dir`] if available.
+    #[inline]
+    pub fn try_as_borrowed_fd(&self) -> Option<BorrowedFd> {
+        // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+        self.dir
+            .as_ref()
+            .map(|dir| unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) })
+    }
+
+    /// Moves the inner [`Entry`][pxar::Entry] and [`Option<Dir>`][dir] out of the [`PxarDir`].
+    ///
+    /// [dir]: nix::dir::Dir
+    #[inline]
+    pub fn into_inner(self) -> (Arc<pxar::Entry>, Option<Dir>) {
+        (self.entry, self.dir)
+    }
+
+    #[inline]
+    fn create_at(&mut self, parent: RawFd, allow_existing: bool) -> Result<BorrowedFd, Error> {
+        const PERMS: Mode = Mode::from_bits_truncate(0o700);
+
+        match mkdirat(parent, self.file_name(), PERMS) {
+            Ok(()) => (),
+            Err(error) => {
+                if !(allow_existing && error.already_exists()) {
+                    return Err(error).context("directory already exists");
+                }
+            }
+        }
+
+        self.open_at(parent)
+    }
+
+    #[inline]
+    fn open_at(&mut self, parent: RawFd) -> Result<BorrowedFd, Error> {
+        Dir::openat(parent, self.file_name(), OFlag::O_DIRECTORY, Mode::empty())
+            .context("failed to open directory")
+            .map(|dir| {
+                // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+                let fd = unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) };
+
+                self.dir = Some(dir);
+
+                fd
+            })
+    }
+}
+
+/// The [`PxarDirStack`] is used to keep track of traversed and created [`PxarDir`s][PxarDir].
+#[derive(Debug)]
+pub(super) struct PxarDirStack {
+    inner: DirStack<PxarDir>,
+    len_created: usize,
+}
+
+/// This struct may safely be `Sync` as it is only used in the context of the
+/// [`RawAsyncExtractor`][super::RawAsyncExtractor], which never accesses an
+/// underlying [`Dir`] concurrently or in parallel.
+unsafe impl Sync for PxarDirStack {}
+
+impl PxarDirStack {
+    #[inline]
+    pub fn new() -> Self {
+        Self {
+            inner: DirStack::new(),
+            len_created: 0,
+        }
+    }
+
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    #[inline]
+    pub fn as_path(&self) -> &Path {
+        self.inner.as_path()
+    }
+
+    #[inline]
+    pub fn push<P: AsRef<Path>>(&mut self, path: P, dir: PxarDir) -> Result<(), Error> {
+        let path = path.as_ref();
+        self.inner.push(path, dir).map(|_| {
+            if self.inner[self.inner.len() - 1].dir.is_some() {
+                self.len_created += 1;
+            }
+        })
+    }
+
+    #[inline]
+    pub fn pop(&mut self) -> Option<PxarDir> {
+        self.inner.pop().map(|dir| {
+            self.len_created = self.len_created.min(self.len());
+            dir
+        })
+    }
+
+    #[inline]
+    pub fn root_dir_fd(&self) -> Result<BorrowedFd, Error> {
+        self.inner
+            .first()
+            .ok_or_else(|| format_err!("stack underrun"))
+            .map(|(_, pxar_dir)| {
+                pxar_dir
+                    .try_as_borrowed_fd()
+                    .context("lost track of directory file descriptors")
+            })?
+    }
+
+    /// Return the last [`Dir`]'s file descriptor as a [`BorrowedFd`]
+    /// or create it if it's not available.
+    #[inline]
+    pub fn last_dir_fd(&mut self, allow_existing: bool) -> Result<BorrowedFd, Error> {
+        if self.is_empty() {
+            bail!("no directory entries on the stack")
+        }
+
+        if self.len_created == 0 {
+            bail!("no created file descriptors on the stack")
+        }
+
+        let mut fd = self.inner[self.len_created - 1]
+            .try_as_borrowed_fd()
+            .context("lost track of directory file descriptors")?
+            .as_raw_fd();
+
+        while self.len_created < self.len() {
+            fd = self.inner[self.len_created]
+                .create_at(fd, allow_existing)
+                .map(|borrowed_fd| borrowed_fd.as_raw_fd())?;
+            self.len_created += 1;
+        }
+
+        self.inner[self.len_created - 1]
+            .try_as_borrowed_fd()
+            .context("lost track of directory file descriptors")
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::ffi::OsStr;
+    use std::path::Path;
+
+    use super::DirStack;
+
+    // helper extension trait to make asserts more concise
+    trait PathHelper {
+        fn as_path(&self) -> &Path;
+    }
+
+    impl<T> PathHelper for T
+    where
+        T: AsRef<OsStr>,
+    {
+        fn as_path(&self) -> &Path {
+            self.as_ref().as_ref()
+        }
+    }
+
+    #[test]
+    fn test_dir_stack_base() {
+        let mut dir_stack: DirStack<u8> = DirStack::new();
+
+        assert!(
+            dir_stack.push("foo", 1).is_ok(),
+            "couldn't push onto stack!"
+        );
+
+        dir_stack.push("bar", 2).unwrap();
+        dir_stack.push("baz", 3).unwrap();
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+
+        assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+        assert_eq!(dir_stack.last(), Some(("baz".as_path(), &3)));
+        assert_eq!(dir_stack.parent(), Some(("foo/bar".as_path(), &2)));
+
+        assert_eq!(dir_stack.pop(), Some(3));
+
+        assert_eq!(dir_stack.last(), Some(("bar".as_path(), &2)));
+        assert_eq!(dir_stack.parent(), Some(("foo".as_path(), &1)));
+
+        assert_eq!(dir_stack.pop(), Some(2));
+
+        assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+        assert_eq!(dir_stack.last(), Some(("foo".as_path(), &1)));
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.pop(), Some(1));
+
+        assert!(dir_stack.first().is_none());
+        assert!(dir_stack.last().is_none());
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "".as_path());
+
+        dir_stack.push("foo", 1).unwrap();
+        dir_stack.clear();
+
+        assert_eq!(dir_stack.as_path(), "".as_path());
+    }
+
+    #[test]
+    fn test_dir_stack_dangling_root() {
+        let mut dir_stack: DirStack<u8> = DirStack::with_dangling_root("foo/bar/baz");
+
+        assert!(dir_stack.is_empty());
+
+        assert!(dir_stack.push("qux", 1).is_ok());
+
+        let expected = Some(("qux".as_path(), &1u8));
+
+        assert_eq!(dir_stack.first(), expected);
+        assert_eq!(dir_stack.last(), expected);
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz/qux".as_path());
+        assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+        assert!(!dir_stack.is_empty());
+
+        assert_eq!(dir_stack.pop(), Some(1));
+
+        assert!(dir_stack.first().is_none());
+        assert!(dir_stack.last().is_none());
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+        assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+        assert!(dir_stack.is_empty());
+
+        let empty_dir_stack: DirStack<u8> = DirStack::new();
+        assert!(empty_dir_stack.is_empty());
+        assert_eq!(empty_dir_stack.dangling_root(), "".as_path())
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/extractor.rs b/pbs-client/src/pxar/aio/extract/extractor.rs
new file mode 100644
index 00000000..944ec157
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/extractor.rs
@@ -0,0 +1,446 @@
+use std::os::unix::prelude::OsStrExt;
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error, Result};
+use memchr;
+use pathpatterns::{MatchList, MatchType};
+
+use pxar::accessor::aio::Accessor;
+use pxar::accessor::SeqReadAtAdapter;
+use pxar::decoder::aio::Decoder;
+use pxar::decoder::SeqRead;
+
+use super::raw::RawAsyncExtractor;
+use super::PxarExtractOptions;
+
+use crate::pxar::Flags;
+
+/// Helper enum that represents the state of the root [`Entry`][pxar::Entry]
+/// in the context of the [`AsyncExtractor<T>`].
+///
+/// For example: Because the [`Accessor<T>`] skips the archive's root directory once a
+/// [`Decoder<T>`] is instantiated from it, the root entry has to be decoded
+/// and provided beforehand.
+#[derive(Debug)]
+pub(crate) enum RootEntryState {
+    None,
+    Decoded { entry: Arc<pxar::Entry> },
+    Extracted,
+}
+
+pub struct AsyncExtractor<T> {
+    decoder: Box<Decoder<T>>,
+    inner: Box<RawAsyncExtractor>,
+
+    options: Arc<PxarExtractOptions>,
+
+    root_entry_state: RootEntryState,
+    end_reached: bool,
+
+    match_stack: Vec<bool>,
+    current_match: bool,
+}
+
+type FileReader = pxar::accessor::sync::FileRefReader<Arc<std::fs::File>>;
+
+impl AsyncExtractor<SeqReadAtAdapter<FileReader>> {
+    /// Create a new extractor from an existing pxar archive file.
+    ///
+    /// This is the preferred way to extract an archive that has been saved to
+    /// local storage, taking advantage of the fact that the archive is available on a drive.
+    pub async fn with_file<P>(src_path: P, options: PxarExtractOptions) -> Result<Self, Error>
+    where
+        P: AsRef<Path>,
+    {
+        let src_path = src_path.as_ref();
+
+        let file = std::fs::File::open(src_path)
+            .with_context(|| format!("failed to open file: {:#?}", src_path))?;
+
+        // allows us to toss the accessor away once we've gotten our decoder from it
+        let file = Arc::new(file);
+
+        let accessor = Accessor::from_file_ref(file)
+            .await
+            .context("failed to instantiate pxar accessor")?;
+
+        // root entry needs to be pre-decoded because `decode_full()` skips it
+        let root_dir = accessor.open_root().await?;
+        let root_entry = root_dir.lookup_self().await?.entry().clone();
+
+        let decoder = root_dir
+            .decode_full()
+            .await
+            .context("failed to instantiate decoder from accessor")?;
+
+        Ok(Self::with_root_entry(decoder, options, root_entry))
+    }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+    /// Create a new extractor from an existing [`Decoder`].
+    /// It is assumed that no entries were decoded prior to the creation of the extractor.
+    pub fn new(decoder: Decoder<T>, options: PxarExtractOptions) -> Self {
+        let root_entry_state = RootEntryState::None;
+        Self::new_impl(decoder, options, root_entry_state)
+    }
+
+    /// Create a new extractor from an existing [`Decoder`] and a pre-decoded root
+    /// [`Entry`][pxar::Entry]. This is useful if you need to handle the extraction
+    /// of the archive's root in a special manner.
+    pub fn with_root_entry(
+        decoder: Decoder<T>,
+        options: PxarExtractOptions,
+        root_entry: pxar::Entry,
+    ) -> Self {
+        let root_entry_state = RootEntryState::Decoded {
+            entry: Arc::new(root_entry),
+        };
+
+        Self::new_impl(decoder, options, root_entry_state)
+    }
+
+    /// Create a new extractor from an arbitrary input that implements [`SeqRead`].
+    pub async fn with_input(input: T, options: PxarExtractOptions) -> Result<Self, Error> {
+        let decoder = Decoder::new(input)
+            .await
+            .context("failed to instantiate decoder")?;
+
+        let root_entry_state = RootEntryState::None;
+
+        Ok(Self::new_impl(decoder, options, root_entry_state))
+    }
+
+    fn new_impl(
+        mut decoder: Decoder<T>,
+        options: PxarExtractOptions,
+        root_entry_state: RootEntryState,
+    ) -> Self {
+        decoder.enable_goodbye_entries(true);
+
+        let current_match = options.default_match;
+
+        Self {
+            decoder: Box::new(decoder),
+            inner: Box::new(RawAsyncExtractor::new()),
+            options: Arc::new(options),
+            root_entry_state,
+            end_reached: false,
+            match_stack: vec![],
+            current_match,
+        }
+    }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+    /// Decode and extract the next [`Entry`][pxar::Entry].
+    ///
+    /// Upon each call, this function will continue to extract entries until
+    /// either an error is encountered or the archive has been fully extracted.
+    #[inline]
+    pub async fn next(&mut self) -> Option<Result<(), Error>> {
+        if self.end_reached {
+            return None;
+        }
+
+        if !matches!(self.root_entry_state, RootEntryState::Extracted) {
+            return Some(self.handle_root_entry().await);
+        }
+
+        let entry = match self.decode_next().await {
+            Some(Ok(entry)) => entry,
+            // other cases are already handled in `decode_next()` so we can just return here
+            rval => {
+                return rval.map(|result| result.map(drop));
+            }
+        };
+
+        let mut result = match self.extract_next(entry).await {
+            Err(error) => self.exec_error_handler(error).await,
+            res => res,
+        };
+
+        if result.is_err() {
+            if let Err(stop_error) = self.stop_extraction() {
+                result = result
+                    .context(stop_error)
+                    .context("encountered another error during extractor shutdown");
+            }
+        }
+
+        Some(result)
+    }
+
+    /// Helper method that starts the extraction process by extracting the root
+    /// directory of the archive to the given destination.
+    #[inline]
+    async fn start_extraction(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+        if matches!(self.root_entry_state, RootEntryState::Extracted) {
+            bail!("root entry of archive was already extracted")
+        }
+
+        self.inner.start_worker_thread(&self.options)?;
+
+        self.inner
+            .extract_root_dir(entry, &self.options)
+            .await
+            .map(|_| {
+                self.root_entry_state = RootEntryState::Extracted;
+            })
+    }
+
+    /// Helper method that stops the extraction process by
+    #[inline]
+    fn stop_extraction(&mut self) -> Result<(), Error> {
+        self.end_reached = true;
+        proxmox_async::runtime::block_in_place(|| self.inner.join_worker_thread())
+    }
+
+    /// Decodes the next [`Entry`][pxar::Entry] and wraps it in an [`Arc`] on success.
+    ///
+    /// If an error occurs while decoding an entry, the extractor is subsequently stopped.
+    #[inline(always)] // hot path
+    async fn decode_next(&mut self) -> Option<Result<Arc<pxar::Entry>, Error>> {
+        let entry = match self.decoder.next().await {
+            None => {
+                return if let Err(error) = self.stop_extraction() {
+                    Some(Err(error))
+                } else {
+                    None
+                };
+            }
+            Some(Err(error)) => {
+                let result = Err(error).context("error reading pxar archive");
+
+                let result = if let Err(stop_error) = self.stop_extraction() {
+                    result
+                        .context(stop_error)
+                        .context("encountered another error during extractor shutdown")
+                } else {
+                    result
+                };
+
+                return Some(result);
+            }
+            Some(Ok(entry)) => entry,
+        };
+
+        Some(Ok(Arc::new(entry)))
+    }
+
+    /// Helper method that extracts the archive's root entry depending on the
+    /// [`RootEntryState`].
+    ///
+    /// If the root pxar entry is decoded or was provided, the extraction process
+    /// is started via [`RawAsyncExtractor::start_extraction()`].
+    async fn handle_root_entry(&mut self) -> Result<(), Error> {
+        let decode_result = match self.root_entry_state {
+            RootEntryState::None => match self.decode_next().await {
+                Some(result) => result.context("error while decoding root entry"),
+                None => Err(format_err!("no root entry found - archive is empty")),
+            },
+            RootEntryState::Decoded { ref entry } => Ok(Arc::clone(entry)),
+            RootEntryState::Extracted => Err(format_err!("root entry was already extracted")),
+        };
+
+        let entry = decode_result.map_err(|error| {
+            let _ = self.stop_extraction();
+            error
+        })?;
+
+        self.start_extraction(entry).await.map_err(|error| {
+            let _ = self.stop_extraction();
+            error
+        })
+    }
+
+    /// Extract an [`Entry`][pxar::Entry] depending on its [`EntryKind`][pxar::EntryKind].
+    ///
+    ///
+    /// This method checks each entry's filename and matches it against the given
+    /// [`MatchList`]. Once an entry is safe to extract and matches, the
+    /// [`Callback`][super::Callback] is executed and the underlying [`RawAsyncExtractor`]
+    /// is used to perform the actual extraction operation.
+    #[inline(always)] // hot path
+    async fn extract_next(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+        Self::check_entry_filename(&entry)?;
+
+        let (match_type, did_match) = self.match_entry(&entry);
+
+        use pxar::EntryKind::*;
+
+        match (did_match, entry.kind()) {
+            (_, Directory) => {
+                self.exec_callback(&entry).await;
+
+                let do_create = self.current_match && match_type != Some(MatchType::Exclude);
+
+                let res = self
+                    .inner
+                    .enter_directory(entry, &self.options, do_create)
+                    .await;
+
+                if res.is_ok() {
+                    // We're starting a new directory, push our old matching
+                    // state and replace it with our new one:
+                    self.match_stack.push(self.current_match);
+                    self.current_match = did_match;
+                }
+
+                res
+            }
+            (_, GoodbyeTable) => {
+                self.exec_callback(&entry).await;
+                let res = self.inner.leave_directory(&self.options).await;
+
+                if res.is_ok() {
+                    // We left a directory, also get back our previous matching state. This is in sync
+                    // with `dir_stack` so this should never be empty except for the final goodbye
+                    // table, in which case we get back to the default of `true`.
+                    self.current_match = self.match_stack.pop().unwrap_or(true);
+                }
+
+                res
+            }
+            (true, Symlink(_)) => {
+                self.exec_callback(&entry).await;
+
+                self.inner.extract_symlink(entry, &self.options).await
+            }
+            (true, Hardlink(_)) => {
+                self.exec_callback(&entry).await;
+
+                self.inner.extract_hardlink(entry, &self.options).await
+            }
+            (true, Device(dev)) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_DEVICE_NODES) {
+                    let dev = dev.to_owned();
+                    self.inner.extract_device(entry, &self.options, dev).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, Fifo) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_FIFOS) {
+                    self.inner.extract_fifo(entry, &self.options, 0).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, Socket) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_SOCKETS) {
+                    self.inner.extract_socket(entry, &self.options, 0).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, File { size, .. }) => {
+                self.exec_callback(&entry).await;
+
+                if let Some(ref mut contents) = self.decoder.contents() {
+                    let size = size.to_owned();
+                    self.inner
+                        .extract_file(entry, &self.options, contents, size)
+                        .await
+                } else {
+                    Err(format_err!(
+                        "found regular file entry without contents in archive"
+                    ))
+                }
+            }
+            (false, _) => Ok(()),
+        }
+    }
+
+    /// Checks whether the [`Entry`'s][e] filename is valid.
+    ///
+    /// The filename is valid if and only if:
+    /// * it doesn't contain slashes `/`
+    /// * it doesn't contain null bytes `\0`
+    ///
+    /// [e]: pxar::Entry
+    fn check_entry_filename(entry: &pxar::Entry) -> Result<(), Error> {
+        let file_name_bytes = entry.file_name().as_bytes();
+
+        if let Some(pos) = memchr::memchr(b'/', file_name_bytes) {
+            bail!(
+                "archive entry filename contains slash at position {pos}, \
+                which is invalid and a security concern"
+            )
+        }
+
+        if let Some(pos) = memchr::memchr(0, file_name_bytes) {
+            bail!("archive entry filename contains NULL byte at position {pos}")
+        }
+
+        Ok(())
+    }
+
+    /// Helper method that checks whether the [`Entry`'s][pxar::Entry] path and file mode
+    /// match the provided [`MatchList`] in [`PxarExtractOptions`].
+    ///
+    /// Whether the entry actually matched is also returned together with the
+    /// [`Option<MatchType>`][MatchType]. If the latter is [`None`], the
+    /// **current match** is used as a fallback.
+    #[inline]
+    pub(crate) fn match_entry(&self, entry: &pxar::Entry) -> (Option<MatchType>, bool) {
+        let path_bytes = entry.path().as_os_str().as_bytes();
+        let file_mode = entry.metadata().file_type() as u32;
+
+        // NOTE: On large match lists this blocks for quite some time,
+        // so this could maybe be called in spawn_blocking() above a certain size
+        // ... buuut that depends on how we define what a "large" `match_list` actually is
+
+        // We can `unwrap()` safely here because we get a `Result<_, Infallible>`
+        let match_type = self
+            .options
+            .match_list
+            .matches(path_bytes, Some(file_mode))
+            .unwrap();
+
+        let did_match = match match_type {
+            Some(MatchType::Include) => true,
+            Some(MatchType::Exclude) => false,
+            None => self.current_match,
+        };
+
+        (match_type, did_match)
+    }
+
+    #[inline]
+    fn has_feature_flags(&self, feature_flags: Flags) -> bool {
+        self.options.feature_flags.contains(feature_flags)
+    }
+
+    /// Helper method to spawn and await a task that executes the extractor's
+    /// [`ErrorHandler`][super::ErrorHandler].
+    #[inline]
+    async fn exec_error_handler(&self, error: Error) -> Result<(), Error> {
+        tokio::task::spawn_blocking({
+            let error_handler = Arc::clone(&self.options.error_handler);
+            move || error_handler(error)
+        })
+        .await
+        .context("failed to execute error handler")?
+    }
+
+    /// Helper method to spawn and await a task that executes the extractor's
+    /// [`Callback`][super::Callback].
+    #[inline]
+    async fn exec_callback(&self, entry: &Arc<pxar::Entry>) {
+        if let Some(ref callback) = self.options.callback {
+            tokio::task::spawn({
+                let callback = Arc::clone(callback);
+                let entry = Arc::clone(entry);
+                async move { callback(entry).await }
+            });
+        }
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/mod.rs b/pbs-client/src/pxar/aio/extract/mod.rs
new file mode 100644
index 00000000..82d9a596
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/mod.rs
@@ -0,0 +1,220 @@
+use std::borrow::Cow;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use futures::future::BoxFuture;
+use lazy_static::lazy_static;
+
+use pathpatterns::MatchEntry;
+
+use crate::pxar::Flags;
+
+pub(crate) mod extractor;
+pub use extractor::AsyncExtractor;
+
+pub(crate) mod raw;
+pub use raw::RawAsyncExtractor;
+
+// NOTE: These will be put into a separate module and are only used here
+//       to avoid (too much) code duplication
+use crate::pxar::extract::OverwriteFlags;
+use crate::pxar::extract::PxarExtractContext;
+
+pub type ErrorHandler =
+    Box<dyn Fn(anyhow::Error) -> Result<(), anyhow::Error> + Send + Sync + 'static>;
+
+pub type Callback = Box<dyn Fn(Arc<pxar::Entry>) -> BoxFuture<'static, ()> + Send + Sync + 'static>;
+
+lazy_static! {
+    static ref DEFAULT_ERROR_HANDLER: Arc<ErrorHandler> = Arc::new(Box::new(Err));
+}
+
+/// Options for extracting a pxar archive.
+pub struct PxarExtractOptions {
+    /// The destination directory to which the archive should be extracted.
+    pub destination: PathBuf,
+
+    /// The flags that control what's extracted from the archive.
+    pub feature_flags: Flags,
+
+    /// The flags that control what kind of file system entries may or may not
+    /// be overwritten.
+    pub overwrite_flags: OverwriteFlags,
+
+    /// Whether to allow already existing directories or not.
+    pub allow_existing_dirs: bool,
+
+    /// The initial matching case for extracted entries.
+    pub default_match: bool,
+
+    /// A list of match entries. Each [`MatchEntry`] lets you control which files
+    /// should or shouldn't be extracted.
+    pub match_list: Vec<MatchEntry>,
+
+    /// A boxed closure that's used to handle errors throughout the entire
+    /// extraction process.
+    pub error_handler: Arc<ErrorHandler>,
+
+    /// An optional future that is called whenever a [`pxar::Entry`] matches
+    /// and is about to be extracted.
+    ///
+    /// **Note that this future is spawned as a task and not awaited until completion.**
+    pub callback: Option<Arc<Callback>>,
+}
+
+impl PxarExtractOptions {
+    pub fn builder<D>(destination: D) -> PxarExtractOptionsBuilder
+    where
+        D: Into<Cow<'static, Path>>,
+    {
+        PxarExtractOptionsBuilder::new(destination)
+    }
+}
+
+impl std::fmt::Debug for PxarExtractOptions {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let error_handler_msg = if Arc::ptr_eq(&self.error_handler, &DEFAULT_ERROR_HANDLER) {
+            &"default (no-op)"
+        } else {
+            &"custom"
+        };
+
+        f.debug_struct("PxarExtractOptions")
+            .field("destination", &self.destination)
+            .field("feature_flags", &self.feature_flags)
+            .field("overwrite_flags", &self.overwrite_flags)
+            .field("allow_existing_dirs", &self.allow_existing_dirs)
+            .field("default_match", &self.default_match)
+            .field("match_list", &self.match_list)
+            .field("error_handler", error_handler_msg)
+            .field("callback", &self.callback.is_some())
+            .finish()
+    }
+}
+
+/// This builder is used to configure the behaviour of the [`AsyncExtractor`][extr].
+///
+/// See [`PxarExtractOptions`] for a complete description of all extraction options.
+///
+/// [extr]: extractor::AsyncExtractor
+pub struct PxarExtractOptionsBuilder {
+    destination: Cow<'static, Path>,
+
+    feature_flags: Flags,
+    overwrite_flags: OverwriteFlags,
+    allow_existing_dirs: bool,
+
+    default_match: bool,
+    match_list: Vec<MatchEntry>,
+
+    error_handler: Option<Arc<ErrorHandler>>,
+    callback: Option<Arc<Callback>>,
+}
+
+impl std::fmt::Debug for PxarExtractOptionsBuilder {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PxarExtractOptionsBuilder")
+            .field("destination", &self.destination)
+            .field("feature_flags", &self.feature_flags)
+            .field("overwrite_flags", &self.overwrite_flags)
+            .field("allow_existing_dirs", &self.allow_existing_dirs)
+            .field("default_match", &self.default_match)
+            .field("match_list", &self.match_list)
+            .field("error_handler", &self.error_handler.is_some())
+            .field("callback", &self.callback.is_some())
+            .finish()
+    }
+}
+
+impl PxarExtractOptionsBuilder {
+    pub fn new<D>(destination: D) -> Self
+    where
+        D: Into<Cow<'static, Path>>,
+    {
+        Self {
+            destination: destination.into(),
+            feature_flags: Flags::default(),
+            overwrite_flags: OverwriteFlags::empty(),
+            allow_existing_dirs: false,
+            default_match: true, // entries are considered as matching by default
+            match_list: Default::default(),
+            error_handler: None,
+            callback: None,
+        }
+    }
+
+    pub fn build(&mut self) -> PxarExtractOptions {
+        let error_handler = self
+            .error_handler
+            .as_ref()
+            .unwrap_or(&DEFAULT_ERROR_HANDLER);
+
+        PxarExtractOptions {
+            destination: self.destination.to_path_buf(),
+            feature_flags: self.feature_flags,
+            overwrite_flags: self.overwrite_flags,
+            allow_existing_dirs: self.allow_existing_dirs,
+            default_match: self.default_match,
+            match_list: self.match_list.clone(),
+            error_handler: Arc::clone(error_handler),
+            callback: self.callback.as_ref().map(Arc::clone),
+        }
+    }
+}
+
+impl PxarExtractOptionsBuilder {
+    pub fn feature_flags(&mut self, flags: Flags) -> &mut Self {
+        self.feature_flags = flags;
+        self
+    }
+
+    pub fn overwrite_flags(&mut self, flags: OverwriteFlags) -> &mut Self {
+        self.overwrite_flags = flags;
+        self
+    }
+
+    pub fn allow_existing_dirs(&mut self, value: bool) -> &mut Self {
+        self.allow_existing_dirs = value;
+        self
+    }
+
+    pub fn default_match(&mut self, value: bool) -> &mut Self {
+        self.default_match = value;
+        self
+    }
+
+    pub fn push_match_entry(&mut self, match_entry: MatchEntry) -> &mut Self {
+        self.match_list.push(match_entry);
+        self
+    }
+
+    pub fn push_match_list<T: ToOwned<Owned = Vec<MatchEntry>>>(
+        &mut self,
+        match_list: T,
+    ) -> &mut Self {
+        // for some reason, clippy doesn't like this
+        #[allow(clippy::redundant_clone)]
+        self.match_list.extend(match_list.to_owned());
+        self
+    }
+
+    pub fn error_handler(&mut self, error_handler: ErrorHandler) -> &mut Self {
+        let error_handler = Arc::new(error_handler);
+        self.error_handler_ref(error_handler)
+    }
+
+    pub fn error_handler_ref(&mut self, error_handler: Arc<ErrorHandler>) -> &mut Self {
+        self.error_handler.replace(error_handler);
+        self
+    }
+
+    pub fn callback(&mut self, callback: Callback) -> &mut Self {
+        let callback = Arc::new(callback);
+        self.callback_ref(callback)
+    }
+
+    pub fn callback_ref(&mut self, callback: Arc<Callback>) -> &mut Self {
+        self.callback.replace(callback);
+        self
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/raw.rs b/pbs-client/src/pxar/aio/extract/raw.rs
new file mode 100644
index 00000000..d6e20981
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/raw.rs
@@ -0,0 +1,503 @@
+use std::os::fd::{AsRawFd, FromRawFd, RawFd};
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, Context, Error, Result};
+use nix::sys::stat::SFlag;
+use nix::{dir::Dir, fcntl::OFlag, sys::stat::Mode};
+
+use pxar::format::Device;
+use tokio::io::AsyncRead;
+use tokio::sync::RwLock;
+
+use proxmox_sys::fs::CreateOptions;
+
+use super::OverwriteFlags;
+use super::PxarExtractContext;
+use super::PxarExtractOptions;
+
+use crate::pxar::aio::dir_stack::{PxarDir, PxarDirStack};
+use crate::pxar::aio::metadata;
+use crate::pxar::aio::worker::SyncTaskWorker;
+
+/// The [`RawAsyncExtractor`] is used to extract individual [`pxar::Entries`][pxar::Entry].
+/// Its purpose is to provide a common underlying mechanism with which an archive is
+/// extracted.
+///
+/// The tracking of directories is based on a stack. [Entering][enter] and [leaving][leave]
+/// directories simply corresponds to [`push()`][push] and [`pop()`][pop]. All remaining
+/// operations do not modify the raw extractor's stack otherwise.
+///
+/// In order to make extraction fully concurrent, an internal worker thread is used
+/// to which synchronous tasks are sent to. This worker thread has to be started manually.
+///
+/// No state is tracked otherwise, which means that it's your own responsibility
+/// to [extract the archive's root][extr_root], as well as [start][w_start] or [stop][w_stop]
+/// the [`RawAsyncExtractor`]'s internal worker thread.
+///
+///
+/// [enter]: Self::enter_directory()
+/// [leave]: Self::leave_directory()
+/// [push]: std::path::PathBuf::push()
+/// [pop]: std::path::PathBuf::pop()
+/// [extr_root]: Self::extract_root_dir
+/// [w_start]: Self::start_worker_thread
+/// [w_stop]: Self::join_worker_thread
+pub struct RawAsyncExtractor {
+    /// A stack of directories used to aid in traversing and extracting the pxar archive.
+    dir_stack: Arc<RwLock<PxarDirStack>>,
+
+    /// Worker thread for synchronous tasks.
+    worker: Option<SyncTaskWorker>,
+}
+
+impl Default for RawAsyncExtractor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl RawAsyncExtractor {
+    #[inline]
+    pub fn new() -> Self {
+        let dir_stack = Arc::new(RwLock::new(PxarDirStack::new()));
+
+        Self {
+            dir_stack,
+            worker: None,
+        }
+    }
+
+    /// Extract the root directory of a pxar archive to the given [`destination`][dest].
+    ///
+    /// [dest]: PxarExtractOptions::destination
+    pub async fn extract_root_dir(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        let path = options.destination.clone();
+
+        let root_pxar_dir =
+            tokio::task::spawn_blocking(move || Self::do_extract_root_dir(path, entry))
+                .await
+                .context("failed to execute task to create extraction directory")??;
+
+        let dir_name = options
+            .destination
+            .file_name()
+            .context("extraction destination directory has no name")?;
+
+        self.dir_stack.write().await.push(dir_name, root_pxar_dir)
+    }
+
+    #[inline]
+    fn do_extract_root_dir<P: AsRef<Path>>(
+        path: P,
+        entry: Arc<pxar::Entry>,
+    ) -> Result<PxarDir, Error> {
+        const CREATE_OPTS: CreateOptions =
+            CreateOptions::new().perm(Mode::from_bits_truncate(0o700));
+
+        if !entry.is_dir() {
+            bail!("pxar archive does not start with a directory entry!");
+        }
+
+        let path = path.as_ref();
+
+        proxmox_sys::fs::create_path(path, None, Some(CREATE_OPTS))
+            .with_context(|| format!("error creating extraction directory {path:?}"))?;
+
+        let dir = Dir::open(path, OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, Mode::empty())
+            .with_context(|| format!("unable to open extraction directory {path:?}"))?;
+
+        Ok(PxarDir::with_dir(entry, dir))
+    }
+
+    #[inline]
+    pub fn start_worker_thread(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+        // When encountering lots of symlinks or hardlinks, the queue may become quite large
+        const WORKER_QUEUE_SIZE: usize = 5000;
+
+        if self.worker.is_some() {
+            bail!("worker thread already started")
+        }
+
+        let error_handler = Arc::clone(&options.error_handler);
+
+        self.worker.replace(SyncTaskWorker::with_error_handler(
+            WORKER_QUEUE_SIZE,
+            error_handler,
+        ));
+
+        Ok(())
+    }
+
+    #[inline]
+    pub fn join_worker_thread(&mut self) -> Result<(), Error> {
+        if let Some(mut worker) = self.worker.take() {
+            worker.join()
+        } else {
+            bail!("worker thread already joined")
+        }
+    }
+
+    /// Convenience wrapper to send a task to the [`SyncTaskWorker`].
+    #[inline]
+    async fn send_to_worker<F>(&self, task: F) -> Result<(), Error>
+    where
+        F: FnOnce() -> Result<(), Error> + Send + 'static,
+    {
+        if let Some(worker) = self.worker.as_ref() {
+            worker.send(task).await
+        } else {
+            bail!("failed to send task to worker - worker already finished")
+        }
+    }
+}
+
+// Extraction operations for each kind of pxar entry.
+impl RawAsyncExtractor {
+    #[inline]
+    fn parent_fd(dir_stack: &RwLock<PxarDirStack>, allow_existing: bool) -> Result<RawFd, Error> {
+        dir_stack
+            .blocking_write()
+            .last_dir_fd(allow_existing)
+            .map(|fd| fd.as_raw_fd())
+    }
+
+    pub async fn enter_directory(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        do_create: bool,
+    ) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let allow_existing = options.allow_existing_dirs;
+
+        let task = move || {
+            {
+                let pxar_dir = PxarDir::new(Arc::clone(&entry));
+                let mut locked_dir_stack = dir_stack.blocking_write();
+
+                locked_dir_stack.push(entry.file_name(), pxar_dir)?;
+
+                if do_create {
+                    locked_dir_stack.last_dir_fd(allow_existing).map(drop)?;
+                }
+
+                Ok::<(), Error>(())
+            }
+            .context(PxarExtractContext::EnterDirectory)
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn leave_directory(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let error_handler = Arc::clone(&options.error_handler);
+        let flags = options.feature_flags;
+
+        let task = move || {
+            {
+                let pxar_dir = dir_stack
+                    .blocking_write()
+                    .pop()
+                    .context("unexpected end of directory stack")?;
+
+                let fd = pxar_dir.try_as_borrowed_fd().map(|fd| fd.as_raw_fd());
+                let (entry, dir) = pxar_dir.into_inner();
+
+                if let Some(fd) = fd {
+                    metadata::apply(flags, &entry, fd, &error_handler)?;
+                }
+
+                // dropping `dir` closes it - we want that to happen after metadata was applied
+                drop(dir);
+                Ok::<(), Error>(())
+            }
+            .context(PxarExtractContext::LeaveDirectory)
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn extract_symlink(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        use pxar::EntryKind::Symlink;
+
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let error_handler = Arc::clone(&options.error_handler);
+
+        let allow_existing = options.allow_existing_dirs;
+        let feature_flags = options.feature_flags;
+
+        let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::SYMLINK);
+
+        let task = move || {
+            {
+                let link = match entry.kind() {
+                    Symlink(link) => link,
+                    _ => bail!("received invalid entry kind while trying to extract symlink"),
+                };
+
+                let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                let result =
+                    nix::unistd::symlinkat(link.as_os_str(), Some(parent_fd), entry.file_name());
+
+                match result {
+                    Ok(()) => {}
+                    Err(nix::errno::Errno::EEXIST) if do_overwrite => {
+                        // Never unlink directories
+                        let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+                        nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+
+                        nix::unistd::symlinkat(
+                            link.as_os_str(),
+                            Some(parent_fd),
+                            entry.file_name(),
+                        )?;
+                    }
+                    Err(error) => return Err(error.into()),
+                }
+
+                metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+            }
+            .context(PxarExtractContext::ExtractSymlink)
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractSymlink)
+    }
+
+    pub async fn extract_hardlink(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        use pxar::EntryKind::Hardlink;
+
+        let dir_stack = Arc::clone(&self.dir_stack);
+
+        let allow_existing = options.allow_existing_dirs;
+        let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::HARDLINK);
+
+        let task = move || {
+            {
+                let link = match entry.kind() {
+                    Hardlink(link) => {
+                        if !AsRef::<Path>::as_ref(link.as_os_str()).is_relative() {
+                            bail!("received absolute path while trying to extract hardlink")
+                        } else {
+                            link.as_os_str().to_owned()
+                        }
+                    }
+                    _ => bail!("received invalid entry kind while trying to extract hardlink"),
+                };
+
+                let root_fd = dir_stack.blocking_read().root_dir_fd()?.as_raw_fd();
+                let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                let make_link = || {
+                    nix::unistd::linkat(
+                        Some(root_fd),
+                        link.as_os_str(),
+                        Some(parent_fd),
+                        entry.file_name(),
+                        nix::unistd::LinkatFlags::NoSymlinkFollow,
+                    )
+                };
+
+                match make_link() {
+                    Err(nix::Error::EEXIST) if do_overwrite => {
+                        let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+                        nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+                        make_link()
+                    }
+                    result => result,
+                }
+            }
+            .context(PxarExtractContext::ExtractHardlink)
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractHardlink)
+    }
+
+    pub async fn extract_device(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: Device,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device.to_dev_t())
+            .await
+            .context(PxarExtractContext::ExtractDevice)
+    }
+
+    pub async fn extract_fifo(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device)
+            .await
+            .context(PxarExtractContext::ExtractFifo)
+    }
+
+    pub async fn extract_socket(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device)
+            .await
+            .context(PxarExtractContext::ExtractSocket)
+    }
+
+    #[inline(always)]
+    async fn extract_special(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let allow_existing = options.allow_existing_dirs;
+
+        let feature_flags = options.feature_flags;
+        let error_handler = Arc::clone(&options.error_handler);
+
+        let task = move || {
+            let mode = metadata::perms_from_metadata(entry.metadata())?;
+
+            let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+            nix::sys::stat::mknodat(parent_fd, entry.file_name(), SFlag::empty(), mode, device)
+                .context("failed to create special device")?;
+
+            metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+                .context("failed to apply metadata to special device")
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn extract_file<C>(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        contents: C,
+        size: u64,
+    ) -> Result<(), Error>
+    where
+        C: AsyncRead + Unpin,
+    {
+        let (sender, receiver) = tokio::sync::oneshot::channel();
+
+        let file_creation_task = {
+            let mut oflags = OFlag::O_CREAT | OFlag::O_WRONLY | OFlag::O_CLOEXEC;
+            if options.overwrite_flags.contains(OverwriteFlags::FILE) {
+                oflags |= OFlag::O_TRUNC;
+            } else {
+                oflags |= OFlag::O_EXCL;
+            }
+
+            let entry = Arc::clone(&entry);
+            let dir_stack = Arc::clone(&self.dir_stack);
+            let error_handler = Arc::clone(&options.error_handler);
+
+            let allow_existing = options.allow_existing_dirs;
+            let feature_flags = options.feature_flags;
+
+            move || {
+                {
+                    let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                    let raw_fd = nix::fcntl::openat(
+                        parent_fd,
+                        entry.file_name(),
+                        oflags,
+                        Mode::from_bits(0o700).unwrap(),
+                    )
+                    .with_context(|| "failed to create file".to_string())?;
+
+                    metadata::apply_initial_flags(feature_flags, &entry, raw_fd, &error_handler)?;
+
+                    let file = unsafe { tokio::fs::File::from_raw_fd(raw_fd) };
+
+                    match sender.send(file) {
+                        Ok(()) => Ok::<(), Error>(()),
+                        Err(_) => bail!("failed to send file descriptor to origin future"),
+                    }
+                }
+                .context(PxarExtractContext::ExtractFile)
+            }
+        };
+
+        // The task is queued for the worker ...
+        self.send_to_worker(file_creation_task)
+            .await
+            .context(PxarExtractContext::ExtractFile)?;
+
+        // ... and its response is then awaited, so other things
+        // may be done in the meantime
+        let mut file = receiver.await.context(
+            "failed to receive raw file descriptor from worker thread - did the thread die?",
+        )?;
+
+        let mut contents = tokio::io::BufReader::new(contents);
+        let copy_result = proxmox_io::sparse_copy_async(&mut contents, &mut file)
+            .await
+            .context(PxarExtractContext::ExtractFile)?;
+
+        if size != copy_result.written {
+            bail!(
+                "extracted {} bytes of a file of {} bytes",
+                copy_result.written,
+                size
+            );
+        }
+
+        let task = {
+            let raw_fd = file.as_raw_fd();
+            let feature_flags = options.feature_flags;
+            let error_handler = Arc::clone(&options.error_handler);
+
+            move || {
+                {
+                    if copy_result.seeked_last {
+                        while match nix::unistd::ftruncate(raw_fd, size as i64) {
+                            Ok(_) => false,
+                            Err(errno) if errno == nix::errno::Errno::EINTR => true,
+                            Err(err) => return Err(err).context("error setting file size"),
+                        } {}
+                    }
+
+                    metadata::apply(feature_flags, &entry, raw_fd, &error_handler)
+                        .with_context(|| "failed to apply metadata to file".to_string())?;
+
+                    // 'file' closes itself when dropped, so we move and drop it here explicitly
+                    // after the remaining work on it was done
+                    drop(file);
+
+                    Ok::<(), Error>(())
+                }
+                .context(PxarExtractContext::ExtractFile)
+            }
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractFile)
+    }
+}
diff --git a/pbs-client/src/pxar/aio/metadata.rs b/pbs-client/src/pxar/aio/metadata.rs
new file mode 100644
index 00000000..06927498
--- /dev/null
+++ b/pbs-client/src/pxar/aio/metadata.rs
@@ -0,0 +1,412 @@
+use std::ffi::{CStr, CString};
+use std::os::fd::{AsRawFd, RawFd};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::errno::Errno;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{Mode, UtimensatFlags};
+
+use proxmox_sys::c_result;
+use proxmox_sys::error::SysError;
+use proxmox_sys::fs::{self, acl, xattr};
+use pxar::Metadata;
+
+use super::ErrorHandler;
+use crate::pxar::Flags;
+
+// NOTE: The functions here are equivalent to crate::pxar::metadata and have
+// only been adapted to take a pxar::Entry directly.
+
+//
+// utility functions
+//
+
+fn allow_notsupp<E: SysError>(err: E) -> Result<(), E> {
+    if err.is_errno(Errno::EOPNOTSUPP) {
+        Ok(())
+    } else {
+        Err(err)
+    }
+}
+
+fn allow_notsupp_remember<E: SysError>(err: E, not_supp: &mut bool) -> Result<(), E> {
+    if err.is_errno(Errno::EOPNOTSUPP) {
+        *not_supp = true;
+        Ok(())
+    } else {
+        Err(err)
+    }
+}
+
+fn timestamp_to_update_timespec(mtime: &pxar::format::StatxTimestamp) -> [libc::timespec; 2] {
+    // restore mtime
+    const UTIME_OMIT: i64 = (1 << 30) - 2;
+
+    [
+        libc::timespec {
+            tv_sec: 0,
+            tv_nsec: UTIME_OMIT,
+        },
+        libc::timespec {
+            tv_sec: mtime.secs,
+            tv_nsec: mtime.nanos as _,
+        },
+    ]
+}
+
+/// Get the file permissions as `nix::Mode`
+pub fn perms_from_metadata(meta: &Metadata) -> Result<Mode, Error> {
+    let mode = meta.stat.get_permission_bits();
+
+    u32::try_from(mode)
+        .context("couldn't narrow permission bits")
+        .and_then(|mode| {
+            Mode::from_bits(mode)
+                .with_context(|| format!("mode contains illegal bits: 0x{:x} (0o{:o})", mode, mode))
+        })
+}
+
+//
+// metadata application:
+//
+
+pub fn apply_initial_flags(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    fd: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let entry_flags = Flags::from_bits_truncate(entry.metadata().stat.flags);
+
+    apply_chattr(
+        fd,
+        entry_flags.to_initial_chattr(),
+        feature_flags.to_initial_chattr(),
+    )
+    .or_else(&**error_handler)
+}
+
+pub fn apply_at(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    parent: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let fd = proxmox_sys::fd::openat(
+        &parent,
+        entry.file_name(),
+        OFlag::O_PATH | OFlag::O_CLOEXEC | OFlag::O_NOFOLLOW,
+        Mode::empty(),
+    )
+    .context("failed to open parent file descriptor")?;
+
+    apply(feature_flags, entry, fd.as_raw_fd(), error_handler)
+}
+
+pub fn apply(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    fd: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let metadata = entry.metadata();
+    let c_proc_path = CString::new(format!("/proc/self/fd/{}", fd)).unwrap();
+
+    apply_ownership(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+    let mut skip_xattrs = false;
+    apply_xattrs(feature_flags, &c_proc_path, metadata, &mut skip_xattrs)
+        .or_else(&**error_handler)?;
+
+    add_fcaps(feature_flags, &c_proc_path, metadata, &mut skip_xattrs).or_else(&**error_handler)?;
+
+    apply_acls(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+    apply_quota_project_id(feature_flags, fd, metadata).or_else(&**error_handler)?;
+
+    // Finally mode and time. We may lose access with mode, but the changing the mode also
+    // affects times.
+    if !metadata.is_symlink() && feature_flags.contains(Flags::WITH_PERMISSIONS) {
+        let mode = perms_from_metadata(metadata)?;
+        nix::sys::stat::fchmod(fd, mode)
+            .or_else(allow_notsupp)
+            .context("failed to change file mode")
+            .or_else(&**error_handler)?;
+    }
+
+    let [atime, mtime] = timestamp_to_update_timespec(&metadata.stat.mtime);
+    let res = nix::sys::stat::utimensat(
+        None,
+        c_proc_path.as_ref(),
+        &atime.into(),
+        &mtime.into(),
+        UtimensatFlags::FollowSymlink,
+    );
+
+    match res {
+        Ok(_) => (),
+        Err(ref err) if err.is_errno(Errno::EOPNOTSUPP) => (),
+        Err(err) => {
+            let err = format_err!(err).context("failed to restore mtime attribute");
+            error_handler(err)?;
+        }
+    }
+
+    if metadata.stat.flags != 0 {
+        apply_flags(feature_flags, fd, metadata.stat.flags).or_else(&**error_handler)?;
+    }
+
+    Ok(())
+}
+
+fn apply_ownership(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_OWNER) {
+        return Ok(());
+    }
+
+    // UID and GID first, as this fails if we lose access anyway.
+    nix::unistd::chown(
+        c_proc_path,
+        Some(metadata.stat.uid.into()),
+        Some(metadata.stat.gid.into()),
+    )
+    .or_else(allow_notsupp)
+    .context("failed to apply ownership")
+}
+
+fn add_fcaps(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+    skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+    if *skip_xattrs || !feature_flags.contains(Flags::WITH_FCAPS) {
+        return Ok(());
+    }
+    let fcaps = match metadata.fcaps.as_ref() {
+        Some(fcaps) => fcaps,
+        None => return Ok(()),
+    };
+
+    c_result!(unsafe {
+        libc::setxattr(
+            c_proc_path.as_ptr(),
+            xattr::xattr_name_fcaps().as_ptr(),
+            fcaps.data.as_ptr() as *const libc::c_void,
+            fcaps.data.len(),
+            0,
+        )
+    })
+    .map(drop)
+    .or_else(|err| allow_notsupp_remember(err, skip_xattrs))
+    .context("failed to apply file capabilities")
+}
+
+fn apply_xattrs(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+    skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+    if *skip_xattrs || !feature_flags.contains(Flags::WITH_XATTRS) {
+        return Ok(());
+    }
+
+    for xattr in &metadata.xattrs {
+        if *skip_xattrs {
+            return Ok(());
+        }
+
+        if !xattr::is_valid_xattr_name(xattr.name()) {
+            log::info!("skipping invalid xattr named {:?}", xattr.name());
+            continue;
+        }
+
+        c_result!(unsafe {
+            libc::setxattr(
+                c_proc_path.as_ptr(),
+                xattr.name().as_ptr() as *const libc::c_char,
+                xattr.value().as_ptr() as *const libc::c_void,
+                xattr.value().len(),
+                0,
+            )
+        })
+        .map(drop)
+        .or_else(|err| allow_notsupp_remember(err, &mut *skip_xattrs))
+        .context("failed to apply extended attributes")?;
+    }
+
+    Ok(())
+}
+
+fn apply_acls(feature_flags: Flags, c_proc_path: &CStr, metadata: &Metadata) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_ACL) || metadata.acl.is_empty() {
+        return Ok(());
+    }
+
+    let mut acl = acl::ACL::init(5)?;
+
+    // acl type access:
+    acl.add_entry_full(
+        acl::ACL_USER_OBJ,
+        None,
+        acl::mode_user_to_acl_permissions(metadata.stat.mode),
+    )?;
+
+    acl.add_entry_full(
+        acl::ACL_OTHER,
+        None,
+        acl::mode_other_to_acl_permissions(metadata.stat.mode),
+    )?;
+
+    match metadata.acl.group_obj.as_ref() {
+        Some(group_obj) => {
+            acl.add_entry_full(
+                acl::ACL_MASK,
+                None,
+                acl::mode_group_to_acl_permissions(metadata.stat.mode),
+            )?;
+            acl.add_entry_full(acl::ACL_GROUP_OBJ, None, group_obj.permissions.0)?;
+        }
+        None => {
+            let mode = acl::mode_group_to_acl_permissions(metadata.stat.mode);
+
+            acl.add_entry_full(acl::ACL_GROUP_OBJ, None, mode)?;
+
+            if !metadata.acl.users.is_empty() || !metadata.acl.groups.is_empty() {
+                log::warn!("Warning: Missing GROUP_OBJ entry in ACL, resetting to value of MASK");
+                acl.add_entry_full(acl::ACL_MASK, None, mode)?;
+            }
+        }
+    }
+
+    for user in &metadata.acl.users {
+        acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+    }
+
+    for group in &metadata.acl.groups {
+        acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+    }
+
+    if !acl.is_valid() {
+        bail!("Error while restoring ACL - ACL invalid");
+    }
+
+    acl.set_file(c_proc_path, acl::ACL_TYPE_ACCESS)?;
+    drop(acl);
+
+    // acl type default:
+    if let Some(default) = metadata.acl.default.as_ref() {
+        let mut acl = acl::ACL::init(5)?;
+
+        acl.add_entry_full(acl::ACL_USER_OBJ, None, default.user_obj_permissions.0)?;
+
+        acl.add_entry_full(acl::ACL_GROUP_OBJ, None, default.group_obj_permissions.0)?;
+
+        acl.add_entry_full(acl::ACL_OTHER, None, default.other_permissions.0)?;
+
+        if default.mask_permissions != pxar::format::acl::Permissions::NO_MASK {
+            acl.add_entry_full(acl::ACL_MASK, None, default.mask_permissions.0)?;
+        }
+
+        for user in &metadata.acl.default_users {
+            acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+        }
+
+        for group in &metadata.acl.default_groups {
+            acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+        }
+
+        if !acl.is_valid() {
+            bail!("Error while restoring ACL - ACL invalid");
+        }
+
+        acl.set_file(c_proc_path, acl::ACL_TYPE_DEFAULT)?;
+    }
+
+    Ok(())
+}
+
+fn apply_quota_project_id(
+    feature_flags: Flags,
+    fd: RawFd,
+    metadata: &Metadata,
+) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_QUOTA_PROJID) {
+        return Ok(());
+    }
+
+    let projid = match metadata.quota_project_id {
+        Some(projid) => projid,
+        None => return Ok(()),
+    };
+
+    let mut fsxattr = fs::FSXAttr::default();
+    unsafe {
+        fs::fs_ioc_fsgetxattr(fd, &mut fsxattr)
+            .context("error while getting fsxattr to restore quota project id")?;
+
+        fsxattr.fsx_projid = projid.projid as u32;
+
+        fs::fs_ioc_fssetxattr(fd, &fsxattr)
+            .context("error while setting fsxattr to restore quota project id")?;
+    }
+
+    Ok(())
+}
+
+fn errno_is_unsupported(errno: Errno) -> bool {
+    matches!(
+        errno,
+        Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL
+    )
+}
+
+fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> {
+    if chattr == 0 {
+        return Ok(());
+    }
+
+    let mut fattr: libc::c_long = 0;
+    match unsafe { fs::read_attr_fd(fd, &mut fattr) } {
+        Ok(_) => (),
+        Err(errno) if errno_is_unsupported(errno) => {
+            return Ok(());
+        }
+        Err(err) => return Err(err).context("failed to read file attributes"),
+    }
+
+    let attr = (chattr & mask) | (fattr & !mask);
+
+    if attr == fattr {
+        return Ok(());
+    }
+
+    match unsafe { fs::write_attr_fd(fd, &attr) } {
+        Ok(_) => Ok(()),
+        Err(errno) if errno_is_unsupported(errno) => Ok(()),
+        Err(err) => Err(err).context("failed to set file attributes"),
+    }
+}
+
+fn apply_flags(feature_flags: Flags, fd: RawFd, entry_flags: u64) -> Result<(), Error> {
+    let entry_flags = Flags::from_bits_truncate(entry_flags);
+
+    apply_chattr(fd, entry_flags.to_chattr(), feature_flags.to_chattr())?;
+
+    let fatattr = (feature_flags & entry_flags).to_fat_attr();
+    if fatattr != 0 {
+        match unsafe { fs::write_fat_attr_fd(fd, &fatattr) } {
+            Ok(_) => (),
+            Err(errno) if errno_is_unsupported(errno) => (),
+            Err(err) => return Err(err).context("failed to set file FAT attributes"),
+        }
+    }
+
+    Ok(())
+}
diff --git a/pbs-client/src/pxar/aio/mod.rs b/pbs-client/src/pxar/aio/mod.rs
new file mode 100644
index 00000000..dfa8d84a
--- /dev/null
+++ b/pbs-client/src/pxar/aio/mod.rs
@@ -0,0 +1,11 @@
+mod dir_stack;
+
+pub(crate) mod extract;
+pub use extract::{
+    AsyncExtractor, Callback, ErrorHandler, PxarExtractOptions, PxarExtractOptionsBuilder,
+    RawAsyncExtractor,
+};
+
+mod metadata;
+
+mod worker;
diff --git a/pbs-client/src/pxar/aio/worker.rs b/pbs-client/src/pxar/aio/worker.rs
new file mode 100644
index 00000000..791e3129
--- /dev/null
+++ b/pbs-client/src/pxar/aio/worker.rs
@@ -0,0 +1,167 @@
+#![allow(unused)]
+use std::sync::{Arc, Mutex};
+use std::thread::{self, JoinHandle};
+
+use anyhow::{bail, Error};
+use tokio::sync::mpsc;
+
+use super::ErrorHandler;
+
+type WorkerTask = Box<dyn FnOnce() -> Result<(), Error> + Send>;
+
+/// [`SyncTaskWorker`] is a wrapper around a [`Thread`][std::thread::Thread]
+/// with the purpose of running synchronous tasks provided by a queue.
+///
+/// A task is anything that implements [`FnOnce() -> Result<(), Error>`]
+/// and can be sent asynchronously to the worker via [`send()`].
+///
+/// As soon as a task returns an [`Error`], the worker will drop its channel,
+/// discarding any remaining tasks. Errors therefore either ought to be handled
+/// inside tasks themselves, or with an [explicitly provided][eh] [`ErrorHandler`].
+///
+/// [eh]: Self::with_error_handler
+pub(crate) struct SyncTaskWorker {
+    sender: Option<mpsc::Sender<WorkerTask>>,
+    join_handle: Option<Box<JoinHandle<()>>>,
+    task_error: Arc<Mutex<Option<Error>>>,
+}
+
+impl SyncTaskWorker {
+    pub fn new(queue_size: usize) -> Self {
+        Self::new_inner(queue_size, Arc::new(Box::new(Err)))
+    }
+
+    pub fn with_error_handler(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+        Self::new_inner(queue_size, error_handler)
+    }
+
+    fn new_inner(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+        let (sender, receiver) = mpsc::channel(queue_size);
+
+        let task_error = Arc::new(Mutex::new(None));
+
+        let worker_loop = {
+            let mut receiver = receiver;
+            let last_error = Arc::clone(&task_error);
+            let error_handler = Arc::clone(&error_handler);
+
+            move || loop {
+                let function: WorkerTask = if let Some(function) = receiver.blocking_recv() {
+                    function
+                } else {
+                    return;
+                };
+
+                let result = (function)().or_else(|error| error_handler(error));
+
+                match result {
+                    Ok(()) => (),
+                    Err(error) => {
+                        let mut guard = last_error.lock().unwrap();
+                        if guard.is_none() {
+                            let error = error
+                                .context("extractor worker thread encountered unexpected error");
+                            *guard = Some(error);
+                        }
+
+                        drop(receiver);
+                        return;
+                    }
+                }
+            }
+        };
+
+        let join_handle = thread::spawn(worker_loop);
+
+        Self {
+            sender: Some(sender),
+            join_handle: Some(Box::new(join_handle)),
+            task_error,
+        }
+    }
+
+    #[inline]
+    fn check_for_error(&self) -> Result<(), Error> {
+        let mut guard = self.task_error.lock().unwrap();
+        if let Some(error) = guard.take() {
+            Err(error)
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Send a task to the worker to execute.
+    ///
+    /// In case any of the worker thread's previous tasks failed, its error
+    /// will be returned. Subsequent calls will not return the same error.
+    ///
+    /// Otherwise, this will fail if the worker's channel is already closed or
+    /// if its internal sender has already been dropped.
+    #[inline]
+    pub async fn send<F>(&self, task: F) -> Result<(), Error>
+    where
+        F: FnOnce() -> Result<(), Error> + Send + 'static,
+    {
+        self.check_for_error()?;
+
+        if let Some(ref sender) = self.sender {
+            match sender.send(Box::new(task)).await {
+                Ok(()) => Ok(()),
+                Err(_) => {
+                    bail!("failed to send task input to worker - channel closed");
+                }
+            }
+        } else {
+            bail!("failed to send task input to worker - sender already dropped");
+        }
+    }
+
+    /// Get the current available capacity of the worker thread's queue.
+    #[inline]
+    pub fn queue_capacity(&self) -> Option<usize> {
+        self.sender.as_ref().map(|s| s.capacity())
+    }
+
+    /// Get the maximum capacity of the worker thread's queue.
+    #[inline]
+    pub fn max_queue_capacity(&self) -> Option<usize> {
+        self.sender.as_ref().map(|s| s.max_capacity())
+    }
+
+    /// Run all outstanding tasks until completion or until a task returns an error.
+    ///
+    /// The first encountered error will be returned, if it exists.
+    ///
+    /// Subsequent calls to [`join()`] will not have any effect.
+    #[inline]
+    pub fn join(&mut self) -> Result<(), Error> {
+        drop(self.sender.take());
+
+        let join_res = self
+            .join_handle
+            .take()
+            .map(|handle| handle.join())
+            .unwrap_or(Ok(()));
+
+        // Can't use `anyhow::Context` on `JoinError`, grr
+        match join_res {
+            Ok(()) => (),
+            Err(_) => bail!("extractor worker thread panicked"),
+        };
+
+        self.check_for_error()?;
+
+        Ok(())
+    }
+}
+
+// [`Drop`] is implemented to ensure the thread is joined appropriately when
+// the worker is dropped.
+//
+// Note however, that this will consume `last_error` (if any), so it's always
+// better to check for errors manually.
+impl Drop for SyncTaskWorker {
+    fn drop(&mut self) {
+        let _ = self.join();
+    }
+}
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 14674b9b..9060bc0e 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -47,6 +47,7 @@
 //! (user, group, acl, ...) because this is already defined by the
 //! linked `ENTRY`.

+pub mod aio;
 pub(crate) mod create;
 pub(crate) mod dir_stack;
 pub(crate) mod extract;
--
2.39.2






More information about the pbs-devel mailing list