[pbs-devel] [PATCH proxmox-backup v6 3/6] pull: add support for pulling from local datastore
Hannes Laimer
h.laimer at proxmox.com
Tue Nov 21 15:31:52 CET 2023
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 138 insertions(+), 5 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index ff3e6d0a..1403c7a7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,8 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::Seek;
-use std::path::Path;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{
+ check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
use pbs_tools::sha::sha256;
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
struct RemoteReader {
@@ -40,6 +42,12 @@ struct RemoteReader {
dir: BackupDir,
}
+struct LocalReader {
+ _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ path: PathBuf,
+ datastore: Arc<DataStore>,
+}
+
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
client: HttpClient,
}
+pub(crate) struct LocalSource {
+ store: Arc<DataStore>,
+ ns: BackupNamespace,
+}
+
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
}
}
+#[async_trait::async_trait]
+impl PullSource for LocalSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ _worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ ListNamespacesRecursive::new_max_depth(
+ self.store.clone(),
+ self.ns.clone(),
+ max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+ )?
+ .collect()
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ Ok(ListAccessibleBackupGroups::new_with_privs(
+ &self.store,
+ namespace.clone(),
+ 0,
+ None,
+ None,
+ Some(owner),
+ )?
+ .filter_map(Result::ok)
+ .map(|backup_group| backup_group.group().clone())
+ .collect::<Vec<pbs_api_types::BackupGroup>>())
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ _worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error> {
+ Ok(self
+ .store
+ .backup_group(namespace.clone(), group.clone())
+ .iter_snapshots()?
+ .filter_map(Result::ok)
+ .map(|snapshot| snapshot.dir().to_owned())
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ print_store_and_ns(self.store.name(), &self.ns)
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error> {
+ let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+ let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+ &dir.full_path(),
+ "snapshot",
+ "locked by another operation",
+ )?;
+ Ok(Arc::new(LocalReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }))
+ }
+}
+
#[async_trait::async_trait]
/// `PullReader` is a trait that provides an interface for reading data from a source.
/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
}
}
+#[async_trait::async_trait]
+impl PullReader for LocalReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ _worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(
+ &self,
+ _to_path: &Path,
+ _worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
+
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -399,7 +529,10 @@ impl PullParameters {
client,
})
} else {
- bail!("local sync not implemented yet")
+ Arc::new(LocalSource {
+ store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+ ns: remote_ns,
+ })
};
let target = PullTarget {
store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
--
2.39.2
More information about the pbs-devel
mailing list