[pbs-devel] [PATCH v5 proxmox-backup 10/31] api: push: implement endpoint for sync in push direction

Christian Ebner c.ebner at proxmox.com
Fri Oct 18 10:42:21 CEST 2024

Expose the sync job in push direction via a dedicated API endpoint,
analogous to the pull direction.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
changes since version 4:
- no changes

changes since version 3:
- include namespace in remote acl path

 src/api2/mod.rs  |   2 +
 src/api2/push.rs | 222 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 224 insertions(+)
 create mode 100644 src/api2/push.rs

diff --git a/src/api2/mod.rs b/src/api2/mod.rs
index a83e4c205..03596326b 100644
--- a/src/api2/mod.rs
+++ b/src/api2/mod.rs
@@ -12,6 +12,7 @@ pub mod helpers;
 pub mod node;
 pub mod ping;
 pub mod pull;
+pub mod push;
 pub mod reader;
 pub mod status;
 pub mod tape;
@@ -29,6 +30,7 @@ const SUBDIRS: SubdirMap = &sorted!([
     ("nodes", &node::ROUTER),
     ("ping", &ping::ROUTER),
     ("pull", &pull::ROUTER),
+    ("push", &push::ROUTER),
     ("reader", &reader::ROUTER),
     ("status", &status::ROUTER),
     ("tape", &tape::ROUTER),
diff --git a/src/api2/push.rs b/src/api2/push.rs
new file mode 100644
index 000000000..ead2bf2e0
--- /dev/null
+++ b/src/api2/push.rs
@@ -0,0 +1,222 @@
+use anyhow::{format_err, Context, Error};
+use futures::{future::FutureExt, select};
+use tracing::info;
+use pbs_api_types::{
+    Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
+use proxmox_rest_server::WorkerTask;
+use proxmox_router::{Permission, Router, RpcEnvironment};
+use proxmox_schema::api;
+use pbs_config::CachedUserInfo;
+use crate::server::push::{push_store, PushParameters};
+/// Check if the provided user is allowed to read from the local source and act on the remote
+/// target for pushing content
+pub fn check_push_privs(
+    auth_id: &Authid,
+    store: &str,
+    namespace: Option<&str>,
+    remote: &str,
+    remote_store: &str,
+    remote_ns: Option<&str>,
+    delete: bool,
+) -> Result<(), Error> {
+    let user_info = CachedUserInfo::new()?;
+    let target_acl_path = match remote_ns {
+        Some(ns) => vec!["remote", remote, remote_store, ns],
+        None => vec!["remote", remote, remote_store],
+    };
+    // Check user is allowed to backup to remote/<remote>/<datastore>/<namespace>
+    user_info.check_privs(
+        auth_id,
+        &target_acl_path,
+        false,
+    )?;
+    if delete {
+        // Check user is allowed to prune remote datastore
+        user_info.check_privs(
+            auth_id,
+            &target_acl_path,
+            false,
+        )?;
+    }
+    let local_acl_path = match namespace {
+        Some(ns) => vec!["datastore", store, ns],
+        None => vec!["datastore", store],
+    };
+    // Check user is allowed to read source datastore
+    user_info.check_privs(auth_id, &local_acl_path, PRIV_DATASTORE_READ, false)?;
+    Ok(())
+impl TryFrom<&SyncJobConfig> for PushParameters {
+    type Error = Error;
+    fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+        PushParameters::new(
+            &sync_job.store,
+            sync_job.ns.clone().unwrap_or_default(),
+            sync_job
+                .remote
+                .as_deref()
+                .context("missing required remote")?,
+            &sync_job.remote_store,
+            sync_job.remote_ns.clone().unwrap_or_default(),
+            sync_job
+                .owner
+                .as_ref()
+                .unwrap_or_else(|| Authid::root_auth_id())
+                .clone(),
+            sync_job.remove_vanished,
+            sync_job.max_depth,
+            sync_job.group_filter.clone(),
+            sync_job.limit.clone(),
+            sync_job.transfer_last,
+        )
+    }
+    input: {
+        properties: {
+            store: {
+                schema: DATASTORE_SCHEMA,
+            },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
+            remote: {
+                schema: REMOTE_ID_SCHEMA,
+            },
+            "remote-store": {
+                schema: DATASTORE_SCHEMA,
+            },
+            "remote-ns": {
+                type: BackupNamespace,
+                optional: true,
+            },
+            "remove-vanished": {
+                schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
+                optional: true,
+            },
+            "max-depth": {
+                schema: NS_MAX_DEPTH_REDUCED_SCHEMA,
+                optional: true,
+            },
+            "group-filter": {
+                schema: GROUP_FILTER_LIST_SCHEMA,
+                optional: true,
+            },
+            limit: {
+                type: RateLimitConfig,
+                flatten: true,
+            },
+            "transfer-last": {
+                schema: TRANSFER_LAST_SCHEMA,
+                optional: true,
+            },
+        },
+    },
+    access: {
+        description: r###"The user needs Remote.Backup privilege on '/remote/{remote}/{remote-store}'
+and needs to own the backup group. Datastore.Read is required on '/datastore/{store}'.
+The delete flag additionally requires the Remote.Prune privilege on '/remote/{remote}/{remote-store}'.
+        permission: &Permission::Anybody,
+    },
+/// Push store to other repository
+async fn push(
+    store: String,
+    ns: Option<BackupNamespace>,
+    remote: String,
+    remote_store: String,
+    remote_ns: Option<BackupNamespace>,
+    remove_vanished: Option<bool>,
+    max_depth: Option<usize>,
+    group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
+    transfer_last: Option<usize>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<String, Error> {
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let delete = remove_vanished.unwrap_or(false);
+    let ns = ns.unwrap_or_default();
+    let source_namespace = if !ns.is_root() {
+        Some(ns.to_string())
+    } else {
+        None
+    };
+    let remote_ns = remote_ns.unwrap_or_default();
+    let remote_namespace = if !remote_ns.is_root() {
+        Some(ns.to_string())
+    } else {
+        None
+    };
+    check_push_privs(
+        &auth_id,
+        &store,
+        source_namespace.as_deref(),
+        &remote,
+        &remote_store,
+        remote_namespace.as_deref(),
+        delete,
+    )?;
+    let push_params = PushParameters::new(
+        &store,
+        ns,
+        &remote,
+        &remote_store,
+        remote_ns,
+        auth_id.clone(),
+        remove_vanished,
+        max_depth,
+        group_filter,
+        limit,
+        transfer_last,
+    )?;
+    let upid_str = WorkerTask::spawn(
+        "sync",
+        Some(store.clone()),
+        auth_id.to_string(),
+        true,
+        move |worker| async move {
+            info!("push datastore '{store}' to '{remote}/{remote_store}'");
+            let push_future = push_store(push_params);
+            (select! {
+                success = push_future.fuse() => success,
+                abort = worker.abort_future().map(|_| Err(format_err!("push aborted"))) => abort,
+            })?;
+            info!("push datastore '{store}' end");
+            Ok(())
+        },
+    )?;
+    Ok(upid_str)
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PUSH);

