[pbs-devel] applied: [PATCH proxmox-backup 2/2] asyncify pxar create_archive

Wolfgang Bumiller w.bumiller at proxmox.com
Wed Feb 17 10:02:41 CET 2021


applied

On Tue, Feb 09, 2021 at 01:03:48PM +0100, Stefan Reiter wrote:
> ...to take advantage of the aio::Encoder from the pxar create.
> 
> Rather straightforward conversion, but does require getting rid of
> references in the Archiver struct, and thus has to be given the Mutex
> for the catalog directly. The callback is boxed.
> 
> archive_dir_contents can call itself recursively, and thus needs to
> return a boxed future.
> 
> Users are adjusted, namely PxarBackupStream is converted to use an
> Abortable future instead of a thread so it supports async in its handler
> function, and the pxar bin create_archive is converted to an async API
> function. One test case is made to just use 'block_on'.
> 
> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
> ---
> 
> Requires updated pxar crate.
> 
> Long patch, but a lot of changes are just
> -call();
> +call().await;
> or using the catalog mutex.
> 
> Probably looks better with -w
> 
>  src/bin/pxar.rs                  |   6 +-
>  src/client/pxar_backup_stream.rs |  65 +++++-----
>  src/pxar/create.rs               | 207 ++++++++++++++++---------------
>  tests/catar.rs                   |   5 +-
>  4 files changed, 143 insertions(+), 140 deletions(-)
> 
> diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs
> index 814b3346..d830c570 100644
> --- a/src/bin/pxar.rs
> +++ b/src/bin/pxar.rs
> @@ -295,7 +295,7 @@ fn extract_archive(
>  )]
>  /// Create a new .pxar archive.
>  #[allow(clippy::too_many_arguments)]
> -fn create_archive(
> +async fn create_archive(
>      archive: String,
>      source: String,
>      verbose: bool,
> @@ -376,7 +376,7 @@ fn create_archive(
>          dir,
>          writer,
>          feature_flags,
> -        |path| {
> +        move |path| {
>              if verbose {
>                  println!("{:?}", path);
>              }
> @@ -384,7 +384,7 @@ fn create_archive(
>          },
>          None,
>          options,
> -    )?;
> +    ).await?;
>  
>      Ok(())
>  }
> diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs
> index 5fb28fd5..b57061a3 100644
> --- a/src/client/pxar_backup_stream.rs
> +++ b/src/client/pxar_backup_stream.rs
> @@ -4,10 +4,10 @@ use std::path::Path;
>  use std::pin::Pin;
>  use std::sync::{Arc, Mutex};
>  use std::task::{Context, Poll};
> -use std::thread;
>  
>  use anyhow::{format_err, Error};
>  use futures::stream::Stream;
> +use futures::future::{Abortable, AbortHandle};
>  use nix::dir::Dir;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::Mode;
> @@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
>  /// consumer.
>  pub struct PxarBackupStream {
>      rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
> -    child: Option<thread::JoinHandle<()>>,
> +    handle: Option<AbortHandle>,
>      error: Arc<Mutex<Option<String>>>,
>  }
>  
>  impl Drop for PxarBackupStream {
>      fn drop(&mut self) {
>          self.rx = None;
> -        self.child.take().unwrap().join().unwrap();
> +        self.handle.take().unwrap().abort();
>      }
>  }
>  
> @@ -43,42 +43,41 @@ impl PxarBackupStream {
>          let buffer_size = 256 * 1024;
>  
>          let error = Arc::new(Mutex::new(None));
> -        let child = std::thread::Builder::new()
> -            .name("PxarBackupStream".to_string())
> -            .spawn({
> -                let error = Arc::clone(&error);
> -                move || {
> -                    let mut catalog_guard = catalog.lock().unwrap();
> -                    let writer = std::io::BufWriter::with_capacity(
> -                        buffer_size,
> -                        crate::tools::StdChannelWriter::new(tx),
> -                    );
> +        let error2 = Arc::clone(&error);
> +        let handler = async move {
> +            let writer = std::io::BufWriter::with_capacity(
> +                buffer_size,
> +                crate::tools::StdChannelWriter::new(tx),
> +            );
>  
> -                    let verbose = options.verbose;
> +            let verbose = options.verbose;
>  
> -                    let writer = pxar::encoder::sync::StandardWriter::new(writer);
> -                    if let Err(err) = crate::pxar::create_archive(
> -                        dir,
> -                        writer,
> -                        crate::pxar::Flags::DEFAULT,
> -                        |path| {
> -                            if verbose {
> -                                println!("{:?}", path);
> -                            }
> -                            Ok(())
> -                        },
> -                        Some(&mut *catalog_guard),
> -                        options,
> -                    ) {
> -                        let mut error = error.lock().unwrap();
> -                        *error = Some(err.to_string());
> +            let writer = pxar::encoder::sync::StandardWriter::new(writer);
> +            if let Err(err) = crate::pxar::create_archive(
> +                dir,
> +                writer,
> +                crate::pxar::Flags::DEFAULT,
> +                move |path| {
> +                    if verbose {
> +                        println!("{:?}", path);
>                      }
> -                }
> -            })?;
> +                    Ok(())
> +                },
> +                Some(catalog),
> +                options,
> +            ).await {
> +                let mut error = error2.lock().unwrap();
> +                *error = Some(err.to_string());
> +            }
> +        };
> +
> +        let (handle, registration) = AbortHandle::new_pair();
> +        let future = Abortable::new(handler, registration);
> +        tokio::spawn(future);
>  
>          Ok(Self {
>              rx: Some(rx),
> -            child: Some(child),
> +            handle: Some(handle),
>              error,
>          })
>      }
> diff --git a/src/pxar/create.rs b/src/pxar/create.rs
> index 36de87da..6950b396 100644
> --- a/src/pxar/create.rs
> +++ b/src/pxar/create.rs
> @@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
>  use std::os::unix::ffi::OsStrExt;
>  use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
>  use std::path::{Path, PathBuf};
> +use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
>  use nix::dir::Dir;
>  use nix::errno::Errno;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::{FileStat, Mode};
> +use futures::future::BoxFuture;
> +use futures::FutureExt;
>  
>  use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
>  use pxar::Metadata;
> -use pxar::encoder::LinkOffset;
> +use pxar::encoder::{SeqWrite, LinkOffset};
>  
>  use proxmox::c_str;
>  use proxmox::sys::error::SysError;
> @@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
>      }
>  }
>  
> -struct Archiver<'a, 'b> {
> +struct Archiver {
>      feature_flags: Flags,
>      fs_feature_flags: Flags,
>      fs_magic: i64,
>      patterns: Vec<MatchEntry>,
> -    callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
> -    catalog: Option<&'b mut dyn BackupCatalogWriter>,
> +    callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
> +    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
>      path: PathBuf,
>      entry_counter: usize,
>      entry_limit: usize,
> @@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
>      file_copy_buffer: Vec<u8>,
>  }
>  
> -type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
> +type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
>  
> -pub fn create_archive<T, F>(
> +pub async fn create_archive<T, F>(
>      source_dir: Dir,
>      mut writer: T,
>      feature_flags: Flags,
> -    mut callback: F,
> -    catalog: Option<&mut dyn BackupCatalogWriter>,
> +    callback: F,
> +    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
>      options: PxarCreateOptions,
>  ) -> Result<(), Error>
>  where
> -    T: pxar::encoder::SeqWrite,
> -    F: FnMut(&Path) -> Result<(), Error>,
> +    T: SeqWrite + Send,
> +    F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
>  {
>      let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
>      if is_virtual_file_system(fs_magic) {
> @@ -182,8 +185,7 @@ where
>          set.insert(stat.st_dev);
>      }
>  
> -    let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
> -    let mut encoder = Encoder::new(writer, &metadata)?;
> +    let mut encoder = Encoder::new(&mut writer, &metadata).await?;
>  
>      let mut patterns = options.patterns;
>  
> @@ -199,7 +201,7 @@ where
>          feature_flags,
>          fs_feature_flags,
>          fs_magic,
> -        callback: &mut callback,
> +        callback: Box::new(callback),
>          patterns,
>          catalog,
>          path: PathBuf::new(),
> @@ -213,8 +215,8 @@ where
>          file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>      };
>  
> -    archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
> -    encoder.finish()?;
> +    archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
> +    encoder.finish().await?;
>      Ok(())
>  }
>  
> @@ -224,7 +226,7 @@ struct FileListEntry {
>      stat: FileStat,
>  }
>  
> -impl<'a, 'b> Archiver<'a, 'b> {
> +impl Archiver {
>      /// Get the currently effective feature flags. (Requested flags masked by the file system
>      /// feature flags).
>      fn flags(&self) -> Flags {
> @@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          }
>      }
>  
> -    fn archive_dir_contents(
> -        &mut self,
> -        encoder: &mut Encoder,
> +    fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
> +        &'a mut self,
> +        encoder: &'a mut Encoder<'b, T>,
>          mut dir: Dir,
>          is_root: bool,
> -    ) -> Result<(), Error> {
> -        let entry_counter = self.entry_counter;
> +    ) -> BoxFuture<'a, Result<(), Error>> {
> +        async move {
> +            let entry_counter = self.entry_counter;
>  
> -        let old_patterns_count = self.patterns.len();
> -        self.read_pxar_excludes(dir.as_raw_fd())?;
> +            let old_patterns_count = self.patterns.len();
> +            self.read_pxar_excludes(dir.as_raw_fd())?;
>  
> -        let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
> +            let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
>  
> -        if is_root && old_patterns_count > 0 {
> -            file_list.push(FileListEntry {
> -                name: CString::new(".pxarexclude-cli").unwrap(),
> -                path: PathBuf::new(),
> -                stat: unsafe { std::mem::zeroed() },
> -            });
> -        }
> -
> -        let dir_fd = dir.as_raw_fd();
> -
> -        let old_path = std::mem::take(&mut self.path);
> -
> -        for file_entry in file_list {
> -            let file_name = file_entry.name.to_bytes();
> -
> -            if is_root && file_name == b".pxarexclude-cli" {
> -                self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
> -                continue;
> +            if is_root && old_patterns_count > 0 {
> +                file_list.push(FileListEntry {
> +                    name: CString::new(".pxarexclude-cli").unwrap(),
> +                    path: PathBuf::new(),
> +                    stat: unsafe { std::mem::zeroed() },
> +                });
>              }
>  
> -            (self.callback)(&file_entry.path)?;
> -            self.path = file_entry.path;
> -            self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
> -                .map_err(|err| self.wrap_err(err))?;
> -        }
> -        self.path = old_path;
> -        self.entry_counter = entry_counter;
> -        self.patterns.truncate(old_patterns_count);
> +            let dir_fd = dir.as_raw_fd();
>  
> -        Ok(())
> +            let old_path = std::mem::take(&mut self.path);
> +
> +            for file_entry in file_list {
> +                let file_name = file_entry.name.to_bytes();
> +
> +                if is_root && file_name == b".pxarexclude-cli" {
> +                    self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
> +                    continue;
> +                }
> +
> +                (self.callback)(&file_entry.path)?;
> +                self.path = file_entry.path;
> +                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
> +                    .map_err(|err| self.wrap_err(err))?;
> +            }
> +            self.path = old_path;
> +            self.entry_counter = entry_counter;
> +            self.patterns.truncate(old_patterns_count);
> +
> +            Ok(())
> +        }.boxed()
>      }
>  
>      /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
> @@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(())
>      }
>  
> -    fn encode_pxarexclude_cli(
> +    async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          file_name: &CStr,
>          patterns_count: usize,
>      ) -> Result<(), Error> {
>          let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
> -
> -        if let Some(ref mut catalog) = self.catalog {
> -            catalog.add_file(file_name, content.len() as u64, 0)?;
> +        if let Some(ref catalog) = self.catalog {
> +            catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
>          }
>  
>          let mut metadata = Metadata::default();
>          metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
>  
> -        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
> -        file.write_all(&content)?;
> +        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
> +        file.write_all(&content).await?;
>  
>          Ok(())
>      }
> @@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(())
>      }
>  
> -    fn add_entry(
> +    async fn add_entry<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          parent: RawFd,
>          c_file_name: &CStr,
>          stat: &FileStat,
> @@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
>  
>                  if stat.st_nlink > 1 {
>                      if let Some((path, offset)) = self.hardlinks.get(&link_info) {
> -                        if let Some(ref mut catalog) = self.catalog {
> -                            catalog.add_hardlink(c_file_name)?;
> +                        if let Some(ref catalog) = self.catalog {
> +                            catalog.lock().unwrap().add_hardlink(c_file_name)?;
>                          }
>  
> -                        encoder.add_hardlink(file_name, path, *offset)?;
> +                        encoder.add_hardlink(file_name, path, *offset).await?;
>  
>                          return Ok(());
>                      }
>                  }
>  
>                  let file_size = stat.st_size as u64;
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
>                  }
>  
>                  let offset: LinkOffset =
> -                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
> +                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
>  
>                  if stat.st_nlink > 1 {
>                      self.hardlinks.insert(link_info, (self.path.clone(), offset));
> @@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              mode::IFDIR => {
>                  let dir = Dir::from_fd(fd.into_raw_fd())?;
>  
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.start_directory(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().start_directory(c_file_name)?;
>                  }
> -                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.end_directory()?;
> +                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().end_directory()?;
>                  }
>                  result
>              }
>              mode::IFSOCK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_socket(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_socket(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_socket(&metadata, file_name)?)
> +                Ok(encoder.add_socket(&metadata, file_name).await?)
>              }
>              mode::IFIFO => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_fifo(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_fifo(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_fifo(&metadata, file_name)?)
> +                Ok(encoder.add_fifo(&metadata, file_name).await?)
>              }
>              mode::IFLNK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_symlink(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_symlink(c_file_name)?;
>                  }
>  
> -                self.add_symlink(encoder, fd, file_name, &metadata)
> +                self.add_symlink(encoder, fd, file_name, &metadata).await
>              }
>              mode::IFBLK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_block_device(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_block_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, &stat)
> +                self.add_device(encoder, file_name, &metadata, &stat).await
>              }
>              mode::IFCHR => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_char_device(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_char_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, &stat)
> +                self.add_device(encoder, file_name, &metadata, &stat).await
>              }
>              other => bail!(
>                  "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          }
>      }
>  
> -    fn add_directory(
> +    async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          dir: Dir,
>          dir_name: &CStr,
>          metadata: &Metadata,
> @@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>      ) -> Result<(), Error> {
>          let dir_name = OsStr::from_bytes(dir_name.to_bytes());
>  
> -        let mut encoder = encoder.create_directory(dir_name, &metadata)?;
> +        let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
>  
>          let old_fs_magic = self.fs_magic;
>          let old_fs_feature_flags = self.fs_feature_flags;
> @@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
>              Ok(())
>          } else {
> -            self.archive_dir_contents(&mut encoder, dir, false)
> +            self.archive_dir_contents(&mut encoder, dir, false).await
>          };
>  
>          self.fs_magic = old_fs_magic;
>          self.fs_feature_flags = old_fs_feature_flags;
>          self.current_st_dev = old_st_dev;
>  
> -        encoder.finish()?;
> +        encoder.finish().await?;
>          result
>      }
>  
> -    fn add_regular_file(
> +    async fn add_regular_file<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          fd: Fd,
>          file_name: &Path,
>          metadata: &Metadata,
> @@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>      ) -> Result<LinkOffset, Error> {
>          let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
>          let mut remaining = file_size;
> -        let mut out = encoder.create_file(metadata, file_name, file_size)?;
> +        let mut out = encoder.create_file(metadata, file_name, file_size).await?;
>          while remaining != 0 {
>              let mut got = match file.read(&mut self.file_copy_buffer[..]) {
>                  Ok(0) => break,
> @@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>                  self.report_file_grew_while_reading()?;
>                  got = remaining as usize;
>              }
> -            out.write_all(&self.file_copy_buffer[..got])?;
> +            out.write_all(&self.file_copy_buffer[..got]).await?;
>              remaining -= got as u64;
>          }
>          if remaining > 0 {
> @@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              vec::clear(&mut self.file_copy_buffer[..to_zero]);
>              while remaining != 0 {
>                  let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
> -                out.write_all(&self.file_copy_buffer[..fill])?;
> +                out.write_all(&self.file_copy_buffer[..fill]).await?;
>                  remaining -= fill as u64;
>              }
>          }
> @@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(out.file_offset())
>      }
>  
> -    fn add_symlink(
> +    async fn add_symlink<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          fd: Fd,
>          file_name: &Path,
>          metadata: &Metadata,
>      ) -> Result<(), Error> {
>          let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
> -        encoder.add_symlink(metadata, file_name, dest)?;
> +        encoder.add_symlink(metadata, file_name, dest).await?;
>          Ok(())
>      }
>  
> -    fn add_device(
> +    async fn add_device<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          file_name: &Path,
>          metadata: &Metadata,
>          stat: &FileStat,
> @@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              metadata,
>              file_name,
>              pxar::format::Device::from_dev_t(stat.st_rdev),
> -        )?)
> +        ).await?)
>      }
>  }
>  
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 2d9dea71..550600c6 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
>          ..PxarCreateOptions::default()
>      };
>  
> -    create_archive(
> +    let rt = tokio::runtime::Runtime::new().unwrap();
> +    rt.block_on(create_archive(
>          dir,
>          writer,
>          Flags::DEFAULT,
>          |_| Ok(()),
>          None,
>          options,
> -    )?;
> +    ))?;
>  
>      Command::new("cmp")
>          .arg("--verbose")
> -- 
> 2.20.1





More information about the pbs-devel mailing list