[pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks

Hannes Laimer h.laimer at proxmox.com
Tue Sep 3 14:34:00 CEST 2024


Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/api2/admin/datastore.rs     |  27 ++---
 src/api2/backup/environment.rs  | 176 +++++++++++++++++---------------
 src/api2/backup/mod.rs          |  21 ++--
 src/api2/backup/upload_chunk.rs |  19 ++--
 src/api2/reader/environment.rs  |  31 +++---
 src/api2/reader/mod.rs          |   5 +-
 src/api2/tape/backup.rs         |  21 ++--
 src/api2/tape/drive.rs          |   2 +-
 src/api2/tape/restore.rs        |  69 +++++++------
 9 files changed, 196 insertions(+), 175 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f1eed9fc..e1124284 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Error};
 use futures::*;
 use hyper::http::request::Parts;
 use hyper::{header, Body, Response, StatusCode};
+use pbs_datastore::chunk_store::{CanRead, Read};
 use serde::Deserialize;
 use serde_json::{json, Value};
 use tokio_stream::wrappers::ReceiverStream;
@@ -73,8 +74,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
 
 const GROUP_NOTES_FILE_NAME: &str = "notes";
 
-fn get_group_note_path(
-    store: &DataStore,
+fn get_group_note_path<T>(
+    store: &DataStore<T>,
     ns: &BackupNamespace,
     group: &pbs_api_types::BackupGroup,
 ) -> PathBuf {
@@ -111,8 +112,8 @@ fn check_privs<T: CanRead>(
     Ok(())
 }
 
-fn read_backup_index(
-    backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+    backup_dir: &BackupDir<T>,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
     let (manifest, index_size) = backup_dir.load_manifest()?;
 
@@ -137,8 +138,8 @@ fn read_backup_index(
     Ok((manifest, result))
 }
 
-fn get_all_snapshot_files(
-    info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+    info: &BackupInfo<T>,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
     let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
 
@@ -502,7 +503,7 @@ unsafe fn list_snapshots_blocking(
         (None, None) => datastore.list_backup_groups(ns.clone())?,
     };
 
-    let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+    let info_to_snapshot_list_item = |group: &BackupGroup<Read>, owner, info: BackupInfo<Read>| {
         let backup = pbs_api_types::BackupDir {
             group: group.into(),
             time: info.backup_dir.backup_time(),
@@ -604,8 +605,8 @@ unsafe fn list_snapshots_blocking(
     })
 }
 
-async fn get_snapshots_count(
-    store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+    store: &Arc<DataStore<T>>,
     owner: Option<&Authid>,
 ) -> Result<Counts, Error> {
     let store = Arc::clone(store);
@@ -1771,12 +1772,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
     &Permission::Anybody,
 );
 
-fn get_local_pxar_reader(
-    datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+    datastore: Arc<DataStore<T>>,
     manifest: &BackupManifest,
-    backup_dir: &BackupDir,
+    backup_dir: &BackupDir<T>,
     pxar_name: &str,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
     let mut path = datastore.base_path();
     path.push(backup_dir.relative_path());
     path.push(pxar_name);
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2..f222da76 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,5 +1,6 @@
 use anyhow::{bail, format_err, Error};
 use nix::dir::Dir;
+use pbs_datastore::chunk_store::CanWrite;
 use std::collections::HashMap;
 use std::sync::{Arc, Mutex};
 use tracing::info;
@@ -53,17 +54,17 @@ impl std::ops::Add for UploadStatistic {
     }
 }
 
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
     name: String,
-    index: DynamicIndexWriter,
+    index: DynamicIndexWriter<T>,
     offset: u64,
     chunk_count: u64,
     upload_stat: UploadStatistic,
 }
 
-struct FixedWriterState {
+struct FixedWriterState<T> {
     name: String,
-    index: FixedIndexWriter,
+    index: FixedIndexWriter<T>,
     size: usize,
     chunk_size: u32,
     chunk_count: u64,
@@ -75,18 +76,18 @@ struct FixedWriterState {
 // key=digest, value=length
 type KnownChunksMap = HashMap<[u8; 32], u32>;
 
-struct SharedBackupState {
+struct SharedBackupState<T> {
     finished: bool,
     uid_counter: usize,
     file_counter: usize, // successfully uploaded files
-    dynamic_writers: HashMap<usize, DynamicWriterState>,
-    fixed_writers: HashMap<usize, FixedWriterState>,
+    dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+    fixed_writers: HashMap<usize, FixedWriterState<T>>,
     known_chunks: KnownChunksMap,
     backup_size: u64, // sums up size of all files
     backup_stat: UploadStatistic,
 }
 
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
     // Raise error if finished flag is set
     fn ensure_unfinished(&self) -> Result<(), Error> {
         if self.finished {
@@ -104,26 +105,26 @@ impl SharedBackupState {
 
 /// `RpcEnvironment` implementation for backup service
 #[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
     env_type: RpcEnvironmentType,
     result_attributes: Value,
     auth_id: Authid,
     pub debug: bool,
     pub formatter: &'static dyn OutputFormatter,
     pub worker: Arc<WorkerTask>,
-    pub datastore: Arc<DataStore>,
-    pub backup_dir: BackupDir,
-    pub last_backup: Option<BackupInfo>,
-    state: Arc<Mutex<SharedBackupState>>,
+    pub datastore: Arc<DataStore<T>>,
+    pub backup_dir: BackupDir<T>,
+    pub last_backup: Option<BackupInfo<T>>,
+    state: Arc<Mutex<SharedBackupState<T>>>,
 }
 
-impl BackupEnvironment {
+impl<T> BackupEnvironment<T> {
     pub fn new(
         env_type: RpcEnvironmentType,
         auth_id: Authid,
         worker: Arc<WorkerTask>,
-        datastore: Arc<DataStore>,
-        backup_dir: BackupDir,
+        datastore: Arc<DataStore<T>>,
+        backup_dir: BackupDir<T>,
     ) -> Self {
         let state = SharedBackupState {
             finished: false,
@@ -149,7 +150,69 @@ impl BackupEnvironment {
             state: Arc::new(Mutex::new(state)),
         }
     }
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+        self.formatter.format_result(result, self)
+    }
+
+    /// If verify-new is set on the datastore, this will run a new verify task
+    /// for the backup. If not, this will return and also drop the passed lock
+    /// immediately.
+    pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
+        self.ensure_finished()?;
+
+        if !self.datastore.verify_new() {
+            // no verify requested, do nothing
+            return Ok(());
+        }
+
+        // Downgrade to shared lock, the backup itself is finished
+        drop(excl_snap_lock);
+        let snap_lock = lock_dir_noblock_shared(
+            &self.backup_dir.full_path(),
+            "snapshot",
+            "snapshot is already locked by another operation",
+        )?;
+
+        let worker_id = format!(
+            "{}:{}/{}/{:08X}",
+            self.datastore.name(),
+            self.backup_dir.backup_type(),
+            self.backup_dir.backup_id(),
+            self.backup_dir.backup_time()
+        );
+
+        let datastore = self.datastore.clone();
+        let backup_dir = self.backup_dir.clone();
+
+        WorkerTask::new_thread(
+            "verify",
+            Some(worker_id),
+            self.auth_id.to_string(),
+            false,
+            move |worker| {
+                worker.log_message("Automatically verifying newly added snapshot");
+
+                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+                if !verify_backup_dir_with_lock(
+                    &verify_worker,
+                    &backup_dir,
+                    worker.upid().clone(),
+                    None,
+                    snap_lock,
+                )? {
+                    bail!("verification failed - please check the log for details");
+                }
 
+                Ok(())
+            },
+        )
+        .map(|_| ())
+    }
+}
+impl<T: CanWrite> BackupEnvironment<T> {
     /// Register a Chunk with associated length.
     ///
     /// We do not fully trust clients, so a client may only use registered
@@ -262,7 +325,7 @@ impl BackupEnvironment {
     /// Store the writer with an unique ID
     pub fn register_dynamic_writer(
         &self,
-        index: DynamicIndexWriter,
+        index: DynamicIndexWriter<T>,
         name: String,
     ) -> Result<usize, Error> {
         let mut state = self.state.lock().unwrap();
@@ -288,7 +351,7 @@ impl BackupEnvironment {
     /// Store the writer with an unique ID
     pub fn register_fixed_writer(
         &self,
-        index: FixedIndexWriter,
+        index: FixedIndexWriter<T>,
         name: String,
         size: usize,
         chunk_size: u32,
@@ -632,61 +695,6 @@ impl BackupEnvironment {
         Ok(())
     }
 
-    /// If verify-new is set on the datastore, this will run a new verify task
-    /// for the backup. If not, this will return and also drop the passed lock
-    /// immediately.
-    pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
-        self.ensure_finished()?;
-
-        if !self.datastore.verify_new() {
-            // no verify requested, do nothing
-            return Ok(());
-        }
-
-        // Downgrade to shared lock, the backup itself is finished
-        drop(excl_snap_lock);
-        let snap_lock = lock_dir_noblock_shared(
-            &self.backup_dir.full_path(),
-            "snapshot",
-            "snapshot is already locked by another operation",
-        )?;
-
-        let worker_id = format!(
-            "{}:{}/{}/{:08X}",
-            self.datastore.name(),
-            self.backup_dir.backup_type(),
-            self.backup_dir.backup_id(),
-            self.backup_dir.backup_time()
-        );
-
-        let datastore = self.datastore.clone();
-        let backup_dir = self.backup_dir.clone();
-
-        WorkerTask::new_thread(
-            "verify",
-            Some(worker_id),
-            self.auth_id.to_string(),
-            false,
-            move |worker| {
-                worker.log_message("Automatically verifying newly added snapshot");
-
-                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
-                if !verify_backup_dir_with_lock(
-                    &verify_worker,
-                    &backup_dir,
-                    worker.upid().clone(),
-                    None,
-                    snap_lock,
-                )? {
-                    bail!("verification failed - please check the log for details");
-                }
-
-                Ok(())
-            },
-        )
-        .map(|_| ())
-    }
-
     pub fn log<S: AsRef<str>>(&self, msg: S) {
         info!("{}", msg.as_ref());
     }
@@ -701,10 +709,6 @@ impl BackupEnvironment {
         }
     }
 
-    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
-        self.formatter.format_result(result, self)
-    }
-
     /// Raise error if finished flag is not set
     pub fn ensure_finished(&self) -> Result<(), Error> {
         let state = self.state.lock().unwrap();
@@ -735,7 +739,7 @@ impl BackupEnvironment {
     }
 }
 
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
     fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
@@ -757,14 +761,18 @@ impl RpcEnvironment for BackupEnvironment {
     }
 }
 
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
 
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index e6a92117..fda7f41d 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
 use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
 use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::{Lookup, Write};
 use serde::Deserialize;
 use serde_json::{json, Value};
 
@@ -281,7 +282,7 @@ fn upgrade_to_backup_protocol(
                         return Ok(());
                     }
 
-                    let verify = |env: BackupEnvironment| {
+                    let verify = |env: BackupEnvironment<pbs_datastore::chunk_store::Write>| {
                         if let Err(err) = env.verify_after_complete(snap_guard) {
                             env.log(format!(
                                 "backup finished, but starting the requested verify task failed: {}",
@@ -402,7 +403,7 @@ fn create_dynamic_index(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     let name = required_string_param(&param, "archive-name")?.to_owned();
 
@@ -452,7 +453,7 @@ fn create_fixed_index(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     let name = required_string_param(&param, "archive-name")?.to_owned();
     let size = required_integer_param(&param, "size")? as usize;
@@ -567,7 +568,7 @@ fn dynamic_append(
         );
     }
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     env.debug(format!("dynamic_append {} chunks", digest_list.len()));
 
@@ -641,7 +642,7 @@ fn fixed_append(
         );
     }
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     env.debug(format!("fixed_append {} chunks", digest_list.len()));
 
@@ -716,7 +717,7 @@ fn close_dynamic_index(
     let csum_str = required_string_param(&param, "csum")?;
     let csum = <[u8; 32]>::from_hex(csum_str)?;
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     env.dynamic_writer_close(wid, chunk_count, size, csum)?;
 
@@ -769,7 +770,7 @@ fn close_fixed_index(
     let csum_str = required_string_param(&param, "csum")?;
     let csum = <[u8; 32]>::from_hex(csum_str)?;
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
     env.fixed_writer_close(wid, chunk_count, size, csum)?;
 
@@ -783,7 +784,7 @@ fn finish_backup(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<Write> = rpcenv.as_ref();
 
     env.finish_backup()?;
     env.log("successfully finished backup");
@@ -802,7 +803,7 @@ fn get_previous_backup_time(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<Lookup> = rpcenv.as_ref();
 
     let backup_time = env
         .last_backup
@@ -829,7 +830,7 @@ fn download_previous(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
 
         let archive_name = required_string_param(&param, "archive-name")?.to_owned();
 
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..57b30dec 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -7,6 +7,7 @@ use futures::*;
 use hex::FromHex;
 use hyper::http::request::Parts;
 use hyper::Body;
+use pbs_datastore::chunk_store::{CanWrite, Write as StoreWrite};
 use serde_json::{json, Value};
 
 use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -20,19 +21,19 @@ use pbs_tools::json::{required_integer_param, required_string_param};
 
 use super::environment::*;
 
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
     stream: Body,
-    store: Arc<DataStore>,
+    store: Arc<DataStore<T>>,
     digest: [u8; 32],
     size: u32,
     encoded_size: u32,
     raw_data: Option<Vec<u8>>,
 }
 
-impl UploadChunk {
+impl<T: CanWrite> UploadChunk<T> {
     pub fn new(
         stream: Body,
-        store: Arc<DataStore>,
+        store: Arc<DataStore<T>>,
         digest: [u8; 32],
         size: u32,
         encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
     }
 }
 
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
     type Output = Result<([u8; 32], u32, u32, bool), Error>;
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
 
         let (digest, size, compressed_size, is_duplicate) =
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
 
         let (digest, size, compressed_size, is_duplicate) =
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
                 println!("Upload error: {}", err);
             }
         }
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
         Ok(env.format_response(Ok(Value::Null)))
     }
     .boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
         let file_name = required_string_param(&param, "file-name")?.to_owned();
         let encoded_size = required_integer_param(&param, "encoded-size")? as usize;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
 
         if !file_name.ends_with(".blob") {
             bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..13d346eb 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,6 +1,7 @@
 use std::collections::HashSet;
 use std::sync::{Arc, RwLock};
 
+use pbs_datastore::chunk_store::CanRead;
 use serde_json::{json, Value};
 
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
@@ -14,25 +15,25 @@ use tracing::info;
 
 /// `RpcEnvironment` implementation for backup reader service
 #[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
     env_type: RpcEnvironmentType,
     result_attributes: Value,
     auth_id: Authid,
     pub debug: bool,
     pub formatter: &'static dyn OutputFormatter,
     pub worker: Arc<WorkerTask>,
-    pub datastore: Arc<DataStore>,
-    pub backup_dir: BackupDir,
+    pub datastore: Arc<DataStore<T>>,
+    pub backup_dir: BackupDir<T>,
     allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
 }
 
-impl ReaderEnvironment {
+impl<T: CanRead> ReaderEnvironment<T> {
     pub fn new(
         env_type: RpcEnvironmentType,
         auth_id: Authid,
         worker: Arc<WorkerTask>,
-        datastore: Arc<DataStore>,
-        backup_dir: BackupDir,
+        datastore: Arc<DataStore<T>>,
+        backup_dir: BackupDir<T>,
     ) -> Self {
         Self {
             result_attributes: json!({}),
@@ -71,7 +72,7 @@ impl ReaderEnvironment {
     }
 }
 
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
     fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
@@ -93,14 +94,18 @@ impl RpcEnvironment for ReaderEnvironment {
     }
 }
 
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
-    fn as_ref(&self) -> &ReaderEnvironment {
-        self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+    fn as_ref(&self) -> &ReaderEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<ReaderEnvironment<T>>()
+            .unwrap()
     }
 }
 
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
-    fn as_ref(&self) -> &ReaderEnvironment {
-        self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+    fn as_ref(&self) -> &ReaderEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<ReaderEnvironment<T>>()
+            .unwrap()
     }
 }
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 48a8c5fc..cf20addc 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
 use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
 use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::Read;
 use serde::Deserialize;
 use serde_json::Value;
 
@@ -253,7 +254,7 @@ fn download_file(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &ReaderEnvironment = rpcenv.as_ref();
+        let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
 
@@ -309,7 +310,7 @@ fn download_chunk(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &ReaderEnvironment = rpcenv.as_ref();
+        let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
 
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a0189..662b953e 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -1,6 +1,7 @@
 use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::CanWrite;
 use serde_json::Value;
 use tracing::{info, warn};
 
@@ -11,9 +12,9 @@ use proxmox_schema::api;
 use proxmox_worker_task::WorkerTaskContext;
 
 use pbs_api_types::{
-    print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation,
-    TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA,
-    PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE, UPID_SCHEMA,
+    print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, TapeBackupJobConfig,
+    TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT,
+    PRIV_TAPE_WRITE, UPID_SCHEMA,
 };
 
 use pbs_config::CachedUserInfo;
@@ -150,7 +151,7 @@ pub fn do_tape_backup_job(
 
     let worker_type = job.jobtype().to_string();
 
-    let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+    let datastore = DataStore::lookup_datastore_write(&setup.store)?;
 
     let (config, _digest) = pbs_config::media_pool::config()?;
     let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -306,7 +307,7 @@ pub fn backup(
 
     check_backup_permission(&auth_id, &setup.store, &setup.pool, &setup.drive)?;
 
-    let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+    let datastore = DataStore::lookup_datastore_write(&setup.store)?;
 
     let (config, _digest) = pbs_config::media_pool::config()?;
     let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
     Ignored,
 }
 
-fn backup_worker(
+fn backup_worker<T: CanWrite + Send + Sync + 'static>(
     worker: &WorkerTask,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     pool_config: &MediaPoolConfig,
     setup: &TapeBackupJobSetup,
     summary: &mut TapeBackupJobSummary,
@@ -560,11 +561,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
     }
 }
 
-fn backup_snapshot(
+fn backup_snapshot<T: CanWrite + Send + Sync + 'static>(
     worker: &WorkerTask,
     pool_writer: &mut PoolWriter,
-    datastore: Arc<DataStore>,
-    snapshot: BackupDir,
+    datastore: Arc<DataStore<T>>,
+    snapshot: BackupDir<T>,
 ) -> Result<SnapshotBackupResult, Error> {
     let snapshot_path = snapshot.relative_path();
     info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..342406c6 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -1342,7 +1342,7 @@ pub fn catalog_media(
             drive.read_label()?; // skip over labels - we already read them above
 
             let mut checked_chunks = HashMap::new();
-            restore_media(
+            restore_media::<pbs_datastore::chunk_store::Write>(
                 worker,
                 &mut drive,
                 &media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 95ce13c7..20558670 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
 use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as StoreWrite};
 use serde_json::Value;
 use tracing::{info, warn};
 
@@ -120,13 +121,13 @@ impl NamespaceMap {
     }
 }
 
-pub struct DataStoreMap {
-    map: HashMap<String, Arc<DataStore>>,
-    default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+    map: HashMap<String, Arc<DataStore<T>>>,
+    default: Option<Arc<DataStore<T>>>,
     ns_map: Option<NamespaceMap>,
 }
 
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<StoreWrite> {
     type Error = Error;
 
     fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
     }
 }
 
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
     fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
         let count = mappings.len();
         let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,9 @@ impl DataStoreMap {
         Ok(count > 0)
     }
 
-    fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+    fn used_datastores(
+        &self,
+    ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
         let mut map = HashMap::new();
         for (source, target) in self.map.iter() {
             let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,7 +192,7 @@ impl DataStoreMap {
             .map(|mapping| mapping.get_namespaces(datastore, ns))
     }
 
-    fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+    fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
         self.map
             .get(source_datastore)
             .or(self.default.as_ref())
@@ -200,7 +203,7 @@ impl DataStoreMap {
         &self,
         source_datastore: &str,
         source_ns: &BackupNamespace,
-    ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+    ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
         self.target_store(source_datastore)
             .map(|store| (store, self.target_ns(source_datastore, source_ns)))
     }
@@ -237,9 +240,9 @@ fn check_datastore_privs(
     Ok(())
 }
 
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
     user_info: &CachedUserInfo,
-    store: &Arc<DataStore>,
+    store: &Arc<DataStore<T>>,
     ns: &BackupNamespace,
     auth_id: &Authid,
     owner: Option<&Authid>,
@@ -449,13 +452,13 @@ pub fn restore(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     inventory: Inventory,
     media_set_uuid: Uuid,
     drive_config: SectionConfigData,
     drive_name: &str,
-    store_map: DataStoreMap,
+    store_map: DataStoreMap<T>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
     auth_id: &Authid,
@@ -529,8 +532,8 @@ fn restore_full_worker(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
-    store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+    store_map: &DataStoreMap<T>,
     store: &str,
     snapshot: &str,
     ns: &BackupNamespace,
@@ -618,14 +621,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     snapshots: Vec<String>,
     inventory: Inventory,
     media_set_uuid: Uuid,
     drive_config: SectionConfigData,
     drive_name: &str,
-    store_map: DataStoreMap,
+    store_map: DataStoreMap<T>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
     user_info: Arc<CachedUserInfo>,
@@ -955,16 +958,16 @@ fn get_media_set_catalog(
     Ok(catalog)
 }
 
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
     let mut path = datastore.base_path();
     path.push(".tmp");
     path.push(media_set_uuid.to_string());
     path
 }
 
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
     source_datastore: &str,
-    datastore: &DataStore,
+    datastore: &DataStore<T>,
     snapshot: &str,
     media_set_uuid: &Uuid,
 ) -> PathBuf {
@@ -974,9 +977,9 @@ fn snapshot_tmpdir(
     path
 }
 
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
     worker: Arc<WorkerTask>,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     file_list: &[u64],
     mut drive: Box<dyn TapeDriver>,
     media_id: &MediaId,
@@ -1083,10 +1086,10 @@ fn restore_snapshots_to_tmpdir(
     Ok(tmp_paths)
 }
 
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     drive: &mut Box<dyn TapeDriver>,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
 ) -> Result<(), Error> {
     for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1136,10 @@ fn restore_file_chunk_map(
     Ok(())
 }
 
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     chunk_list: &mut HashSet<[u8; 32]>,
 ) -> Result<usize, Error> {
     let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1198,12 @@ fn restore_partial_chunk_archive<'a>(
 
 /// Request and restore complete media without using existing catalog (create catalog instead)
 #[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     media_id: &MediaId,
     drive_config: &SectionConfigData,
     drive_name: &str,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
@@ -1253,11 +1256,11 @@ pub fn request_and_restore_media(
 /// Restore complete media content and catalog
 ///
 /// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     drive: &mut Box<dyn TapeDriver>,
     media_id: &MediaId,
-    target: Option<(&DataStoreMap, &Authid)>,
+    target: Option<(&DataStoreMap<T>, &Authid)>,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     verbose: bool,
     auth_id: &Authid,
@@ -1301,11 +1304,11 @@ pub fn restore_media(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     mut reader: Box<dyn 'a + TapeRead>,
     current_file_number: u64,
-    target: Option<(&DataStoreMap, &Authid)>,
+    target: Option<(&DataStoreMap<T>, &Authid)>,
     catalog: &mut MediaCatalog,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     verbose: bool,
@@ -1525,10 +1528,10 @@ fn scan_chunk_archive<'a>(
     Ok(Some(chunks))
 }
 
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     checked_chunks: &mut HashSet<[u8; 32]>,
     verbose: bool,
 ) -> Result<Option<Vec<[u8; 32]>>, Error> {
-- 
2.39.2





More information about the pbs-devel mailing list