[pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore

Hannes Laimer h.laimer at proxmox.com
Tue Aug 8 14:13:44 CEST 2023


Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 138 insertions(+), 5 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index e1a27a8c..a685bccb 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,8 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::Seek;
-use std::path::Path;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{
+    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
 use pbs_tools::sha::sha256;
 
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
 struct RemoteReader {
@@ -40,6 +42,12 @@ struct RemoteReader {
     dir: BackupDir,
 }
 
+struct LocalReader {
+    _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+    path: PathBuf,
+    datastore: Arc<DataStore>,
+}
+
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
     ns: BackupNamespace,
@@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
     client: HttpClient,
 }
 
+pub(crate) struct LocalSource {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
     }
 }
 
+#[async_trait::async_trait]
+impl PullSource for LocalSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        ListNamespacesRecursive::new_max_depth(
+            self.store.clone(),
+            self.ns.clone(),
+            max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+        )?
+        .collect()
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        Ok(ListAccessibleBackupGroups::new_with_privs(
+            &self.store,
+            namespace.clone(),
+            MAX_NAMESPACE_DEPTH,
+            None,
+            None,
+            Some(owner),
+        )?
+        .filter_map(Result::ok)
+        .map(|backup_group| backup_group.group().clone())
+        .collect::<Vec<pbs_api_types::BackupGroup>>())
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        Ok(self
+            .store
+            .backup_group(namespace.clone(), group.clone())
+            .iter_snapshots()?
+            .filter_map(Result::ok)
+            .map(|snapshot| snapshot.dir().to_owned())
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.store.name(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+        let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+            &dir.full_path(),
+            "snapshot",
+            "locked by another operation",
+        )?;
+        Ok(Arc::new(LocalReader {
+            _dir_lock: Arc::new(Mutex::new(dir_lock)),
+            path: dir.full_path(),
+            datastore: dir.datastore().clone(),
+        }))
+    }
+}
+
 #[async_trait::async_trait]
 /// `PullReader` is a trait that provides an interface for reading data from a source.
 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
     }
 }
 
+#[async_trait::async_trait]
+impl PullReader for LocalReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(LocalChunkReader::new(
+            self.datastore.clone(),
+            None,
+            crypt_mode,
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let mut from_path = self.path.clone();
+        from_path.push(filename);
+        tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        _to_path: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+        self.datastore.name() == target_store_name
+    }
+}
+
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Where data is pulled from
@@ -399,7 +529,10 @@ impl PullParameters {
                 client,
             })
         } else {
-            bail!("local sync not implemented yet")
+            Arc::new(LocalSource {
+                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+                ns: remote_ns,
+            })
         };
         let target = PullTarget {
             store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
-- 
2.39.2






More information about the pbs-devel mailing list