[pbs-devel] [RFC PATCH proxmox-backup 3/5] api2/admin/datastore: refactor create_zip into pxar/extract
Dominik Csapak
d.csapak at proxmox.com
Mon Dec 21 12:25:05 CET 2020
we will reuse that code in the client, so we need to move it to
where we can access it from the client
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/api2/admin/datastore.rs | 99 +++--------------------------
src/pxar/extract.rs | 120 +++++++++++++++++++++++++++++++++++-
src/pxar/mod.rs | 5 +-
3 files changed, 133 insertions(+), 91 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ad66336c..84f5417a 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -2,8 +2,6 @@ use std::collections::HashSet;
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::sync::{Arc, Mutex};
-use std::path::{Path, PathBuf};
-use std::pin::Pin;
use anyhow::{bail, format_err, Error};
use futures::*;
@@ -20,7 +18,7 @@ use proxmox::api::schema::*;
use proxmox::tools::fs::{replace_file, CreateOptions};
use proxmox::{http_err, identity, list_subdirs_api_method, sortable};
-use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
+use pxar::accessor::aio::Accessor;
use pxar::EntryKind;
use crate::api2::types::*;
@@ -28,11 +26,11 @@ use crate::api2::node::rrd::create_value_from_rrd;
use crate::backup::*;
use crate::config::datastore;
use crate::config::cached_user_info::CachedUserInfo;
+use crate::pxar::create_zip;
use crate::server::{jobstate::Job, WorkerTask};
use crate::tools::{
self,
- zip::{ZipEncoder, ZipEntry},
AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream,
};
@@ -1344,66 +1342,6 @@ fn catalog(
catalog_reader.list_dir_content(&path)
}
-fn recurse_files<'a, T, W>(
- zip: &'a mut ZipEncoder<W>,
- decoder: &'a mut Accessor<T>,
- prefix: &'a Path,
- file: FileEntry<T>,
-) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
-where
- T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
- W: tokio::io::AsyncWrite + Unpin + Send + 'static,
-{
- Box::pin(async move {
- let metadata = file.entry().metadata();
- let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
-
- match file.kind() {
- EntryKind::File { .. } => {
- let entry = ZipEntry::new(
- path,
- metadata.stat.mtime.secs,
- metadata.stat.mode as u16,
- true,
- );
- zip.add_entry(entry, Some(file.contents().await?))
- .await
- .map_err(|err| format_err!("could not send file entry: {}", err))?;
- }
- EntryKind::Hardlink(_) => {
- let realfile = decoder.follow_hardlink(&file).await?;
- let entry = ZipEntry::new(
- path,
- metadata.stat.mtime.secs,
- metadata.stat.mode as u16,
- true,
- );
- zip.add_entry(entry, Some(realfile.contents().await?))
- .await
- .map_err(|err| format_err!("could not send file entry: {}", err))?;
- }
- EntryKind::Directory => {
- let dir = file.enter_directory().await?;
- let mut readdir = dir.read_dir();
- let entry = ZipEntry::new(
- path,
- metadata.stat.mtime.secs,
- metadata.stat.mode as u16,
- false,
- );
- zip.add_entry::<FileContents<T>>(entry, None).await?;
- while let Some(entry) = readdir.next().await {
- let entry = entry?.decode_entry().await?;
- recurse_files(zip, decoder, prefix, entry).await?;
- }
- }
- _ => {} // ignore all else
- };
-
- Ok(())
- })
-}
-
#[sortable]
pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&pxar_file_download),
@@ -1479,9 +1417,10 @@ fn pxar_file_download(
let decoder = Accessor::new(reader, archive_size).await?;
let root = decoder.open_root().await?;
+ let path = OsStr::from_bytes(file_path).to_os_string();
let file = root
- .lookup(OsStr::from_bytes(file_path)).await?
- .ok_or(format_err!("error opening '{:?}'", file_path))?;
+ .lookup(&path).await?
+ .ok_or(format_err!("error opening '{:?}'", path))?;
let body = match file.kind() {
EntryKind::File { .. } => Body::wrap_stream(
@@ -1495,37 +1434,19 @@ fn pxar_file_download(
.map_err(move |err| {
eprintln!(
"error during streaming of hardlink '{:?}' - {}",
- filepath, err
+ path, err
);
err
}),
),
EntryKind::Directory => {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
- let mut prefix = PathBuf::new();
- let mut components = file.entry().path().components();
- components.next_back(); // discar last
- for comp in components {
- prefix.push(comp);
- }
-
let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
-
- crate::server::spawn_internal_task(async move {
- let mut zipencoder = ZipEncoder::new(channelwriter);
- let mut decoder = decoder;
- recurse_files(&mut zipencoder, &mut decoder, &prefix, file)
- .await
- .map_err(|err| eprintln!("error during creating of zip: {}", err))?;
-
- zipencoder
- .finish()
- .await
- .map_err(|err| eprintln!("error during finishing of zip: {}", err))
- });
-
+ crate::server::spawn_internal_task(
+ create_zip(channelwriter, decoder, path.clone(), false)
+ );
Body::wrap_stream(receiver.map_err(move |err| {
- eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
+ eprintln!("error during streaming of zip '{:?}' - {}", path, err);
err
}))
}
diff --git a/src/pxar/extract.rs b/src/pxar/extract.rs
index ed238a2c..77472f56 100644
--- a/src/pxar/extract.rs
+++ b/src/pxar/extract.rs
@@ -5,9 +5,11 @@ use std::ffi::{CStr, CString, OsStr, OsString};
use std::io;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
+use std::pin::Pin;
+use futures::future::Future;
use anyhow::{bail, format_err, Error};
use nix::dir::Dir;
use nix::fcntl::OFlag;
@@ -16,6 +18,7 @@ use nix::sys::stat::Mode;
use pathpatterns::{MatchEntry, MatchList, MatchType};
use pxar::format::Device;
use pxar::Metadata;
+use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
use proxmox::c_result;
use proxmox::tools::fs::{create_path, CreateOptions};
@@ -24,6 +27,8 @@ use crate::pxar::dir_stack::PxarDirStack;
use crate::pxar::metadata;
use crate::pxar::Flags;
+use crate::tools::zip::{ZipEncoder, ZipEntry};
+
pub fn extract_archive<T, F>(
mut decoder: pxar::decoder::Decoder<T>,
destination: &Path,
@@ -457,3 +462,116 @@ impl Extractor {
)
}
}
+
+pub async fn create_zip<T, W, P>(
+ output: W,
+ decoder: Accessor<T>,
+ path: P,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
+ W: tokio::io::AsyncWrite + Unpin + Send + 'static,
+ P: AsRef<Path>,
+{
+ let root = decoder.open_root().await?;
+ let file = root
+ .lookup(&path).await?
+ .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?;
+
+ let mut prefix = PathBuf::new();
+ let mut components = file.entry().path().components();
+ components.next_back(); // discar last
+ for comp in components {
+ prefix.push(comp);
+ }
+
+ let mut zipencoder = ZipEncoder::new(output);
+ let mut decoder = decoder;
+ recurse_files_zip(&mut zipencoder, &mut decoder, &prefix, file, verbose)
+ .await
+ .map_err(|err| {
+ eprintln!("error during creating of zip: {}", err);
+ err
+ })?;
+
+ zipencoder
+ .finish()
+ .await
+ .map_err(|err| {
+ eprintln!("error during finishing of zip: {}", err);
+ err
+ })
+}
+
+fn recurse_files_zip<'a, T, W>(
+ zip: &'a mut ZipEncoder<W>,
+ decoder: &'a mut Accessor<T>,
+ prefix: &'a Path,
+ file: FileEntry<T>,
+ verbose: bool,
+) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>>
+where
+ T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
+ W: tokio::io::AsyncWrite + Unpin + Send + 'static,
+{
+ use pxar::EntryKind;
+ Box::pin(async move {
+ let metadata = file.entry().metadata();
+ let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
+
+ match file.kind() {
+ EntryKind::File { .. } => {
+ if verbose {
+ eprintln!("adding '{}' to zip", path.display());
+ }
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ true,
+ );
+ zip.add_entry(entry, Some(file.contents().await?))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
+ }
+ EntryKind::Hardlink(_) => {
+ let realfile = decoder.follow_hardlink(&file).await?;
+ if verbose {
+ eprintln!("adding '{}' to zip", path.display());
+ }
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ true,
+ );
+ zip.add_entry(entry, Some(realfile.contents().await?))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
+ }
+ EntryKind::Directory => {
+ let dir = file.enter_directory().await?;
+ let mut readdir = dir.read_dir();
+ if verbose {
+ eprintln!("adding '{}' to zip", path.display());
+ }
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ false,
+ );
+ zip.add_entry::<FileContents<T>>(entry, None).await?;
+ while let Some(entry) = readdir.next().await {
+ let entry = entry?.decode_entry().await?;
+ recurse_files_zip(zip, decoder, prefix, entry, verbose).await?;
+ }
+ }
+ _ => {} // ignore all else
+ };
+
+ Ok(())
+ })
+}
+
diff --git a/src/pxar/mod.rs b/src/pxar/mod.rs
index 6e910667..ba47e220 100644
--- a/src/pxar/mod.rs
+++ b/src/pxar/mod.rs
@@ -59,7 +59,10 @@ mod flags;
pub use flags::Flags;
pub use create::create_archive;
-pub use extract::extract_archive;
+pub use extract::{
+ extract_archive,
+ create_zip,
+};
/// The format requires to build sorted directory lookup tables in
/// memory, so we restrict the number of allowed entries to limit
--
2.20.1
More information about the pbs-devel
mailing list