[pbs-devel] [PATCH proxmox-backup v5 2/5] api2/tape/restore: add optional snapshots to 'restore'

Dominik Csapak d.csapak at proxmox.com
Tue May 11 12:50:04 CEST 2021


this makes it possible to only restore some snapshots from a tape media-set
instead of the whole. If the user selects only a small part, this will
probably be faster (and definitely uses less space on the target
datastores).

the user has to provide a list of snapshots to restore in the form of
'store:type/group/id'
e.g. 'mystore:ct/100/2021-01-01T00:00:00Z'

we achieve this by first restoring the index to a temp dir, retrieving
a list of chunks, and using the catalog, we generate a list of
media/files that we need to (partially) restore.

finally, we copy the snapshots to the correct dir in the datastore,
and clean up the temp dir

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/api2/tape/restore.rs | 539 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 523 insertions(+), 16 deletions(-)

diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index d1298434..14e20ee4 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1,6 +1,6 @@
-use std::path::Path;
+use std::path::{Path, PathBuf};
 use std::ffi::OsStr;
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, BTreeMap};
 use std::convert::TryFrom;
 use std::io::{Seek, SeekFrom};
 use std::sync::Arc;
@@ -40,6 +40,7 @@ use crate::{
         UPID_SCHEMA,
         Authid,
         Userid,
+        TAPE_RESTORE_SNAPSHOT_SCHEMA,
     },
     config::{
         self,
@@ -51,9 +52,14 @@ use crate::{
         },
     },
     backup::{
+        ArchiveType,
+        archive_type,
+        IndexFile,
         MANIFEST_BLOB_NAME,
         CryptMode,
         DataStore,
+        DynamicIndexReader,
+        FixedIndexReader,
         BackupDir,
         DataBlob,
         BackupManifest,
@@ -69,6 +75,7 @@ use crate::{
         MediaId,
         MediaSet,
         MediaCatalog,
+        MediaSetCatalog,
         Inventory,
         lock_media_set,
         file_formats::{
@@ -95,6 +102,8 @@ use crate::{
     },
 };
 
+const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
+
 pub struct DataStoreMap {
     map: HashMap<String, Arc<DataStore>>,
     default: Option<Arc<DataStore>>,
@@ -200,6 +209,14 @@ pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
                 type: Userid,
                 optional: true,
             },
+            "snapshots": {
+                description: "List of snapshots.",
+                type: Array,
+                optional: true,
+                items: {
+                    schema: TAPE_RESTORE_SNAPSHOT_SCHEMA,
+                },
+            },
             owner: {
                 type: Authid,
                 optional: true,
@@ -222,6 +239,7 @@ pub fn restore(
     drive: String,
     media_set: String,
     notify_user: Option<Userid>,
+    snapshots: Option<Vec<String>>,
     owner: Option<Authid>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
@@ -271,6 +289,7 @@ pub fn restore(
         .map(|s| s.to_string())
         .collect::<Vec<String>>()
         .join(", ");
+
     let upid_str = WorkerTask::new_thread(
         "tape-restore",
         Some(taskid),
@@ -288,20 +307,37 @@ pub fn restore(
                 .and_then(|userid| lookup_user_email(userid))
                 .or_else(|| lookup_user_email(&auth_id.clone().into()));
 
-
-            task_log!(worker, "Restore mediaset '{}'", media_set);
+            task_log!(worker, "Mediaset '{}'", media_set);
             task_log!(worker, "Pool: {}", pool);
-            let res = restore_worker(
-                worker.clone(),
-                inventory,
-                media_set_uuid,
-                drive_config,
-                &drive,
-                store_map,
-                restore_owner,
-                email
-            );
-            task_log!(worker, "Restore mediaset '{}' done", media_set);
+
+            let res = if let Some(snapshots) = snapshots {
+                restore_list_worker(
+                    worker.clone(),
+                    snapshots,
+                    inventory,
+                    media_set_uuid,
+                    drive_config,
+                    &drive,
+                    store_map,
+                    restore_owner,
+                    email,
+                )
+            } else {
+                restore_full_worker(
+                    worker.clone(),
+                    inventory,
+                    media_set_uuid,
+                    drive_config,
+                    &drive,
+                    store_map,
+                    restore_owner,
+                    email,
+                )
+            };
+
+            if res.is_ok() {
+                task_log!(worker, "Restore mediaset '{}' done", media_set);
+            }
 
             if let Err(err) = set_tape_device_state(&drive, "") {
                 task_log!(
@@ -319,7 +355,7 @@ pub fn restore(
     Ok(upid_str.into())
 }
 
-fn restore_worker(
+fn restore_full_worker(
     worker: Arc<WorkerTask>,
     inventory: Inventory,
     media_set_uuid: Uuid,
@@ -406,6 +442,477 @@ fn restore_worker(
     Ok(())
 }
 
+fn restore_list_worker(
+    worker: Arc<WorkerTask>,
+    snapshots: Vec<String>,
+    inventory: Inventory,
+    media_set_uuid: Uuid,
+    drive_config: SectionConfigData,
+    drive_name: &str,
+    store_map: DataStoreMap,
+    restore_owner: &Authid,
+    email: Option<String>,
+) -> Result<(), Error> {
+    let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
+    std::fs::create_dir_all(&base_path)?;
+
+    let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
+
+    let mut datastore_locks = Vec::new();
+    let mut snapshot_file_hash: BTreeMap<Uuid, Vec<u64>> = BTreeMap::new();
+    let mut snapshot_locks = HashMap::new();
+
+    let res = proxmox::try_block!({
+        // assemble snapshot files/locks
+        for store_snapshot in snapshots.iter() {
+            let mut split = store_snapshot.splitn(2, ':');
+            let source_datastore = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+            let snapshot = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+            let backup_dir: BackupDir = snapshot.parse()?;
+
+            let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
+                format_err!(
+                    "could not find mapping for source datastore: {}",
+                    source_datastore
+                )
+            })?;
+
+            let (owner, _group_lock) =
+                datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
+            if restore_owner != &owner {
+                // only the owner is allowed to create additional snapshots
+                bail!(
+                    "restore '{}' failed - owner check failed ({} != {})",
+                    snapshot,
+                    restore_owner,
+                    owner
+                );
+            }
+
+            let (media_id, file_num) = if let Some((media_uuid, file_num)) =
+                catalog.lookup_snapshot(&source_datastore, &snapshot)
+            {
+                let media_id = inventory.lookup_media(media_uuid).unwrap();
+                (media_id, file_num)
+            } else {
+                task_warn!(
+                    worker,
+                    "did not find snapshot '{}' in media set {}",
+                    snapshot,
+                    media_set_uuid
+                );
+                continue;
+            };
+
+            let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
+
+            if !is_new {
+                task_log!(
+                    worker,
+                    "found snapshot {} on target datastore, skipping...",
+                    snapshot
+                );
+                continue;
+            }
+
+            snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
+
+            let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
+            datastore_locks.push(shared_store_lock);
+
+            let file_list = snapshot_file_hash
+                .entry(media_id.label.uuid.clone())
+                .or_insert_with(Vec::new);
+            file_list.push(file_num);
+
+            task_log!(
+                worker,
+                "found snapshot {} on {}: file {}",
+                snapshot,
+                media_id.label.label_text,
+                file_num
+            );
+        }
+
+        if snapshot_file_hash.is_empty() {
+            task_log!(worker, "nothing to restore, skipping remaining phases...");
+            return Ok(());
+        }
+
+        task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
+        let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
+        for (media_uuid, file_list) in snapshot_file_hash.iter_mut() {
+            let media_id = inventory.lookup_media(media_uuid).unwrap();
+            let (drive, info) = request_and_load_media(
+                &worker,
+                &drive_config,
+                &drive_name,
+                &media_id.label,
+                &email,
+            )?;
+            file_list.sort_unstable();
+            restore_snapshots_to_tmpdir(
+                worker.clone(),
+                &base_path,
+                file_list,
+                drive,
+                &info,
+                &media_set_uuid,
+                &mut datastore_chunk_map,
+            ).map_err(|err| format_err!("could not restore snapshots to tmpdir: {}", err))?;
+        }
+
+        // sorted media_uuid => (sorted file_num => (set of digests)))
+        let mut media_file_chunk_map: BTreeMap<Uuid, BTreeMap<u64, HashSet<[u8; 32]>>> = BTreeMap::new();
+
+        for (source_datastore, chunks) in datastore_chunk_map.into_iter() {
+            let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+                format_err!(
+                    "could not find mapping for source datastore: {}",
+                    source_datastore
+                )
+            })?;
+            for digest in chunks.into_iter() {
+                // we only want to restore chunks that we do not have yet
+                if !datastore.cond_touch_chunk(&digest, false)? {
+                    if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
+                        let file = media_file_chunk_map.entry(uuid.clone()).or_insert_with(BTreeMap::new);
+                        let chunks = file.entry(nr).or_insert_with(HashSet::new);
+                        chunks.insert(digest);
+                    }
+                }
+            }
+        }
+
+        // we do not need it anymore, saves memory
+        drop(catalog);
+
+        if !media_file_chunk_map.is_empty() {
+            task_log!(worker, "Phase 2: restore chunks to datastores");
+        } else {
+            task_log!(worker, "all chunks exist already, skipping phase 2...");
+        }
+
+        for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() {
+            let media_id = inventory.lookup_media(media_uuid).unwrap();
+            let (mut drive, _info) = request_and_load_media(
+                &worker,
+                &drive_config,
+                &drive_name,
+                &media_id.label,
+                &email,
+            )?;
+            restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?;
+        }
+
+        task_log!(
+            worker,
+            "Phase 3: copy snapshots from temp dir to datastores"
+        );
+        for (store_snapshot, _lock) in snapshot_locks.into_iter() {
+            proxmox::try_block!({
+                let mut split = store_snapshot.splitn(2, ':');
+                let source_datastore = split
+                    .next()
+                    .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+                let snapshot = split
+                    .next()
+                    .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+                let backup_dir: BackupDir = snapshot.parse()?;
+
+                let datastore = store_map
+                    .get_datastore(&source_datastore)
+                    .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
+
+                let mut tmp_path = base_path.clone();
+                tmp_path.push(&source_datastore);
+                tmp_path.push(snapshot);
+
+                let path = datastore.snapshot_path(&backup_dir);
+
+                for entry in std::fs::read_dir(tmp_path)? {
+                    let entry = entry?;
+                    let mut new_path = path.clone();
+                    new_path.push(entry.file_name());
+                    std::fs::copy(entry.path(), new_path)?;
+                }
+                task_log!(worker, "Restore snapshot '{}' done", snapshot);
+                Ok(())
+            }).map_err(|err: Error| format_err!("could not copy {}: {}", store_snapshot, err))?;
+        }
+        Ok(())
+    });
+
+    match std::fs::remove_dir_all(&base_path) {
+        Ok(()) => {}
+        Err(err) => task_warn!(worker, "error cleaning up: {}", err),
+    }
+
+    res
+}
+
+fn get_media_set_catalog(
+    inventory: &Inventory,
+    media_set_uuid: &Uuid,
+) -> Result<MediaSetCatalog, Error> {
+    let status_path = Path::new(TAPE_STATUS_DIR);
+
+    let members = inventory.compute_media_set_members(media_set_uuid)?;
+    let media_list = members.media_list();
+    let mut catalog = MediaSetCatalog::new();
+
+    for (seq_nr, media_uuid) in media_list.iter().enumerate() {
+        match media_uuid {
+            None => {
+                bail!(
+                    "media set {} is incomplete (missing member {}).",
+                    media_set_uuid,
+                    seq_nr
+                );
+            }
+            Some(media_uuid) => {
+                let media_id = inventory.lookup_media(media_uuid).unwrap();
+                let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
+                catalog.append_catalog(media_catalog)?;
+            }
+        }
+    }
+
+    Ok(catalog)
+}
+
+fn restore_snapshots_to_tmpdir(
+    worker: Arc<WorkerTask>,
+    path: &PathBuf,
+    file_list: &[u64],
+    mut drive: Box<dyn TapeDriver>,
+    media_id: &MediaId,
+    media_set_uuid: &Uuid,
+    chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+    match media_id.media_set_label {
+        None => {
+            bail!(
+                "missing media set label on media {} ({})",
+                media_id.label.label_text,
+                media_id.label.uuid
+            );
+        }
+        Some(ref set) => {
+            if set.uuid != *media_set_uuid {
+                bail!(
+                    "wrong media set label on media {} ({} != {})",
+                    media_id.label.label_text,
+                    media_id.label.uuid,
+                    media_set_uuid
+                );
+            }
+            let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
+                task_log!(worker, "Encryption key fingerprint: {}", fp);
+                (fp, set.uuid.clone())
+            });
+
+            drive.set_encryption(encrypt_fingerprint)?;
+        }
+    }
+
+    for file_num in file_list {
+        let current_file_number = drive.current_file_number()?;
+        if current_file_number != *file_num {
+            task_log!(worker, "was at file {}, moving to {}", current_file_number, file_num);
+            drive.move_to_file(*file_num)?;
+            let current_file_number = drive.current_file_number()?;
+            task_log!(worker, "now at file {}", current_file_number);
+        }
+        let mut reader = drive.read_next_file()?;
+
+        let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+        if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+            bail!("missing MediaContentHeader");
+        }
+
+        match header.content_magic {
+            PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
+                let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+                let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
+                    .map_err(|err| {
+                        format_err!("unable to parse snapshot archive header - {}", err)
+                    })?;
+
+                let source_datastore = archive_header.store;
+                let snapshot = archive_header.snapshot;
+
+                task_log!(
+                    worker,
+                    "File {}: snapshot archive {}:{}",
+                    file_num,
+                    source_datastore,
+                    snapshot
+                );
+
+                let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
+
+                let mut tmp_path = path.clone();
+                tmp_path.push(&source_datastore);
+                tmp_path.push(snapshot);
+                std::fs::create_dir_all(&tmp_path)?;
+
+                let chunks = chunks_list
+                    .entry(source_datastore)
+                    .or_insert_with(HashSet::new);
+                let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
+                for item in manifest.files() {
+                    let mut archive_path = tmp_path.to_owned();
+                    archive_path.push(&item.filename);
+
+                    let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
+                        ArchiveType::DynamicIndex => {
+                            Box::new(DynamicIndexReader::open(&archive_path)?)
+                        }
+                        ArchiveType::FixedIndex => {
+                            Box::new(FixedIndexReader::open(&archive_path)?)
+                        }
+                        ArchiveType::Blob => continue,
+                    };
+                    for i in 0..index.index_count() {
+                        if let Some(digest) = index.index_digest(i) {
+                            chunks.insert(*digest);
+                        }
+                    }
+                }
+            }
+            other => bail!("unexpected file type: {:?}", other),
+        }
+    }
+
+    Ok(())
+}
+
+fn restore_file_chunk_map(
+    worker: Arc<WorkerTask>,
+    drive: &mut Box<dyn TapeDriver>,
+    store_map: &DataStoreMap,
+    file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+    for (nr, chunk_map) in file_chunk_map.iter_mut() {
+        let current_file_number = drive.current_file_number()?;
+        if current_file_number != *nr {
+            task_log!(worker, "was at file {}, moving to {}", current_file_number, nr);
+            drive.move_to_file(*nr)?;
+            let current_file_number = drive.current_file_number()?;
+            task_log!(worker, "now at file {}", current_file_number);
+        }
+        let mut reader = drive.read_next_file()?;
+        let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+        if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+            bail!("missing MediaContentHeader");
+        }
+
+        match header.content_magic {
+            PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
+                let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+                let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
+                    .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
+
+                let source_datastore = archive_header.store;
+
+                task_log!(
+                    worker,
+                    "File {}: chunk archive for datastore '{}'",
+                    nr,
+                    source_datastore
+                );
+
+                let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+                    format_err!("unexpected chunk archive for store: {}", source_datastore)
+                })?;
+
+                let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunk_map)?;
+                task_log!(worker, "restored {} chunks", count);
+            }
+            _ => bail!("unexpected content magic {:?}", header.content_magic),
+        }
+    }
+
+    Ok(())
+}
+
+fn restore_partial_chunk_archive<'a>(
+    worker: Arc<WorkerTask>,
+    reader: Box<dyn 'a + TapeRead>,
+    datastore: Arc<DataStore>,
+    chunk_list: &mut HashSet<[u8; 32]>,
+) -> Result<usize, Error> {
+    let mut decoder = ChunkArchiveDecoder::new(reader);
+
+    let mut count = 0;
+
+    let start_time = std::time::SystemTime::now();
+    let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
+    let bytes2 = bytes.clone();
+
+    let writer_pool = ParallelHandler::new(
+        "tape restore chunk writer",
+        4,
+        move |(chunk, digest): (DataBlob, [u8; 32])| {
+            if !datastore.cond_touch_chunk(&digest, false)? {
+                bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
+                chunk.verify_crc()?;
+                if chunk.crypt_mode()? == CryptMode::None {
+                    chunk.decode(None, Some(&digest))?; // verify digest
+                }
+
+                datastore.insert_chunk(&chunk, &digest)?;
+            }
+            Ok(())
+        },
+    );
+
+    let verify_and_write_channel = writer_pool.channel();
+
+    loop {
+        let (digest, blob) = match decoder.next_chunk()? {
+            Some((digest, blob)) => (digest, blob),
+            None => break,
+        };
+
+        worker.check_abort()?;
+
+        if chunk_list.remove(&digest) {
+            verify_and_write_channel.send((blob, digest.clone()))?;
+            count += 1;
+        }
+
+        if chunk_list.is_empty() {
+            break;
+        }
+    }
+
+    drop(verify_and_write_channel);
+
+    writer_pool.complete()?;
+
+    let elapsed = start_time.elapsed()?.as_secs_f64();
+
+    let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+
+    task_log!(
+        worker,
+        "restored {} bytes ({:.2} MB/s)",
+        bytes,
+        (bytes as f64) / (1_000_000.0 * elapsed)
+    );
+
+    Ok(count)
+}
+
+
 /// Request and restore complete media without using existing catalog (create catalog instead)
 pub fn request_and_restore_media(
     worker: Arc<WorkerTask>,
-- 
2.20.1






More information about the pbs-devel mailing list