[pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block

Hannes Laimer h.laimer at proxmox.com
Mon May 26 16:14:40 CEST 2025


... based on whether they read or write.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/api2/backup/environment.rs | 337 +++++++++++++++++----------------
 1 file changed, 174 insertions(+), 163 deletions(-)

diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 3d541b46..a1620fb9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -13,6 +13,7 @@ use proxmox_sys::fs::{replace_file, CreateOptions};
 
 use pbs_api_types::Authid;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanWrite;
 use pbs_datastore::dynamic_index::DynamicIndexWriter;
 use pbs_datastore::fixed_index::FixedIndexWriter;
 use pbs_datastore::{DataBlob, DataStore};
@@ -54,17 +55,17 @@ impl std::ops::Add for UploadStatistic {
     }
 }
 
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
     name: String,
-    index: DynamicIndexWriter,
+    index: DynamicIndexWriter<T>,
     offset: u64,
     chunk_count: u64,
     upload_stat: UploadStatistic,
 }
 
-struct FixedWriterState {
+struct FixedWriterState<T> {
     name: String,
-    index: FixedIndexWriter,
+    index: FixedIndexWriter<T>,
     size: usize,
     chunk_size: u32,
     chunk_count: u64,
@@ -76,18 +77,18 @@ struct FixedWriterState {
 // key=digest, value=length
 type KnownChunksMap = HashMap<[u8; 32], u32>;
 
-struct SharedBackupState {
+struct SharedBackupState<T> {
     finished: bool,
     uid_counter: usize,
     file_counter: usize, // successfully uploaded files
-    dynamic_writers: HashMap<usize, DynamicWriterState>,
-    fixed_writers: HashMap<usize, FixedWriterState>,
+    dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+    fixed_writers: HashMap<usize, FixedWriterState<T>>,
     known_chunks: KnownChunksMap,
     backup_size: u64, // sums up size of all files
     backup_stat: UploadStatistic,
 }
 
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
     // Raise error if finished flag is set
     fn ensure_unfinished(&self) -> Result<(), Error> {
         if self.finished {
@@ -105,26 +106,32 @@ impl SharedBackupState {
 
 /// `RpcEnvironment` implementation for backup service
 #[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
     env_type: RpcEnvironmentType,
     result_attributes: Value,
     auth_id: Authid,
     pub debug: bool,
     pub formatter: &'static dyn OutputFormatter,
     pub worker: Arc<WorkerTask>,
-    pub datastore: Arc<DataStore>,
-    pub backup_dir: BackupDir,
-    pub last_backup: Option<BackupInfo>,
-    state: Arc<Mutex<SharedBackupState>>,
+    pub datastore: Arc<DataStore<T>>,
+    pub backup_dir: BackupDir<T>,
+    pub last_backup: Option<BackupInfo<T>>,
+    state: Arc<Mutex<SharedBackupState<T>>>,
 }
 
-impl BackupEnvironment {
+impl<T: Send + Sync + 'static> BackupEnvironment<T> {
+    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+        self.formatter.format_result(result, self)
+    }
+}
+
+impl<T> BackupEnvironment<T> {
     pub fn new(
         env_type: RpcEnvironmentType,
         auth_id: Authid,
         worker: Arc<WorkerTask>,
-        datastore: Arc<DataStore>,
-        backup_dir: BackupDir,
+        datastore: Arc<DataStore<T>>,
+        backup_dir: BackupDir<T>,
     ) -> Self {
         let state = SharedBackupState {
             finished: false,
@@ -260,10 +267,148 @@ impl BackupEnvironment {
         state.known_chunks.get(digest).copied()
     }
 
+    fn log_upload_stat(
+        &self,
+        archive_name: &str,
+        csum: &[u8; 32],
+        uuid: &[u8; 16],
+        size: u64,
+        chunk_count: u64,
+        upload_stat: &UploadStatistic,
+    ) {
+        self.log(format!("Upload statistics for '{}'", archive_name));
+        self.log(format!("UUID: {}", hex::encode(uuid)));
+        self.log(format!("Checksum: {}", hex::encode(csum)));
+        self.log(format!("Size: {}", size));
+        self.log(format!("Chunk count: {}", chunk_count));
+
+        if size == 0 || chunk_count == 0 {
+            return;
+        }
+
+        self.log(format!(
+            "Upload size: {} ({}%)",
+            upload_stat.size,
+            (upload_stat.size * 100) / size
+        ));
+
+        // account for zero chunk, which might be uploaded but never used
+        let client_side_duplicates = if chunk_count < upload_stat.count {
+            0
+        } else {
+            chunk_count - upload_stat.count
+        };
+
+        let server_side_duplicates = upload_stat.duplicates;
+
+        if (client_side_duplicates + server_side_duplicates) > 0 {
+            let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
+            self.log(format!(
+                "Duplicates: {}+{} ({}%)",
+                client_side_duplicates, server_side_duplicates, per
+            ));
+        }
+
+        if upload_stat.size > 0 {
+            self.log(format!(
+                "Compression: {}%",
+                (upload_stat.compressed_size * 100) / upload_stat.size
+            ));
+        }
+    }
+
+    pub fn log<S: AsRef<str>>(&self, msg: S) {
+        info!("{}", msg.as_ref());
+    }
+
+    pub fn debug<S: AsRef<str>>(&self, msg: S) {
+        if self.debug {
+            // This is kinda weird, we would like to use tracing::debug! here and automatically
+            // filter it, but self.debug is set from the client-side and the logs are printed on
+            // client and server side. This means that if the client sets the log level to debug,
+            // both server and client need to have 'debug' logs printed.
+            self.log(msg);
+        }
+    }
+
+    /// Raise error if finished flag is not set
+    pub fn ensure_finished(&self) -> Result<(), Error> {
+        let state = self.state.lock().unwrap();
+        if !state.finished {
+            bail!("backup ended but finished flag is not set.");
+        }
+        Ok(())
+    }
+
+    /// Return true if the finished flag is set
+    pub fn finished(&self) -> bool {
+        let state = self.state.lock().unwrap();
+        state.finished
+    }
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+    /// If verify-new is set on the datastore, this will run a new verify task
+    /// for the backup. If not, this will return and also drop the passed lock
+    /// immediately.
+    pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
+        self.ensure_finished()?;
+
+        if !self.datastore.verify_new() {
+            // no verify requested, do nothing
+            return Ok(());
+        }
+
+        // Downgrade to shared lock, the backup itself is finished
+        drop(excl_snap_lock);
+        let snap_lock = self.backup_dir.lock_shared().with_context(|| {
+            format!(
+                "while trying to verify snapshot '{:?}' after completion",
+                self.backup_dir
+            )
+        })?;
+        let worker_id = format!(
+            "{}:{}/{}/{:08X}",
+            self.datastore.name(),
+            self.backup_dir.backup_type(),
+            self.backup_dir.backup_id(),
+            self.backup_dir.backup_time()
+        );
+
+        let datastore = self.datastore.clone();
+        let backup_dir = self.backup_dir.clone();
+
+        WorkerTask::new_thread(
+            "verify",
+            Some(worker_id),
+            self.auth_id.to_string(),
+            false,
+            move |worker| {
+                worker.log_message("Automatically verifying newly added snapshot");
+
+                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+                if !verify_backup_dir_with_lock(
+                    &verify_worker,
+                    &backup_dir,
+                    worker.upid().clone(),
+                    None,
+                    snap_lock,
+                )? {
+                    bail!("verification failed - please check the log for details");
+                }
+
+                Ok(())
+            },
+        )
+        .map(|_| ())
+    }
+}
+
+impl<T: CanWrite> BackupEnvironment<T> {
     /// Store the writer with an unique ID
     pub fn register_dynamic_writer(
         &self,
-        index: DynamicIndexWriter,
+        index: DynamicIndexWriter<T>,
         name: String,
     ) -> Result<usize, Error> {
         let mut state = self.state.lock().unwrap();
@@ -289,7 +434,7 @@ impl BackupEnvironment {
     /// Store the writer with an unique ID
     pub fn register_fixed_writer(
         &self,
-        index: FixedIndexWriter,
+        index: FixedIndexWriter<T>,
         name: String,
         size: usize,
         chunk_size: u32,
@@ -379,56 +524,6 @@ impl BackupEnvironment {
         Ok(())
     }
 
-    fn log_upload_stat(
-        &self,
-        archive_name: &str,
-        csum: &[u8; 32],
-        uuid: &[u8; 16],
-        size: u64,
-        chunk_count: u64,
-        upload_stat: &UploadStatistic,
-    ) {
-        self.log(format!("Upload statistics for '{}'", archive_name));
-        self.log(format!("UUID: {}", hex::encode(uuid)));
-        self.log(format!("Checksum: {}", hex::encode(csum)));
-        self.log(format!("Size: {}", size));
-        self.log(format!("Chunk count: {}", chunk_count));
-
-        if size == 0 || chunk_count == 0 {
-            return;
-        }
-
-        self.log(format!(
-            "Upload size: {} ({}%)",
-            upload_stat.size,
-            (upload_stat.size * 100) / size
-        ));
-
-        // account for zero chunk, which might be uploaded but never used
-        let client_side_duplicates = if chunk_count < upload_stat.count {
-            0
-        } else {
-            chunk_count - upload_stat.count
-        };
-
-        let server_side_duplicates = upload_stat.duplicates;
-
-        if (client_side_duplicates + server_side_duplicates) > 0 {
-            let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
-            self.log(format!(
-                "Duplicates: {}+{} ({}%)",
-                client_side_duplicates, server_side_duplicates, per
-            ));
-        }
-
-        if upload_stat.size > 0 {
-            self.log(format!(
-                "Compression: {}%",
-                (upload_stat.compressed_size * 100) / upload_stat.size
-            ));
-        }
-    }
-
     /// Close dynamic writer
     pub fn dynamic_writer_close(
         &self,
@@ -633,94 +728,6 @@ impl BackupEnvironment {
         Ok(())
     }
 
-    /// If verify-new is set on the datastore, this will run a new verify task
-    /// for the backup. If not, this will return and also drop the passed lock
-    /// immediately.
-    pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
-        self.ensure_finished()?;
-
-        if !self.datastore.verify_new() {
-            // no verify requested, do nothing
-            return Ok(());
-        }
-
-        // Downgrade to shared lock, the backup itself is finished
-        drop(excl_snap_lock);
-        let snap_lock = self.backup_dir.lock_shared().with_context(|| {
-            format!(
-                "while trying to verify snapshot '{:?}' after completion",
-                self.backup_dir
-            )
-        })?;
-        let worker_id = format!(
-            "{}:{}/{}/{:08X}",
-            self.datastore.name(),
-            self.backup_dir.backup_type(),
-            self.backup_dir.backup_id(),
-            self.backup_dir.backup_time()
-        );
-
-        let datastore = self.datastore.clone();
-        let backup_dir = self.backup_dir.clone();
-
-        WorkerTask::new_thread(
-            "verify",
-            Some(worker_id),
-            self.auth_id.to_string(),
-            false,
-            move |worker| {
-                worker.log_message("Automatically verifying newly added snapshot");
-
-                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
-                if !verify_backup_dir_with_lock(
-                    &verify_worker,
-                    &backup_dir,
-                    worker.upid().clone(),
-                    None,
-                    snap_lock,
-                )? {
-                    bail!("verification failed - please check the log for details");
-                }
-
-                Ok(())
-            },
-        )
-        .map(|_| ())
-    }
-
-    pub fn log<S: AsRef<str>>(&self, msg: S) {
-        info!("{}", msg.as_ref());
-    }
-
-    pub fn debug<S: AsRef<str>>(&self, msg: S) {
-        if self.debug {
-            // This is kinda weird, we would like to use tracing::debug! here and automatically
-            // filter it, but self.debug is set from the client-side and the logs are printed on
-            // client and server side. This means that if the client sets the log level to debug,
-            // both server and client need to have 'debug' logs printed.
-            self.log(msg);
-        }
-    }
-
-    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
-        self.formatter.format_result(result, self)
-    }
-
-    /// Raise error if finished flag is not set
-    pub fn ensure_finished(&self) -> Result<(), Error> {
-        let state = self.state.lock().unwrap();
-        if !state.finished {
-            bail!("backup ended but finished flag is not set.");
-        }
-        Ok(())
-    }
-
-    /// Return true if the finished flag is set
-    pub fn finished(&self) -> bool {
-        let state = self.state.lock().unwrap();
-        state.finished
-    }
-
     /// Remove complete backup
     pub fn remove_backup(&self) -> Result<(), Error> {
         let mut state = self.state.lock().unwrap();
@@ -736,7 +743,7 @@ impl BackupEnvironment {
     }
 }
 
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
     fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
@@ -758,14 +765,18 @@ impl RpcEnvironment for BackupEnvironment {
     }
 }
 
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
 
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
-- 
2.39.5





More information about the pbs-devel mailing list