[pbs-devel] [PATCH proxmox-backup 08/22] pxar/extract: add sequential variants to create_zip, extract_sub_dir

Stefan Reiter s.reiter at proxmox.com
Tue Feb 16 18:06:56 CET 2021


For streaming pxar files directly from a restore source and extracting
them on the fly, we cannot create an Accessor, and instead have to live
with a sequential Decoder. This supports only the aio::Decoder variant,
since the functions are async anyway.

The original functionality remains in place, the new functions are
labelled with a _seq suffix. The recursive functions actually doing the
work are changed to take an EitherEntry enum variant that can contain
either an Accessor (recursive operation) or a Decoder (linear
operation).

If the seq_ variants are given an encoder where the current position
points to a file, they will only extract/encode this file, if it's a
directory, they will extract until they leave the directory they started
in.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
 src/pxar/extract.rs | 388 ++++++++++++++++++++++++++++++++------------
 src/pxar/mod.rs     |   5 +-
 2 files changed, 292 insertions(+), 101 deletions(-)

diff --git a/src/pxar/extract.rs b/src/pxar/extract.rs
index b673b4b8..66a5ed59 100644
--- a/src/pxar/extract.rs
+++ b/src/pxar/extract.rs
@@ -17,8 +17,9 @@ use nix::sys::stat::Mode;
 
 use pathpatterns::{MatchEntry, MatchList, MatchType};
 use pxar::format::Device;
-use pxar::Metadata;
+use pxar::{Entry, Metadata, EntryKind};
 use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
+use pxar::decoder::aio::Decoder;
 
 use proxmox::c_result;
 use proxmox::tools::fs::{create_path, CreateOptions};
@@ -90,8 +91,6 @@ where
     let mut err_path_stack = vec![OsString::from("/")];
     let mut current_match = options.extract_match_default;
     while let Some(entry) = decoder.next() {
-        use pxar::EntryKind;
-
         let entry = entry.map_err(|err| format_err!("error reading pxar archive: {}", err))?;
 
         let file_name_os = entry.file_name();
@@ -471,9 +470,23 @@ impl Extractor {
     }
 }
 
+enum EitherEntry<
+    'a,
+    S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+    T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
+> {
+    Entry(Entry, &'a mut Decoder<S>),
+    FileEntry(FileEntry<T>, &'a mut Accessor<T>),
+}
+
+// These types are never constructed, but we need some concrete type fulfilling S and T from
+// EitherEntry so rust is happy with its use in async fns
+type BogusSeqRead = pxar::decoder::sync::StandardReader<std::io::Empty>;
+type BogusReadAt = pxar::accessor::sync::FileRefReader<Arc<std::fs::File>>;
+
 pub async fn create_zip<T, W, P>(
     output: W,
-    decoder: Accessor<T>,
+    mut decoder: Accessor<T>,
     path: P,
     verbose: bool,
 ) -> Result<(), Error>
@@ -484,96 +497,174 @@ where
 {
     let root = decoder.open_root().await?;
     let file = root
-        .lookup(&path).await?
-        .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
+        .lookup(&path)
+        .await?
+        .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?;
 
     let mut prefix = PathBuf::new();
     let mut components = file.entry().path().components();
-    components.next_back(); // discar last
+    components.next_back(); // discard last
     for comp in components {
         prefix.push(comp);
     }
 
     let mut zipencoder = ZipEncoder::new(output);
-    let mut decoder = decoder;
-    recurse_files_zip(&mut zipencoder, &mut decoder, &prefix, file, verbose)
+    let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(file, &mut decoder);
+    add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose)
         .await
         .map_err(|err| {
             eprintln!("error during creating of zip: {}", err);
             err
         })?;
 
-    zipencoder
-        .finish()
-        .await
-        .map_err(|err| {
-            eprintln!("error during finishing of zip: {}", err);
-            err
-        })
+    zipencoder.finish().await.map_err(|err| {
+        eprintln!("error during finishing of zip: {}", err);
+        err
+    })
 }
 
-fn recurse_files_zip<'a, T, W>(
+pub async fn create_zip_seq<S, W>(
+    output: W,
+    mut decoder: Decoder<S>,
+    verbose: bool,
+) -> Result<(), Error>
+where
+    S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+    W: tokio::io::AsyncWrite + Unpin + Send + 'static,
+{
+    decoder.enable_goodbye_entries(true);
+    let root = match decoder.peek().await {
+        Some(Ok(root)) => root,
+        Some(Err(err)) => bail!("error getting root entry from pxar: {}", err),
+        None => bail!("cannot extract empty archive"),
+    };
+
+    let mut prefix = PathBuf::new();
+    let mut components = root.path().components();
+    components.next_back(); // discard last
+    for comp in components {
+        prefix.push(comp);
+    }
+
+    let mut zipencoder = ZipEncoder::new(output);
+
+    let root_is_file = matches!(root.kind(), EntryKind::File { .. });
+    let mut dir_level = 0;
+
+    while let Some(file) = decoder.next().await {
+        match file {
+            Ok(file) => {
+                match file.kind() {
+                    EntryKind::Directory => dir_level += 1,
+                    EntryKind::GoodbyeTable => {
+                        dir_level -= 1;
+                        // only extract until we leave the directory we started in
+                        if dir_level == 0 {
+                            break;
+                        }
+                    }
+                    _ => {}
+                }
+
+                let entry: EitherEntry<S, BogusReadAt> = EitherEntry::Entry(file, &mut decoder);
+                add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose)
+                    .await
+                    .map_err(|err| {
+                        eprintln!("error during creating of zip: {}", err);
+                        err
+                    })?;
+
+                if root_is_file {
+                    break;
+                }
+            }
+            Err(err) => bail!("error in decoder: {}", err),
+        }
+    }
+
+    zipencoder.finish().await.map_err(|err| {
+        eprintln!("error during finishing of zip: {}", err);
+        err
+    })
+}
+
+fn add_entry_to_zip<'a, S, T, W>(
     zip: &'a mut ZipEncoder<W>,
-    decoder: &'a mut Accessor<T>,
+    file: EitherEntry<'a, S, T>,
     prefix: &'a Path,
-    file: FileEntry<T>,
     verbose: bool,
 ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
 where
+    S: pxar::decoder::SeqRead + Unpin + Send + 'static,
     T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
     W: tokio::io::AsyncWrite + Unpin + Send + 'static,
 {
-    use pxar::EntryKind;
     Box::pin(async move {
-        let metadata = file.entry().metadata();
-        let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
+        let (metadata, path, kind) = match file {
+            EitherEntry::Entry(ref e, _) => (e.metadata(), e.path(), e.kind()),
+            EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.path(), fe.kind()),
+        };
 
-        match file.kind() {
+        if verbose && !matches!(kind, EntryKind::GoodbyeTable) {
+            eprintln!("adding '{}' to zip", path.display());
+        }
+
+        match kind {
             EntryKind::File { .. } => {
-                if verbose {
-                    eprintln!("adding '{}' to zip", path.display());
-                }
                 let entry = ZipEntry::new(
                     path,
                     metadata.stat.mtime.secs,
                     metadata.stat.mode as u16,
                     true,
                 );
-                zip.add_entry(entry, Some(file.contents().await?))
-                   .await
-                   .map_err(|err| format_err!("could not send file entry: {}", err))?;
+                let contents = match file {
+                    EitherEntry::Entry(_, dec) => Box::new(match dec.contents() {
+                        Some(con) => con,
+                        None => bail!("file without contents found"),
+                    })
+                        as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+                    EitherEntry::FileEntry(ref fe, _) => Box::new(
+                        fe.contents()
+                            .await
+                            .map_err(|err| format_err!("file with bad contents found: {}", err))?,
+                    )
+                        as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+                };
+                zip.add_entry(entry, Some(contents))
+                    .await
+                    .map_err(|err| format_err!("could not send file entry: {}", err))?;
             }
             EntryKind::Hardlink(_) => {
-                let realfile = decoder.follow_hardlink(&file).await?;
-                if verbose {
-                    eprintln!("adding '{}' to zip", path.display());
+                // we can't extract hardlinks in sequential extraction
+                if let EitherEntry::FileEntry(ref fe, ref accessor) = file {
+                    let realfile = accessor.follow_hardlink(&fe).await?;
+                    let entry = ZipEntry::new(
+                        path,
+                        metadata.stat.mtime.secs,
+                        metadata.stat.mode as u16,
+                        true,
+                    );
+                    zip.add_entry(entry, Some(realfile.contents().await?))
+                        .await
+                        .map_err(|err| format_err!("could not send file entry: {}", err))?;
                 }
-                let entry = ZipEntry::new(
-                    path,
-                    metadata.stat.mtime.secs,
-                    metadata.stat.mode as u16,
-                    true,
-                );
-                zip.add_entry(entry, Some(realfile.contents().await?))
-                   .await
-                   .map_err(|err| format_err!("could not send file entry: {}", err))?;
             }
             EntryKind::Directory => {
-                let dir = file.enter_directory().await?;
-                let mut readdir = dir.read_dir();
-                if verbose {
-                    eprintln!("adding '{}' to zip", path.display());
-                }
                 let entry = ZipEntry::new(
                     path,
                     metadata.stat.mtime.secs,
                     metadata.stat.mode as u16,
                     false,
                 );
-                zip.add_entry::<FileContents<T>>(entry, None).await?;
-                while let Some(entry) = readdir.next().await {
-                    let entry = entry?.decode_entry().await?;
-                    recurse_files_zip(zip, decoder, prefix, entry, verbose).await?;
+                if let EitherEntry::FileEntry(fe, a) = file {
+                    let dir = fe.enter_directory().await?;
+                    let mut readdir = dir.read_dir();
+                    zip.add_entry::<FileContents<T>>(entry, None).await?;
+                    while let Some(entry) = readdir.next().await {
+                        let entry = entry?.decode_entry().await?;
+                        let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(entry, a);
+                        add_entry_to_zip(zip, entry, prefix, verbose).await?;
+                    }
                 }
             }
             _ => {} // ignore all else
@@ -583,6 +674,43 @@ where
     })
 }
 
+fn get_extractor<DEST>(destination: DEST, metadata: Metadata) -> Result<Extractor, Error>
+where
+    DEST: AsRef<Path>
+{
+    create_path(
+        &destination,
+        None,
+        Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))),
+    )
+    .map_err(|err| {
+        format_err!(
+            "error creating directory {:?}: {}",
+            destination.as_ref(),
+            err
+        )
+    })?;
+
+    let dir = Dir::open(
+        destination.as_ref(),
+        OFlag::O_DIRECTORY | OFlag::O_CLOEXEC,
+        Mode::empty(),
+    )
+    .map_err(|err| {
+        format_err!(
+            "unable to open target directory {:?}: {}",
+            destination.as_ref(),
+            err,
+        )
+    })?;
+
+    Ok(Extractor::new(
+        dir,
+        metadata,
+        false,
+        Flags::DEFAULT,
+    ))
+}
 
 pub async fn extract_sub_dir<T, DEST, PATH>(
     destination: DEST,
@@ -597,47 +725,86 @@ where
 {
     let root = decoder.open_root().await?;
 
-    create_path(
-        &destination,
-        None,
-        Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))),
-    )
-    .map_err(|err| format_err!("error creating directory {:?}: {}", destination.as_ref(), err))?;
-
-    let dir = Dir::open(
-        destination.as_ref(),
-        OFlag::O_DIRECTORY | OFlag::O_CLOEXEC,
-        Mode::empty(),
-    )
-    .map_err(|err| format_err!("unable to open target directory {:?}: {}", destination.as_ref(), err,))?;
-
-    let mut extractor =  Extractor::new(
-        dir,
+    let mut extractor = get_extractor(
+        destination,
         root.lookup_self().await?.entry().metadata().clone(),
-        false,
-        Flags::DEFAULT,
-    );
+    )?;
 
     let file = root
-        .lookup(&path).await?
-        .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
+        .lookup(&path)
+        .await?
+        .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?;
 
-    recurse_files_extractor(&mut extractor, &mut decoder, file, verbose).await
+    let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(file, &mut decoder);
+    do_extract_sub_dir(&mut extractor, entry, verbose).await
 }
 
-fn recurse_files_extractor<'a, T>(
+pub async fn extract_sub_dir_seq<S, DEST>(
+    destination: DEST,
+    mut decoder: Decoder<S>,
+    verbose: bool,
+) -> Result<(), Error>
+where
+    S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+    DEST: AsRef<Path>,
+{
+    decoder.enable_goodbye_entries(true);
+    let root = match decoder.peek().await {
+        Some(Ok(root)) => root,
+        Some(Err(err)) => bail!("error getting root entry from pxar: {}", err),
+        None => bail!("cannot extract empty archive"),
+    };
+
+    let mut extractor = get_extractor(destination, root.metadata().clone())?;
+    let root_is_file = matches!(root.kind(), EntryKind::File { .. });
+    let mut dir_level = 0;
+
+    while let Some(file) = decoder.next().await {
+        match file {
+            Ok(file) => {
+                match file.kind() {
+                    EntryKind::Directory => dir_level += 1,
+                    EntryKind::GoodbyeTable => {
+                        dir_level -= 1;
+                        // only extract until we leave the directory we started in
+                        if dir_level == 0 {
+                            break;
+                        }
+                    },
+                    _ => {}
+                }
+
+                let path = file.path().to_owned();
+                let entry: EitherEntry<S, BogusReadAt> = EitherEntry::Entry(file, &mut decoder);
+                if let Err(err) = do_extract_sub_dir(&mut extractor, entry, verbose).await {
+                    eprintln!("error extracting {}: {}", path.display(), err);
+                }
+
+                if root_is_file {
+                    break;
+                }
+            }
+            Err(err) => bail!("error in decoder: {}", err),
+        }
+    }
+
+    Ok(())
+}
+
+fn do_extract_sub_dir<'a, S, T>(
     extractor: &'a mut Extractor,
-    decoder: &'a mut Accessor<T>,
-    file: FileEntry<T>,
+    file: EitherEntry<'a, S, T>,
     verbose: bool,
 ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
 where
+    S: pxar::decoder::SeqRead + Unpin + Send,
     T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
 {
-    use pxar::EntryKind;
     Box::pin(async move {
-        let metadata = file.entry().metadata();
-        let file_name_os = file.file_name();
+        let (metadata, file_name_os, path, kind) = match file {
+            EitherEntry::Entry(ref e, _) => (e.metadata(), e.file_name(), e.path(), e.kind()),
+            EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.file_name(), fe.path(), fe.kind()),
+        };
 
         // safety check: a file entry in an archive must never contain slashes:
         if file_name_os.as_bytes().contains(&b'/') {
@@ -647,28 +814,32 @@ where
         let file_name = CString::new(file_name_os.as_bytes())
             .map_err(|_| format_err!("encountered file name with null-bytes"))?;
 
-        if verbose {
-            eprintln!("extracting: {}", file.path().display());
+        if verbose && !matches!(kind, EntryKind::GoodbyeTable) {
+            eprintln!("extracting: {}", path.display());
         }
 
-        match file.kind() {
+        match kind {
             EntryKind::Directory => {
                 extractor
                     .enter_directory(file_name_os.to_owned(), metadata.clone(), true)
                     .map_err(|err| format_err!("error at entry {:?}: {}", file_name_os, err))?;
 
-                let dir = file.enter_directory().await?;
-                let mut readdir = dir.read_dir();
-                while let Some(entry) = readdir.next().await {
-                    let entry = entry?.decode_entry().await?;
-                    let filename = entry.path().to_path_buf();
+                // for EitherEntry::Entry we detect directory end with GoodbyeTable
+                if let EitherEntry::FileEntry(file, a) = file {
+                    let dir = file.enter_directory().await?;
+                    let mut readdir = dir.read_dir();
+                    while let Some(entry) = readdir.next().await {
+                        let entry = entry?.decode_entry().await?;
+                        let filename = entry.path().to_path_buf();
 
-                    // log errors and continue
-                    if let Err(err) = recurse_files_extractor(extractor, decoder, entry, verbose).await {
-                        eprintln!("error extracting {:?}: {}", filename.display(), err);
+                        // log errors and continue
+                        let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(entry, a);
+                        if let Err(err) = do_extract_sub_dir(extractor, entry, verbose).await {
+                            eprintln!("error extracting {}: {}", filename.display(), err);
+                        }
                     }
+                    extractor.leave_directory()?;
                 }
-                extractor.leave_directory()?;
             }
             EntryKind::Symlink(link) => {
                 extractor.extract_symlink(&file_name, metadata, link.as_ref())?;
@@ -691,17 +862,34 @@ where
                     extractor.extract_special(&file_name, metadata, 0)?;
                 }
             }
-            EntryKind::File { size, .. } => extractor.async_extract_file(
-                &file_name,
-                metadata,
-                *size,
-                &mut file.contents().await.map_err(|_| {
-                    format_err!("found regular file entry without contents in archive")
-                })?,
-            ).await?,
-            EntryKind::GoodbyeTable => {}, // ignore
+            EntryKind::File { size, .. } => {
+                extractor
+                    .async_extract_file(
+                        &file_name,
+                        metadata,
+                        *size,
+                        &mut match file {
+                            EitherEntry::Entry(_, dec) => Box::new(match dec.contents() {
+                                Some(con) => con,
+                                None => bail!("file without contents found"),
+                            })
+                                as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+                            EitherEntry::FileEntry(ref fe, _) => {
+                                Box::new(fe.contents().await.map_err(|err| {
+                                    format_err!("file with bad contents found: {}", err)
+                                })?)
+                                    as Box<dyn tokio::io::AsyncRead + Unpin + Send>
+                            }
+                        },
+                    )
+                    .await?
+            }
+            EntryKind::GoodbyeTable => {
+                if let EitherEntry::Entry(_, _) = file {
+                    extractor.leave_directory()?;
+                }
+            }
         }
         Ok(())
     })
 }
-
diff --git a/src/pxar/mod.rs b/src/pxar/mod.rs
index d1302962..d5c42942 100644
--- a/src/pxar/mod.rs
+++ b/src/pxar/mod.rs
@@ -59,7 +59,10 @@ mod flags;
 pub use flags::Flags;
 
 pub use create::{create_archive, PxarCreateOptions};
-pub use extract::{create_zip, extract_archive, extract_sub_dir, ErrorHandler, PxarExtractOptions};
+pub use extract::{
+    create_zip, create_zip_seq, extract_archive, extract_sub_dir, extract_sub_dir_seq,
+    ErrorHandler, PxarExtractOptions,
+};
 
 /// The format requires to build sorted directory lookup tables in
 /// memory, so we restrict the number of allowed entries to limit
-- 
2.20.1






More information about the pbs-devel mailing list