[pbs-devel] [PATCH proxmox-backup 08/22] pxar/extract: add sequential variants to create_zip, extract_sub_dir
Stefan Reiter
s.reiter at proxmox.com
Tue Feb 16 18:06:56 CET 2021
For streaming pxar files directly from a restore source and extracting
them on the fly, we cannot create an Accessor, and instead have to live
with a sequential Decoder. This supports only the aio::Decoder variant,
since the functions are async anyway.
The original functionality remains in place, the new functions are
labelled with a _seq suffix. The recursive functions actually doing the
work are changed to take an EitherEntry enum variant that can contain
either an Accessor (recursive operation) or a Decoder (linear
operation).
If the seq_ variants are given an encoder where the current position
points to a file, they will only extract/encode this file, if it's a
directory, they will extract until they leave the directory they started
in.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
src/pxar/extract.rs | 388 ++++++++++++++++++++++++++++++++------------
src/pxar/mod.rs | 5 +-
2 files changed, 292 insertions(+), 101 deletions(-)
diff --git a/src/pxar/extract.rs b/src/pxar/extract.rs
index b673b4b8..66a5ed59 100644
--- a/src/pxar/extract.rs
+++ b/src/pxar/extract.rs
@@ -17,8 +17,9 @@ use nix::sys::stat::Mode;
use pathpatterns::{MatchEntry, MatchList, MatchType};
use pxar::format::Device;
-use pxar::Metadata;
+use pxar::{Entry, Metadata, EntryKind};
use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
+use pxar::decoder::aio::Decoder;
use proxmox::c_result;
use proxmox::tools::fs::{create_path, CreateOptions};
@@ -90,8 +91,6 @@ where
let mut err_path_stack = vec![OsString::from("/")];
let mut current_match = options.extract_match_default;
while let Some(entry) = decoder.next() {
- use pxar::EntryKind;
-
let entry = entry.map_err(|err| format_err!("error reading pxar archive: {}", err))?;
let file_name_os = entry.file_name();
@@ -471,9 +470,23 @@ impl Extractor {
}
}
+enum EitherEntry<
+ 'a,
+ S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+ T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
+> {
+ Entry(Entry, &'a mut Decoder<S>),
+ FileEntry(FileEntry<T>, &'a mut Accessor<T>),
+}
+
+// These types are never constructed, but we need some concrete type fulfilling S and T from
+// EitherEntry so rust is happy with its use in async fns
+type BogusSeqRead = pxar::decoder::sync::StandardReader<std::io::Empty>;
+type BogusReadAt = pxar::accessor::sync::FileRefReader<Arc<std::fs::File>>;
+
pub async fn create_zip<T, W, P>(
output: W,
- decoder: Accessor<T>,
+ mut decoder: Accessor<T>,
path: P,
verbose: bool,
) -> Result<(), Error>
@@ -484,96 +497,174 @@ where
{
let root = decoder.open_root().await?;
let file = root
- .lookup(&path).await?
- .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
+ .lookup(&path)
+ .await?
+ .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?;
let mut prefix = PathBuf::new();
let mut components = file.entry().path().components();
- components.next_back(); // discar last
+ components.next_back(); // discard last
for comp in components {
prefix.push(comp);
}
let mut zipencoder = ZipEncoder::new(output);
- let mut decoder = decoder;
- recurse_files_zip(&mut zipencoder, &mut decoder, &prefix, file, verbose)
+ let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(file, &mut decoder);
+ add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose)
.await
.map_err(|err| {
eprintln!("error during creating of zip: {}", err);
err
})?;
- zipencoder
- .finish()
- .await
- .map_err(|err| {
- eprintln!("error during finishing of zip: {}", err);
- err
- })
+ zipencoder.finish().await.map_err(|err| {
+ eprintln!("error during finishing of zip: {}", err);
+ err
+ })
}
-fn recurse_files_zip<'a, T, W>(
+pub async fn create_zip_seq<S, W>(
+ output: W,
+ mut decoder: Decoder<S>,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+ W: tokio::io::AsyncWrite + Unpin + Send + 'static,
+{
+ decoder.enable_goodbye_entries(true);
+ let root = match decoder.peek().await {
+ Some(Ok(root)) => root,
+ Some(Err(err)) => bail!("error getting root entry from pxar: {}", err),
+ None => bail!("cannot extract empty archive"),
+ };
+
+ let mut prefix = PathBuf::new();
+ let mut components = root.path().components();
+ components.next_back(); // discard last
+ for comp in components {
+ prefix.push(comp);
+ }
+
+ let mut zipencoder = ZipEncoder::new(output);
+
+ let root_is_file = matches!(root.kind(), EntryKind::File { .. });
+ let mut dir_level = 0;
+
+ while let Some(file) = decoder.next().await {
+ match file {
+ Ok(file) => {
+ match file.kind() {
+ EntryKind::Directory => dir_level += 1,
+ EntryKind::GoodbyeTable => {
+ dir_level -= 1;
+ // only extract until we leave the directory we started in
+ if dir_level == 0 {
+ break;
+ }
+ }
+ _ => {}
+ }
+
+ let entry: EitherEntry<S, BogusReadAt> = EitherEntry::Entry(file, &mut decoder);
+ add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose)
+ .await
+ .map_err(|err| {
+ eprintln!("error during creating of zip: {}", err);
+ err
+ })?;
+
+ if root_is_file {
+ break;
+ }
+ }
+ Err(err) => bail!("error in decoder: {}", err),
+ }
+ }
+
+ zipencoder.finish().await.map_err(|err| {
+ eprintln!("error during finishing of zip: {}", err);
+ err
+ })
+}
+
+fn add_entry_to_zip<'a, S, T, W>(
zip: &'a mut ZipEncoder<W>,
- decoder: &'a mut Accessor<T>,
+ file: EitherEntry<'a, S, T>,
prefix: &'a Path,
- file: FileEntry<T>,
verbose: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
where
+ S: pxar::decoder::SeqRead + Unpin + Send + 'static,
T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
W: tokio::io::AsyncWrite + Unpin + Send + 'static,
{
- use pxar::EntryKind;
Box::pin(async move {
- let metadata = file.entry().metadata();
- let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
+ let (metadata, path, kind) = match file {
+ EitherEntry::Entry(ref e, _) => (e.metadata(), e.path(), e.kind()),
+ EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.path(), fe.kind()),
+ };
- match file.kind() {
+ if verbose && !matches!(kind, EntryKind::GoodbyeTable) {
+ eprintln!("adding '{}' to zip", path.display());
+ }
+
+ match kind {
EntryKind::File { .. } => {
- if verbose {
- eprintln!("adding '{}' to zip", path.display());
- }
let entry = ZipEntry::new(
path,
metadata.stat.mtime.secs,
metadata.stat.mode as u16,
true,
);
- zip.add_entry(entry, Some(file.contents().await?))
- .await
- .map_err(|err| format_err!("could not send file entry: {}", err))?;
+ let contents = match file {
+ EitherEntry::Entry(_, dec) => Box::new(match dec.contents() {
+ Some(con) => con,
+ None => bail!("file without contents found"),
+ })
+ as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+ EitherEntry::FileEntry(ref fe, _) => Box::new(
+ fe.contents()
+ .await
+ .map_err(|err| format_err!("file with bad contents found: {}", err))?,
+ )
+ as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+ };
+ zip.add_entry(entry, Some(contents))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
}
EntryKind::Hardlink(_) => {
- let realfile = decoder.follow_hardlink(&file).await?;
- if verbose {
- eprintln!("adding '{}' to zip", path.display());
+ // we can't extract hardlinks in sequential extraction
+ if let EitherEntry::FileEntry(ref fe, ref accessor) = file {
+ let realfile = accessor.follow_hardlink(&fe).await?;
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ true,
+ );
+ zip.add_entry(entry, Some(realfile.contents().await?))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
}
- let entry = ZipEntry::new(
- path,
- metadata.stat.mtime.secs,
- metadata.stat.mode as u16,
- true,
- );
- zip.add_entry(entry, Some(realfile.contents().await?))
- .await
- .map_err(|err| format_err!("could not send file entry: {}", err))?;
}
EntryKind::Directory => {
- let dir = file.enter_directory().await?;
- let mut readdir = dir.read_dir();
- if verbose {
- eprintln!("adding '{}' to zip", path.display());
- }
let entry = ZipEntry::new(
path,
metadata.stat.mtime.secs,
metadata.stat.mode as u16,
false,
);
- zip.add_entry::<FileContents<T>>(entry, None).await?;
- while let Some(entry) = readdir.next().await {
- let entry = entry?.decode_entry().await?;
- recurse_files_zip(zip, decoder, prefix, entry, verbose).await?;
+ if let EitherEntry::FileEntry(fe, a) = file {
+ let dir = fe.enter_directory().await?;
+ let mut readdir = dir.read_dir();
+ zip.add_entry::<FileContents<T>>(entry, None).await?;
+ while let Some(entry) = readdir.next().await {
+ let entry = entry?.decode_entry().await?;
+ let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(entry, a);
+ add_entry_to_zip(zip, entry, prefix, verbose).await?;
+ }
}
}
_ => {} // ignore all else
@@ -583,6 +674,43 @@ where
})
}
+fn get_extractor<DEST>(destination: DEST, metadata: Metadata) -> Result<Extractor, Error>
+where
+ DEST: AsRef<Path>
+{
+ create_path(
+ &destination,
+ None,
+ Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))),
+ )
+ .map_err(|err| {
+ format_err!(
+ "error creating directory {:?}: {}",
+ destination.as_ref(),
+ err
+ )
+ })?;
+
+ let dir = Dir::open(
+ destination.as_ref(),
+ OFlag::O_DIRECTORY | OFlag::O_CLOEXEC,
+ Mode::empty(),
+ )
+ .map_err(|err| {
+ format_err!(
+ "unable to open target directory {:?}: {}",
+ destination.as_ref(),
+ err,
+ )
+ })?;
+
+ Ok(Extractor::new(
+ dir,
+ metadata,
+ false,
+ Flags::DEFAULT,
+ ))
+}
pub async fn extract_sub_dir<T, DEST, PATH>(
destination: DEST,
@@ -597,47 +725,86 @@ where
{
let root = decoder.open_root().await?;
- create_path(
- &destination,
- None,
- Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))),
- )
- .map_err(|err| format_err!("error creating directory {:?}: {}", destination.as_ref(), err))?;
-
- let dir = Dir::open(
- destination.as_ref(),
- OFlag::O_DIRECTORY | OFlag::O_CLOEXEC,
- Mode::empty(),
- )
- .map_err(|err| format_err!("unable to open target directory {:?}: {}", destination.as_ref(), err,))?;
-
- let mut extractor = Extractor::new(
- dir,
+ let mut extractor = get_extractor(
+ destination,
root.lookup_self().await?.entry().metadata().clone(),
- false,
- Flags::DEFAULT,
- );
+ )?;
let file = root
- .lookup(&path).await?
- .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
+ .lookup(&path)
+ .await?
+ .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?;
- recurse_files_extractor(&mut extractor, &mut decoder, file, verbose).await
+ let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(file, &mut decoder);
+ do_extract_sub_dir(&mut extractor, entry, verbose).await
}
-fn recurse_files_extractor<'a, T>(
+pub async fn extract_sub_dir_seq<S, DEST>(
+ destination: DEST,
+ mut decoder: Decoder<S>,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ S: pxar::decoder::SeqRead + Unpin + Send + 'static,
+ DEST: AsRef<Path>,
+{
+ decoder.enable_goodbye_entries(true);
+ let root = match decoder.peek().await {
+ Some(Ok(root)) => root,
+ Some(Err(err)) => bail!("error getting root entry from pxar: {}", err),
+ None => bail!("cannot extract empty archive"),
+ };
+
+ let mut extractor = get_extractor(destination, root.metadata().clone())?;
+ let root_is_file = matches!(root.kind(), EntryKind::File { .. });
+ let mut dir_level = 0;
+
+ while let Some(file) = decoder.next().await {
+ match file {
+ Ok(file) => {
+ match file.kind() {
+ EntryKind::Directory => dir_level += 1,
+ EntryKind::GoodbyeTable => {
+ dir_level -= 1;
+ // only extract until we leave the directory we started in
+ if dir_level == 0 {
+ break;
+ }
+ },
+ _ => {}
+ }
+
+ let path = file.path().to_owned();
+ let entry: EitherEntry<S, BogusReadAt> = EitherEntry::Entry(file, &mut decoder);
+ if let Err(err) = do_extract_sub_dir(&mut extractor, entry, verbose).await {
+ eprintln!("error extracting {}: {}", path.display(), err);
+ }
+
+ if root_is_file {
+ break;
+ }
+ }
+ Err(err) => bail!("error in decoder: {}", err),
+ }
+ }
+
+ Ok(())
+}
+
+fn do_extract_sub_dir<'a, S, T>(
extractor: &'a mut Extractor,
- decoder: &'a mut Accessor<T>,
- file: FileEntry<T>,
+ file: EitherEntry<'a, S, T>,
verbose: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
where
+ S: pxar::decoder::SeqRead + Unpin + Send,
T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
{
- use pxar::EntryKind;
Box::pin(async move {
- let metadata = file.entry().metadata();
- let file_name_os = file.file_name();
+ let (metadata, file_name_os, path, kind) = match file {
+ EitherEntry::Entry(ref e, _) => (e.metadata(), e.file_name(), e.path(), e.kind()),
+ EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.file_name(), fe.path(), fe.kind()),
+ };
// safety check: a file entry in an archive must never contain slashes:
if file_name_os.as_bytes().contains(&b'/') {
@@ -647,28 +814,32 @@ where
let file_name = CString::new(file_name_os.as_bytes())
.map_err(|_| format_err!("encountered file name with null-bytes"))?;
- if verbose {
- eprintln!("extracting: {}", file.path().display());
+ if verbose && !matches!(kind, EntryKind::GoodbyeTable) {
+ eprintln!("extracting: {}", path.display());
}
- match file.kind() {
+ match kind {
EntryKind::Directory => {
extractor
.enter_directory(file_name_os.to_owned(), metadata.clone(), true)
.map_err(|err| format_err!("error at entry {:?}: {}", file_name_os, err))?;
- let dir = file.enter_directory().await?;
- let mut readdir = dir.read_dir();
- while let Some(entry) = readdir.next().await {
- let entry = entry?.decode_entry().await?;
- let filename = entry.path().to_path_buf();
+ // for EitherEntry::Entry we detect directory end with GoodbyeTable
+ if let EitherEntry::FileEntry(file, a) = file {
+ let dir = file.enter_directory().await?;
+ let mut readdir = dir.read_dir();
+ while let Some(entry) = readdir.next().await {
+ let entry = entry?.decode_entry().await?;
+ let filename = entry.path().to_path_buf();
- // log errors and continue
- if let Err(err) = recurse_files_extractor(extractor, decoder, entry, verbose).await {
- eprintln!("error extracting {:?}: {}", filename.display(), err);
+ // log errors and continue
+ let entry: EitherEntry<BogusSeqRead, T> = EitherEntry::FileEntry(entry, a);
+ if let Err(err) = do_extract_sub_dir(extractor, entry, verbose).await {
+ eprintln!("error extracting {}: {}", filename.display(), err);
+ }
}
+ extractor.leave_directory()?;
}
- extractor.leave_directory()?;
}
EntryKind::Symlink(link) => {
extractor.extract_symlink(&file_name, metadata, link.as_ref())?;
@@ -691,17 +862,34 @@ where
extractor.extract_special(&file_name, metadata, 0)?;
}
}
- EntryKind::File { size, .. } => extractor.async_extract_file(
- &file_name,
- metadata,
- *size,
- &mut file.contents().await.map_err(|_| {
- format_err!("found regular file entry without contents in archive")
- })?,
- ).await?,
- EntryKind::GoodbyeTable => {}, // ignore
+ EntryKind::File { size, .. } => {
+ extractor
+ .async_extract_file(
+ &file_name,
+ metadata,
+ *size,
+ &mut match file {
+ EitherEntry::Entry(_, dec) => Box::new(match dec.contents() {
+ Some(con) => con,
+ None => bail!("file without contents found"),
+ })
+ as Box<dyn tokio::io::AsyncRead + Unpin + Send>,
+ EitherEntry::FileEntry(ref fe, _) => {
+ Box::new(fe.contents().await.map_err(|err| {
+ format_err!("file with bad contents found: {}", err)
+ })?)
+ as Box<dyn tokio::io::AsyncRead + Unpin + Send>
+ }
+ },
+ )
+ .await?
+ }
+ EntryKind::GoodbyeTable => {
+ if let EitherEntry::Entry(_, _) = file {
+ extractor.leave_directory()?;
+ }
+ }
}
Ok(())
})
}
-
diff --git a/src/pxar/mod.rs b/src/pxar/mod.rs
index d1302962..d5c42942 100644
--- a/src/pxar/mod.rs
+++ b/src/pxar/mod.rs
@@ -59,7 +59,10 @@ mod flags;
pub use flags::Flags;
pub use create::{create_archive, PxarCreateOptions};
-pub use extract::{create_zip, extract_archive, extract_sub_dir, ErrorHandler, PxarExtractOptions};
+pub use extract::{
+ create_zip, create_zip_seq, extract_archive, extract_sub_dir, extract_sub_dir_seq,
+ ErrorHandler, PxarExtractOptions,
+};
/// The format requires to build sorted directory lookup tables in
/// memory, so we restrict the number of allowed entries to limit
--
2.20.1
More information about the pbs-devel
mailing list