[pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks
Hannes Laimer
h.laimer at proxmox.com
Tue Sep 3 14:34:00 CEST 2024
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
src/api2/admin/datastore.rs | 27 ++---
src/api2/backup/environment.rs | 176 +++++++++++++++++---------------
src/api2/backup/mod.rs | 21 ++--
src/api2/backup/upload_chunk.rs | 19 ++--
src/api2/reader/environment.rs | 31 +++---
src/api2/reader/mod.rs | 5 +-
src/api2/tape/backup.rs | 21 ++--
src/api2/tape/drive.rs | 2 +-
src/api2/tape/restore.rs | 69 +++++++------
9 files changed, 196 insertions(+), 175 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f1eed9fc..e1124284 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Error};
use futures::*;
use hyper::http::request::Parts;
use hyper::{header, Body, Response, StatusCode};
+use pbs_datastore::chunk_store::{CanRead, Read};
use serde::Deserialize;
use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream;
@@ -73,8 +74,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
const GROUP_NOTES_FILE_NAME: &str = "notes";
-fn get_group_note_path(
- store: &DataStore,
+fn get_group_note_path<T>(
+ store: &DataStore<T>,
ns: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
) -> PathBuf {
@@ -111,8 +112,8 @@ fn check_privs<T: CanRead>(
Ok(())
}
-fn read_backup_index(
- backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+ backup_dir: &BackupDir<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, index_size) = backup_dir.load_manifest()?;
@@ -137,8 +138,8 @@ fn read_backup_index(
Ok((manifest, result))
}
-fn get_all_snapshot_files(
- info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+ info: &BackupInfo<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
@@ -502,7 +503,7 @@ unsafe fn list_snapshots_blocking(
(None, None) => datastore.list_backup_groups(ns.clone())?,
};
- let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+ let info_to_snapshot_list_item = |group: &BackupGroup<Read>, owner, info: BackupInfo<Read>| {
let backup = pbs_api_types::BackupDir {
group: group.into(),
time: info.backup_dir.backup_time(),
@@ -604,8 +605,8 @@ unsafe fn list_snapshots_blocking(
})
}
-async fn get_snapshots_count(
- store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+ store: &Arc<DataStore<T>>,
owner: Option<&Authid>,
) -> Result<Counts, Error> {
let store = Arc::clone(store);
@@ -1771,12 +1772,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
-fn get_local_pxar_reader(
- datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+ datastore: Arc<DataStore<T>>,
manifest: &BackupManifest,
- backup_dir: &BackupDir,
+ backup_dir: &BackupDir<T>,
pxar_name: &str,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
let mut path = datastore.base_path();
path.push(backup_dir.relative_path());
path.push(pxar_name);
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2..f222da76 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, format_err, Error};
use nix::dir::Dir;
+use pbs_datastore::chunk_store::CanWrite;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::info;
@@ -53,17 +54,17 @@ impl std::ops::Add for UploadStatistic {
}
}
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
name: String,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
-struct FixedWriterState {
+struct FixedWriterState<T> {
name: String,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
size: usize,
chunk_size: u32,
chunk_count: u64,
@@ -75,18 +76,18 @@ struct FixedWriterState {
// key=digest, value=length
type KnownChunksMap = HashMap<[u8; 32], u32>;
-struct SharedBackupState {
+struct SharedBackupState<T> {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
- dynamic_writers: HashMap<usize, DynamicWriterState>,
- fixed_writers: HashMap<usize, FixedWriterState>,
+ dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+ fixed_writers: HashMap<usize, FixedWriterState<T>>,
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
}
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
@@ -104,26 +105,26 @@ impl SharedBackupState {
/// `RpcEnvironment` implementation for backup service
#[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
- pub last_backup: Option<BackupInfo>,
- state: Arc<Mutex<SharedBackupState>>,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
+ pub last_backup: Option<BackupInfo<T>>,
+ state: Arc<Mutex<SharedBackupState<T>>>,
}
-impl BackupEnvironment {
+impl<T> BackupEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
let state = SharedBackupState {
finished: false,
@@ -149,7 +150,69 @@ impl BackupEnvironment {
state: Arc::new(Mutex::new(state)),
}
}
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+ pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+ self.formatter.format_result(result, self)
+ }
+
+ /// If verify-new is set on the datastore, this will run a new verify task
+ /// for the backup. If not, this will return and also drop the passed lock
+ /// immediately.
+ pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
+ self.ensure_finished()?;
+
+ if !self.datastore.verify_new() {
+ // no verify requested, do nothing
+ return Ok(());
+ }
+
+ // Downgrade to shared lock, the backup itself is finished
+ drop(excl_snap_lock);
+ let snap_lock = lock_dir_noblock_shared(
+ &self.backup_dir.full_path(),
+ "snapshot",
+ "snapshot is already locked by another operation",
+ )?;
+
+ let worker_id = format!(
+ "{}:{}/{}/{:08X}",
+ self.datastore.name(),
+ self.backup_dir.backup_type(),
+ self.backup_dir.backup_id(),
+ self.backup_dir.backup_time()
+ );
+
+ let datastore = self.datastore.clone();
+ let backup_dir = self.backup_dir.clone();
+
+ WorkerTask::new_thread(
+ "verify",
+ Some(worker_id),
+ self.auth_id.to_string(),
+ false,
+ move |worker| {
+ worker.log_message("Automatically verifying newly added snapshot");
+
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ if !verify_backup_dir_with_lock(
+ &verify_worker,
+ &backup_dir,
+ worker.upid().clone(),
+ None,
+ snap_lock,
+ )? {
+ bail!("verification failed - please check the log for details");
+ }
+ Ok(())
+ },
+ )
+ .map(|_| ())
+ }
+}
+impl<T: CanWrite> BackupEnvironment<T> {
/// Register a Chunk with associated length.
///
/// We do not fully trust clients, so a client may only use registered
@@ -262,7 +325,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_dynamic_writer(
&self,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
name: String,
) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@@ -288,7 +351,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_fixed_writer(
&self,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
name: String,
size: usize,
chunk_size: u32,
@@ -632,61 +695,6 @@ impl BackupEnvironment {
Ok(())
}
- /// If verify-new is set on the datastore, this will run a new verify task
- /// for the backup. If not, this will return and also drop the passed lock
- /// immediately.
- pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
- self.ensure_finished()?;
-
- if !self.datastore.verify_new() {
- // no verify requested, do nothing
- return Ok(());
- }
-
- // Downgrade to shared lock, the backup itself is finished
- drop(excl_snap_lock);
- let snap_lock = lock_dir_noblock_shared(
- &self.backup_dir.full_path(),
- "snapshot",
- "snapshot is already locked by another operation",
- )?;
-
- let worker_id = format!(
- "{}:{}/{}/{:08X}",
- self.datastore.name(),
- self.backup_dir.backup_type(),
- self.backup_dir.backup_id(),
- self.backup_dir.backup_time()
- );
-
- let datastore = self.datastore.clone();
- let backup_dir = self.backup_dir.clone();
-
- WorkerTask::new_thread(
- "verify",
- Some(worker_id),
- self.auth_id.to_string(),
- false,
- move |worker| {
- worker.log_message("Automatically verifying newly added snapshot");
-
- let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
- if !verify_backup_dir_with_lock(
- &verify_worker,
- &backup_dir,
- worker.upid().clone(),
- None,
- snap_lock,
- )? {
- bail!("verification failed - please check the log for details");
- }
-
- Ok(())
- },
- )
- .map(|_| ())
- }
-
pub fn log<S: AsRef<str>>(&self, msg: S) {
info!("{}", msg.as_ref());
}
@@ -701,10 +709,6 @@ impl BackupEnvironment {
}
}
- pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
- self.formatter.format_result(result, self)
- }
-
/// Raise error if finished flag is not set
pub fn ensure_finished(&self) -> Result<(), Error> {
let state = self.state.lock().unwrap();
@@ -735,7 +739,7 @@ impl BackupEnvironment {
}
}
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -757,14 +761,18 @@ impl RpcEnvironment for BackupEnvironment {
}
}
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index e6a92117..fda7f41d 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::{Lookup, Write};
use serde::Deserialize;
use serde_json::{json, Value};
@@ -281,7 +282,7 @@ fn upgrade_to_backup_protocol(
return Ok(());
}
- let verify = |env: BackupEnvironment| {
+ let verify = |env: BackupEnvironment<pbs_datastore::chunk_store::Write>| {
if let Err(err) = env.verify_after_complete(snap_guard) {
env.log(format!(
"backup finished, but starting the requested verify task failed: {}",
@@ -402,7 +403,7 @@ fn create_dynamic_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
@@ -452,7 +453,7 @@ fn create_fixed_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
let size = required_integer_param(¶m, "size")? as usize;
@@ -567,7 +568,7 @@ fn dynamic_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("dynamic_append {} chunks", digest_list.len()));
@@ -641,7 +642,7 @@ fn fixed_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("fixed_append {} chunks", digest_list.len()));
@@ -716,7 +717,7 @@ fn close_dynamic_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.dynamic_writer_close(wid, chunk_count, size, csum)?;
@@ -769,7 +770,7 @@ fn close_fixed_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size, csum)?;
@@ -783,7 +784,7 @@ fn finish_backup(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Write> = rpcenv.as_ref();
env.finish_backup()?;
env.log("successfully finished backup");
@@ -802,7 +803,7 @@ fn get_previous_backup_time(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Lookup> = rpcenv.as_ref();
let backup_time = env
.last_backup
@@ -829,7 +830,7 @@ fn download_previous(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let archive_name = required_string_param(¶m, "archive-name")?.to_owned();
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..57b30dec 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -7,6 +7,7 @@ use futures::*;
use hex::FromHex;
use hyper::http::request::Parts;
use hyper::Body;
+use pbs_datastore::chunk_store::{CanWrite, Write as StoreWrite};
use serde_json::{json, Value};
use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -20,19 +21,19 @@ use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
raw_data: Option<Vec<u8>>,
}
-impl UploadChunk {
+impl<T: CanWrite> UploadChunk<T> {
pub fn new(
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
}
}
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
type Output = Result<([u8; 32], u32, u32, bool), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
println!("Upload error: {}", err);
}
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
}
.boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
let file_name = required_string_param(¶m, "file-name")?.to_owned();
let encoded_size = required_integer_param(¶m, "encoded-size")? as usize;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..13d346eb 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
+use pbs_datastore::chunk_store::CanRead;
use serde_json::{json, Value};
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
@@ -14,25 +15,25 @@ use tracing::info;
/// `RpcEnvironment` implementation for backup reader service
#[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
}
-impl ReaderEnvironment {
+impl<T: CanRead> ReaderEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
Self {
result_attributes: json!({}),
@@ -71,7 +72,7 @@ impl ReaderEnvironment {
}
}
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -93,14 +94,18 @@ impl RpcEnvironment for ReaderEnvironment {
}
}
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 48a8c5fc..cf20addc 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::Read;
use serde::Deserialize;
use serde_json::Value;
@@ -253,7 +254,7 @@ fn download_file(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let file_name = required_string_param(¶m, "file-name")?.to_owned();
@@ -309,7 +310,7 @@ fn download_chunk(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a0189..662b953e 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -1,6 +1,7 @@
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::CanWrite;
use serde_json::Value;
use tracing::{info, warn};
@@ -11,9 +12,9 @@ use proxmox_schema::api;
use proxmox_worker_task::WorkerTaskContext;
use pbs_api_types::{
- print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation,
- TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA,
- PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE, UPID_SCHEMA,
+ print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, TapeBackupJobConfig,
+ TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT,
+ PRIV_TAPE_WRITE, UPID_SCHEMA,
};
use pbs_config::CachedUserInfo;
@@ -150,7 +151,7 @@ pub fn do_tape_backup_job(
let worker_type = job.jobtype().to_string();
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -306,7 +307,7 @@ pub fn backup(
check_backup_permission(&auth_id, &setup.store, &setup.pool, &setup.drive)?;
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
Ignored,
}
-fn backup_worker(
+fn backup_worker<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
pool_config: &MediaPoolConfig,
setup: &TapeBackupJobSetup,
summary: &mut TapeBackupJobSummary,
@@ -560,11 +561,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
}
}
-fn backup_snapshot(
+fn backup_snapshot<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
pool_writer: &mut PoolWriter,
- datastore: Arc<DataStore>,
- snapshot: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ snapshot: BackupDir<T>,
) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path();
info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..342406c6 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -1342,7 +1342,7 @@ pub fn catalog_media(
drive.read_label()?; // skip over labels - we already read them above
let mut checked_chunks = HashMap::new();
- restore_media(
+ restore_media::<pbs_datastore::chunk_store::Write>(
worker,
&mut drive,
&media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 95ce13c7..20558670 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as StoreWrite};
use serde_json::Value;
use tracing::{info, warn};
@@ -120,13 +121,13 @@ impl NamespaceMap {
}
}
-pub struct DataStoreMap {
- map: HashMap<String, Arc<DataStore>>,
- default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+ map: HashMap<String, Arc<DataStore<T>>>,
+ default: Option<Arc<DataStore<T>>>,
ns_map: Option<NamespaceMap>,
}
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<StoreWrite> {
type Error = Error;
fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
}
}
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,9 @@ impl DataStoreMap {
Ok(count > 0)
}
- fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+ fn used_datastores(
+ &self,
+ ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,7 +192,7 @@ impl DataStoreMap {
.map(|mapping| mapping.get_namespaces(datastore, ns))
}
- fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+ fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
self.map
.get(source_datastore)
.or(self.default.as_ref())
@@ -200,7 +203,7 @@ impl DataStoreMap {
&self,
source_datastore: &str,
source_ns: &BackupNamespace,
- ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+ ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
self.target_store(source_datastore)
.map(|store| (store, self.target_ns(source_datastore, source_ns)))
}
@@ -237,9 +240,9 @@ fn check_datastore_privs(
Ok(())
}
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
user_info: &CachedUserInfo,
- store: &Arc<DataStore>,
+ store: &Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
owner: Option<&Authid>,
@@ -449,13 +452,13 @@ pub fn restore(
}
#[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
auth_id: &Authid,
@@ -529,8 +532,8 @@ fn restore_full_worker(
}
#[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
- store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+ store_map: &DataStoreMap<T>,
store: &str,
snapshot: &str,
ns: &BackupNamespace,
@@ -618,14 +621,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
}
#[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
user_info: Arc<CachedUserInfo>,
@@ -955,16 +958,16 @@ fn get_media_set_catalog(
Ok(catalog)
}
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
let mut path = datastore.base_path();
path.push(".tmp");
path.push(media_set_uuid.to_string());
path
}
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
source_datastore: &str,
- datastore: &DataStore,
+ datastore: &DataStore<T>,
snapshot: &str,
media_set_uuid: &Uuid,
) -> PathBuf {
@@ -974,9 +977,9 @@ fn snapshot_tmpdir(
path
}
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
worker: Arc<WorkerTask>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
@@ -1083,10 +1086,10 @@ fn restore_snapshots_to_tmpdir(
Ok(tmp_paths)
}
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1136,10 @@ fn restore_file_chunk_map(
Ok(())
}
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1198,12 @@ fn restore_partial_chunk_archive<'a>(
/// Request and restore complete media without using existing catalog (create catalog instead)
#[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
media_id: &MediaId,
drive_config: &SectionConfigData,
drive_name: &str,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
@@ -1253,11 +1256,11 @@ pub fn request_and_restore_media(
/// Restore complete media content and catalog
///
/// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
media_id: &MediaId,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
auth_id: &Authid,
@@ -1301,11 +1304,11 @@ pub fn restore_media(
}
#[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
mut reader: Box<dyn 'a + TapeRead>,
current_file_number: u64,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
catalog: &mut MediaCatalog,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
@@ -1525,10 +1528,10 @@ fn scan_chunk_archive<'a>(
Ok(Some(chunks))
}
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
checked_chunks: &mut HashSet<[u8; 32]>,
verbose: bool,
) -> Result<Option<Vec<[u8; 32]>>, Error> {
--
2.39.2
More information about the pbs-devel
mailing list