[pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics
Hannes Laimer
h.laimer at proxmox.com
Mon May 26 16:14:41 CEST 2025
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
src/api2/admin/datastore.rs | 27 ++++----
src/api2/backup/mod.rs | 21 +++---
src/api2/backup/upload_chunk.rs | 19 +++---
src/api2/config/datastore.rs | 5 +-
src/api2/reader/environment.rs | 30 +++++----
src/api2/reader/mod.rs | 5 +-
src/api2/tape/backup.rs | 11 ++--
src/api2/tape/drive.rs | 3 +-
src/api2/tape/restore.rs | 71 +++++++++++----------
src/backup/hierarchy.rs | 23 +++----
src/backup/verify.rs | 53 +++++++--------
src/bin/proxmox-backup-proxy.rs | 7 +-
src/server/gc_job.rs | 7 +-
src/server/prune_job.rs | 5 +-
src/server/pull.rs | 23 +++----
src/server/push.rs | 3 +-
src/server/sync.rs | 13 ++--
src/tape/file_formats/snapshot_archive.rs | 5 +-
src/tape/pool_writer/mod.rs | 11 ++--
src/tape/pool_writer/new_chunks_iterator.rs | 7 +-
20 files changed, 189 insertions(+), 160 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 39249448..e3f93cdd 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -54,6 +54,7 @@ use pbs_config::CachedUserInfo;
use pbs_datastore::backup_info::BackupInfo;
use pbs_datastore::cached_chunk_reader::CachedChunkReader;
use pbs_datastore::catalog::{ArchiveEntry, CatalogReader};
+use pbs_datastore::chunk_store::{CanRead, Read as R};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::data_blob_reader::DataBlobReader;
use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
@@ -79,8 +80,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
const GROUP_NOTES_FILE_NAME: &str = "notes";
-fn get_group_note_path(
- store: &DataStore,
+fn get_group_note_path<T>(
+ store: &DataStore<T>,
ns: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
) -> PathBuf {
@@ -114,8 +115,8 @@ fn check_privs_and_load_store(
Ok(datastore)
}
-fn read_backup_index(
- backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+ backup_dir: &BackupDir<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, index_size) = backup_dir.load_manifest()?;
@@ -140,8 +141,8 @@ fn read_backup_index(
Ok((manifest, result))
}
-fn get_all_snapshot_files(
- info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+ info: &BackupInfo<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
@@ -529,7 +530,7 @@ unsafe fn list_snapshots_blocking(
(None, None) => datastore.list_backup_groups(ns.clone())?,
};
- let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+ let info_to_snapshot_list_item = |group: &BackupGroup<R>, owner, info: BackupInfo<R>| {
let backup = pbs_api_types::BackupDir {
group: group.into(),
time: info.backup_dir.backup_time(),
@@ -629,8 +630,8 @@ unsafe fn list_snapshots_blocking(
})
}
-async fn get_snapshots_count(
- store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+ store: &Arc<DataStore<T>>,
owner: Option<&Authid>,
) -> Result<Counts, Error> {
let store = Arc::clone(store);
@@ -1796,12 +1797,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
-fn get_local_pxar_reader(
- datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+ datastore: Arc<DataStore<T>>,
manifest: &BackupManifest,
- backup_dir: &BackupDir,
+ backup_dir: &BackupDir<T>,
pxar_name: &BackupArchiveName,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
let mut path = datastore.base_path();
path.push(backup_dir.relative_path());
path.push(pxar_name.as_ref());
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index 629df933..79354dbf 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{Read as R, Write as W};
use pbs_datastore::index::IndexFile;
use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1};
use pbs_tools::json::{required_array_param, required_integer_param, required_string_param};
@@ -279,7 +280,7 @@ fn upgrade_to_backup_protocol(
return Ok(());
}
- let verify = |env: BackupEnvironment| {
+ let verify = |env: BackupEnvironment<W>| {
if let Err(err) = env.verify_after_complete(snap_guard) {
env.log(format!(
"backup finished, but starting the requested verify task failed: {}",
@@ -400,7 +401,7 @@ fn create_dynamic_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
@@ -450,7 +451,7 @@ fn create_fixed_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
let size = required_integer_param(¶m, "size")? as usize;
@@ -565,7 +566,7 @@ fn dynamic_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.debug(format!("dynamic_append {} chunks", digest_list.len()));
@@ -639,7 +640,7 @@ fn fixed_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.debug(format!("fixed_append {} chunks", digest_list.len()));
@@ -714,7 +715,7 @@ fn close_dynamic_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.dynamic_writer_close(wid, chunk_count, size, csum)?;
@@ -767,7 +768,7 @@ fn close_fixed_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size, csum)?;
@@ -781,7 +782,7 @@ fn finish_backup(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.finish_backup()?;
env.log("successfully finished backup");
@@ -800,7 +801,7 @@ fn get_previous_backup_time(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<R> = rpcenv.as_ref();
let backup_time = env
.last_backup
@@ -827,7 +828,7 @@ fn download_previous(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<R> = rpcenv.as_ref();
let archive_name = required_string_param(¶m, "archive-name")?.to_owned();
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..bb1566ae 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -14,25 +14,26 @@ use proxmox_schema::*;
use proxmox_sortable_macro::sortable;
use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
+use pbs_datastore::chunk_store::{CanWrite, Lookup as L, Write as W};
use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
use pbs_datastore::{DataBlob, DataStore};
use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
raw_data: Option<Vec<u8>>,
}
-impl UploadChunk {
+impl<T> UploadChunk<T> {
pub fn new(
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
}
}
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
type Output = Result<([u8; 32], u32, u32, bool), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
println!("Upload error: {}", err);
}
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<L> = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
}
.boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
let file_name = required_string_param(¶m, "file-name")?.to_owned();
let encoded_size = required_integer_param(¶m, "encoded-size")? as usize;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index b133be70..52fa6db1 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -30,6 +30,7 @@ use crate::api2::config::tape_backup_job::{delete_tape_backup_job, list_tape_bac
use crate::api2::config::verify::delete_verification_job;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{Read as R, Write as W};
use pbs_datastore::get_datastore_mount_status;
use proxmox_rest_server::WorkerTask;
@@ -124,7 +125,7 @@ pub(crate) fn do_create_datastore(
};
let chunk_store = if reuse_datastore {
- ChunkStore::verify_chunkstore(&path).and_then(|_| {
+ ChunkStore::<R>::verify_chunkstore(&path).and_then(|_| {
// Must be the only instance accessing and locking the chunk store,
// dropping will close all other locks from this process on the lockfile as well.
ChunkStore::open(
@@ -666,7 +667,7 @@ pub async fn delete_datastore(
auth_id.to_string(),
to_stdout,
move |_worker| {
- pbs_datastore::DataStore::destroy(&name, destroy_data)?;
+ pbs_datastore::DataStore::<W>::destroy(&name, destroy_data)?;
// ignore errors
let _ = jobstate::remove_state_file("prune", &name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..26f5bec6 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -14,25 +14,25 @@ use tracing::info;
/// `RpcEnvironment` implementation for backup reader service
#[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
}
-impl ReaderEnvironment {
+impl<T> ReaderEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
Self {
result_attributes: json!({}),
@@ -71,7 +71,7 @@ impl ReaderEnvironment {
}
}
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -93,14 +93,18 @@ impl RpcEnvironment for ReaderEnvironment {
}
}
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index cc791299..52f0953a 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -23,6 +23,7 @@ use pbs_api_types::{
DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
use pbs_datastore::index::IndexFile;
use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
use pbs_tools::json::required_string_param;
@@ -247,7 +248,7 @@ fn download_file(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<R> = rpcenv.as_ref();
let file_name = required_string_param(¶m, "file-name")?.to_owned();
@@ -303,7 +304,7 @@ fn download_chunk(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<R> = rpcenv.as_ref();
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index 31293a9a..306d5936 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
use pbs_config::CachedUserInfo;
use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataStore, StoreProgress};
use crate::tape::TapeNotificationMode;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
Ignored,
}
-fn backup_worker(
+fn backup_worker<T: CanRead + Send + Sync + 'static>(
worker: &WorkerTask,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
pool_config: &MediaPoolConfig,
setup: &TapeBackupJobSetup,
summary: &mut TapeBackupJobSummary,
@@ -564,11 +565,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
}
}
-fn backup_snapshot(
+fn backup_snapshot<T: CanRead + Send + Sync + 'static>(
worker: &WorkerTask,
pool_writer: &mut PoolWriter,
- datastore: Arc<DataStore>,
- snapshot: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ snapshot: BackupDir<T>,
) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path();
info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..47fa06dc 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Write as W;
use pbs_tape::{
linux_list_drives::{lookup_device_identification, lto_tape_device_list, open_lto_tape_device},
sg_tape::tape_alert_flags_critical,
@@ -1342,7 +1343,7 @@ pub fn catalog_media(
drive.read_label()?; // skip over labels - we already read them above
let mut checked_chunks = HashMap::new();
- restore_media(
+ restore_media::<W>(
worker,
&mut drive,
&media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 2cc1baab..8f089c20 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -27,6 +27,7 @@ use pbs_api_types::{
};
use pbs_client::pxar::tools::handle_root_with_optional_format_version_prelude;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as W};
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
@@ -120,13 +121,13 @@ impl NamespaceMap {
}
}
-pub struct DataStoreMap {
- map: HashMap<String, Arc<DataStore>>,
- default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+ map: HashMap<String, Arc<DataStore<T>>>,
+ default: Option<Arc<DataStore<T>>>,
ns_map: Option<NamespaceMap>,
}
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<W> {
type Error = Error;
fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
}
}
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,10 @@ impl DataStoreMap {
Ok(count > 0)
}
- fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+ #[allow(clippy::type_complexity)]
+ fn used_datastores(
+ &self,
+ ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,18 +193,19 @@ impl DataStoreMap {
.map(|mapping| mapping.get_namespaces(datastore, ns))
}
- fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+ fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
self.map
.get(source_datastore)
.or(self.default.as_ref())
.map(Arc::clone)
}
+ #[allow(clippy::type_complexity)]
fn get_targets(
&self,
source_datastore: &str,
source_ns: &BackupNamespace,
- ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+ ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
self.target_store(source_datastore)
.map(|store| (store, self.target_ns(source_datastore, source_ns)))
}
@@ -237,9 +242,9 @@ fn check_datastore_privs(
Ok(())
}
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
user_info: &CachedUserInfo,
- store: &Arc<DataStore>,
+ store: &Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
owner: Option<&Authid>,
@@ -449,13 +454,13 @@ pub fn restore(
}
#[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
auth_id: &Authid,
@@ -529,8 +534,8 @@ fn restore_full_worker(
}
#[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
- store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+ store_map: &DataStoreMap<T>,
store: &str,
snapshot: &str,
ns: &BackupNamespace,
@@ -618,14 +623,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
}
#[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
user_info: Arc<CachedUserInfo>,
@@ -955,16 +960,16 @@ fn get_media_set_catalog(
Ok(catalog)
}
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
let mut path = datastore.base_path();
path.push(".tmp");
path.push(media_set_uuid.to_string());
path
}
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
source_datastore: &str,
- datastore: &DataStore,
+ datastore: &DataStore<T>,
snapshot: &str,
media_set_uuid: &Uuid,
) -> PathBuf {
@@ -974,9 +979,9 @@ fn snapshot_tmpdir(
path
}
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
worker: Arc<WorkerTask>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
@@ -1083,10 +1088,10 @@ fn restore_snapshots_to_tmpdir(
Ok(tmp_paths)
}
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1138,10 @@ fn restore_file_chunk_map(
Ok(())
}
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1200,12 @@ fn restore_partial_chunk_archive<'a>(
/// Request and restore complete media without using existing catalog (create catalog instead)
#[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
media_id: &MediaId,
drive_config: &SectionConfigData,
drive_name: &str,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
@@ -1253,11 +1258,11 @@ pub fn request_and_restore_media(
/// Restore complete media content and catalog
///
/// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
media_id: &MediaId,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
auth_id: &Authid,
@@ -1301,11 +1306,11 @@ pub fn restore_media(
}
#[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
mut reader: Box<dyn 'a + TapeRead>,
current_file_number: u64,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
catalog: &mut MediaCatalog,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
@@ -1525,10 +1530,10 @@ fn scan_chunk_archive<'a>(
Ok(Some(chunks))
}
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
checked_chunks: &mut HashSet<[u8; 32]>,
verbose: bool,
) -> Result<Option<Vec<[u8; 32]>>, Error> {
diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs
index 8dd71fcf..039e32a6 100644
--- a/src/backup/hierarchy.rs
+++ b/src/backup/hierarchy.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive};
/// Asserts that `privs` are fulfilled on datastore + (optional) namespace.
@@ -68,8 +69,8 @@ pub fn check_ns_privs_full(
);
}
-pub fn can_access_any_namespace(
- store: Arc<DataStore>,
+pub fn can_access_any_namespace<T: CanRead + 'static>(
+ store: Arc<DataStore<T>>,
auth_id: &Authid,
user_info: &CachedUserInfo,
) -> bool {
@@ -95,8 +96,8 @@ pub fn can_access_any_namespace(
///
/// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and
/// optional owner checks.
-pub struct ListAccessibleBackupGroups<'a> {
- store: &'a Arc<DataStore>,
+pub struct ListAccessibleBackupGroups<'a, T> {
+ store: &'a Arc<DataStore<T>>,
auth_id: Option<&'a Authid>,
user_info: Arc<CachedUserInfo>,
/// The priv on NS level that allows auth_id trump the owner check
@@ -104,15 +105,15 @@ pub struct ListAccessibleBackupGroups<'a> {
/// The priv that auth_id is required to have on NS level additionally to being owner
owner_and_priv: u64,
/// Contains the intertnal state, group iter and a bool flag for override_owner_priv
- state: Option<(ListGroups, bool)>,
- ns_iter: ListNamespacesRecursive,
+ state: Option<(ListGroups<T>, bool)>,
+ ns_iter: ListNamespacesRecursive<T>,
}
-impl<'a> ListAccessibleBackupGroups<'a> {
+impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> {
// TODO: builder pattern
pub fn new_owned(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
auth_id: Option<&'a Authid>,
@@ -122,7 +123,7 @@ impl<'a> ListAccessibleBackupGroups<'a> {
}
pub fn new_with_privs(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
override_owner_priv: Option<u64>,
@@ -145,8 +146,8 @@ impl<'a> ListAccessibleBackupGroups<'a> {
pub static NS_PRIVS_OK: u64 =
PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT;
-impl Iterator for ListAccessibleBackupGroups<'_> {
- type Item = Result<BackupGroup, Error>;
+impl<T: CanRead> Iterator for ListAccessibleBackupGroups<'_, T> {
+ type Item = Result<BackupGroup<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 3d2cba8a..15c2e9e4 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -15,6 +15,7 @@ use pbs_api_types::{
UPID,
};
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
+use pbs_datastore::chunk_store::{CanRead, CanWrite};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, StoreProgress};
@@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups;
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
/// already been verified or detected as corrupt.
-pub struct VerifyWorker {
+pub struct VerifyWorker<T> {
worker: Arc<dyn WorkerTaskContext>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
}
-impl VerifyWorker {
+impl<T> VerifyWorker<T> {
/// Creates a new VerifyWorker for a given task worker and datastore.
- pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+ pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore<T>>) -> Self {
Self {
worker,
datastore,
@@ -46,7 +47,7 @@ impl VerifyWorker {
}
}
-fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+fn verify_blob<T: CanRead>(backup_dir: &BackupDir<T>, info: &FileInfo) -> Result<(), Error> {
let blob = backup_dir.load_blob(&info.filename)?;
let raw_size = blob.raw_size();
@@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
}
}
-fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
+fn rename_corrupted_chunk<T: CanWrite>(datastore: Arc<DataStore<T>>, digest: &[u8; 32]) {
let (path, digest_str) = datastore.chunk_path(digest);
let mut counter = 0;
@@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
};
}
-fn verify_index_chunks(
- verify_worker: &VerifyWorker,
+fn verify_index_chunks<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
@@ -238,9 +239,9 @@ fn verify_index_chunks(
Ok(())
}
-fn verify_fixed_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_fixed_index<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -260,9 +261,9 @@ fn verify_fixed_index(
verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
}
-fn verify_dynamic_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_dynamic_index<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -291,9 +292,9 @@ fn verify_dynamic_index(
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_dir(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
@@ -325,9 +326,9 @@ pub fn verify_backup_dir(
}
/// See verify_backup_dir
-pub fn verify_backup_dir_with_lock(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir_with_lock<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
_snap_lock: BackupLockGuard,
@@ -403,9 +404,9 @@ pub fn verify_backup_dir_with_lock(
/// Returns
/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_group(
- verify_worker: &VerifyWorker,
- group: &BackupGroup,
+pub fn verify_backup_group<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ group: &BackupGroup<T>,
progress: &mut StoreProgress,
upid: &UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
@@ -455,8 +456,8 @@ pub fn verify_backup_group(
/// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_all_backups(
- verify_worker: &VerifyWorker,
+pub fn verify_all_backups<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
upid: &UPID,
ns: BackupNamespace,
max_depth: Option<usize>,
@@ -504,7 +505,7 @@ pub fn verify_all_backups(
.filter(|group| {
!(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
})
- .collect::<Vec<BackupGroup>>(),
+ .collect::<Vec<BackupGroup<T>>>(),
Err(err) => {
info!("unable to list backups: {err}");
return Ok(errors);
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1d4cf37c..bda2f17b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -20,7 +20,8 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
use proxmox_sys::fs::CreateOptions;
use proxmox_sys::logrotate::LogRotate;
-use pbs_datastore::DataStore;
+use pbs_datastore::chunk_store::Lookup as L;
+use pbs_datastore::{is_garbage_collection_running, DataStore};
use proxmox_rest_server::{
cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
@@ -265,7 +266,7 @@ async fn run() -> Result<(), Error> {
// to remove references for not configured datastores
command_sock.register_command("datastore-removed".to_string(), |_value| {
- if let Err(err) = DataStore::remove_unused_datastores() {
+ if let Err(err) = DataStore::<L>::remove_unused_datastores() {
log::error!("could not refresh datastores: {err}");
}
Ok(Value::Null)
@@ -274,7 +275,7 @@ async fn run() -> Result<(), Error> {
// clear cache entry for datastore that is in a specific maintenance mode
command_sock.register_command("update-datastore-cache".to_string(), |value| {
if let Some(name) = value.and_then(Value::as_str) {
- if let Err(err) = DataStore::update_datastore_cache(name) {
+ if let Err(err) = DataStore::<L>::update_datastore_cache(name) {
log::error!("could not trigger update datastore cache: {err}");
}
}
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 64835028..c2af6c67 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -4,15 +4,18 @@ use std::sync::Arc;
use tracing::info;
use pbs_api_types::Authid;
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::DataStore;
use proxmox_rest_server::WorkerTask;
use crate::server::{jobstate::Job, send_gc_status};
/// Runs a garbage collection job.
-pub fn do_garbage_collection_job(
+pub fn do_garbage_collection_job<
+ T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static,
+>(
mut job: Job,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
auth_id: &Authid,
schedule: Option<String>,
to_stdout: bool,
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 1c86647a..395aaee4 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
print_store_and_ns, Authid, KeepOptions, Operation, PruneJobOptions, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
};
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::prune::compute_prune_info;
use pbs_datastore::DataStore;
use proxmox_rest_server::WorkerTask;
@@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask;
use crate::backup::ListAccessibleBackupGroups;
use crate::server::jobstate::Job;
-pub fn prune_datastore(
+pub fn prune_datastore<T: CanWrite>(
auth_id: Authid,
prune_options: PruneJobOptions,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
dry_run: bool,
) -> Result<(), Error> {
let store = &datastore.name();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c14..573aa805 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
};
use pbs_client::BackupRepository;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanWrite, Write as W};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -34,8 +35,8 @@ use super::sync::{
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
-pub(crate) struct PullTarget {
- store: Arc<DataStore>,
+pub(crate) struct PullTarget<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
}
@@ -44,7 +45,7 @@ pub(crate) struct PullParameters {
/// Where data is pulled from
source: Arc<dyn SyncSource>,
/// Where data should be pulled into
- target: PullTarget,
+ target: PullTarget<W>,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -135,9 +136,9 @@ impl PullParameters {
}
}
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, T: CanWrite + Send + Sync + 'static>(
chunk_reader: Arc<dyn AsyncReadChunk>,
- target: Arc<DataStore>,
+ target: Arc<DataStore<T>>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -260,9 +261,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive<'a>(
+async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>(
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -343,10 +344,10 @@ async fn pull_single_archive<'a>(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot<'a>(
+async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>(
params: &PullParameters,
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt: bool,
is_new: bool,
@@ -482,10 +483,10 @@ async fn pull_snapshot<'a>(
///
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
-async fn pull_snapshot_from<'a>(
+async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>(
params: &PullParameters,
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt: bool,
) -> Result<SyncStats, Error> {
diff --git a/src/server/push.rs b/src/server/push.rs
index e71012ed..ff9d9358 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
};
use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
use pbs_datastore::data_blob::ChunkInfo;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -61,7 +62,7 @@ impl PushTarget {
/// Parameters for a push operation
pub(crate) struct PushParameters {
/// Source of backups to be pushed to remote
- source: Arc<LocalSource>,
+ source: Arc<LocalSource<R>>,
/// Target for backups to be pushed to
target: PushTarget,
/// User used for permission checks on the source side, including potentially filtering visible
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 09814ef0..96a73503 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{BackupManifest, DataStore, ListNamespacesRecursive, LocalChunkReader};
@@ -105,10 +106,10 @@ pub(crate) struct RemoteSourceReader {
pub(crate) dir: BackupDir,
}
-pub(crate) struct LocalSourceReader {
+pub(crate) struct LocalSourceReader<T> {
pub(crate) _dir_lock: Arc<Mutex<BackupLockGuard>>,
pub(crate) path: PathBuf,
- pub(crate) datastore: Arc<DataStore>,
+ pub(crate) datastore: Arc<DataStore<T>>,
}
#[async_trait::async_trait]
@@ -189,7 +190,7 @@ impl SyncSourceReader for RemoteSourceReader {
}
#[async_trait::async_trait]
-impl SyncSourceReader for LocalSourceReader {
+impl<T: CanRead + Send + Sync + 'static> SyncSourceReader for LocalSourceReader<T> {
fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
Arc::new(LocalChunkReader::new(
self.datastore.clone(),
@@ -266,8 +267,8 @@ pub(crate) struct RemoteSource {
pub(crate) client: HttpClient,
}
-pub(crate) struct LocalSource {
- pub(crate) store: Arc<DataStore>,
+pub(crate) struct LocalSource<T> {
+ pub(crate) store: Arc<DataStore<T>>,
pub(crate) ns: BackupNamespace,
}
@@ -415,7 +416,7 @@ impl SyncSource for RemoteSource {
}
#[async_trait::async_trait]
-impl SyncSource for LocalSource {
+impl<T: CanRead + Send + Sync + 'static> SyncSource for LocalSource<T> {
async fn list_namespaces(
&self,
max_depth: &mut Option<usize>,
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index 9d11c04b..7f4ef01f 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -5,6 +5,7 @@ use std::task::{Context, Poll};
use proxmox_sys::error::SysError;
use proxmox_uuid::Uuid;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::SnapshotReader;
use pbs_tape::{MediaContentHeader, TapeWrite, PROXMOX_TAPE_BLOCK_SIZE};
@@ -21,9 +22,9 @@ use crate::tape::file_formats::{
/// `LEOM` was detected before all data was written. The stream is
/// marked inclomplete in that case and does not contain all data (The
/// backup task must rewrite the whole file on the next media).
-pub fn tape_write_snapshot_archive<'a>(
+pub fn tape_write_snapshot_archive<'a, T: CanRead>(
writer: &mut (dyn TapeWrite + 'a),
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<Option<Uuid>, std::io::Error> {
let backup_dir = snapshot_reader.snapshot();
let snapshot =
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 54084421..17c20add 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -15,6 +15,7 @@ use tracing::{info, warn};
use proxmox_uuid::Uuid;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataStore, SnapshotReader};
use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
use proxmox_rest_server::WorkerTask;
@@ -452,9 +453,9 @@ impl PoolWriter {
/// archive is marked incomplete, and we do not use it. The caller
/// should mark the media as full and try again using another
/// media.
- pub fn append_snapshot_archive(
+ pub fn append_snapshot_archive<T: CanRead>(
&mut self,
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<(bool, usize), Error> {
let status = match self.status {
Some(ref mut status) => status,
@@ -543,10 +544,10 @@ impl PoolWriter {
Ok((leom, bytes_written))
}
- pub fn spawn_chunk_reader_thread(
+ pub fn spawn_chunk_reader_thread<T: CanRead + Send + Sync + 'static>(
&self,
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
NewChunksIterator::spawn(
datastore,
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index e6f418df..e1a0da20 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
@@ -21,9 +22,9 @@ impl NewChunksIterator {
/// Creates the iterator, spawning a new thread
///
/// Make sure to join() the returned thread handle.
- pub fn spawn(
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ pub fn spawn<T: CanRead + Send + Sync + 'static>(
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
catalog_set: Arc<Mutex<CatalogSet>>,
read_threads: usize,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
--
2.39.5
More information about the pbs-devel
mailing list