[pbs-devel] [PATCH proxmox-backup v2 6/8] api2/tape/restore: add 'restore-single' api path
Dominik Csapak
d.csapak at proxmox.com
Wed May 5 12:09:16 CEST 2021
this makes it possible to only restore some snapshots from a tape media-set
instead of the whole. If the user selects only a small part, this will
probably be faster (and definitely uses less space on the target
datastores).
the user has to provide a datastore map (like on normal restore), and
a list of snapshots to restore in the form of 'store:type/group/id'
e.g. 'mystore:ct/100/2021-01-01T00:00:00Z'
we achieve this by first restoring the index to a temp dir, retrieving
a list of chunks, and using the catalog, we generate a list of
media/files that we need to (partially) restore.
finally, we copy the snapshots to the correct dir in the datastore,
and clean up the temp dir
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/api2/tape/mod.rs | 1 +
src/api2/tape/restore.rs | 612 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 612 insertions(+), 1 deletion(-)
diff --git a/src/api2/tape/mod.rs b/src/api2/tape/mod.rs
index 219a721b..4bf72d18 100644
--- a/src/api2/tape/mod.rs
+++ b/src/api2/tape/mod.rs
@@ -72,6 +72,7 @@ const SUBDIRS: SubdirMap = &[
("drive", &drive::ROUTER),
("media", &media::ROUTER),
("restore", &restore::ROUTER),
+ ("restore-single", &restore::ROUTER_SINGLE),
(
"scan-changers",
&Router::new()
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 9884b379..f6b2157e 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1,4 +1,4 @@
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::ffi::OsStr;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
@@ -51,9 +51,14 @@ use crate::{
},
},
backup::{
+ ArchiveType,
+ archive_type,
+ IndexFile,
MANIFEST_BLOB_NAME,
CryptMode,
DataStore,
+ DynamicIndexReader,
+ FixedIndexReader,
BackupDir,
DataBlob,
BackupManifest,
@@ -69,6 +74,7 @@ use crate::{
MediaId,
MediaSet,
MediaCatalog,
+ MediaSetCatalog,
Inventory,
lock_media_set,
file_formats::{
@@ -95,6 +101,8 @@ use crate::{
},
};
+const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
+
pub struct DataStoreMap {
map: HashMap<String, Arc<DataStore>>,
default: Option<Arc<DataStore>>,
@@ -182,6 +190,608 @@ fn check_datastore_privs(
}
pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
+pub const ROUTER_SINGLE: Router = Router::new().post(&API_METHOD_RESTORE_SINGLE);
+
+#[api(
+ input: {
+ properties: {
+ store: {
+ schema: DATASTORE_MAP_LIST_SCHEMA,
+ },
+ drive: {
+ schema: DRIVE_NAME_SCHEMA,
+ },
+ "media-set": {
+ description: "Media set UUID.",
+ type: String,
+ },
+ "snapshots": {
+ description: "List of snapshots.",
+ type: Array,
+ items: {
+ type: String,
+ description: "A single snapshot in format: 'store:type/group/id'."
+ },
+ },
+ "notify-user": {
+ type: Userid,
+ optional: true,
+ },
+ owner: {
+ type: Authid,
+ optional: true,
+ },
+ },
+ },
+ returns: {
+ schema: UPID_SCHEMA,
+ },
+ access: {
+ // Note: parameters are no uri parameter, so we need to test inside function body
+ description: "The user needs Tape.Read privilege on /tape/pool/{pool} \
+ and /tape/drive/{drive}, Datastore.Backup privilege on /datastore/{store}.",
+ permission: &Permission::Anybody,
+ },
+)]
+/// Restore single snapshots from from media-set
+pub fn restore_single(
+ store: String,
+ drive: String,
+ media_set: String,
+ snapshots: Vec<String>,
+ notify_user: Option<Userid>,
+ owner: Option<Authid>,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+ let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+ let user_info = CachedUserInfo::new()?;
+
+ let store_map = DataStoreMap::try_from(store)
+ .map_err(|err| format_err!("cannot parse store mapping: {}", err))?;
+ let used_datastores = store_map.used_datastores();
+ if used_datastores.len() == 0 {
+ bail!("no datastores given");
+ }
+
+ for store in used_datastores.iter() {
+ check_datastore_privs(&user_info, &store, &auth_id, &owner)?;
+ }
+
+ let privs = user_info.lookup_privs(&auth_id, &["tape", "drive", &drive]);
+ if (privs & PRIV_TAPE_READ) == 0 {
+ bail!("no permissions on /tape/drive/{}", drive);
+ }
+
+ let media_set_uuid = media_set.parse()?;
+
+ let status_path = Path::new(TAPE_STATUS_DIR);
+
+ // test what catalog files we have
+ let _catalogs = MediaCatalog::media_with_catalogs(status_path)?;
+
+ let _lock = lock_media_set(status_path, &media_set_uuid, None)?;
+
+ let inventory = Inventory::load(status_path)?;
+
+ let pool = inventory.lookup_media_set_pool(&media_set_uuid)?;
+
+ let privs = user_info.lookup_privs(&auth_id, &["tape", "pool", &pool]);
+ if (privs & PRIV_TAPE_READ) == 0 {
+ bail!("no permissions on /tape/pool/{}", pool);
+ }
+
+ let (drive_config, _digest) = config::drive::config()?;
+
+ // early check/lock before starting worker
+ let drive_lock = lock_tape_device(&drive_config, &drive)?;
+
+ let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
+
+ let upid_str = WorkerTask::new_thread(
+ "tape-restore-single",
+ None,
+ auth_id.clone(),
+ to_stdout,
+ move |worker| {
+ let _drive_lock = drive_lock; // keep lock guard
+
+ set_tape_device_state(&drive, &worker.upid().to_string())?;
+
+
+ let restore_owner = owner.as_ref().unwrap_or(&auth_id);
+
+ let email = notify_user
+ .as_ref()
+ .and_then(|userid| lookup_user_email(userid))
+ .or_else(|| lookup_user_email(&auth_id.clone().into()));
+
+ let res = restore_single_worker(
+ worker.clone(),
+ snapshots,
+ inventory,
+ media_set_uuid,
+ drive_config,
+ &drive,
+ store_map,
+ restore_owner,
+ email,
+ );
+
+ if let Err(err) = set_tape_device_state(&drive, "") {
+ task_log!(worker, "could not unset drive state for {}: {}", drive, err);
+ }
+
+ res
+ },
+ )?;
+
+ Ok(upid_str.into())
+}
+
+fn restore_single_worker(
+ worker: Arc<WorkerTask>,
+ snapshots: Vec<String>,
+ inventory: Inventory,
+ media_set_uuid: Uuid,
+ drive_config: SectionConfigData,
+ drive_name: &str,
+ store_map: DataStoreMap,
+ restore_owner: &Authid,
+ email: Option<String>,
+) -> Result<(), Error> {
+ let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
+ std::fs::create_dir_all(&base_path)?;
+
+ let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
+
+ let mut datastore_locks = Vec::new();
+ let mut snapshot_file_list: HashMap<Uuid, Vec<u64>> = HashMap::new();
+ let mut snapshot_locks = HashMap::new();
+
+ let res = proxmox::try_block!({
+ // assemble snapshot files/locks
+ for i in 0..snapshots.len() {
+ let store_snapshot = &snapshots[i];
+ let mut split = snapshots[i].splitn(2, ':');
+ let source_datastore = split
+ .next()
+ .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+ let snapshot = split
+ .next()
+ .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+ let backup_dir: BackupDir = snapshot.parse()?;
+
+ let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
+ format_err!(
+ "could not find mapping for source datastore: {}",
+ source_datastore
+ )
+ })?;
+
+ let (owner, _group_lock) =
+ datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
+ if restore_owner != &owner {
+ // only the owner is allowed to create additional snapshots
+ bail!(
+ "restore '{}' failed - owner check failed ({} != {})",
+ snapshot,
+ restore_owner,
+ owner
+ );
+ }
+
+ let (media_id, file_num) = if let Some((media_uuid, nr)) =
+ catalog.lookup_snapshot(&source_datastore, &snapshot)
+ {
+ let media_id = inventory.lookup_media(media_uuid).unwrap();
+ (media_id, nr)
+ } else {
+ task_warn!(
+ worker,
+ "did not find snapshot '{}' in media set {}",
+ snapshot,
+ media_set_uuid
+ );
+ continue;
+ };
+
+ let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
+
+ if !is_new {
+ task_log!(
+ worker,
+ "found snapshot {} on target datastore, skipping...",
+ snapshot
+ );
+ continue;
+ }
+
+ snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
+
+ let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
+ datastore_locks.push(shared_store_lock);
+
+ let file_list = snapshot_file_list
+ .entry(media_id.label.uuid.clone())
+ .or_insert_with(Vec::new);
+ file_list.push(file_num);
+
+ task_log!(
+ worker,
+ "found snapshot {} on {}: file {}",
+ snapshot,
+ media_id.label.label_text,
+ file_num
+ );
+ }
+
+ if snapshot_file_list.is_empty() {
+ task_log!(worker, "nothing to restore, skipping remaining phases...");
+ return Ok(());
+ }
+
+ task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
+ let mut chunks_list: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
+ for (media_uuid, file_list) in snapshot_file_list.iter_mut() {
+ let media_id = inventory.lookup_media(media_uuid).unwrap();
+ let (drive, info) = request_and_load_media(
+ &worker,
+ &drive_config,
+ &drive_name,
+ &media_id.label,
+ &email,
+ )?;
+ file_list.sort_unstable();
+ restore_snapshots_to_tmpdir(
+ worker.clone(),
+ &base_path,
+ file_list,
+ drive,
+ &info,
+ &media_set_uuid,
+ &mut chunks_list,
+ )?;
+ }
+
+ let mut media_list: HashMap<Uuid, HashMap<u64, HashSet<[u8; 32]>>> = HashMap::new();
+
+ for (source_datastore, chunks) in chunks_list.into_iter() {
+ let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+ format_err!(
+ "could not find mapping for source datastore: {}",
+ source_datastore
+ )
+ })?;
+ for digest in chunks.into_iter() {
+ // we only want to restore chunks that we do not have yet
+ if !datastore.cond_touch_chunk(&digest, false)? {
+ if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
+ let file = media_list.entry(uuid.clone()).or_insert_with(HashMap::new);
+ let chunks = file.entry(nr).or_insert_with(HashSet::new);
+ chunks.insert(digest);
+ }
+ }
+ }
+ }
+
+ // we do not need it anymore, saves memory
+ drop(catalog);
+
+ if !media_list.is_empty() {
+ task_log!(worker, "Phase 2: restore chunks to datastores");
+ } else {
+ task_log!(worker, "all chunks exist already, skipping phase 2...");
+ }
+
+ for (media_uuid, file_list) in media_list.iter_mut() {
+ let media_id = inventory.lookup_media(media_uuid).unwrap();
+ let (mut drive, _info) = request_and_load_media(
+ &worker,
+ &drive_config,
+ &drive_name,
+ &media_id.label,
+ &email,
+ )?;
+ let mut files: Vec<u64> = file_list.keys().map(|v| *v).collect();
+ files.sort();
+ restore_chunk_file_list(worker.clone(), &mut drive, &files[..], &store_map, file_list)?;
+ }
+
+ task_log!(
+ worker,
+ "Phase 3: copy snapshots from temp dir to datastores"
+ );
+ for (store_snapshot, _lock) in snapshot_locks.into_iter() {
+ let mut split = store_snapshot.splitn(2, ':');
+ let source_datastore = split
+ .next()
+ .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+ let snapshot = split
+ .next()
+ .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+ let backup_dir: BackupDir = snapshot.parse()?;
+
+ let datastore = store_map
+ .get_datastore(&source_datastore)
+ .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
+
+ let mut tmp_path = base_path.clone();
+ tmp_path.push(&source_datastore);
+ tmp_path.push(snapshot);
+
+ let path = datastore.snapshot_path(&backup_dir);
+
+ for entry in std::fs::read_dir(tmp_path)? {
+ let entry = entry?;
+ let mut new_path = path.clone();
+ new_path.push(entry.file_name());
+ std::fs::copy(entry.path(), new_path)?;
+ }
+ task_log!(worker, "Restore snapshot '{}' done", snapshot);
+ }
+ Ok(())
+ });
+
+ match std::fs::remove_dir_all(&base_path) {
+ Ok(()) => {}
+ Err(err) => task_warn!(worker, "error cleaning up: {}", err),
+ }
+
+ res
+}
+
+fn get_media_set_catalog(
+ inventory: &Inventory,
+ media_set_uuid: &Uuid,
+) -> Result<MediaSetCatalog, Error> {
+ let status_path = Path::new(TAPE_STATUS_DIR);
+
+ let members = inventory.compute_media_set_members(media_set_uuid)?;
+ let media_list = members.media_list();
+ let mut catalog = MediaSetCatalog::new();
+
+ for (seq_nr, media_uuid) in media_list.iter().enumerate() {
+ match media_uuid {
+ None => {
+ bail!(
+ "media set {} is incomplete (missing member {}).",
+ media_set_uuid,
+ seq_nr
+ );
+ }
+ Some(media_uuid) => {
+ let media_id = inventory.lookup_media(media_uuid).unwrap();
+ let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
+ catalog.append_catalog(media_catalog)?;
+ }
+ }
+ }
+
+ Ok(catalog)
+}
+
+fn restore_snapshots_to_tmpdir(
+ worker: Arc<WorkerTask>,
+ path: &PathBuf,
+ file_list: &[u64],
+ mut drive: Box<dyn TapeDriver>,
+ media_id: &MediaId,
+ media_set_uuid: &Uuid,
+ chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+ match media_id.media_set_label {
+ None => {
+ bail!(
+ "missing media set label on media {} ({})",
+ media_id.label.label_text,
+ media_id.label.uuid
+ );
+ }
+ Some(ref set) => {
+ if set.uuid != *media_set_uuid {
+ bail!(
+ "wrong media set label on media {} ({} != {})",
+ media_id.label.label_text,
+ media_id.label.uuid,
+ media_set_uuid
+ );
+ }
+ let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
+ task_log!(worker, "Encryption key fingerprint: {}", fp);
+ (fp, set.uuid.clone())
+ });
+
+ drive.set_encryption(encrypt_fingerprint)?;
+ }
+ }
+
+ for file_num in file_list {
+ drive.move_to_file(*file_num)?;
+ let mut reader = drive.read_next_file()?;
+
+ let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+ if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+ bail!("missing MediaContentHeader");
+ }
+
+ match header.content_magic {
+ PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
+ let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+ let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
+ .map_err(|err| {
+ format_err!("unable to parse snapshot archive header - {}", err)
+ })?;
+
+ let source_datastore = archive_header.store;
+ let snapshot = archive_header.snapshot;
+
+ task_log!(
+ worker,
+ "File {}: snapshot archive {}:{}",
+ file_num,
+ source_datastore,
+ snapshot
+ );
+
+ let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
+
+ let mut tmp_path = path.clone();
+ tmp_path.push(&source_datastore);
+ tmp_path.push(snapshot);
+ std::fs::create_dir_all(&tmp_path)?;
+
+ let chunks = chunks_list
+ .entry(source_datastore)
+ .or_insert_with(HashSet::new);
+ let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
+ for item in manifest.files() {
+ let mut archive_path = tmp_path.to_owned();
+ archive_path.push(&item.filename);
+
+ let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
+ ArchiveType::DynamicIndex => {
+ Box::new(DynamicIndexReader::open(&archive_path)?)
+ }
+ ArchiveType::FixedIndex => {
+ Box::new(FixedIndexReader::open(&archive_path)?)
+ }
+ ArchiveType::Blob => continue,
+ };
+ for i in 0..index.index_count() {
+ if let Some(digest) = index.index_digest(i) {
+ chunks.insert(*digest);
+ }
+ }
+ }
+ }
+ _ => bail!("unexpected file type"),
+ }
+ }
+
+ Ok(())
+}
+
+fn restore_chunk_file_list(
+ worker: Arc<WorkerTask>,
+ drive: &mut Box<dyn TapeDriver>,
+ file_list: &[u64],
+ store_map: &DataStoreMap,
+ chunk_list: &mut HashMap<u64, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+ for nr in file_list {
+ let current_file_number = drive.current_file_number()?;
+ if current_file_number != *nr {
+ drive.move_to_file(*nr)?;
+ }
+ let mut reader = drive.read_next_file()?;
+ let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+ if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+ bail!("missing MediaContentHeader");
+ }
+
+ match header.content_magic {
+ PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
+ let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+ let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
+ .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
+
+ let source_datastore = archive_header.store;
+
+ task_log!(
+ worker,
+ "File {}: chunk archive for datastore '{}'",
+ nr,
+ source_datastore
+ );
+
+ let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+ format_err!("unexpected chunk archive for store: {}", source_datastore)
+ })?;
+
+ let chunks = chunk_list
+ .get_mut(nr)
+ .ok_or_else(|| format_err!("undexpected file nr: {}", nr))?;
+
+ let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunks)?;
+ task_log!(worker, "restored {} chunks", count);
+ }
+ _ => bail!("unexpected content magic {:?}", header.content_magic),
+ }
+ }
+
+ Ok(())
+}
+
+fn restore_partial_chunk_archive<'a>(
+ worker: Arc<WorkerTask>,
+ reader: Box<dyn 'a + TapeRead>,
+ datastore: Arc<DataStore>,
+ chunk_list: &mut HashSet<[u8; 32]>,
+) -> Result<usize, Error> {
+ let mut decoder = ChunkArchiveDecoder::new(reader);
+
+ let mut count = 0;
+
+ let start_time = std::time::SystemTime::now();
+ let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
+ let bytes2 = bytes.clone();
+
+ let writer_pool = ParallelHandler::new(
+ "tape restore chunk writer",
+ 4,
+ move |(chunk, digest): (DataBlob, [u8; 32])| {
+ if !datastore.cond_touch_chunk(&digest, false)? {
+ bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
+ chunk.verify_crc()?;
+ if chunk.crypt_mode()? == CryptMode::None {
+ chunk.decode(None, Some(&digest))?; // verify digest
+ }
+
+ datastore.insert_chunk(&chunk, &digest)?;
+ }
+ Ok(())
+ },
+ );
+
+ let verify_and_write_channel = writer_pool.channel();
+
+ loop {
+ let (digest, blob) = match decoder.next_chunk()? {
+ Some((digest, blob)) => (digest, blob),
+ None => break,
+ };
+
+ worker.check_abort()?;
+
+ if chunk_list.remove(&digest) {
+ verify_and_write_channel.send((blob, digest.clone()))?;
+ count += 1;
+ }
+
+ if chunk_list.is_empty() {
+ break;
+ }
+ }
+
+ drop(verify_and_write_channel);
+
+ writer_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+
+ let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+
+ task_log!(
+ worker,
+ "restored {} bytes ({:.2} MB/s)",
+ bytes,
+ (bytes as f64) / (1_000_000.0 * elapsed)
+ );
+
+ Ok(count)
+}
#[api(
input: {
--
2.20.1
More information about the pbs-devel
mailing list