[pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics

Hannes Laimer h.laimer at proxmox.com
Mon May 26 16:14:41 CEST 2025


Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/api2/admin/datastore.rs                 | 27 ++++----
 src/api2/backup/mod.rs                      | 21 +++---
 src/api2/backup/upload_chunk.rs             | 19 +++---
 src/api2/config/datastore.rs                |  5 +-
 src/api2/reader/environment.rs              | 30 +++++----
 src/api2/reader/mod.rs                      |  5 +-
 src/api2/tape/backup.rs                     | 11 ++--
 src/api2/tape/drive.rs                      |  3 +-
 src/api2/tape/restore.rs                    | 71 +++++++++++----------
 src/backup/hierarchy.rs                     | 23 +++----
 src/backup/verify.rs                        | 53 +++++++--------
 src/bin/proxmox-backup-proxy.rs             |  7 +-
 src/server/gc_job.rs                        |  7 +-
 src/server/prune_job.rs                     |  5 +-
 src/server/pull.rs                          | 23 +++----
 src/server/push.rs                          |  3 +-
 src/server/sync.rs                          | 13 ++--
 src/tape/file_formats/snapshot_archive.rs   |  5 +-
 src/tape/pool_writer/mod.rs                 | 11 ++--
 src/tape/pool_writer/new_chunks_iterator.rs |  7 +-
 20 files changed, 189 insertions(+), 160 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 39249448..e3f93cdd 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -54,6 +54,7 @@ use pbs_config::CachedUserInfo;
 use pbs_datastore::backup_info::BackupInfo;
 use pbs_datastore::cached_chunk_reader::CachedChunkReader;
 use pbs_datastore::catalog::{ArchiveEntry, CatalogReader};
+use pbs_datastore::chunk_store::{CanRead, Read as R};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::data_blob_reader::DataBlobReader;
 use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
@@ -79,8 +80,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
 
 const GROUP_NOTES_FILE_NAME: &str = "notes";
 
-fn get_group_note_path(
-    store: &DataStore,
+fn get_group_note_path<T>(
+    store: &DataStore<T>,
     ns: &BackupNamespace,
     group: &pbs_api_types::BackupGroup,
 ) -> PathBuf {
@@ -114,8 +115,8 @@ fn check_privs_and_load_store(
     Ok(datastore)
 }
 
-fn read_backup_index(
-    backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+    backup_dir: &BackupDir<T>,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
     let (manifest, index_size) = backup_dir.load_manifest()?;
 
@@ -140,8 +141,8 @@ fn read_backup_index(
     Ok((manifest, result))
 }
 
-fn get_all_snapshot_files(
-    info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+    info: &BackupInfo<T>,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
     let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
 
@@ -529,7 +530,7 @@ unsafe fn list_snapshots_blocking(
         (None, None) => datastore.list_backup_groups(ns.clone())?,
     };
 
-    let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+    let info_to_snapshot_list_item = |group: &BackupGroup<R>, owner, info: BackupInfo<R>| {
         let backup = pbs_api_types::BackupDir {
             group: group.into(),
             time: info.backup_dir.backup_time(),
@@ -629,8 +630,8 @@ unsafe fn list_snapshots_blocking(
     })
 }
 
-async fn get_snapshots_count(
-    store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+    store: &Arc<DataStore<T>>,
     owner: Option<&Authid>,
 ) -> Result<Counts, Error> {
     let store = Arc::clone(store);
@@ -1796,12 +1797,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
     &Permission::Anybody,
 );
 
-fn get_local_pxar_reader(
-    datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+    datastore: Arc<DataStore<T>>,
     manifest: &BackupManifest,
-    backup_dir: &BackupDir,
+    backup_dir: &BackupDir<T>,
     pxar_name: &BackupArchiveName,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
     let mut path = datastore.base_path();
     path.push(backup_dir.relative_path());
     path.push(pxar_name.as_ref());
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index 629df933..79354dbf 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
     BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
 };
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{Read as R, Write as W};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1};
 use pbs_tools::json::{required_array_param, required_integer_param, required_string_param};
@@ -279,7 +280,7 @@ fn upgrade_to_backup_protocol(
                         return Ok(());
                     }
 
-                    let verify = |env: BackupEnvironment| {
+                    let verify = |env: BackupEnvironment<W>| {
                         if let Err(err) = env.verify_after_complete(snap_guard) {
                             env.log(format!(
                                 "backup finished, but starting the requested verify task failed: {}",
@@ -400,7 +401,7 @@ fn create_dynamic_index(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     let name = required_string_param(&param, "archive-name")?.to_owned();
 
@@ -450,7 +451,7 @@ fn create_fixed_index(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     let name = required_string_param(&param, "archive-name")?.to_owned();
     let size = required_integer_param(&param, "size")? as usize;
@@ -565,7 +566,7 @@ fn dynamic_append(
         );
     }
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     env.debug(format!("dynamic_append {} chunks", digest_list.len()));
 
@@ -639,7 +640,7 @@ fn fixed_append(
         );
     }
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     env.debug(format!("fixed_append {} chunks", digest_list.len()));
 
@@ -714,7 +715,7 @@ fn close_dynamic_index(
     let csum_str = required_string_param(&param, "csum")?;
     let csum = <[u8; 32]>::from_hex(csum_str)?;
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     env.dynamic_writer_close(wid, chunk_count, size, csum)?;
 
@@ -767,7 +768,7 @@ fn close_fixed_index(
     let csum_str = required_string_param(&param, "csum")?;
     let csum = <[u8; 32]>::from_hex(csum_str)?;
 
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     env.fixed_writer_close(wid, chunk_count, size, csum)?;
 
@@ -781,7 +782,7 @@ fn finish_backup(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
     env.finish_backup()?;
     env.log("successfully finished backup");
@@ -800,7 +801,7 @@ fn get_previous_backup_time(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let env: &BackupEnvironment = rpcenv.as_ref();
+    let env: &BackupEnvironment<R> = rpcenv.as_ref();
 
     let backup_time = env
         .last_backup
@@ -827,7 +828,7 @@ fn download_previous(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<R> = rpcenv.as_ref();
 
         let archive_name = required_string_param(&param, "archive-name")?.to_owned();
 
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..bb1566ae 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -14,25 +14,26 @@ use proxmox_schema::*;
 use proxmox_sortable_macro::sortable;
 
 use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
+use pbs_datastore::chunk_store::{CanWrite, Lookup as L, Write as W};
 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
 use pbs_datastore::{DataBlob, DataStore};
 use pbs_tools::json::{required_integer_param, required_string_param};
 
 use super::environment::*;
 
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
     stream: Body,
-    store: Arc<DataStore>,
+    store: Arc<DataStore<T>>,
     digest: [u8; 32],
     size: u32,
     encoded_size: u32,
     raw_data: Option<Vec<u8>>,
 }
 
-impl UploadChunk {
+impl<T> UploadChunk<T> {
     pub fn new(
         stream: Body,
-        store: Arc<DataStore>,
+        store: Arc<DataStore<T>>,
         digest: [u8; 32],
         size: u32,
         encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
     }
 }
 
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
     type Output = Result<([u8; 32], u32, u32, bool), Error>;
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
         let (digest, size, compressed_size, is_duplicate) =
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
         let (digest, size, compressed_size, is_duplicate) =
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
                 println!("Upload error: {}", err);
             }
         }
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<L> = rpcenv.as_ref();
         Ok(env.format_response(Ok(Value::Null)))
     }
     .boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
         let file_name = required_string_param(&param, "file-name")?.to_owned();
         let encoded_size = required_integer_param(&param, "encoded-size")? as usize;
 
-        let env: &BackupEnvironment = rpcenv.as_ref();
+        let env: &BackupEnvironment<W> = rpcenv.as_ref();
 
         if !file_name.ends_with(".blob") {
             bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index b133be70..52fa6db1 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -30,6 +30,7 @@ use crate::api2::config::tape_backup_job::{delete_tape_backup_job, list_tape_bac
 use crate::api2::config::verify::delete_verification_job;
 use pbs_config::CachedUserInfo;
 
+use pbs_datastore::chunk_store::{Read as R, Write as W};
 use pbs_datastore::get_datastore_mount_status;
 use proxmox_rest_server::WorkerTask;
 
@@ -124,7 +125,7 @@ pub(crate) fn do_create_datastore(
     };
 
     let chunk_store = if reuse_datastore {
-        ChunkStore::verify_chunkstore(&path).and_then(|_| {
+        ChunkStore::<R>::verify_chunkstore(&path).and_then(|_| {
             // Must be the only instance accessing and locking the chunk store,
             // dropping will close all other locks from this process on the lockfile as well.
             ChunkStore::open(
@@ -666,7 +667,7 @@ pub async fn delete_datastore(
         auth_id.to_string(),
         to_stdout,
         move |_worker| {
-            pbs_datastore::DataStore::destroy(&name, destroy_data)?;
+            pbs_datastore::DataStore::<W>::destroy(&name, destroy_data)?;
 
             // ignore errors
             let _ = jobstate::remove_state_file("prune", &name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..26f5bec6 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -14,25 +14,25 @@ use tracing::info;
 
 /// `RpcEnvironment` implementation for backup reader service
 #[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
     env_type: RpcEnvironmentType,
     result_attributes: Value,
     auth_id: Authid,
     pub debug: bool,
     pub formatter: &'static dyn OutputFormatter,
     pub worker: Arc<WorkerTask>,
-    pub datastore: Arc<DataStore>,
-    pub backup_dir: BackupDir,
+    pub datastore: Arc<DataStore<T>>,
+    pub backup_dir: BackupDir<T>,
     allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
 }
 
-impl ReaderEnvironment {
+impl<T> ReaderEnvironment<T> {
     pub fn new(
         env_type: RpcEnvironmentType,
         auth_id: Authid,
         worker: Arc<WorkerTask>,
-        datastore: Arc<DataStore>,
-        backup_dir: BackupDir,
+        datastore: Arc<DataStore<T>>,
+        backup_dir: BackupDir<T>,
     ) -> Self {
         Self {
             result_attributes: json!({}),
@@ -71,7 +71,7 @@ impl ReaderEnvironment {
     }
 }
 
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
     fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
@@ -93,14 +93,18 @@ impl RpcEnvironment for ReaderEnvironment {
     }
 }
 
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
-    fn as_ref(&self) -> &ReaderEnvironment {
-        self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+    fn as_ref(&self) -> &ReaderEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<ReaderEnvironment<T>>()
+            .unwrap()
     }
 }
 
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
-    fn as_ref(&self) -> &ReaderEnvironment {
-        self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+    fn as_ref(&self) -> &ReaderEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<ReaderEnvironment<T>>()
+            .unwrap()
     }
 }
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index cc791299..52f0953a 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -23,6 +23,7 @@ use pbs_api_types::{
     DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
 };
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
 use pbs_tools::json::required_string_param;
@@ -247,7 +248,7 @@ fn download_file(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &ReaderEnvironment = rpcenv.as_ref();
+        let env: &ReaderEnvironment<R> = rpcenv.as_ref();
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
 
@@ -303,7 +304,7 @@ fn download_chunk(
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
     async move {
-        let env: &ReaderEnvironment = rpcenv.as_ref();
+        let env: &ReaderEnvironment<R> = rpcenv.as_ref();
 
         let digest_str = required_string_param(&param, "digest")?;
         let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index 31293a9a..306d5936 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
 
 use pbs_config::CachedUserInfo;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::{DataStore, StoreProgress};
 
 use crate::tape::TapeNotificationMode;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
     Ignored,
 }
 
-fn backup_worker(
+fn backup_worker<T: CanRead + Send + Sync + 'static>(
     worker: &WorkerTask,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     pool_config: &MediaPoolConfig,
     setup: &TapeBackupJobSetup,
     summary: &mut TapeBackupJobSummary,
@@ -564,11 +565,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
     }
 }
 
-fn backup_snapshot(
+fn backup_snapshot<T: CanRead + Send + Sync + 'static>(
     worker: &WorkerTask,
     pool_writer: &mut PoolWriter,
-    datastore: Arc<DataStore>,
-    snapshot: BackupDir,
+    datastore: Arc<DataStore<T>>,
+    snapshot: BackupDir<T>,
 ) -> Result<SnapshotBackupResult, Error> {
     let snapshot_path = snapshot.relative_path();
     info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..47fa06dc 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
 use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
 
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Write as W;
 use pbs_tape::{
     linux_list_drives::{lookup_device_identification, lto_tape_device_list, open_lto_tape_device},
     sg_tape::tape_alert_flags_critical,
@@ -1342,7 +1343,7 @@ pub fn catalog_media(
             drive.read_label()?; // skip over labels - we already read them above
 
             let mut checked_chunks = HashMap::new();
-            restore_media(
+            restore_media::<W>(
                 worker,
                 &mut drive,
                 &media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 2cc1baab..8f089c20 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -27,6 +27,7 @@ use pbs_api_types::{
 };
 use pbs_client::pxar::tools::handle_root_with_optional_format_version_prelude;
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as W};
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
@@ -120,13 +121,13 @@ impl NamespaceMap {
     }
 }
 
-pub struct DataStoreMap {
-    map: HashMap<String, Arc<DataStore>>,
-    default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+    map: HashMap<String, Arc<DataStore<T>>>,
+    default: Option<Arc<DataStore<T>>>,
     ns_map: Option<NamespaceMap>,
 }
 
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<W> {
     type Error = Error;
 
     fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
     }
 }
 
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
     fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
         let count = mappings.len();
         let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,10 @@ impl DataStoreMap {
         Ok(count > 0)
     }
 
-    fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+    #[allow(clippy::type_complexity)]
+    fn used_datastores(
+        &self,
+    ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
         let mut map = HashMap::new();
         for (source, target) in self.map.iter() {
             let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,18 +193,19 @@ impl DataStoreMap {
             .map(|mapping| mapping.get_namespaces(datastore, ns))
     }
 
-    fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+    fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
         self.map
             .get(source_datastore)
             .or(self.default.as_ref())
             .map(Arc::clone)
     }
 
+    #[allow(clippy::type_complexity)]
     fn get_targets(
         &self,
         source_datastore: &str,
         source_ns: &BackupNamespace,
-    ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+    ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
         self.target_store(source_datastore)
             .map(|store| (store, self.target_ns(source_datastore, source_ns)))
     }
@@ -237,9 +242,9 @@ fn check_datastore_privs(
     Ok(())
 }
 
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
     user_info: &CachedUserInfo,
-    store: &Arc<DataStore>,
+    store: &Arc<DataStore<T>>,
     ns: &BackupNamespace,
     auth_id: &Authid,
     owner: Option<&Authid>,
@@ -449,13 +454,13 @@ pub fn restore(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     inventory: Inventory,
     media_set_uuid: Uuid,
     drive_config: SectionConfigData,
     drive_name: &str,
-    store_map: DataStoreMap,
+    store_map: DataStoreMap<T>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
     auth_id: &Authid,
@@ -529,8 +534,8 @@ fn restore_full_worker(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
-    store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+    store_map: &DataStoreMap<T>,
     store: &str,
     snapshot: &str,
     ns: &BackupNamespace,
@@ -618,14 +623,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     snapshots: Vec<String>,
     inventory: Inventory,
     media_set_uuid: Uuid,
     drive_config: SectionConfigData,
     drive_name: &str,
-    store_map: DataStoreMap,
+    store_map: DataStoreMap<T>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
     user_info: Arc<CachedUserInfo>,
@@ -955,16 +960,16 @@ fn get_media_set_catalog(
     Ok(catalog)
 }
 
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
     let mut path = datastore.base_path();
     path.push(".tmp");
     path.push(media_set_uuid.to_string());
     path
 }
 
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
     source_datastore: &str,
-    datastore: &DataStore,
+    datastore: &DataStore<T>,
     snapshot: &str,
     media_set_uuid: &Uuid,
 ) -> PathBuf {
@@ -974,9 +979,9 @@ fn snapshot_tmpdir(
     path
 }
 
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
     worker: Arc<WorkerTask>,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     file_list: &[u64],
     mut drive: Box<dyn TapeDriver>,
     media_id: &MediaId,
@@ -1083,10 +1088,10 @@ fn restore_snapshots_to_tmpdir(
     Ok(tmp_paths)
 }
 
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     drive: &mut Box<dyn TapeDriver>,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
 ) -> Result<(), Error> {
     for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1138,10 @@ fn restore_file_chunk_map(
     Ok(())
 }
 
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     chunk_list: &mut HashSet<[u8; 32]>,
 ) -> Result<usize, Error> {
     let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1200,12 @@ fn restore_partial_chunk_archive<'a>(
 
 /// Request and restore complete media without using existing catalog (create catalog instead)
 #[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     media_id: &MediaId,
     drive_config: &SectionConfigData,
     drive_name: &str,
-    store_map: &DataStoreMap,
+    store_map: &DataStoreMap<T>,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     restore_owner: &Authid,
     notification_mode: &TapeNotificationMode,
@@ -1253,11 +1258,11 @@ pub fn request_and_restore_media(
 /// Restore complete media content and catalog
 ///
 /// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     drive: &mut Box<dyn TapeDriver>,
     media_id: &MediaId,
-    target: Option<(&DataStoreMap, &Authid)>,
+    target: Option<(&DataStoreMap<T>, &Authid)>,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     verbose: bool,
     auth_id: &Authid,
@@ -1301,11 +1306,11 @@ pub fn restore_media(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     mut reader: Box<dyn 'a + TapeRead>,
     current_file_number: u64,
-    target: Option<(&DataStoreMap, &Authid)>,
+    target: Option<(&DataStoreMap<T>, &Authid)>,
     catalog: &mut MediaCatalog,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
     verbose: bool,
@@ -1525,10 +1530,10 @@ fn scan_chunk_archive<'a>(
     Ok(Some(chunks))
 }
 
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
     worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     checked_chunks: &mut HashSet<[u8; 32]>,
     verbose: bool,
 ) -> Result<Option<Vec<[u8; 32]>>, Error> {
diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs
index 8dd71fcf..039e32a6 100644
--- a/src/backup/hierarchy.rs
+++ b/src/backup/hierarchy.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
     PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ,
 };
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive};
 
 /// Asserts that `privs` are fulfilled on datastore + (optional) namespace.
@@ -68,8 +69,8 @@ pub fn check_ns_privs_full(
     );
 }
 
-pub fn can_access_any_namespace(
-    store: Arc<DataStore>,
+pub fn can_access_any_namespace<T: CanRead + 'static>(
+    store: Arc<DataStore<T>>,
     auth_id: &Authid,
     user_info: &CachedUserInfo,
 ) -> bool {
@@ -95,8 +96,8 @@ pub fn can_access_any_namespace(
 ///
 /// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and
 /// optional owner checks.
-pub struct ListAccessibleBackupGroups<'a> {
-    store: &'a Arc<DataStore>,
+pub struct ListAccessibleBackupGroups<'a, T> {
+    store: &'a Arc<DataStore<T>>,
     auth_id: Option<&'a Authid>,
     user_info: Arc<CachedUserInfo>,
     /// The priv on NS level that allows auth_id trump the owner check
@@ -104,15 +105,15 @@ pub struct ListAccessibleBackupGroups<'a> {
     /// The priv that auth_id is required to have on NS level additionally to being owner
     owner_and_priv: u64,
     /// Contains the intertnal state, group iter and a bool flag for override_owner_priv
-    state: Option<(ListGroups, bool)>,
-    ns_iter: ListNamespacesRecursive,
+    state: Option<(ListGroups<T>, bool)>,
+    ns_iter: ListNamespacesRecursive<T>,
 }
 
-impl<'a> ListAccessibleBackupGroups<'a> {
+impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> {
     // TODO: builder pattern
 
     pub fn new_owned(
-        store: &'a Arc<DataStore>,
+        store: &'a Arc<DataStore<T>>,
         ns: BackupNamespace,
         max_depth: usize,
         auth_id: Option<&'a Authid>,
@@ -122,7 +123,7 @@ impl<'a> ListAccessibleBackupGroups<'a> {
     }
 
     pub fn new_with_privs(
-        store: &'a Arc<DataStore>,
+        store: &'a Arc<DataStore<T>>,
         ns: BackupNamespace,
         max_depth: usize,
         override_owner_priv: Option<u64>,
@@ -145,8 +146,8 @@ impl<'a> ListAccessibleBackupGroups<'a> {
 pub static NS_PRIVS_OK: u64 =
     PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT;
 
-impl Iterator for ListAccessibleBackupGroups<'_> {
-    type Item = Result<BackupGroup, Error>;
+impl<T: CanRead> Iterator for ListAccessibleBackupGroups<'_, T> {
+    type Item = Result<BackupGroup<T>, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
         loop {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 3d2cba8a..15c2e9e4 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -15,6 +15,7 @@ use pbs_api_types::{
     UPID,
 };
 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
+use pbs_datastore::chunk_store::{CanRead, CanWrite};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::{DataBlob, DataStore, StoreProgress};
@@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups;
 
 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
 /// already been verified or detected as corrupt.
-pub struct VerifyWorker {
+pub struct VerifyWorker<T> {
     worker: Arc<dyn WorkerTaskContext>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 }
 
-impl VerifyWorker {
+impl<T> VerifyWorker<T> {
     /// Creates a new VerifyWorker for a given task worker and datastore.
-    pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+    pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore<T>>) -> Self {
         Self {
             worker,
             datastore,
@@ -46,7 +47,7 @@ impl VerifyWorker {
     }
 }
 
-fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+fn verify_blob<T: CanRead>(backup_dir: &BackupDir<T>, info: &FileInfo) -> Result<(), Error> {
     let blob = backup_dir.load_blob(&info.filename)?;
 
     let raw_size = blob.raw_size();
@@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
     }
 }
 
-fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
+fn rename_corrupted_chunk<T: CanWrite>(datastore: Arc<DataStore<T>>, digest: &[u8; 32]) {
     let (path, digest_str) = datastore.chunk_path(digest);
 
     let mut counter = 0;
@@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
     };
 }
 
-fn verify_index_chunks(
-    verify_worker: &VerifyWorker,
+fn verify_index_chunks<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
     index: Box<dyn IndexFile + Send>,
     crypt_mode: CryptMode,
 ) -> Result<(), Error> {
@@ -238,9 +239,9 @@ fn verify_index_chunks(
     Ok(())
 }
 
-fn verify_fixed_index(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+fn verify_fixed_index<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     info: &FileInfo,
 ) -> Result<(), Error> {
     let mut path = backup_dir.relative_path();
@@ -260,9 +261,9 @@ fn verify_fixed_index(
     verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
 }
 
-fn verify_dynamic_index(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+fn verify_dynamic_index<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     info: &FileInfo,
 ) -> Result<(), Error> {
     let mut path = backup_dir.relative_path();
@@ -291,9 +292,9 @@ fn verify_dynamic_index(
 /// - Ok(true) if verify is successful
 /// - Ok(false) if there were verification errors
 /// - Err(_) if task was aborted
-pub fn verify_backup_dir(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+pub fn verify_backup_dir<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<bool, Error> {
@@ -325,9 +326,9 @@ pub fn verify_backup_dir(
 }
 
 /// See verify_backup_dir
-pub fn verify_backup_dir_with_lock(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+pub fn verify_backup_dir_with_lock<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
     _snap_lock: BackupLockGuard,
@@ -403,9 +404,9 @@ pub fn verify_backup_dir_with_lock(
 /// Returns
 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_backup_group(
-    verify_worker: &VerifyWorker,
-    group: &BackupGroup,
+pub fn verify_backup_group<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    group: &BackupGroup<T>,
     progress: &mut StoreProgress,
     upid: &UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
@@ -455,8 +456,8 @@ pub fn verify_backup_group(
 /// Returns
 /// - Ok(failed_dirs) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_all_backups(
-    verify_worker: &VerifyWorker,
+pub fn verify_all_backups<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
     upid: &UPID,
     ns: BackupNamespace,
     max_depth: Option<usize>,
@@ -504,7 +505,7 @@ pub fn verify_all_backups(
             .filter(|group| {
                 !(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
             })
-            .collect::<Vec<BackupGroup>>(),
+            .collect::<Vec<BackupGroup<T>>>(),
         Err(err) => {
             info!("unable to list backups: {err}");
             return Ok(errors);
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1d4cf37c..bda2f17b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -20,7 +20,8 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 use proxmox_sys::fs::CreateOptions;
 use proxmox_sys::logrotate::LogRotate;
 
-use pbs_datastore::DataStore;
+use pbs_datastore::chunk_store::Lookup as L;
+use pbs_datastore::{is_garbage_collection_running, DataStore};
 
 use proxmox_rest_server::{
     cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
@@ -265,7 +266,7 @@ async fn run() -> Result<(), Error> {
 
     // to remove references for not configured datastores
     command_sock.register_command("datastore-removed".to_string(), |_value| {
-        if let Err(err) = DataStore::remove_unused_datastores() {
+        if let Err(err) = DataStore::<L>::remove_unused_datastores() {
             log::error!("could not refresh datastores: {err}");
         }
         Ok(Value::Null)
@@ -274,7 +275,7 @@ async fn run() -> Result<(), Error> {
     // clear cache entry for datastore that is in a specific maintenance mode
     command_sock.register_command("update-datastore-cache".to_string(), |value| {
         if let Some(name) = value.and_then(Value::as_str) {
-            if let Err(err) = DataStore::update_datastore_cache(name) {
+            if let Err(err) = DataStore::<L>::update_datastore_cache(name) {
                 log::error!("could not trigger update datastore cache: {err}");
             }
         }
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 64835028..c2af6c67 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -4,15 +4,18 @@ use std::sync::Arc;
 use tracing::info;
 
 use pbs_api_types::Authid;
+use pbs_datastore::chunk_store::CanWrite;
 use pbs_datastore::DataStore;
 use proxmox_rest_server::WorkerTask;
 
 use crate::server::{jobstate::Job, send_gc_status};
 
 /// Runs a garbage collection job.
-pub fn do_garbage_collection_job(
+pub fn do_garbage_collection_job<
+    T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static,
+>(
     mut job: Job,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     auth_id: &Authid,
     schedule: Option<String>,
     to_stdout: bool,
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 1c86647a..395aaee4 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
     print_store_and_ns, Authid, KeepOptions, Operation, PruneJobOptions, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
 };
+use pbs_datastore::chunk_store::CanWrite;
 use pbs_datastore::prune::compute_prune_info;
 use pbs_datastore::DataStore;
 use proxmox_rest_server::WorkerTask;
@@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask;
 use crate::backup::ListAccessibleBackupGroups;
 use crate::server::jobstate::Job;
 
-pub fn prune_datastore(
+pub fn prune_datastore<T: CanWrite>(
     auth_id: Authid,
     prune_options: PruneJobOptions,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     dry_run: bool,
 ) -> Result<(), Error> {
     let store = &datastore.name();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c14..573aa805 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
 };
 use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanWrite, Write as W};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -34,8 +35,8 @@ use super::sync::{
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
 use crate::tools::parallel_handler::ParallelHandler;
 
-pub(crate) struct PullTarget {
-    store: Arc<DataStore>,
+pub(crate) struct PullTarget<T> {
+    store: Arc<DataStore<T>>,
     ns: BackupNamespace,
 }
 
@@ -44,7 +45,7 @@ pub(crate) struct PullParameters {
     /// Where data is pulled from
     source: Arc<dyn SyncSource>,
     /// Where data should be pulled into
-    target: PullTarget,
+    target: PullTarget<W>,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -135,9 +136,9 @@ impl PullParameters {
     }
 }
 
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, T: CanWrite + Send + Sync + 'static>(
     chunk_reader: Arc<dyn AsyncReadChunk>,
-    target: Arc<DataStore>,
+    target: Arc<DataStore<T>>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<SyncStats, Error> {
@@ -260,9 +261,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 ///   -- Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive<'a>(
+async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>(
     reader: Arc<dyn SyncSourceReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<SyncStats, Error> {
@@ -343,10 +344,10 @@ async fn pull_single_archive<'a>(
 ///   -- if file already exists, verify contents
 ///   -- if not, pull it from the remote
 /// - Download log if not already existing
-async fn pull_snapshot<'a>(
+async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>(
     params: &PullParameters,
     reader: Arc<dyn SyncSourceReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt: bool,
     is_new: bool,
@@ -482,10 +483,10 @@ async fn pull_snapshot<'a>(
 ///
 /// The `reader` is configured to read from the source backup directory, while the
 /// `snapshot` is pointing to the local datastore and target namespace.
-async fn pull_snapshot_from<'a>(
+async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>(
     params: &PullParameters,
     reader: Arc<dyn SyncSourceReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt: bool,
 ) -> Result<SyncStats, Error> {
diff --git a/src/server/push.rs b/src/server/push.rs
index e71012ed..ff9d9358 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
 };
 use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions};
 use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
 use pbs_datastore::data_blob::ChunkInfo;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -61,7 +62,7 @@ impl PushTarget {
 /// Parameters for a push operation
 pub(crate) struct PushParameters {
     /// Source of backups to be pushed to remote
-    source: Arc<LocalSource>,
+    source: Arc<LocalSource<R>>,
     /// Target for backups to be pushed to
     target: PushTarget,
     /// User used for permission checks on the source side, including potentially filtering visible
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 09814ef0..96a73503 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
 };
 use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{BackupManifest, DataStore, ListNamespacesRecursive, LocalChunkReader};
@@ -105,10 +106,10 @@ pub(crate) struct RemoteSourceReader {
     pub(crate) dir: BackupDir,
 }
 
-pub(crate) struct LocalSourceReader {
+pub(crate) struct LocalSourceReader<T> {
     pub(crate) _dir_lock: Arc<Mutex<BackupLockGuard>>,
     pub(crate) path: PathBuf,
-    pub(crate) datastore: Arc<DataStore>,
+    pub(crate) datastore: Arc<DataStore<T>>,
 }
 
 #[async_trait::async_trait]
@@ -189,7 +190,7 @@ impl SyncSourceReader for RemoteSourceReader {
 }
 
 #[async_trait::async_trait]
-impl SyncSourceReader for LocalSourceReader {
+impl<T: CanRead + Send + Sync + 'static> SyncSourceReader for LocalSourceReader<T> {
     fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
         Arc::new(LocalChunkReader::new(
             self.datastore.clone(),
@@ -266,8 +267,8 @@ pub(crate) struct RemoteSource {
     pub(crate) client: HttpClient,
 }
 
-pub(crate) struct LocalSource {
-    pub(crate) store: Arc<DataStore>,
+pub(crate) struct LocalSource<T> {
+    pub(crate) store: Arc<DataStore<T>>,
     pub(crate) ns: BackupNamespace,
 }
 
@@ -415,7 +416,7 @@ impl SyncSource for RemoteSource {
 }
 
 #[async_trait::async_trait]
-impl SyncSource for LocalSource {
+impl<T: CanRead + Send + Sync + 'static> SyncSource for LocalSource<T> {
     async fn list_namespaces(
         &self,
         max_depth: &mut Option<usize>,
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index 9d11c04b..7f4ef01f 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -5,6 +5,7 @@ use std::task::{Context, Poll};
 use proxmox_sys::error::SysError;
 use proxmox_uuid::Uuid;
 
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::SnapshotReader;
 use pbs_tape::{MediaContentHeader, TapeWrite, PROXMOX_TAPE_BLOCK_SIZE};
 
@@ -21,9 +22,9 @@ use crate::tape::file_formats::{
 /// `LEOM` was detected before all data was written. The stream is
 /// marked inclomplete in that case and does not contain all data (The
 /// backup task must rewrite the whole file on the next media).
-pub fn tape_write_snapshot_archive<'a>(
+pub fn tape_write_snapshot_archive<'a, T: CanRead>(
     writer: &mut (dyn TapeWrite + 'a),
-    snapshot_reader: &SnapshotReader,
+    snapshot_reader: &SnapshotReader<T>,
 ) -> Result<Option<Uuid>, std::io::Error> {
     let backup_dir = snapshot_reader.snapshot();
     let snapshot =
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 54084421..17c20add 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -15,6 +15,7 @@ use tracing::{info, warn};
 
 use proxmox_uuid::Uuid;
 
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::{DataStore, SnapshotReader};
 use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
 use proxmox_rest_server::WorkerTask;
@@ -452,9 +453,9 @@ impl PoolWriter {
     /// archive is marked incomplete, and we do not use it. The caller
     /// should mark the media as full and try again using another
     /// media.
-    pub fn append_snapshot_archive(
+    pub fn append_snapshot_archive<T: CanRead>(
         &mut self,
-        snapshot_reader: &SnapshotReader,
+        snapshot_reader: &SnapshotReader<T>,
     ) -> Result<(bool, usize), Error> {
         let status = match self.status {
             Some(ref mut status) => status,
@@ -543,10 +544,10 @@ impl PoolWriter {
         Ok((leom, bytes_written))
     }
 
-    pub fn spawn_chunk_reader_thread(
+    pub fn spawn_chunk_reader_thread<T: CanRead + Send + Sync + 'static>(
         &self,
-        datastore: Arc<DataStore>,
-        snapshot_reader: Arc<Mutex<SnapshotReader>>,
+        datastore: Arc<DataStore<T>>,
+        snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
     ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
         NewChunksIterator::spawn(
             datastore,
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index e6f418df..e1a0da20 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
 
 use anyhow::{format_err, Error};
 
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
 
 use crate::tape::CatalogSet;
@@ -21,9 +22,9 @@ impl NewChunksIterator {
     /// Creates the iterator, spawning a new thread
     ///
     /// Make sure to join() the returned thread handle.
-    pub fn spawn(
-        datastore: Arc<DataStore>,
-        snapshot_reader: Arc<Mutex<SnapshotReader>>,
+    pub fn spawn<T: CanRead + Send + Sync + 'static>(
+        datastore: Arc<DataStore<T>>,
+        snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
         catalog_set: Arc<Mutex<CatalogSet>>,
         read_threads: usize,
     ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
-- 
2.39.5





More information about the pbs-devel mailing list