[pbs-devel] [PATCH proxmox-backup RFC 10/10] backup/server/tape: add generics and separate functions into impl blocks

Hannes Laimer h.laimer at proxmox.com
Tue Sep 3 14:34:01 CEST 2024


Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/backup/hierarchy.rs                     | 26 +++++-----
 src/backup/verify.rs                        | 53 +++++++++++----------
 src/server/gc_job.rs                        |  8 ++--
 src/server/prune_job.rs                     |  5 +-
 src/server/pull.rs                          | 23 ++++-----
 src/tape/file_formats/snapshot_archive.rs   |  5 +-
 src/tape/pool_writer/mod.rs                 | 11 +++--
 src/tape/pool_writer/new_chunks_iterator.rs |  7 +--
 8 files changed, 74 insertions(+), 64 deletions(-)

diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs
index 640a7762..29f05c9e 100644
--- a/src/backup/hierarchy.rs
+++ b/src/backup/hierarchy.rs
@@ -7,7 +7,9 @@ use pbs_api_types::{
     PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ,
 };
 use pbs_config::CachedUserInfo;
-use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive};
+use pbs_datastore::{
+    backup_info::BackupGroup, chunk_store::CanRead, DataStore, ListGroups, ListNamespacesRecursive,
+};
 
 /// Asserts that `privs` are fulfilled on datastore + (optional) namespace.
 pub fn check_ns_privs(
@@ -68,8 +70,8 @@ pub fn check_ns_privs_full(
     );
 }
 
-pub fn can_access_any_namespace(
-    store: Arc<DataStore>,
+pub fn can_access_any_namespace<T: CanRead>(
+    store: Arc<DataStore<T>>,
     auth_id: &Authid,
     user_info: &CachedUserInfo,
 ) -> bool {
@@ -95,8 +97,8 @@ pub fn can_access_any_namespace(
 ///
 /// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and
 /// optional owner checks.
-pub struct ListAccessibleBackupGroups<'a> {
-    store: &'a Arc<DataStore>,
+pub struct ListAccessibleBackupGroups<'a, T> {
+    store: &'a Arc<DataStore<T>>,
     auth_id: Option<&'a Authid>,
     user_info: Arc<CachedUserInfo>,
     /// The priv on NS level that allows auth_id trump the owner check
@@ -104,15 +106,15 @@ pub struct ListAccessibleBackupGroups<'a> {
     /// The priv that auth_id is required to have on NS level additionally to being owner
     owner_and_priv: u64,
     /// Contains the intertnal state, group iter and a bool flag for override_owner_priv
-    state: Option<(ListGroups, bool)>,
-    ns_iter: ListNamespacesRecursive,
+    state: Option<(ListGroups<T>, bool)>,
+    ns_iter: ListNamespacesRecursive<T>,
 }
 
-impl<'a> ListAccessibleBackupGroups<'a> {
+impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> {
     // TODO: builder pattern
 
     pub fn new_owned(
-        store: &'a Arc<DataStore>,
+        store: &'a Arc<DataStore<T>>,
         ns: BackupNamespace,
         max_depth: usize,
         auth_id: Option<&'a Authid>,
@@ -122,7 +124,7 @@ impl<'a> ListAccessibleBackupGroups<'a> {
     }
 
     pub fn new_with_privs(
-        store: &'a Arc<DataStore>,
+        store: &'a Arc<DataStore<T>>,
         ns: BackupNamespace,
         max_depth: usize,
         override_owner_priv: Option<u64>,
@@ -145,8 +147,8 @@ impl<'a> ListAccessibleBackupGroups<'a> {
 pub static NS_PRIVS_OK: u64 =
     PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT;
 
-impl<'a> Iterator for ListAccessibleBackupGroups<'a> {
-    type Item = Result<BackupGroup, Error>;
+impl<'a, T: CanRead> Iterator for ListAccessibleBackupGroups<'a, T> {
+    type Item = Result<BackupGroup<T>, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
         loop {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 6ef7e8eb..1ede08ea 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -5,6 +5,7 @@ use std::time::Instant;
 
 use anyhow::{bail, format_err, Error};
 use nix::dir::Dir;
+use pbs_datastore::chunk_store::{CanRead, CanWrite};
 use tracing::{error, info};
 
 use proxmox_sys::fs::lock_dir_noblock_shared;
@@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups;
 
 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
 /// already been verified or detected as corrupt.
-pub struct VerifyWorker {
+pub struct VerifyWorker<T> {
     worker: Arc<dyn WorkerTaskContext>,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 }
 
-impl VerifyWorker {
+impl<T: CanWrite> VerifyWorker<T> {
     /// Creates a new VerifyWorker for a given task worker and datastore.
-    pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+    pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore<T>>) -> Self {
         Self {
             worker,
             datastore,
@@ -46,7 +47,7 @@ impl VerifyWorker {
     }
 }
 
-fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+fn verify_blob<T: CanRead>(backup_dir: &BackupDir<T>, info: &FileInfo) -> Result<(), Error> {
     let blob = backup_dir.load_blob(&info.filename)?;
 
     let raw_size = blob.raw_size();
@@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
     }
 }
 
-fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
+fn rename_corrupted_chunk<T: CanWrite>(datastore: Arc<DataStore<T>>, digest: &[u8; 32]) {
     let (path, digest_str) = datastore.chunk_path(digest);
 
     let mut counter = 0;
@@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
     };
 }
 
-fn verify_index_chunks(
-    verify_worker: &VerifyWorker,
+fn verify_index_chunks<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
     index: Box<dyn IndexFile + Send>,
     crypt_mode: CryptMode,
 ) -> Result<(), Error> {
@@ -238,9 +239,9 @@ fn verify_index_chunks(
     Ok(())
 }
 
-fn verify_fixed_index(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+fn verify_fixed_index<T: CanWrite + Sync + Send + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     info: &FileInfo,
 ) -> Result<(), Error> {
     let mut path = backup_dir.relative_path();
@@ -260,9 +261,9 @@ fn verify_fixed_index(
     verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
 }
 
-fn verify_dynamic_index(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+fn verify_dynamic_index<T: CanWrite + Sync + Send + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     info: &FileInfo,
 ) -> Result<(), Error> {
     let mut path = backup_dir.relative_path();
@@ -291,9 +292,9 @@ fn verify_dynamic_index(
 /// - Ok(true) if verify is successful
 /// - Ok(false) if there were verification errors
 /// - Err(_) if task was aborted
-pub fn verify_backup_dir(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+pub fn verify_backup_dir<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
 ) -> Result<bool, Error> {
@@ -328,9 +329,9 @@ pub fn verify_backup_dir(
 }
 
 /// See verify_backup_dir
-pub fn verify_backup_dir_with_lock(
-    verify_worker: &VerifyWorker,
-    backup_dir: &BackupDir,
+pub fn verify_backup_dir_with_lock<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    backup_dir: &BackupDir<T>,
     upid: UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
     _snap_lock: Dir,
@@ -415,9 +416,9 @@ pub fn verify_backup_dir_with_lock(
 /// Returns
 /// - Ok((count, failed_dirs)) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_backup_group(
-    verify_worker: &VerifyWorker,
-    group: &BackupGroup,
+pub fn verify_backup_group<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
+    group: &BackupGroup<T>,
     progress: &mut StoreProgress,
     upid: &UPID,
     filter: Option<&dyn Fn(&BackupManifest) -> bool>,
@@ -467,8 +468,8 @@ pub fn verify_backup_group(
 /// Returns
 /// - Ok(failed_dirs) where failed_dirs had verification errors
 /// - Err(_) if task was aborted
-pub fn verify_all_backups(
-    verify_worker: &VerifyWorker,
+pub fn verify_all_backups<T: CanWrite + Send + Sync + 'static>(
+    verify_worker: &VerifyWorker<T>,
     upid: &UPID,
     ns: BackupNamespace,
     max_depth: Option<usize>,
@@ -516,7 +517,7 @@ pub fn verify_all_backups(
             .filter(|group| {
                 !(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
             })
-            .collect::<Vec<BackupGroup>>(),
+            .collect::<Vec<BackupGroup<T>>>(),
         Err(err) => {
             info!("unable to list backups: {err}");
             return Ok(errors);
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 64835028..4892430c 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -4,15 +4,17 @@ use std::sync::Arc;
 use tracing::info;
 
 use pbs_api_types::Authid;
-use pbs_datastore::DataStore;
+use pbs_datastore::{chunk_store::CanWrite, DataStore};
 use proxmox_rest_server::WorkerTask;
 
 use crate::server::{jobstate::Job, send_gc_status};
 
 /// Runs a garbage collection job.
-pub fn do_garbage_collection_job(
+pub fn do_garbage_collection_job<
+    T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static,
+>(
     mut job: Job,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     auth_id: &Authid,
     schedule: Option<String>,
     to_stdout: bool,
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 546c0bbd..9024a61f 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -1,6 +1,7 @@
 use std::sync::Arc;
 
 use anyhow::Error;
+use pbs_datastore::chunk_store::CanWrite;
 use tracing::{info, warn};
 
 use pbs_api_types::{
@@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask;
 use crate::backup::ListAccessibleBackupGroups;
 use crate::server::jobstate::Job;
 
-pub fn prune_datastore(
+pub fn prune_datastore<T: CanWrite>(
     auth_id: Authid,
     prune_options: PruneJobOptions,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<T>>,
     dry_run: bool,
 ) -> Result<(), Error> {
     let store = &datastore.name();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 41ab5e0e..a567c510 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -9,6 +9,7 @@ use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
+use pbs_datastore::chunk_store::{CanWrite, Read as StoreRead, Write as StoreWrite};
 use proxmox_human_byte::HumanByte;
 use proxmox_router::HttpError;
 use serde_json::json;
@@ -45,11 +46,11 @@ struct RemoteReader {
 struct LocalReader {
     _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
     path: PathBuf,
-    datastore: Arc<DataStore>,
+    datastore: Arc<DataStore<StoreRead>>,
 }
 
 pub(crate) struct PullTarget {
-    store: Arc<DataStore>,
+    store: Arc<DataStore<StoreWrite>>,
     ns: BackupNamespace,
 }
 
@@ -60,7 +61,7 @@ pub(crate) struct RemoteSource {
 }
 
 pub(crate) struct LocalSource {
-    store: Arc<DataStore>,
+    store: Arc<DataStore<StoreRead>>,
     ns: BackupNamespace,
 }
 
@@ -571,9 +572,9 @@ impl PullParameters {
     }
 }
 
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, T: CanWrite + Send + Sync + 'static>(
     chunk_reader: Arc<dyn AsyncReadChunk>,
-    target: Arc<DataStore>,
+    target: Arc<DataStore<T>>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<PullStats, Error> {
@@ -696,9 +697,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// -- Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive<'a>(
+async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>(
     reader: Arc<dyn PullReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<PullStats, Error> {
@@ -779,9 +780,9 @@ async fn pull_single_archive<'a>(
 /// -- if file already exists, verify contents
 /// -- if not, pull it from the remote
 /// - Download log if not already existing
-async fn pull_snapshot<'a>(
+async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>(
     reader: Arc<dyn PullReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<PullStats, Error> {
     let mut pull_stats = PullStats::default();
@@ -890,9 +891,9 @@ async fn pull_snapshot<'a>(
 ///
 /// The `reader` is configured to read from the source backup directory, while the
 /// `snapshot` is pointing to the local datastore and target namespace.
-async fn pull_snapshot_from<'a>(
+async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>(
     reader: Arc<dyn PullReader + 'a>,
-    snapshot: &'a pbs_datastore::BackupDir,
+    snapshot: &'a pbs_datastore::BackupDir<T>,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<PullStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index f5a588f4..526c69ef 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -2,6 +2,7 @@ use std::io::{Read, Write};
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
+use pbs_datastore::chunk_store::CanRead;
 use proxmox_sys::error::SysError;
 use proxmox_uuid::Uuid;
 
@@ -21,9 +22,9 @@ use crate::tape::file_formats::{
 /// `LEOM` was detected before all data was written. The stream is
 /// marked inclomplete in that case and does not contain all data (The
 /// backup task must rewrite the whole file on the next media).
-pub fn tape_write_snapshot_archive<'a>(
+pub fn tape_write_snapshot_archive<'a, T: CanRead>(
     writer: &mut (dyn TapeWrite + 'a),
-    snapshot_reader: &SnapshotReader,
+    snapshot_reader: &SnapshotReader<T>,
 ) -> Result<Option<Uuid>, std::io::Error> {
     let backup_dir = snapshot_reader.snapshot();
     let snapshot =
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 9731e1cc..54a8715e 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -3,6 +3,7 @@ pub use catalog_set::*;
 
 mod new_chunks_iterator;
 pub use new_chunks_iterator::*;
+use pbs_datastore::chunk_store::CanRead;
 
 use std::collections::HashSet;
 use std::fs::File;
@@ -445,9 +446,9 @@ impl PoolWriter {
     /// archive is marked incomplete, and we do not use it. The caller
     /// should mark the media as full and try again using another
     /// media.
-    pub fn append_snapshot_archive(
+    pub fn append_snapshot_archive<T: CanRead>(
         &mut self,
-        snapshot_reader: &SnapshotReader,
+        snapshot_reader: &SnapshotReader<T>,
     ) -> Result<(bool, usize), Error> {
         let status = match self.status {
             Some(ref mut status) => status,
@@ -536,10 +537,10 @@ impl PoolWriter {
         Ok((leom, bytes_written))
     }
 
-    pub fn spawn_chunk_reader_thread(
+    pub fn spawn_chunk_reader_thread<T: CanRead + Send + Sync + 'static>(
         &self,
-        datastore: Arc<DataStore>,
-        snapshot_reader: Arc<Mutex<SnapshotReader>>,
+        datastore: Arc<DataStore<T>>,
+        snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
     ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
         NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
     }
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 1454b33d..b83ddf3e 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
 
 use anyhow::{format_err, Error};
 
+use pbs_datastore::chunk_store::CanRead;
 use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
 
 use crate::tape::CatalogSet;
@@ -20,9 +21,9 @@ impl NewChunksIterator {
     /// Creates the iterator, spawning a new thread
     ///
     /// Make sure to join() the returned thread handle.
-    pub fn spawn(
-        datastore: Arc<DataStore>,
-        snapshot_reader: Arc<Mutex<SnapshotReader>>,
+    pub fn spawn<T: CanRead + Send + Sync + 'static>(
+        datastore: Arc<DataStore<T>>,
+        snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
         catalog_set: Arc<Mutex<CatalogSet>>,
     ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
         let (tx, rx) = std::sync::mpsc::sync_channel(3);
-- 
2.39.2





More information about the pbs-devel mailing list