[pbs-devel] [PATCH proxmox-backup 22/22] file-restore: add 'extract' command for VM file restore
Stefan Reiter
s.reiter at proxmox.com
Tue Feb 16 18:07:10 CET 2021
Encodes the data into a streaming pxar archive on the restore VM, then
extracts it locally. This allows sharing most of the code with regular
pxar (container) restore.
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
Cargo.toml | 2 +-
debian/control | 1 +
src/bin/proxmox-file-restore.rs | 157 +++++++++++++-----
src/bin/proxmox_file_restore/block_driver.rs | 17 ++
.../proxmox_file_restore/block_driver_qemu.rs | 27 +++
src/bin/proxmox_restore_daemon/api.rs | 140 +++++++++++++++-
6 files changed, 302 insertions(+), 42 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index de42c2ff..988496a4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -64,7 +64,7 @@ syslog = "4.0"
tokio = { version = "1.0", features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] }
tokio-openssl = "0.6.1"
tokio-stream = "0.1.0"
-tokio-util = { version = "0.6", features = [ "codec" ] }
+tokio-util = { version = "0.6", features = [ "codec", "io" ] }
tower-service = "0.3.0"
udev = ">= 0.3, <0.5"
url = "2.1"
diff --git a/debian/control b/debian/control
index f4d81732..661c2894 100644
--- a/debian/control
+++ b/debian/control
@@ -67,6 +67,7 @@ Build-Depends: debhelper (>= 11),
librust-tokio-stream-0.1+default-dev,
librust-tokio-util-0.6+codec-dev,
librust-tokio-util-0.6+default-dev,
+ librust-tokio-util-0.6+io-dev,
librust-tower-service-0.3+default-dev,
librust-udev-0.4+default-dev | librust-udev-0.3+default-dev,
librust-url-2+default-dev (>= 2.1-~~),
diff --git a/src/bin/proxmox-file-restore.rs b/src/bin/proxmox-file-restore.rs
index 232931d9..48e4643f 100644
--- a/src/bin/proxmox-file-restore.rs
+++ b/src/bin/proxmox-file-restore.rs
@@ -14,6 +14,7 @@ use proxmox::api::{
},
};
use pxar::accessor::aio::Accessor;
+use pxar::decoder::aio::Decoder;
use proxmox_backup::api2::{helpers, types::ArchiveEntry};
use proxmox_backup::backup::{
@@ -21,7 +22,7 @@ use proxmox_backup::backup::{
DirEntryAttribute, IndexFile, LocalDynamicReadAt, CATALOG_NAME,
};
use proxmox_backup::client::{BackupReader, RemoteChunkReader};
-use proxmox_backup::pxar::{create_zip, extract_sub_dir};
+use proxmox_backup::pxar::{create_zip, create_zip_seq, extract_sub_dir, extract_sub_dir_seq};
use proxmox_backup::tools;
// use "pub" so rust doesn't complain about "unused" functions in the module
@@ -273,7 +274,11 @@ async fn list(param: Value) -> Result<Value, Error> {
description: "Print verbose information",
optional: true,
default: false,
- }
+ },
+ "driver": {
+ type: BlockDriverType,
+ optional: true,
+ },
}
}
)]
@@ -306,20 +311,21 @@ async fn extract(param: Value) -> Result<Value, Error> {
}
};
+ let client = connect(&repo)?;
+ let client = BackupReader::start(
+ client,
+ crypt_config.clone(),
+ repo.store(),
+ &snapshot.group().backup_type(),
+ &snapshot.group().backup_id(),
+ snapshot.backup_time(),
+ true,
+ )
+ .await?;
+ let (manifest, _) = client.download_manifest().await?;
+
match path {
ExtractPath::Pxar(archive_name, path) => {
- let client = connect(&repo)?;
- let client = BackupReader::start(
- client,
- crypt_config.clone(),
- repo.store(),
- &snapshot.group().backup_type(),
- &snapshot.group().backup_id(),
- snapshot.backup_time(),
- true,
- )
- .await?;
- let (manifest, _) = client.download_manifest().await?;
let file_info = manifest.lookup_file_info(&archive_name)?;
let index = client
.download_dynamic_index(&manifest, &archive_name)
@@ -336,32 +342,23 @@ async fn extract(param: Value) -> Result<Value, Error> {
let archive_size = reader.archive_size();
let reader = LocalDynamicReadAt::new(reader);
let decoder = Accessor::new(reader, archive_size).await?;
+ extract_to_target(decoder, &path, target, verbose).await?;
+ }
+ ExtractPath::VM(file, path) => {
+ let details = SnapRestoreDetails {
+ manifest,
+ repo,
+ snapshot,
+ };
+ let driver: Option<BlockDriverType> = match param.get("driver") {
+ Some(drv) => Some(serde_json::from_value(drv.clone())?),
+ None => None,
+ };
- let root = decoder.open_root().await?;
- let file = root
- .lookup(OsStr::from_bytes(&path))
- .await?
- .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
+ let reader = data_extract(driver, details, file, path.clone()).await?;
+ let decoder = Decoder::from_tokio(reader).await?;
- if let Some(target) = target {
- extract_sub_dir(target, decoder, OsStr::from_bytes(&path), verbose).await?;
- } else {
- match file.kind() {
- pxar::EntryKind::File { .. } => {
- tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout())
- .await?;
- }
- _ => {
- create_zip(
- tokio::io::stdout(),
- decoder,
- OsStr::from_bytes(&path),
- verbose,
- )
- .await?;
- }
- }
- }
+ extract_to_target_seq(decoder, target, verbose).await?;
}
_ => {
bail!("cannot extract '{}'", orig_path);
@@ -371,6 +368,90 @@ async fn extract(param: Value) -> Result<Value, Error> {
Ok(Value::Null)
}
+async fn extract_to_target_seq<T>(
+ mut decoder: Decoder<T>,
+ target: Option<PathBuf>,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ T: pxar::decoder::SeqRead + Send + Unpin + 'static,
+{
+ // skip / directory for extraction
+ let _root = decoder.next().await.transpose()?;
+
+ // take a peek at the root of the data we want to extract - don't call next(), as that would
+ // mean it couldn't be read by the extraction functions below anymore
+ let mut data_root = match decoder.peek().await.transpose()? {
+ Some(r) => r,
+ None => bail!("no pxar entries found"),
+ };
+
+ // skip .pxarexclude-cli if it comes first for some reason
+ if matches!(data_root.kind(), pxar::EntryKind::File { .. })
+ && data_root.file_name().as_bytes() == b".pxarexclude-cli"
+ {
+ decoder.next().await;
+ data_root = match decoder.peek().await.transpose()? {
+ Some(r) => r,
+ None => bail!("no pxar entries found (after skipping .pxarexclude-cli)"),
+ };
+ }
+
+ if let Some(target) = target {
+ extract_sub_dir_seq(target, decoder, verbose).await?;
+ } else {
+ if matches!(data_root.kind(), pxar::EntryKind::File { .. }) {
+ match decoder.contents() {
+ Some(mut c) => {
+ tokio::io::copy(&mut c, &mut tokio::io::stdout()).await?;
+ }
+ None => bail!("cannot extract pxar file entry without content"),
+ }
+ } else {
+ create_zip_seq(tokio::io::stdout(), decoder, verbose).await?;
+ }
+ }
+
+ Ok(())
+}
+
+async fn extract_to_target<T>(
+ decoder: Accessor<T>,
+ path: &[u8],
+ target: Option<PathBuf>,
+ verbose: bool,
+) -> Result<(), Error>
+where
+ T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
+{
+ let root = decoder.open_root().await?;
+ let file = root
+ .lookup(OsStr::from_bytes(&path))
+ .await?
+ .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
+
+ if let Some(target) = target {
+ extract_sub_dir(target, decoder, OsStr::from_bytes(&path), verbose).await?;
+ } else {
+ match file.kind() {
+ pxar::EntryKind::File { .. } => {
+ tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout()).await?;
+ }
+ _ => {
+ create_zip(
+ tokio::io::stdout(),
+ decoder,
+ OsStr::from_bytes(&path),
+ verbose,
+ )
+ .await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
fn main() {
let list_cmd_def = CliCommand::new(&API_METHOD_LIST)
.arg_param(&["snapshot", "path"])
diff --git a/src/bin/proxmox_file_restore/block_driver.rs b/src/bin/proxmox_file_restore/block_driver.rs
index 5ed35f25..2815ab60 100644
--- a/src/bin/proxmox_file_restore/block_driver.rs
+++ b/src/bin/proxmox_file_restore/block_driver.rs
@@ -36,6 +36,13 @@ pub trait BlockRestoreDriver {
img_file: String,
path: Vec<u8>,
) -> Async<Result<Vec<ArchiveEntry>, Error>>;
+ /// Attempt to create a pxar archive of the given file path and return a reader instance for it
+ fn data_extract(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+ ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>>;
/// Return status of all running/mapped images, result value is (id, extra data), where id must
/// match with the ones returned from list()
@@ -75,6 +82,16 @@ pub async fn data_list(
driver.data_list(details, img_file, path).await
}
+pub async fn data_extract(
+ driver: Option<BlockDriverType>,
+ details: SnapRestoreDetails,
+ img_file: String,
+ path: Vec<u8>,
+) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, Error> {
+ let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
+ driver.data_extract(details, img_file, path).await
+}
+
#[api(
input: {
properties: {
diff --git a/src/bin/proxmox_file_restore/block_driver_qemu.rs b/src/bin/proxmox_file_restore/block_driver_qemu.rs
index 3277af5d..205d933c 100644
--- a/src/bin/proxmox_file_restore/block_driver_qemu.rs
+++ b/src/bin/proxmox_file_restore/block_driver_qemu.rs
@@ -369,6 +369,33 @@ impl BlockRestoreDriver for QemuBlockDriver {
.boxed()
}
+ fn data_extract(
+ &self,
+ details: SnapRestoreDetails,
+ img_file: String,
+ mut path: Vec<u8>,
+ ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
+ async move {
+ let mut client = ensure_running(&details).await?;
+ if !path.is_empty() && path[0] != b'/' {
+ path.insert(0, b'/');
+ }
+ let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
+ let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
+ tokio::spawn(async move {
+ if let Err(err) = client
+ .download("api2/json/extract", Some(json!({ "path": path })), &mut tx)
+ .await
+ {
+ eprintln!("reading file extraction stream failed - {}", err);
+ }
+ });
+
+ Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
+ }
+ .boxed()
+ }
+
fn status(&self) -> Async<Result<Vec<(String, Value)>, Error>> {
async move {
let mut state_map = VMStateMap::load()?;
diff --git a/src/bin/proxmox_restore_daemon/api.rs b/src/bin/proxmox_restore_daemon/api.rs
index 125b5bfb..281f2121 100644
--- a/src/bin/proxmox_restore_daemon/api.rs
+++ b/src/bin/proxmox_restore_daemon/api.rs
@@ -1,16 +1,29 @@
///! File-restore API running inside the restore VM
use anyhow::{bail, Error};
+use futures::FutureExt;
+use hyper::http::request::Parts;
+use hyper::{header, Body, Response, StatusCode};
+use log::error;
+use pathpatterns::{MatchEntry, MatchPattern, MatchType, Pattern};
+use serde_json::Value;
+
use std::ffi::OsStr;
use std::fs;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
-use proxmox::api::{api, ApiMethod, Permission, Router, RpcEnvironment, SubdirMap};
-use proxmox::list_subdirs_api_method;
+use proxmox::api::{
+ api, schema::*, ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment,
+ SubdirMap,
+};
+use proxmox::{identity, list_subdirs_api_method, sortable};
use proxmox_backup::api2::types::*;
use proxmox_backup::backup::DirEntryAttribute;
-use proxmox_backup::tools::fs::read_subdir;
+use proxmox_backup::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
+use proxmox_backup::tools::{self, fs::read_subdir};
+
+use pxar::encoder::aio::TokioWriter;
use super::{disk::ResolveResult, watchdog_remaining, watchdog_undo_ping};
@@ -19,6 +32,7 @@ use super::{disk::ResolveResult, watchdog_remaining, watchdog_undo_ping};
// host can contact us - and there the proxmox-backup-client validates permissions already.
const SUBDIRS: SubdirMap = &[
+ ("extract", &Router::new().get(&API_METHOD_EXTRACT)),
("list", &Router::new().get(&API_METHOD_LIST)),
("status", &Router::new().get(&API_METHOD_STATUS)),
];
@@ -180,3 +194,123 @@ fn list(
Ok(res)
}
+
+#[sortable]
+pub const API_METHOD_EXTRACT: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&extract),
+ &ObjectSchema::new(
+ "Extract a file or directory from the VM as a pxar archive.",
+ &sorted!([(
+ "path",
+ false,
+ &StringSchema::new("base64-encoded path to list files and directories under").schema()
+ )]),
+ ),
+)
+.access(None, &Permission::World);
+
+fn extract(
+ _parts: Parts,
+ _req_body: Body,
+ param: Value,
+ _info: &ApiMethod,
+ _rpcenv: Box<dyn RpcEnvironment>,
+) -> ApiResponseFuture {
+ async move {
+ let path = tools::required_string_param(¶m, "path")?;
+
+ let param_path = base64::decode(path)?;
+ let mut path = param_path.clone();
+ if let Some(b'/') = path.last() {
+ path.pop();
+ }
+ let path_str = OsStr::from_bytes(&path[..]);
+ let param_path_buf = Path::new(path_str);
+
+ let query_result = {
+ let mut disk_state = crate::DISK_STATE.lock().unwrap();
+ disk_state.resolve(¶m_path_buf)?
+ };
+
+ let vm_path = match query_result {
+ ResolveResult::Path(vm_path) => vm_path,
+ _ => bail!(
+ "invalid path, cannot restore meta-directory: {:?}",
+ param_path_buf
+ ),
+ };
+
+ // check here so we can return a real error message, failing in the async task will stop
+ // the transfer, but not return a useful message
+ if !vm_path.exists() {
+ bail!("file or directory {:?} does not exist", param_path_buf);
+ }
+
+ let (writer, reader) = tokio::io::duplex(1024 * 64);
+ let pxar_writer = TokioWriter::new(writer);
+
+ tokio::spawn(async move {
+ let result = async move {
+ // pxar always expects a directory as it's root, so to accommodate files as well we
+ // encode the parent dir with a filter only matching the target instead
+ let mut patterns = vec![MatchEntry::new(
+ MatchPattern::Pattern(Pattern::path(b"*").unwrap()),
+ MatchType::Exclude,
+ )];
+
+ let name = match vm_path.file_name() {
+ Some(name) => name,
+ None => bail!("no file name found for path: {:?}", vm_path),
+ };
+
+ if vm_path.is_dir() {
+ let mut pat = name.as_bytes().to_vec();
+ patterns.push(MatchEntry::new(
+ MatchPattern::Pattern(Pattern::path(pat.clone())?),
+ MatchType::Include,
+ ));
+ pat.extend(b"/**/*".iter());
+ patterns.push(MatchEntry::new(
+ MatchPattern::Pattern(Pattern::path(pat)?),
+ MatchType::Include,
+ ));
+ } else {
+ patterns.push(MatchEntry::new(
+ MatchPattern::Literal(name.as_bytes().to_vec()),
+ MatchType::Include,
+ ));
+ }
+
+ let dir_path = vm_path.parent().unwrap_or_else(|| Path::new("/"));
+ let dir = nix::dir::Dir::open(
+ dir_path,
+ nix::fcntl::OFlag::O_NOFOLLOW,
+ nix::sys::stat::Mode::empty(),
+ )?;
+
+ let options = PxarCreateOptions {
+ entries_max: ENCODER_MAX_ENTRIES,
+ device_set: None,
+ patterns,
+ verbose: false,
+ skip_lost_and_found: false,
+ };
+ create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options).await
+ }
+ .await;
+ if let Err(err) = result {
+ error!("pxar streaming task failed - {}", err);
+ }
+ });
+
+ let stream = tokio_util::io::ReaderStream::new(reader);
+
+ let body = Body::wrap_stream(stream);
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .body(body)
+ .unwrap())
+ }
+ .boxed()
+}
--
2.20.1
More information about the pbs-devel
mailing list