[pbs-devel] [PATCH v3 proxmox-backup 09/18] server: add LDAP realm sync job

Lukas Wagner l.wagner at proxmox.com
Thu Feb 9 14:31:19 CET 2023


This commit adds sync jobs for LDAP user sync. As of now, they
can only be started manually.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 pbs-api-types/src/user.rs    |   2 +-
 src/api2/access/domain.rs    |  85 ++++++-
 src/server/mod.rs            |   3 +
 src/server/realm_sync_job.rs | 463 +++++++++++++++++++++++++++++++++++
 www/Utils.js                 |   4 +-
 5 files changed, 551 insertions(+), 6 deletions(-)
 create mode 100644 src/server/realm_sync_job.rs

diff --git a/pbs-api-types/src/user.rs b/pbs-api-types/src/user.rs
index a7481190..21bf0e61 100644
--- a/pbs-api-types/src/user.rs
+++ b/pbs-api-types/src/user.rs
@@ -172,7 +172,7 @@ impl ApiToken {
         },
     }
 )]
-#[derive(Serialize, Deserialize, Updater)]
+#[derive(Serialize, Deserialize, Updater, PartialEq, Eq)]
 /// User properties.
 pub struct User {
     #[updater(skip)]
diff --git a/src/api2/access/domain.rs b/src/api2/access/domain.rs
index 3aaf98ae..31aa62bc 100644
--- a/src/api2/access/domain.rs
+++ b/src/api2/access/domain.rs
@@ -1,12 +1,16 @@
 //! List Authentication domains/realms
 
-use anyhow::Error;
+use anyhow::{format_err, Error};
 use serde_json::{json, Value};
 
-use proxmox_router::{Permission, Router, RpcEnvironment};
+use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap};
 use proxmox_schema::api;
 
-use pbs_api_types::BasicRealmInfo;
+use pbs_api_types::{
+    Authid, BasicRealmInfo, Realm, PRIV_PERMISSIONS_MODIFY, REMOVE_VANISHED_SCHEMA, UPID_SCHEMA,
+};
+
+use crate::server::jobstate::Job;
 
 #[api(
     returns: {
@@ -50,4 +54,77 @@ fn list_domains(rpcenv: &mut dyn RpcEnvironment) -> Result<Vec<BasicRealmInfo>,
     Ok(list)
 }
 
-pub const ROUTER: Router = Router::new().get(&API_METHOD_LIST_DOMAINS);
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            realm: {
+                type: Realm,
+            },
+            "dry-run": {
+                type: bool,
+                description: "If set, do not create/delete anything",
+                default: false,
+                optional: true,
+            },
+            "remove-vanished": {
+                optional: true,
+                schema: REMOVE_VANISHED_SCHEMA,
+            },
+            "enable-new": {
+                description: "Enable newly synced users immediately",
+                optional: true,
+            }
+         },
+    },
+    returns: {
+        schema: UPID_SCHEMA,
+    },
+    access: {
+        permission: &Permission::Privilege(&["access", "users"], PRIV_PERMISSIONS_MODIFY, false),
+    },
+)]
+/// Synchronize users of a given realm
+pub fn sync_realm(
+    realm: Realm,
+    dry_run: bool,
+    remove_vanished: Option<String>,
+    enable_new: Option<bool>,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+
+    let job = Job::new("realm-sync", realm.as_str())
+        .map_err(|_| format_err!("realm sync already running"))?;
+
+    let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
+
+    let upid_str = crate::server::do_realm_sync_job(
+        job,
+        realm.clone(),
+        &auth_id,
+        None,
+        to_stdout,
+        dry_run,
+        remove_vanished,
+        enable_new,
+    )
+    .map_err(|err| {
+        format_err!(
+            "unable to start realm sync job on realm {} - {}",
+            realm.as_str(),
+            err
+        )
+    })?;
+
+    Ok(json!(upid_str))
+}
+
+const SYNC_ROUTER: Router = Router::new().post(&API_METHOD_SYNC_REALM);
+const SYNC_SUBDIRS: SubdirMap = &[("sync", &SYNC_ROUTER)];
+
+const REALM_ROUTER: Router = Router::new().subdirs(SYNC_SUBDIRS);
+
+pub const ROUTER: Router = Router::new()
+    .get(&API_METHOD_LIST_DOMAINS)
+    .match_all("realm", &REALM_ROUTER);
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 06dcb867..ad865a6f 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -22,6 +22,9 @@ pub use prune_job::*;
 mod gc_job;
 pub use gc_job::*;
 
+mod realm_sync_job;
+pub use realm_sync_job::*;
+
 mod email_notifications;
 pub use email_notifications::*;
 
diff --git a/src/server/realm_sync_job.rs b/src/server/realm_sync_job.rs
new file mode 100644
index 00000000..0fd8adfe
--- /dev/null
+++ b/src/server/realm_sync_job.rs
@@ -0,0 +1,463 @@
+use anyhow::{bail, Context, Error};
+use pbs_config::{acl::AclTree, token_shadow, BackupLockGuard};
+use proxmox_ldap::{Config, Connection, SearchParameters, SearchResult};
+use proxmox_rest_server::WorkerTask;
+use proxmox_schema::ApiType;
+use proxmox_section_config::SectionConfigData;
+use proxmox_sys::task_log;
+
+use std::{collections::HashSet, sync::Arc};
+
+use pbs_api_types::{
+    ApiToken, Authid, LdapRealmConfig, Realm, RemoveVanished, SyncAttributes as LdapSyncAttributes,
+    SyncDefaultsOptions, User, Userid, REMOVE_VANISHED_ARRAY, USER_CLASSES_ARRAY,
+};
+
+use crate::{auth, server::jobstate::Job};
+
+/// Runs a realm sync job
+#[allow(clippy::too_many_arguments)]
+pub fn do_realm_sync_job(
+    mut job: Job,
+    realm: Realm,
+    auth_id: &Authid,
+    _schedule: Option<String>,
+    to_stdout: bool,
+    dry_run: bool,
+    remove_vanished: Option<String>,
+    enable_new: Option<bool>,
+) -> Result<String, Error> {
+    let worker_type = job.jobtype().to_string();
+    let upid_str = WorkerTask::spawn(
+        &worker_type,
+        Some(realm.as_str().to_owned()),
+        auth_id.to_string(),
+        to_stdout,
+        move |worker| {
+            job.start(&worker.upid().to_string()).unwrap();
+
+            task_log!(worker, "starting realm sync for {}", realm.as_str());
+
+            let override_settings = GeneralSyncSettingsOverride {
+                remove_vanished,
+                enable_new,
+            };
+
+            async move {
+                let sync_job = LdapRealmSyncJob::new(worker, realm, &override_settings, dry_run)?;
+                sync_job.sync().await
+            }
+        },
+    )?;
+
+    Ok(upid_str)
+}
+
+/// Implemenation for syncing LDAP realms
+struct LdapRealmSyncJob {
+    worker: Arc<WorkerTask>,
+    realm: Realm,
+    general_sync_settings: GeneralSyncSettings,
+    ldap_sync_settings: LdapSyncSettings,
+    ldap_config: Config,
+    dry_run: bool,
+}
+
+impl LdapRealmSyncJob {
+    /// Create new LdapRealmSyncJob
+    fn new(
+        worker: Arc<WorkerTask>,
+        realm: Realm,
+        override_settings: &GeneralSyncSettingsOverride,
+        dry_run: bool,
+    ) -> Result<Self, Error> {
+        let (domains, _digest) = pbs_config::domains::config()?;
+        let config = if let Ok(config) = domains.lookup::<LdapRealmConfig>("ldap", realm.as_str()) {
+            config
+        } else {
+            bail!("unknown realm '{}'", realm.as_str());
+        };
+
+        let sync_settings = GeneralSyncSettings::default()
+            .apply_config(&config)?
+            .apply_override(override_settings)?;
+        let sync_attributes = LdapSyncSettings::from_config(&config)?;
+
+        let ldap_config = auth::LdapAuthenticator::api_type_to_config(&config)?;
+
+        Ok(Self {
+            worker,
+            realm,
+            general_sync_settings: sync_settings,
+            ldap_sync_settings: sync_attributes,
+            ldap_config,
+            dry_run,
+        })
+    }
+
+    /// Perform realm synchronization
+    async fn sync(&self) -> Result<(), Error> {
+        if self.dry_run {
+            task_log!(
+                self.worker,
+                "this is a DRY RUN - changes will not be persisted"
+            );
+        }
+
+        let ldap = Connection::new(self.ldap_config.clone());
+
+        let parameters = SearchParameters {
+            attributes: self.ldap_sync_settings.attributes.clone(),
+            user_classes: self.ldap_sync_settings.user_classes.clone(),
+            user_filter: self.ldap_sync_settings.user_filter.clone(),
+        };
+
+        let users = ldap.search_entities(&parameters).await?;
+        self.update_user_config(&users)?;
+
+        Ok(())
+    }
+
+    fn update_user_config(&self, users: &[SearchResult]) -> Result<(), Error> {
+        let user_lock = pbs_config::user::lock_config()?;
+        let acl_lock = pbs_config::acl::lock_config()?;
+
+        let (mut user_config, _digest) = pbs_config::user::config()?;
+        let (mut tree, _) = pbs_config::acl::config()?;
+
+        let retrieved_users = self.create_or_update_users(&mut user_config, &user_lock, users)?;
+
+        if self.general_sync_settings.should_remove_entries() {
+            let vanished_users =
+                self.compute_vanished_users(&user_config, &user_lock, &retrieved_users)?;
+
+            self.delete_users(
+                &mut user_config,
+                &user_lock,
+                &mut tree,
+                &acl_lock,
+                &vanished_users,
+            )?;
+        }
+
+        if !self.dry_run {
+            pbs_config::user::save_config(&user_config).context("could not store user config")?;
+            pbs_config::acl::save_config(&tree).context("could not store acl config")?;
+        }
+
+        Ok(())
+    }
+
+    fn create_or_update_users(
+        &self,
+        user_config: &mut SectionConfigData,
+        _user_lock: &BackupLockGuard,
+        users: &[SearchResult],
+    ) -> Result<HashSet<Userid>, Error> {
+        let mut retrieved_users = HashSet::new();
+
+        for result in users {
+            let mut username = result
+                .attributes
+                .get(&self.ldap_sync_settings.user_attr)
+                .context("userid attribute not in search result")?
+                .get(0)
+                .context("userid attribute array is empty")?
+                .clone();
+
+            username.push_str(&format!("@{}", self.realm.as_str()));
+
+            let userid: Userid = username.parse()?;
+            retrieved_users.insert(userid.clone());
+
+            self.create_or_update_user(user_config, userid, result)?;
+        }
+
+        Ok(retrieved_users)
+    }
+
+    fn create_or_update_user(
+        &self,
+        user_config: &mut SectionConfigData,
+        userid: Userid,
+        result: &SearchResult,
+    ) -> Result<(), Error> {
+        let existing_user = user_config.lookup::<User>("user", userid.as_str()).ok();
+        let new_or_updated_user =
+            self.construct_or_update_user(result, userid, existing_user.as_ref());
+
+        if let Some(existing_user) = existing_user {
+            if existing_user != new_or_updated_user {
+                task_log!(
+                    self.worker,
+                    "updating user {}",
+                    new_or_updated_user.userid.as_str()
+                );
+            }
+        } else {
+            task_log!(
+                self.worker,
+                "creating user {}",
+                new_or_updated_user.userid.as_str()
+            );
+        }
+
+        user_config.set_data(
+            new_or_updated_user.userid.as_str(),
+            "user",
+            &new_or_updated_user,
+        )?;
+        Ok(())
+    }
+
+    fn construct_or_update_user(
+        &self,
+        result: &SearchResult,
+        userid: Userid,
+        existing_user: Option<&User>,
+    ) -> User {
+        let lookup = |a: Option<&String>| {
+            a.and_then(|e| result.attributes.get(e))
+                .and_then(|v| v.get(0))
+                .cloned()
+        };
+
+        User {
+            userid,
+            comment: existing_user.as_ref().and_then(|u| u.comment.clone()),
+            enable: existing_user
+                .and_then(|o| o.enable)
+                .or(Some(self.general_sync_settings.enable_new)),
+            expire: existing_user.and_then(|u| u.expire).or(Some(0)),
+            firstname: lookup(self.ldap_sync_settings.firstname_attr.as_ref()).or_else(|| {
+                if !self.general_sync_settings.should_remove_properties() {
+                    existing_user.and_then(|o| o.firstname.clone())
+                } else {
+                    None
+                }
+            }),
+            lastname: lookup(self.ldap_sync_settings.lastname_attr.as_ref()).or_else(|| {
+                if !self.general_sync_settings.should_remove_properties() {
+                    existing_user.and_then(|o| o.lastname.clone())
+                } else {
+                    None
+                }
+            }),
+            email: lookup(self.ldap_sync_settings.email_attr.as_ref()).or_else(|| {
+                if !self.general_sync_settings.should_remove_properties() {
+                    existing_user.and_then(|o| o.email.clone())
+                } else {
+                    None
+                }
+            }),
+        }
+    }
+
+    fn compute_vanished_users(
+        &self,
+        user_config: &SectionConfigData,
+        _user_lock: &BackupLockGuard,
+        synced_users: &HashSet<Userid>,
+    ) -> Result<Vec<Userid>, Error> {
+        Ok(user_config
+            .convert_to_typed_array::<User>("user")?
+            .into_iter()
+            .filter(|user| {
+                user.userid.realm() == self.realm && !synced_users.contains(&user.userid)
+            })
+            .map(|user| user.userid)
+            .collect())
+    }
+
+    fn delete_users(
+        &self,
+        user_config: &mut SectionConfigData,
+        _user_lock: &BackupLockGuard,
+        acl_config: &mut AclTree,
+        _acl_lock: &BackupLockGuard,
+        to_delete: &[Userid],
+    ) -> Result<(), Error> {
+        for userid in to_delete {
+            task_log!(self.worker, "deleting user {}", userid.as_str());
+
+            // Delete the user
+            user_config.sections.remove(userid.as_str());
+
+            if self.general_sync_settings.should_remove_acls() {
+                let auth_id = userid.clone().into();
+                // Delete the user's ACL entries
+                acl_config.delete_authid(&auth_id);
+            }
+
+            let user_tokens: Vec<ApiToken> = user_config
+                .convert_to_typed_array::<ApiToken>("token")?
+                .into_iter()
+                .filter(|token| token.tokenid.user().eq(userid))
+                .collect();
+
+            // Delete tokens, token secrets and ACLs corresponding to all tokens for a user
+            for token in user_tokens {
+                if let Some(name) = token.tokenid.tokenname() {
+                    let tokenid = Authid::from((userid.clone(), Some(name.to_owned())));
+                    let tokenid_string = tokenid.to_string();
+
+                    user_config.sections.remove(&tokenid_string);
+
+                    if !self.dry_run {
+                        token_shadow::delete_secret(&tokenid)?;
+                    }
+
+                    if self.general_sync_settings.should_remove_acls() {
+                        acl_config.delete_authid(&tokenid);
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+/// General realm sync settings - Override for manual invokation
+struct GeneralSyncSettingsOverride {
+    remove_vanished: Option<String>,
+    enable_new: Option<bool>,
+}
+
+/// General realm sync settings from the realm configuration
+struct GeneralSyncSettings {
+    remove_vanished: Vec<RemoveVanished>,
+    enable_new: bool,
+}
+
+/// LDAP-specific realm sync settings from the realm configuration
+struct LdapSyncSettings {
+    user_attr: String,
+    firstname_attr: Option<String>,
+    lastname_attr: Option<String>,
+    email_attr: Option<String>,
+    attributes: Vec<String>,
+    user_classes: Vec<String>,
+    user_filter: Option<String>,
+}
+
+impl LdapSyncSettings {
+    fn from_config(config: &LdapRealmConfig) -> Result<Self, Error> {
+        let mut attributes = vec![config.user_attr.clone()];
+
+        let mut email = None;
+        let mut firstname = None;
+        let mut lastname = None;
+
+        if let Some(sync_attributes) = &config.sync_attributes {
+            let value = LdapSyncAttributes::API_SCHEMA.parse_property_string(sync_attributes)?;
+            let sync_attributes: LdapSyncAttributes = serde_json::from_value(value)?;
+
+            email = sync_attributes.email.clone();
+            firstname = sync_attributes.firstname.clone();
+            lastname = sync_attributes.lastname.clone();
+
+            if let Some(email_attr) = sync_attributes.email {
+                attributes.push(email_attr);
+            }
+
+            if let Some(firstname_attr) = sync_attributes.firstname {
+                attributes.push(firstname_attr);
+            }
+
+            if let Some(lastname_attr) = sync_attributes.lastname {
+                attributes.push(lastname_attr);
+            }
+        }
+
+        let user_classes = if let Some(user_classes) = &config.user_classes {
+            let a = USER_CLASSES_ARRAY.parse_property_string(user_classes)?;
+            serde_json::from_value(a)?
+        } else {
+            vec![
+                "posixaccount".into(),
+                "person".into(),
+                "inetorgperson".into(),
+                "user".into(),
+            ]
+        };
+
+        Ok(Self {
+            user_attr: config.user_attr.clone(),
+            firstname_attr: firstname,
+            lastname_attr: lastname,
+            email_attr: email,
+            attributes,
+            user_classes,
+            user_filter: config.filter.clone(),
+        })
+    }
+}
+
+impl Default for GeneralSyncSettings {
+    fn default() -> Self {
+        Self {
+            remove_vanished: Default::default(),
+            enable_new: true,
+        }
+    }
+}
+
+impl GeneralSyncSettings {
+    fn apply_config(self, config: &LdapRealmConfig) -> Result<Self, Error> {
+        let mut enable_new = None;
+        let mut remove_vanished = None;
+
+        if let Some(sync_defaults_options) = &config.sync_defaults_options {
+            let sync_defaults_options = Self::parse_sync_defaults_options(sync_defaults_options)?;
+
+            enable_new = sync_defaults_options.enable_new;
+
+            if let Some(vanished) = sync_defaults_options.remove_vanished.as_deref() {
+                remove_vanished = Some(Self::parse_remove_vanished(vanished)?);
+            }
+        }
+
+        Ok(Self {
+            enable_new: enable_new.unwrap_or(self.enable_new),
+            remove_vanished: remove_vanished.unwrap_or(self.remove_vanished),
+        })
+    }
+
+    fn apply_override(self, override_config: &GeneralSyncSettingsOverride) -> Result<Self, Error> {
+        let enable_new = override_config.enable_new;
+        let remove_vanished = if let Some(s) = override_config.remove_vanished.as_deref() {
+            Some(Self::parse_remove_vanished(s)?)
+        } else {
+            None
+        };
+
+        Ok(Self {
+            enable_new: enable_new.unwrap_or(self.enable_new),
+            remove_vanished: remove_vanished.unwrap_or(self.remove_vanished),
+        })
+    }
+
+    fn parse_sync_defaults_options(s: &str) -> Result<SyncDefaultsOptions, Error> {
+        let value = SyncDefaultsOptions::API_SCHEMA.parse_property_string(s)?;
+        Ok(serde_json::from_value(value)?)
+    }
+
+    fn parse_remove_vanished(s: &str) -> Result<Vec<RemoveVanished>, Error> {
+        Ok(serde_json::from_value(
+            REMOVE_VANISHED_ARRAY.parse_property_string(s)?,
+        )?)
+    }
+
+    fn should_remove_properties(&self) -> bool {
+        self.remove_vanished.contains(&RemoveVanished::Properties)
+    }
+
+    fn should_remove_entries(&self) -> bool {
+        self.remove_vanished.contains(&RemoveVanished::Entry)
+    }
+
+    fn should_remove_acls(&self) -> bool {
+        self.remove_vanished.contains(&RemoveVanished::Acl)
+    }
+}
diff --git a/www/Utils.js b/www/Utils.js
index 3d51d6d2..2eca600e 100644
--- a/www/Utils.js
+++ b/www/Utils.js
@@ -337,7 +337,7 @@ Ext.define('PBS.Utils', {
 	    handler: function() {
 		window.open(docsURI);
 	    },
-        };
+	};
     },
 
     calculate_dedup_factor: function(gcstatus) {
@@ -406,6 +406,7 @@ Ext.define('PBS.Utils', {
 	    "format-media": [gettext('Drive'), gettext('Format media')],
 	    "forget-group": [gettext('Group'), gettext('Remove Group')],
 	    garbage_collection: ['Datastore', gettext('Garbage Collect')],
+	    'realm-sync': ['Realm', gettext('User Sync')],
 	    'inventory-update': [gettext('Drive'), gettext('Inventory Update')],
 	    'label-media': [gettext('Drive'), gettext('Label Media')],
 	    'load-media': (type, id) => PBS.Utils.render_drive_load_media_id(id, gettext('Load Media')),
@@ -433,6 +434,7 @@ Ext.define('PBS.Utils', {
 		add: false,
 		edit: false,
 		pwchange: true,
+		sync: false,
 	    },
 	});
     },
-- 
2.30.2






More information about the pbs-devel mailing list