[pbs-devel] applied: [PATCH proxmox-backup 2/2] asyncify pxar create_archive
Wolfgang Bumiller
w.bumiller at proxmox.com
Wed Feb 17 10:02:41 CET 2021
applied
On Tue, Feb 09, 2021 at 01:03:48PM +0100, Stefan Reiter wrote:
> ...to take advantage of the aio::Encoder from the pxar create.
>
> Rather straightforward conversion, but does require getting rid of
> references in the Archiver struct, and thus has to be given the Mutex
> for the catalog directly. The callback is boxed.
>
> archive_dir_contents can call itself recursively, and thus needs to
> return a boxed future.
>
> Users are adjusted, namely PxarBackupStream is converted to use an
> Abortable future instead of a thread so it supports async in its handler
> function, and the pxar bin create_archive is converted to an async API
> function. One test case is made to just use 'block_on'.
>
> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
> ---
>
> Requires updated pxar crate.
>
> Long patch, but a lot of changes are just
> -call();
> +call().await;
> or using the catalog mutex.
>
> Probably looks better with -w
>
> src/bin/pxar.rs | 6 +-
> src/client/pxar_backup_stream.rs | 65 +++++-----
> src/pxar/create.rs | 207 ++++++++++++++++---------------
> tests/catar.rs | 5 +-
> 4 files changed, 143 insertions(+), 140 deletions(-)
>
> diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs
> index 814b3346..d830c570 100644
> --- a/src/bin/pxar.rs
> +++ b/src/bin/pxar.rs
> @@ -295,7 +295,7 @@ fn extract_archive(
> )]
> /// Create a new .pxar archive.
> #[allow(clippy::too_many_arguments)]
> -fn create_archive(
> +async fn create_archive(
> archive: String,
> source: String,
> verbose: bool,
> @@ -376,7 +376,7 @@ fn create_archive(
> dir,
> writer,
> feature_flags,
> - |path| {
> + move |path| {
> if verbose {
> println!("{:?}", path);
> }
> @@ -384,7 +384,7 @@ fn create_archive(
> },
> None,
> options,
> - )?;
> + ).await?;
>
> Ok(())
> }
> diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs
> index 5fb28fd5..b57061a3 100644
> --- a/src/client/pxar_backup_stream.rs
> +++ b/src/client/pxar_backup_stream.rs
> @@ -4,10 +4,10 @@ use std::path::Path;
> use std::pin::Pin;
> use std::sync::{Arc, Mutex};
> use std::task::{Context, Poll};
> -use std::thread;
>
> use anyhow::{format_err, Error};
> use futures::stream::Stream;
> +use futures::future::{Abortable, AbortHandle};
> use nix::dir::Dir;
> use nix::fcntl::OFlag;
> use nix::sys::stat::Mode;
> @@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
> /// consumer.
> pub struct PxarBackupStream {
> rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
> - child: Option<thread::JoinHandle<()>>,
> + handle: Option<AbortHandle>,
> error: Arc<Mutex<Option<String>>>,
> }
>
> impl Drop for PxarBackupStream {
> fn drop(&mut self) {
> self.rx = None;
> - self.child.take().unwrap().join().unwrap();
> + self.handle.take().unwrap().abort();
> }
> }
>
> @@ -43,42 +43,41 @@ impl PxarBackupStream {
> let buffer_size = 256 * 1024;
>
> let error = Arc::new(Mutex::new(None));
> - let child = std::thread::Builder::new()
> - .name("PxarBackupStream".to_string())
> - .spawn({
> - let error = Arc::clone(&error);
> - move || {
> - let mut catalog_guard = catalog.lock().unwrap();
> - let writer = std::io::BufWriter::with_capacity(
> - buffer_size,
> - crate::tools::StdChannelWriter::new(tx),
> - );
> + let error2 = Arc::clone(&error);
> + let handler = async move {
> + let writer = std::io::BufWriter::with_capacity(
> + buffer_size,
> + crate::tools::StdChannelWriter::new(tx),
> + );
>
> - let verbose = options.verbose;
> + let verbose = options.verbose;
>
> - let writer = pxar::encoder::sync::StandardWriter::new(writer);
> - if let Err(err) = crate::pxar::create_archive(
> - dir,
> - writer,
> - crate::pxar::Flags::DEFAULT,
> - |path| {
> - if verbose {
> - println!("{:?}", path);
> - }
> - Ok(())
> - },
> - Some(&mut *catalog_guard),
> - options,
> - ) {
> - let mut error = error.lock().unwrap();
> - *error = Some(err.to_string());
> + let writer = pxar::encoder::sync::StandardWriter::new(writer);
> + if let Err(err) = crate::pxar::create_archive(
> + dir,
> + writer,
> + crate::pxar::Flags::DEFAULT,
> + move |path| {
> + if verbose {
> + println!("{:?}", path);
> }
> - }
> - })?;
> + Ok(())
> + },
> + Some(catalog),
> + options,
> + ).await {
> + let mut error = error2.lock().unwrap();
> + *error = Some(err.to_string());
> + }
> + };
> +
> + let (handle, registration) = AbortHandle::new_pair();
> + let future = Abortable::new(handler, registration);
> + tokio::spawn(future);
>
> Ok(Self {
> rx: Some(rx),
> - child: Some(child),
> + handle: Some(handle),
> error,
> })
> }
> diff --git a/src/pxar/create.rs b/src/pxar/create.rs
> index 36de87da..6950b396 100644
> --- a/src/pxar/create.rs
> +++ b/src/pxar/create.rs
> @@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
> use std::os::unix::ffi::OsStrExt;
> use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
> use std::path::{Path, PathBuf};
> +use std::sync::{Arc, Mutex};
>
> use anyhow::{bail, format_err, Error};
> use nix::dir::Dir;
> use nix::errno::Errno;
> use nix::fcntl::OFlag;
> use nix::sys::stat::{FileStat, Mode};
> +use futures::future::BoxFuture;
> +use futures::FutureExt;
>
> use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
> use pxar::Metadata;
> -use pxar::encoder::LinkOffset;
> +use pxar::encoder::{SeqWrite, LinkOffset};
>
> use proxmox::c_str;
> use proxmox::sys::error::SysError;
> @@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
> }
> }
>
> -struct Archiver<'a, 'b> {
> +struct Archiver {
> feature_flags: Flags,
> fs_feature_flags: Flags,
> fs_magic: i64,
> patterns: Vec<MatchEntry>,
> - callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
> - catalog: Option<&'b mut dyn BackupCatalogWriter>,
> + callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
> + catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
> path: PathBuf,
> entry_counter: usize,
> entry_limit: usize,
> @@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
> file_copy_buffer: Vec<u8>,
> }
>
> -type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
> +type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
>
> -pub fn create_archive<T, F>(
> +pub async fn create_archive<T, F>(
> source_dir: Dir,
> mut writer: T,
> feature_flags: Flags,
> - mut callback: F,
> - catalog: Option<&mut dyn BackupCatalogWriter>,
> + callback: F,
> + catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
> options: PxarCreateOptions,
> ) -> Result<(), Error>
> where
> - T: pxar::encoder::SeqWrite,
> - F: FnMut(&Path) -> Result<(), Error>,
> + T: SeqWrite + Send,
> + F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
> {
> let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
> if is_virtual_file_system(fs_magic) {
> @@ -182,8 +185,7 @@ where
> set.insert(stat.st_dev);
> }
>
> - let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
> - let mut encoder = Encoder::new(writer, &metadata)?;
> + let mut encoder = Encoder::new(&mut writer, &metadata).await?;
>
> let mut patterns = options.patterns;
>
> @@ -199,7 +201,7 @@ where
> feature_flags,
> fs_feature_flags,
> fs_magic,
> - callback: &mut callback,
> + callback: Box::new(callback),
> patterns,
> catalog,
> path: PathBuf::new(),
> @@ -213,8 +215,8 @@ where
> file_copy_buffer: vec::undefined(4 * 1024 * 1024),
> };
>
> - archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
> - encoder.finish()?;
> + archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
> + encoder.finish().await?;
> Ok(())
> }
>
> @@ -224,7 +226,7 @@ struct FileListEntry {
> stat: FileStat,
> }
>
> -impl<'a, 'b> Archiver<'a, 'b> {
> +impl Archiver {
> /// Get the currently effective feature flags. (Requested flags masked by the file system
> /// feature flags).
> fn flags(&self) -> Flags {
> @@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> {
> }
> }
>
> - fn archive_dir_contents(
> - &mut self,
> - encoder: &mut Encoder,
> + fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
> + &'a mut self,
> + encoder: &'a mut Encoder<'b, T>,
> mut dir: Dir,
> is_root: bool,
> - ) -> Result<(), Error> {
> - let entry_counter = self.entry_counter;
> + ) -> BoxFuture<'a, Result<(), Error>> {
> + async move {
> + let entry_counter = self.entry_counter;
>
> - let old_patterns_count = self.patterns.len();
> - self.read_pxar_excludes(dir.as_raw_fd())?;
> + let old_patterns_count = self.patterns.len();
> + self.read_pxar_excludes(dir.as_raw_fd())?;
>
> - let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
> + let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
>
> - if is_root && old_patterns_count > 0 {
> - file_list.push(FileListEntry {
> - name: CString::new(".pxarexclude-cli").unwrap(),
> - path: PathBuf::new(),
> - stat: unsafe { std::mem::zeroed() },
> - });
> - }
> -
> - let dir_fd = dir.as_raw_fd();
> -
> - let old_path = std::mem::take(&mut self.path);
> -
> - for file_entry in file_list {
> - let file_name = file_entry.name.to_bytes();
> -
> - if is_root && file_name == b".pxarexclude-cli" {
> - self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
> - continue;
> + if is_root && old_patterns_count > 0 {
> + file_list.push(FileListEntry {
> + name: CString::new(".pxarexclude-cli").unwrap(),
> + path: PathBuf::new(),
> + stat: unsafe { std::mem::zeroed() },
> + });
> }
>
> - (self.callback)(&file_entry.path)?;
> - self.path = file_entry.path;
> - self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
> - .map_err(|err| self.wrap_err(err))?;
> - }
> - self.path = old_path;
> - self.entry_counter = entry_counter;
> - self.patterns.truncate(old_patterns_count);
> + let dir_fd = dir.as_raw_fd();
>
> - Ok(())
> + let old_path = std::mem::take(&mut self.path);
> +
> + for file_entry in file_list {
> + let file_name = file_entry.name.to_bytes();
> +
> + if is_root && file_name == b".pxarexclude-cli" {
> + self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
> + continue;
> + }
> +
> + (self.callback)(&file_entry.path)?;
> + self.path = file_entry.path;
> + self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
> + .map_err(|err| self.wrap_err(err))?;
> + }
> + self.path = old_path;
> + self.entry_counter = entry_counter;
> + self.patterns.truncate(old_patterns_count);
> +
> + Ok(())
> + }.boxed()
> }
>
> /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
> @@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
> Ok(())
> }
>
> - fn encode_pxarexclude_cli(
> + async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> file_name: &CStr,
> patterns_count: usize,
> ) -> Result<(), Error> {
> let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
> -
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_file(file_name, content.len() as u64, 0)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
> }
>
> let mut metadata = Metadata::default();
> metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
>
> - let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
> - file.write_all(&content)?;
> + let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
> + file.write_all(&content).await?;
>
> Ok(())
> }
> @@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
> Ok(())
> }
>
> - fn add_entry(
> + async fn add_entry<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> parent: RawFd,
> c_file_name: &CStr,
> stat: &FileStat,
> @@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
>
> if stat.st_nlink > 1 {
> if let Some((path, offset)) = self.hardlinks.get(&link_info) {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_hardlink(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_hardlink(c_file_name)?;
> }
>
> - encoder.add_hardlink(file_name, path, *offset)?;
> + encoder.add_hardlink(file_name, path, *offset).await?;
>
> return Ok(());
> }
> }
>
> let file_size = stat.st_size as u64;
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
> }
>
> let offset: LinkOffset =
> - self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
> + self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
>
> if stat.st_nlink > 1 {
> self.hardlinks.insert(link_info, (self.path.clone(), offset));
> @@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
> mode::IFDIR => {
> let dir = Dir::from_fd(fd.into_raw_fd())?;
>
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.start_directory(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(c_file_name)?;
> }
> - let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.end_directory()?;
> + let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> }
> result
> }
> mode::IFSOCK => {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_socket(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_socket(c_file_name)?;
> }
>
> - Ok(encoder.add_socket(&metadata, file_name)?)
> + Ok(encoder.add_socket(&metadata, file_name).await?)
> }
> mode::IFIFO => {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_fifo(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_fifo(c_file_name)?;
> }
>
> - Ok(encoder.add_fifo(&metadata, file_name)?)
> + Ok(encoder.add_fifo(&metadata, file_name).await?)
> }
> mode::IFLNK => {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_symlink(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_symlink(c_file_name)?;
> }
>
> - self.add_symlink(encoder, fd, file_name, &metadata)
> + self.add_symlink(encoder, fd, file_name, &metadata).await
> }
> mode::IFBLK => {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_block_device(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_block_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, &stat)
> + self.add_device(encoder, file_name, &metadata, &stat).await
> }
> mode::IFCHR => {
> - if let Some(ref mut catalog) = self.catalog {
> - catalog.add_char_device(c_file_name)?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().add_char_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, &stat)
> + self.add_device(encoder, file_name, &metadata, &stat).await
> }
> other => bail!(
> "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
> }
> }
>
> - fn add_directory(
> + async fn add_directory<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> dir: Dir,
> dir_name: &CStr,
> metadata: &Metadata,
> @@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
> ) -> Result<(), Error> {
> let dir_name = OsStr::from_bytes(dir_name.to_bytes());
>
> - let mut encoder = encoder.create_directory(dir_name, &metadata)?;
> + let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
>
> let old_fs_magic = self.fs_magic;
> let old_fs_feature_flags = self.fs_feature_flags;
> @@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
> writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
> Ok(())
> } else {
> - self.archive_dir_contents(&mut encoder, dir, false)
> + self.archive_dir_contents(&mut encoder, dir, false).await
> };
>
> self.fs_magic = old_fs_magic;
> self.fs_feature_flags = old_fs_feature_flags;
> self.current_st_dev = old_st_dev;
>
> - encoder.finish()?;
> + encoder.finish().await?;
> result
> }
>
> - fn add_regular_file(
> + async fn add_regular_file<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> fd: Fd,
> file_name: &Path,
> metadata: &Metadata,
> @@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
> ) -> Result<LinkOffset, Error> {
> let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
> let mut remaining = file_size;
> - let mut out = encoder.create_file(metadata, file_name, file_size)?;
> + let mut out = encoder.create_file(metadata, file_name, file_size).await?;
> while remaining != 0 {
> let mut got = match file.read(&mut self.file_copy_buffer[..]) {
> Ok(0) => break,
> @@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
> self.report_file_grew_while_reading()?;
> got = remaining as usize;
> }
> - out.write_all(&self.file_copy_buffer[..got])?;
> + out.write_all(&self.file_copy_buffer[..got]).await?;
> remaining -= got as u64;
> }
> if remaining > 0 {
> @@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
> vec::clear(&mut self.file_copy_buffer[..to_zero]);
> while remaining != 0 {
> let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
> - out.write_all(&self.file_copy_buffer[..fill])?;
> + out.write_all(&self.file_copy_buffer[..fill]).await?;
> remaining -= fill as u64;
> }
> }
> @@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
> Ok(out.file_offset())
> }
>
> - fn add_symlink(
> + async fn add_symlink<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> fd: Fd,
> file_name: &Path,
> metadata: &Metadata,
> ) -> Result<(), Error> {
> let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
> - encoder.add_symlink(metadata, file_name, dest)?;
> + encoder.add_symlink(metadata, file_name, dest).await?;
> Ok(())
> }
>
> - fn add_device(
> + async fn add_device<T: SeqWrite + Send>(
> &mut self,
> - encoder: &mut Encoder,
> + encoder: &mut Encoder<'_, T>,
> file_name: &Path,
> metadata: &Metadata,
> stat: &FileStat,
> @@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
> metadata,
> file_name,
> pxar::format::Device::from_dev_t(stat.st_rdev),
> - )?)
> + ).await?)
> }
> }
>
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 2d9dea71..550600c6 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
> ..PxarCreateOptions::default()
> };
>
> - create_archive(
> + let rt = tokio::runtime::Runtime::new().unwrap();
> + rt.block_on(create_archive(
> dir,
> writer,
> Flags::DEFAULT,
> |_| Ok(()),
> None,
> options,
> - )?;
> + ))?;
>
> Command::new("cmp")
> .arg("--verbose")
> --
> 2.20.1
More information about the pbs-devel
mailing list