[pbs-devel] [PATCH v2 proxmox-backup 20/20] file-restore: add 'extract' command for VM file restore

Stefan Reiter s.reiter at proxmox.com
Wed Mar 24 16:18:27 CET 2021


The data on the restore daemon is either encoded into a pxar archive, to
provide the most accurate data for local restore, or encoded directly
into a zip file (or written out unprocessed for files), depending on the
'pxar' argument to the 'extract' API call.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---

v2:
* add 'pxar' property to VM API call to allow encoding files as zip/raw directly
  in the VM - this avoids the re-encoding in proxmox-file-restore

 Cargo.toml                                    |   2 +-
 debian/control                                |   1 +
 src/bin/proxmox-file-restore.rs               | 112 +++++++----
 src/bin/proxmox_file_restore/block_driver.rs  |  24 +++
 .../proxmox_file_restore/block_driver_qemu.rs |  32 ++++
 src/bin/proxmox_restore_daemon/api.rs         | 176 +++++++++++++++++-
 6 files changed, 307 insertions(+), 40 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 2ffda29f..80d691d4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -64,7 +64,7 @@ syslog = "4.0"
 tokio = { version = "1.0", features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] }
 tokio-openssl = "0.6.1"
 tokio-stream = "0.1.0"
-tokio-util = { version = "0.6", features = [ "codec" ] }
+tokio-util = { version = "0.6", features = [ "codec", "io" ] }
 tower-service = "0.3.0"
 udev = ">= 0.3, <0.5"
 url = "2.1"
diff --git a/debian/control b/debian/control
index de1aa616..1184c340 100644
--- a/debian/control
+++ b/debian/control
@@ -67,6 +67,7 @@ Build-Depends: debhelper (>= 11),
  librust-tokio-stream-0.1+default-dev,
  librust-tokio-util-0.6+codec-dev,
  librust-tokio-util-0.6+default-dev,
+ librust-tokio-util-0.6+io-dev,
  librust-tower-service-0.3+default-dev,
  librust-udev-0.4+default-dev | librust-udev-0.3+default-dev,
  librust-url-2+default-dev (>= 2.1-~~),
diff --git a/src/bin/proxmox-file-restore.rs b/src/bin/proxmox-file-restore.rs
index 258db40d..e6d01a62 100644
--- a/src/bin/proxmox-file-restore.rs
+++ b/src/bin/proxmox-file-restore.rs
@@ -14,6 +14,7 @@ use proxmox::api::{
     },
 };
 use pxar::accessor::aio::Accessor;
+use pxar::decoder::aio::Decoder;
 
 use proxmox_backup::api2::{helpers, types::ArchiveEntry};
 use proxmox_backup::backup::{
@@ -21,7 +22,7 @@ use proxmox_backup::backup::{
     DirEntryAttribute, IndexFile, LocalDynamicReadAt, CATALOG_NAME,
 };
 use proxmox_backup::client::{BackupReader, RemoteChunkReader};
-use proxmox_backup::pxar::{create_zip, extract_sub_dir};
+use proxmox_backup::pxar::{create_zip, extract_sub_dir, extract_sub_dir_seq};
 use proxmox_backup::tools;
 
 // use "pub" so rust doesn't complain about "unused" functions in the module
@@ -276,7 +277,11 @@ async fn list(param: Value) -> Result<Value, Error> {
                description: "Print verbose information",
                optional: true,
                default: false,
-           }
+           },
+           "driver": {
+               type: BlockDriverType,
+               optional: true,
+           },
        }
    }
 )]
@@ -309,20 +314,21 @@ async fn extract(param: Value) -> Result<Value, Error> {
         }
     };
 
+    let client = connect(&repo)?;
+    let client = BackupReader::start(
+        client,
+        crypt_config.clone(),
+        repo.store(),
+        &snapshot.group().backup_type(),
+        &snapshot.group().backup_id(),
+        snapshot.backup_time(),
+        true,
+    )
+    .await?;
+    let (manifest, _) = client.download_manifest().await?;
+
     match path {
         ExtractPath::Pxar(archive_name, path) => {
-            let client = connect(&repo)?;
-            let client = BackupReader::start(
-                client,
-                crypt_config.clone(),
-                repo.store(),
-                &snapshot.group().backup_type(),
-                &snapshot.group().backup_id(),
-                snapshot.backup_time(),
-                true,
-            )
-            .await?;
-            let (manifest, _) = client.download_manifest().await?;
             let file_info = manifest.lookup_file_info(&archive_name)?;
             let index = client
                 .download_dynamic_index(&manifest, &archive_name)
@@ -339,31 +345,28 @@ async fn extract(param: Value) -> Result<Value, Error> {
             let archive_size = reader.archive_size();
             let reader = LocalDynamicReadAt::new(reader);
             let decoder = Accessor::new(reader, archive_size).await?;
-
-            let root = decoder.open_root().await?;
-            let file = root
-                .lookup(OsStr::from_bytes(&path))
-                .await?
-                .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
+            extract_to_target(decoder, &path, target, verbose).await?;
+        }
+        ExtractPath::VM(file, path) => {
+            let details = SnapRestoreDetails {
+                manifest,
+                repo,
+                snapshot,
+            };
+            let driver: Option<BlockDriverType> = match param.get("driver") {
+                Some(drv) => Some(serde_json::from_value(drv.clone())?),
+                None => None,
+            };
 
             if let Some(target) = target {
-                extract_sub_dir(target, decoder, OsStr::from_bytes(&path), verbose).await?;
+                let reader = data_extract(driver, details, file, path.clone(), true).await?;
+                let mut decoder = Decoder::from_tokio(reader).await?;
+                // skip root dir
+                decoder.next().await.transpose()?;
+                extract_sub_dir_seq(target, decoder, verbose).await?;
             } else {
-                match file.kind() {
-                    pxar::EntryKind::File { .. } => {
-                        tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout())
-                            .await?;
-                    }
-                    _ => {
-                        create_zip(
-                            tokio::io::stdout(),
-                            decoder,
-                            OsStr::from_bytes(&path),
-                            verbose,
-                        )
-                        .await?;
-                    }
-                }
+                let mut reader = data_extract(driver, details, file, path.clone(), false).await?;
+                tokio::io::copy(&mut reader, &mut tokio::io::stdout()).await?;
             }
         }
         _ => {
@@ -374,6 +377,43 @@ async fn extract(param: Value) -> Result<Value, Error> {
     Ok(Value::Null)
 }
 
+async fn extract_to_target<T>(
+    decoder: Accessor<T>,
+    path: &[u8],
+    target: Option<PathBuf>,
+    verbose: bool,
+) -> Result<(), Error>
+where
+    T: pxar::accessor::ReadAt + Clone + Send + Sync + Unpin + 'static,
+{
+    let root = decoder.open_root().await?;
+    let file = root
+        .lookup(OsStr::from_bytes(&path))
+        .await?
+        .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
+
+    if let Some(target) = target {
+        extract_sub_dir(target, decoder, OsStr::from_bytes(&path), verbose).await?;
+    } else {
+        match file.kind() {
+            pxar::EntryKind::File { .. } => {
+                tokio::io::copy(&mut file.contents().await?, &mut tokio::io::stdout()).await?;
+            }
+            _ => {
+                create_zip(
+                    tokio::io::stdout(),
+                    decoder,
+                    OsStr::from_bytes(&path),
+                    verbose,
+                )
+                .await?;
+            }
+        }
+    }
+
+    Ok(())
+}
+
 fn main() {
     let list_cmd_def = CliCommand::new(&API_METHOD_LIST)
         .arg_param(&["snapshot", "path"])
diff --git a/src/bin/proxmox_file_restore/block_driver.rs b/src/bin/proxmox_file_restore/block_driver.rs
index 35660443..38abf6c3 100644
--- a/src/bin/proxmox_file_restore/block_driver.rs
+++ b/src/bin/proxmox_file_restore/block_driver.rs
@@ -41,6 +41,19 @@ pub trait BlockRestoreDriver {
         path: Vec<u8>,
     ) -> Async<Result<Vec<ArchiveEntry>, Error>>;
 
+    /// pxar=true:
+    /// Attempt to create a pxar archive of the given file path and return a reader instance for it
+    /// pxar=false:
+    /// Attempt to read the file or folder at the given path and return the file content or a zip
+    /// file as a stream
+    fn data_extract(
+        &self,
+        details: SnapRestoreDetails,
+        img_file: String,
+        path: Vec<u8>,
+        pxar: bool,
+    ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>>;
+
     /// Return status of all running/mapped images, result value is (id, extra data), where id must
     /// match with the ones returned from list()
     fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>>;
@@ -79,6 +92,17 @@ pub async fn data_list(
     driver.data_list(details, img_file, path).await
 }
 
+pub async fn data_extract(
+    driver: Option<BlockDriverType>,
+    details: SnapRestoreDetails,
+    img_file: String,
+    path: Vec<u8>,
+    pxar: bool,
+) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, Error> {
+    let driver = driver.unwrap_or(DEFAULT_DRIVER).resolve();
+    driver.data_extract(details, img_file, path, pxar).await
+}
+
 #[api(
    input: {
        properties: {
diff --git a/src/bin/proxmox_file_restore/block_driver_qemu.rs b/src/bin/proxmox_file_restore/block_driver_qemu.rs
index 1a96ef10..bb312747 100644
--- a/src/bin/proxmox_file_restore/block_driver_qemu.rs
+++ b/src/bin/proxmox_file_restore/block_driver_qemu.rs
@@ -236,6 +236,38 @@ impl BlockRestoreDriver for QemuBlockDriver {
         .boxed()
     }
 
+    fn data_extract(
+        &self,
+        details: SnapRestoreDetails,
+        img_file: String,
+        mut path: Vec<u8>,
+        pxar: bool,
+    ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
+        async move {
+            let client = ensure_running(&details).await?;
+            if !path.is_empty() && path[0] != b'/' {
+                path.insert(0, b'/');
+            }
+            let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
+            let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
+            tokio::spawn(async move {
+                if let Err(err) = client
+                    .download(
+                        "api2/json/extract",
+                        Some(json!({ "path": path, "pxar": pxar })),
+                        &mut tx,
+                    )
+                    .await
+                {
+                    eprintln!("reading file extraction stream failed - {}", err);
+                }
+            });
+
+            Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
+        }
+        .boxed()
+    }
+
     fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
         async move {
             let mut state_map = VMStateMap::load()?;
diff --git a/src/bin/proxmox_restore_daemon/api.rs b/src/bin/proxmox_restore_daemon/api.rs
index 2f990f36..7ac70278 100644
--- a/src/bin/proxmox_restore_daemon/api.rs
+++ b/src/bin/proxmox_restore_daemon/api.rs
@@ -1,16 +1,29 @@
 ///! File-restore API running inside the restore VM
 use anyhow::{bail, Error};
+use futures::FutureExt;
+use hyper::http::request::Parts;
+use hyper::{header, Body, Response, StatusCode};
+use log::error;
+use pathpatterns::{MatchEntry, MatchPattern, MatchType, Pattern};
+use serde_json::Value;
+
 use std::ffi::OsStr;
 use std::fs;
 use std::os::unix::ffi::OsStrExt;
 use std::path::{Path, PathBuf};
 
-use proxmox::api::{api, ApiMethod, Permission, Router, RpcEnvironment, SubdirMap};
-use proxmox::list_subdirs_api_method;
+use proxmox::api::{
+    api, schema::*, ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment,
+    SubdirMap,
+};
+use proxmox::{identity, list_subdirs_api_method, sortable};
 
 use proxmox_backup::api2::types::*;
 use proxmox_backup::backup::DirEntryAttribute;
-use proxmox_backup::tools::fs::read_subdir;
+use proxmox_backup::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
+use proxmox_backup::tools::{self, fs::read_subdir, zip::zip_directory};
+
+use pxar::encoder::aio::TokioWriter;
 
 use super::{disk::ResolveResult, watchdog_remaining, watchdog_ping};
 
@@ -18,6 +31,7 @@ use super::{disk::ResolveResult, watchdog_remaining, watchdog_ping};
 // not exist within the restore VM. Safety is guaranteed by checking a ticket via a custom ApiAuth.
 
 const SUBDIRS: SubdirMap = &[
+    ("extract", &Router::new().get(&API_METHOD_EXTRACT)),
     ("list", &Router::new().get(&API_METHOD_LIST)),
     ("status", &Router::new().get(&API_METHOD_STATUS)),
     ("stop", &Router::new().get(&API_METHOD_STOP)),
@@ -197,3 +211,159 @@ fn list(
 
     Ok(res)
 }
+
+#[sortable]
+pub const API_METHOD_EXTRACT: ApiMethod = ApiMethod::new(
+    &ApiHandler::AsyncHttp(&extract),
+    &ObjectSchema::new(
+        "Extract a file or directory from the VM as a pxar archive.",
+        &sorted!([
+            (
+                "path",
+                false,
+                &StringSchema::new("base64-encoded path to list files and directories under")
+                    .schema()
+            ),
+            (
+                "pxar",
+                true,
+                &BooleanSchema::new(concat!(
+                    "if true, return a pxar archive, otherwise either the ",
+                    "file content or the directory as a zip file"
+                ))
+                .default(true)
+                .schema()
+            )
+        ]),
+    ),
+)
+.access(None, &Permission::Superuser);
+
+fn extract(
+    _parts: Parts,
+    _req_body: Body,
+    param: Value,
+    _info: &ApiMethod,
+    _rpcenv: Box<dyn RpcEnvironment>,
+) -> ApiResponseFuture {
+    watchdog_ping();
+    async move {
+        let path = tools::required_string_param(&param, "path")?;
+        let mut path = base64::decode(path)?;
+        if let Some(b'/') = path.last() {
+            path.pop();
+        }
+        let path = Path::new(OsStr::from_bytes(&path[..]));
+
+        let pxar = param["pxar"].as_bool().unwrap_or(true);
+
+        let query_result = {
+            let mut disk_state = crate::DISK_STATE.lock().unwrap();
+            disk_state.resolve(&path)?
+        };
+
+        let vm_path = match query_result {
+            ResolveResult::Path(vm_path) => vm_path,
+            _ => bail!("invalid path, cannot restore meta-directory: {:?}", path),
+        };
+
+        // check here so we can return a real error message, failing in the async task will stop
+        // the transfer, but not return a useful message
+        if !vm_path.exists() {
+            bail!("file or directory {:?} does not exist", path);
+        }
+
+        let (mut writer, reader) = tokio::io::duplex(1024 * 64);
+
+        if pxar {
+            tokio::spawn(async move {
+                let result = async move {
+                    // pxar always expects a directory as it's root, so to accommodate files as
+                    // well we encode the parent dir with a filter only matching the target instead
+                    let mut patterns = vec![MatchEntry::new(
+                        MatchPattern::Pattern(Pattern::path(b"*").unwrap()),
+                        MatchType::Exclude,
+                    )];
+
+                    let name = match vm_path.file_name() {
+                        Some(name) => name,
+                        None => bail!("no file name found for path: {:?}", vm_path),
+                    };
+
+                    if vm_path.is_dir() {
+                        let mut pat = name.as_bytes().to_vec();
+                        patterns.push(MatchEntry::new(
+                            MatchPattern::Pattern(Pattern::path(pat.clone())?),
+                            MatchType::Include,
+                        ));
+                        pat.extend(b"/**/*".iter());
+                        patterns.push(MatchEntry::new(
+                            MatchPattern::Pattern(Pattern::path(pat)?),
+                            MatchType::Include,
+                        ));
+                    } else {
+                        patterns.push(MatchEntry::new(
+                            MatchPattern::Literal(name.as_bytes().to_vec()),
+                            MatchType::Include,
+                        ));
+                    }
+
+                    let dir_path = vm_path.parent().unwrap_or_else(|| Path::new("/"));
+                    let dir = nix::dir::Dir::open(
+                        dir_path,
+                        nix::fcntl::OFlag::O_NOFOLLOW,
+                        nix::sys::stat::Mode::empty(),
+                    )?;
+
+                    let options = PxarCreateOptions {
+                        entries_max: ENCODER_MAX_ENTRIES,
+                        device_set: None,
+                        patterns,
+                        verbose: false,
+                        skip_lost_and_found: false,
+                    };
+
+                    let pxar_writer = TokioWriter::new(writer);
+                    create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options)
+                        .await
+                }
+                .await;
+                if let Err(err) = result {
+                    error!("pxar streaming task failed - {}", err);
+                }
+            });
+        } else {
+            tokio::spawn(async move {
+                let result = async move {
+                    if vm_path.is_dir() {
+                        zip_directory(&mut writer, &vm_path).await?;
+                        Ok(())
+                    } else if vm_path.is_file() {
+                        let mut file = tokio::fs::OpenOptions::new()
+                            .read(true)
+                            .open(vm_path)
+                            .await?;
+                        tokio::io::copy(&mut file, &mut writer).await?;
+                        Ok(())
+                    } else {
+                        bail!("invalid entry type for path: {:?}", vm_path);
+                    }
+                }
+                .await;
+                if let Err(err) = result {
+                    error!("file or dir streaming task failed - {}", err);
+                }
+            });
+        }
+
+        let stream = tokio_util::io::ReaderStream::new(reader);
+
+        let body = Body::wrap_stream(stream);
+        Ok(Response::builder()
+            .status(StatusCode::OK)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
+            .body(body)
+            .unwrap())
+    }
+    .boxed()
+}
-- 
2.20.1






More information about the pbs-devel mailing list