[pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive

Stefan Reiter s.reiter at proxmox.com
Tue Feb 9 13:03:48 CET 2021


...to take advantage of the aio::Encoder from the pxar create.

Rather straightforward conversion, but does require getting rid of
references in the Archiver struct, and thus has to be given the Mutex
for the catalog directly. The callback is boxed.

archive_dir_contents can call itself recursively, and thus needs to
return a boxed future.

Users are adjusted, namely PxarBackupStream is converted to use an
Abortable future instead of a thread so it supports async in its handler
function, and the pxar bin create_archive is converted to an async API
function. One test case is made to just use 'block_on'.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---

Requires updated pxar crate.

Long patch, but a lot of changes are just
-call();
+call().await;
or using the catalog mutex.

Probably looks better with -w

 src/bin/pxar.rs                  |   6 +-
 src/client/pxar_backup_stream.rs |  65 +++++-----
 src/pxar/create.rs               | 207 ++++++++++++++++---------------
 tests/catar.rs                   |   5 +-
 4 files changed, 143 insertions(+), 140 deletions(-)

diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs
index 814b3346..d830c570 100644
--- a/src/bin/pxar.rs
+++ b/src/bin/pxar.rs
@@ -295,7 +295,7 @@ fn extract_archive(
 )]
 /// Create a new .pxar archive.
 #[allow(clippy::too_many_arguments)]
-fn create_archive(
+async fn create_archive(
     archive: String,
     source: String,
     verbose: bool,
@@ -376,7 +376,7 @@ fn create_archive(
         dir,
         writer,
         feature_flags,
-        |path| {
+        move |path| {
             if verbose {
                 println!("{:?}", path);
             }
@@ -384,7 +384,7 @@ fn create_archive(
         },
         None,
         options,
-    )?;
+    ).await?;
 
     Ok(())
 }
diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs
index 5fb28fd5..b57061a3 100644
--- a/src/client/pxar_backup_stream.rs
+++ b/src/client/pxar_backup_stream.rs
@@ -4,10 +4,10 @@ use std::path::Path;
 use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
-use std::thread;
 
 use anyhow::{format_err, Error};
 use futures::stream::Stream;
+use futures::future::{Abortable, AbortHandle};
 use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
@@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
 /// consumer.
 pub struct PxarBackupStream {
     rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
-    child: Option<thread::JoinHandle<()>>,
+    handle: Option<AbortHandle>,
     error: Arc<Mutex<Option<String>>>,
 }
 
 impl Drop for PxarBackupStream {
     fn drop(&mut self) {
         self.rx = None;
-        self.child.take().unwrap().join().unwrap();
+        self.handle.take().unwrap().abort();
     }
 }
 
@@ -43,42 +43,41 @@ impl PxarBackupStream {
         let buffer_size = 256 * 1024;
 
         let error = Arc::new(Mutex::new(None));
-        let child = std::thread::Builder::new()
-            .name("PxarBackupStream".to_string())
-            .spawn({
-                let error = Arc::clone(&error);
-                move || {
-                    let mut catalog_guard = catalog.lock().unwrap();
-                    let writer = std::io::BufWriter::with_capacity(
-                        buffer_size,
-                        crate::tools::StdChannelWriter::new(tx),
-                    );
+        let error2 = Arc::clone(&error);
+        let handler = async move {
+            let writer = std::io::BufWriter::with_capacity(
+                buffer_size,
+                crate::tools::StdChannelWriter::new(tx),
+            );
 
-                    let verbose = options.verbose;
+            let verbose = options.verbose;
 
-                    let writer = pxar::encoder::sync::StandardWriter::new(writer);
-                    if let Err(err) = crate::pxar::create_archive(
-                        dir,
-                        writer,
-                        crate::pxar::Flags::DEFAULT,
-                        |path| {
-                            if verbose {
-                                println!("{:?}", path);
-                            }
-                            Ok(())
-                        },
-                        Some(&mut *catalog_guard),
-                        options,
-                    ) {
-                        let mut error = error.lock().unwrap();
-                        *error = Some(err.to_string());
+            let writer = pxar::encoder::sync::StandardWriter::new(writer);
+            if let Err(err) = crate::pxar::create_archive(
+                dir,
+                writer,
+                crate::pxar::Flags::DEFAULT,
+                move |path| {
+                    if verbose {
+                        println!("{:?}", path);
                     }
-                }
-            })?;
+                    Ok(())
+                },
+                Some(catalog),
+                options,
+            ).await {
+                let mut error = error2.lock().unwrap();
+                *error = Some(err.to_string());
+            }
+        };
+
+        let (handle, registration) = AbortHandle::new_pair();
+        let future = Abortable::new(handler, registration);
+        tokio::spawn(future);
 
         Ok(Self {
             rx: Some(rx),
-            child: Some(child),
+            handle: Some(handle),
             error,
         })
     }
diff --git a/src/pxar/create.rs b/src/pxar/create.rs
index 36de87da..6950b396 100644
--- a/src/pxar/create.rs
+++ b/src/pxar/create.rs
@@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
 use std::os::unix::ffi::OsStrExt;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
 use nix::dir::Dir;
 use nix::errno::Errno;
 use nix::fcntl::OFlag;
 use nix::sys::stat::{FileStat, Mode};
+use futures::future::BoxFuture;
+use futures::FutureExt;
 
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use pxar::Metadata;
-use pxar::encoder::LinkOffset;
+use pxar::encoder::{SeqWrite, LinkOffset};
 
 use proxmox::c_str;
 use proxmox::sys::error::SysError;
@@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
     }
 }
 
-struct Archiver<'a, 'b> {
+struct Archiver {
     feature_flags: Flags,
     fs_feature_flags: Flags,
     fs_magic: i64,
     patterns: Vec<MatchEntry>,
-    callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
-    catalog: Option<&'b mut dyn BackupCatalogWriter>,
+    callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
+    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
     path: PathBuf,
     entry_counter: usize,
     entry_limit: usize,
@@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
     file_copy_buffer: Vec<u8>,
 }
 
-type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
+type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
 
-pub fn create_archive<T, F>(
+pub async fn create_archive<T, F>(
     source_dir: Dir,
     mut writer: T,
     feature_flags: Flags,
-    mut callback: F,
-    catalog: Option<&mut dyn BackupCatalogWriter>,
+    callback: F,
+    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
     options: PxarCreateOptions,
 ) -> Result<(), Error>
 where
-    T: pxar::encoder::SeqWrite,
-    F: FnMut(&Path) -> Result<(), Error>,
+    T: SeqWrite + Send,
+    F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
 {
     let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
     if is_virtual_file_system(fs_magic) {
@@ -182,8 +185,7 @@ where
         set.insert(stat.st_dev);
     }
 
-    let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
-    let mut encoder = Encoder::new(writer, &metadata)?;
+    let mut encoder = Encoder::new(&mut writer, &metadata).await?;
 
     let mut patterns = options.patterns;
 
@@ -199,7 +201,7 @@ where
         feature_flags,
         fs_feature_flags,
         fs_magic,
-        callback: &mut callback,
+        callback: Box::new(callback),
         patterns,
         catalog,
         path: PathBuf::new(),
@@ -213,8 +215,8 @@ where
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
     };
 
-    archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
-    encoder.finish()?;
+    archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
+    encoder.finish().await?;
     Ok(())
 }
 
@@ -224,7 +226,7 @@ struct FileListEntry {
     stat: FileStat,
 }
 
-impl<'a, 'b> Archiver<'a, 'b> {
+impl Archiver {
     /// Get the currently effective feature flags. (Requested flags masked by the file system
     /// feature flags).
     fn flags(&self) -> Flags {
@@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> {
         }
     }
 
-    fn archive_dir_contents(
-        &mut self,
-        encoder: &mut Encoder,
+    fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
+        &'a mut self,
+        encoder: &'a mut Encoder<'b, T>,
         mut dir: Dir,
         is_root: bool,
-    ) -> Result<(), Error> {
-        let entry_counter = self.entry_counter;
+    ) -> BoxFuture<'a, Result<(), Error>> {
+        async move {
+            let entry_counter = self.entry_counter;
 
-        let old_patterns_count = self.patterns.len();
-        self.read_pxar_excludes(dir.as_raw_fd())?;
+            let old_patterns_count = self.patterns.len();
+            self.read_pxar_excludes(dir.as_raw_fd())?;
 
-        let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
+            let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
 
-        if is_root && old_patterns_count > 0 {
-            file_list.push(FileListEntry {
-                name: CString::new(".pxarexclude-cli").unwrap(),
-                path: PathBuf::new(),
-                stat: unsafe { std::mem::zeroed() },
-            });
-        }
-
-        let dir_fd = dir.as_raw_fd();
-
-        let old_path = std::mem::take(&mut self.path);
-
-        for file_entry in file_list {
-            let file_name = file_entry.name.to_bytes();
-
-            if is_root && file_name == b".pxarexclude-cli" {
-                self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
-                continue;
+            if is_root && old_patterns_count > 0 {
+                file_list.push(FileListEntry {
+                    name: CString::new(".pxarexclude-cli").unwrap(),
+                    path: PathBuf::new(),
+                    stat: unsafe { std::mem::zeroed() },
+                });
             }
 
-            (self.callback)(&file_entry.path)?;
-            self.path = file_entry.path;
-            self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
-                .map_err(|err| self.wrap_err(err))?;
-        }
-        self.path = old_path;
-        self.entry_counter = entry_counter;
-        self.patterns.truncate(old_patterns_count);
+            let dir_fd = dir.as_raw_fd();
 
-        Ok(())
+            let old_path = std::mem::take(&mut self.path);
+
+            for file_entry in file_list {
+                let file_name = file_entry.name.to_bytes();
+
+                if is_root && file_name == b".pxarexclude-cli" {
+                    self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
+                    continue;
+                }
+
+                (self.callback)(&file_entry.path)?;
+                self.path = file_entry.path;
+                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
+                    .map_err(|err| self.wrap_err(err))?;
+            }
+            self.path = old_path;
+            self.entry_counter = entry_counter;
+            self.patterns.truncate(old_patterns_count);
+
+            Ok(())
+        }.boxed()
     }
 
     /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
@@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(())
     }
 
-    fn encode_pxarexclude_cli(
+    async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         file_name: &CStr,
         patterns_count: usize,
     ) -> Result<(), Error> {
         let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
-
-        if let Some(ref mut catalog) = self.catalog {
-            catalog.add_file(file_name, content.len() as u64, 0)?;
+        if let Some(ref catalog) = self.catalog {
+            catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
         }
 
         let mut metadata = Metadata::default();
         metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
 
-        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
-        file.write_all(&content)?;
+        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
+        file.write_all(&content).await?;
 
         Ok(())
     }
@@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(())
     }
 
-    fn add_entry(
+    async fn add_entry<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         parent: RawFd,
         c_file_name: &CStr,
         stat: &FileStat,
@@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
 
                 if stat.st_nlink > 1 {
                     if let Some((path, offset)) = self.hardlinks.get(&link_info) {
-                        if let Some(ref mut catalog) = self.catalog {
-                            catalog.add_hardlink(c_file_name)?;
+                        if let Some(ref catalog) = self.catalog {
+                            catalog.lock().unwrap().add_hardlink(c_file_name)?;
                         }
 
-                        encoder.add_hardlink(file_name, path, *offset)?;
+                        encoder.add_hardlink(file_name, path, *offset).await?;
 
                         return Ok(());
                     }
                 }
 
                 let file_size = stat.st_size as u64;
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
                 }
 
                 let offset: LinkOffset =
-                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
+                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
 
                 if stat.st_nlink > 1 {
                     self.hardlinks.insert(link_info, (self.path.clone(), offset));
@@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
             mode::IFDIR => {
                 let dir = Dir::from_fd(fd.into_raw_fd())?;
 
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.start_directory(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().start_directory(c_file_name)?;
                 }
-                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.end_directory()?;
+                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().end_directory()?;
                 }
                 result
             }
             mode::IFSOCK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_socket(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_socket(c_file_name)?;
                 }
 
-                Ok(encoder.add_socket(&metadata, file_name)?)
+                Ok(encoder.add_socket(&metadata, file_name).await?)
             }
             mode::IFIFO => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_fifo(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_fifo(c_file_name)?;
                 }
 
-                Ok(encoder.add_fifo(&metadata, file_name)?)
+                Ok(encoder.add_fifo(&metadata, file_name).await?)
             }
             mode::IFLNK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_symlink(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_symlink(c_file_name)?;
                 }
 
-                self.add_symlink(encoder, fd, file_name, &metadata)
+                self.add_symlink(encoder, fd, file_name, &metadata).await
             }
             mode::IFBLK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_block_device(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_block_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, &stat)
+                self.add_device(encoder, file_name, &metadata, &stat).await
             }
             mode::IFCHR => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_char_device(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_char_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, &stat)
+                self.add_device(encoder, file_name, &metadata, &stat).await
             }
             other => bail!(
                 "encountered unknown file type: 0x{:x} (0o{:o})",
@@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
         }
     }
 
-    fn add_directory(
+    async fn add_directory<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         dir: Dir,
         dir_name: &CStr,
         metadata: &Metadata,
@@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
     ) -> Result<(), Error> {
         let dir_name = OsStr::from_bytes(dir_name.to_bytes());
 
-        let mut encoder = encoder.create_directory(dir_name, &metadata)?;
+        let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
 
         let old_fs_magic = self.fs_magic;
         let old_fs_feature_flags = self.fs_feature_flags;
@@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
             writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
             Ok(())
         } else {
-            self.archive_dir_contents(&mut encoder, dir, false)
+            self.archive_dir_contents(&mut encoder, dir, false).await
         };
 
         self.fs_magic = old_fs_magic;
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish()?;
+        encoder.finish().await?;
         result
     }
 
-    fn add_regular_file(
+    async fn add_regular_file<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         fd: Fd,
         file_name: &Path,
         metadata: &Metadata,
@@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
     ) -> Result<LinkOffset, Error> {
         let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
         let mut remaining = file_size;
-        let mut out = encoder.create_file(metadata, file_name, file_size)?;
+        let mut out = encoder.create_file(metadata, file_name, file_size).await?;
         while remaining != 0 {
             let mut got = match file.read(&mut self.file_copy_buffer[..]) {
                 Ok(0) => break,
@@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
                 self.report_file_grew_while_reading()?;
                 got = remaining as usize;
             }
-            out.write_all(&self.file_copy_buffer[..got])?;
+            out.write_all(&self.file_copy_buffer[..got]).await?;
             remaining -= got as u64;
         }
         if remaining > 0 {
@@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
             vec::clear(&mut self.file_copy_buffer[..to_zero]);
             while remaining != 0 {
                 let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
-                out.write_all(&self.file_copy_buffer[..fill])?;
+                out.write_all(&self.file_copy_buffer[..fill]).await?;
                 remaining -= fill as u64;
             }
         }
@@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(out.file_offset())
     }
 
-    fn add_symlink(
+    async fn add_symlink<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         fd: Fd,
         file_name: &Path,
         metadata: &Metadata,
     ) -> Result<(), Error> {
         let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
-        encoder.add_symlink(metadata, file_name, dest)?;
+        encoder.add_symlink(metadata, file_name, dest).await?;
         Ok(())
     }
 
-    fn add_device(
+    async fn add_device<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         file_name: &Path,
         metadata: &Metadata,
         stat: &FileStat,
@@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
             metadata,
             file_name,
             pxar::format::Device::from_dev_t(stat.st_rdev),
-        )?)
+        ).await?)
     }
 }
 
diff --git a/tests/catar.rs b/tests/catar.rs
index 2d9dea71..550600c6 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         ..PxarCreateOptions::default()
     };
 
-    create_archive(
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    rt.block_on(create_archive(
         dir,
         writer,
         Flags::DEFAULT,
         |_| Ok(()),
         None,
         options,
-    )?;
+    ))?;
 
     Command::new("cmp")
         .arg("--verbose")
-- 
2.20.1






More information about the pbs-devel mailing list