[pve-devel] [PATCH v4 proxmox 11/69] notify: add notification groups

Lukas Wagner l.wagner at proxmox.com
Thu Jul 20 16:31:38 CEST 2023


When notifying via a group, all endpoints contained in that group
will send out the notification.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 proxmox-notify/src/config.rs |   9 ++
 proxmox-notify/src/group.rs  |  41 +++++++++
 proxmox-notify/src/lib.rs    | 170 ++++++++++++++++++++++++++++-------
 3 files changed, 190 insertions(+), 30 deletions(-)
 create mode 100644 proxmox-notify/src/group.rs

diff --git a/proxmox-notify/src/config.rs b/proxmox-notify/src/config.rs
index 5508c916..53817254 100644
--- a/proxmox-notify/src/config.rs
+++ b/proxmox-notify/src/config.rs
@@ -2,6 +2,7 @@ use lazy_static::lazy_static;
 use proxmox_schema::{ApiType, ObjectSchema};
 use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
 
+use crate::group::{GroupConfig, GROUP_TYPENAME};
 use crate::schema::BACKEND_NAME_SCHEMA;
 use crate::Error;
 
@@ -36,6 +37,14 @@ fn config_init() -> SectionConfig {
         ));
     }
 
+    const GROUP_SCHEMA: &ObjectSchema = GroupConfig::API_SCHEMA.unwrap_object_schema();
+
+    config.register_plugin(SectionConfigPlugin::new(
+        GROUP_TYPENAME.to_string(),
+        Some(String::from("name")),
+        GROUP_SCHEMA,
+    ));
+
     config
 }
 
diff --git a/proxmox-notify/src/group.rs b/proxmox-notify/src/group.rs
new file mode 100644
index 00000000..bf0b42e5
--- /dev/null
+++ b/proxmox-notify/src/group.rs
@@ -0,0 +1,41 @@
+use crate::schema::ENTITY_NAME_SCHEMA;
+use proxmox_schema::api_types::COMMENT_SCHEMA;
+use proxmox_schema::{api, Updater};
+use serde::{Deserialize, Serialize};
+
+pub(crate) const GROUP_TYPENAME: &str = "group";
+
+#[api(
+    properties: {
+        "endpoint": {
+            type: Array,
+            items: {
+                description: "Name of the included endpoint(s)",
+                type: String,
+            },
+        },
+        comment: {
+            optional: true,
+            schema: COMMENT_SCHEMA,
+        },
+    },
+)]
+#[derive(Debug, Serialize, Deserialize, Updater, Default)]
+#[serde(rename_all = "kebab-case")]
+/// Config for notification channels
+pub struct GroupConfig {
+    /// Name of the channel
+    #[updater(skip)]
+    pub name: String,
+    /// Endpoints for this channel
+    pub endpoint: Vec<String>,
+    /// Comment
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub comment: Option<String>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub enum DeleteableGroupProperty {
+    Comment,
+}
diff --git a/proxmox-notify/src/lib.rs b/proxmox-notify/src/lib.rs
index da794c07..bb35199f 100644
--- a/proxmox-notify/src/lib.rs
+++ b/proxmox-notify/src/lib.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::fmt::Display;
 
+use group::{GroupConfig, GROUP_TYPENAME};
 use proxmox_schema::api;
 use proxmox_section_config::SectionConfigData;
 use serde::{Deserialize, Serialize};
@@ -12,6 +13,7 @@ use std::error::Error as StdError;
 pub mod api;
 mod config;
 pub mod endpoints;
+pub mod group;
 pub mod schema;
 
 #[derive(Debug)]
@@ -20,6 +22,7 @@ pub enum Error {
     ConfigDeserialization(Box<dyn StdError + Send + Sync>),
     NotifyFailed(String, Box<dyn StdError + Send + Sync>),
     TargetDoesNotExist(String),
+    TargetTestFailed(Vec<Box<dyn StdError + Send + Sync + 'static>>),
 }
 
 impl Display for Error {
@@ -37,6 +40,13 @@ impl Display for Error {
             Error::TargetDoesNotExist(target) => {
                 write!(f, "notification target '{target}' does not exist")
             }
+            Error::TargetTestFailed(errs) => {
+                for err in errs {
+                    writeln!(f, "{err}")?;
+                }
+
+                Ok(())
+            }
         }
     }
 }
@@ -48,6 +58,7 @@ impl StdError for Error {
             Error::ConfigDeserialization(err) => Some(&**err),
             Error::NotifyFailed(_, err) => Some(&**err),
             Error::TargetDoesNotExist(_) => None,
+            Error::TargetTestFailed(errs) => Some(&*errs[0]),
         }
     }
 }
@@ -131,6 +142,7 @@ impl Config {
 #[derive(Default)]
 pub struct Bus {
     endpoints: HashMap<String, Box<dyn Endpoint>>,
+    groups: HashMap<String, GroupConfig>,
 }
 
 #[allow(unused_macros)]
@@ -234,7 +246,15 @@ impl Bus {
             );
         }
 
-        Ok(Bus { endpoints })
+        let groups: HashMap<String, GroupConfig> = config
+            .config
+            .convert_to_typed_array(GROUP_TYPENAME)
+            .map_err(|err| Error::ConfigDeserialization(err.into()))?
+            .into_iter()
+            .map(|group: GroupConfig| (group.name.clone(), group))
+            .collect();
+
+        Ok(Bus { endpoints, groups })
     }
 
     #[cfg(test)]
@@ -242,39 +262,76 @@ impl Bus {
         self.endpoints.insert(endpoint.name().to_string(), endpoint);
     }
 
-    pub fn send(&self, target: &str, notification: &Notification) -> Result<(), Error> {
-        log::info!(
-            "sending notification with title '{title}'",
-            title = notification.title
-        );
-
-        let endpoint = self
-            .endpoints
-            .get(target)
-            .ok_or(Error::TargetDoesNotExist(target.into()))?;
+    #[cfg(test)]
+    pub fn add_group(&mut self, group: GroupConfig) {
+        self.groups.insert(group.name.clone(), group);
+    }
 
-        endpoint.send(notification).unwrap_or_else(|e| {
-            log::error!(
-                "could not notfiy via endpoint `{name}`: {e}",
-                name = endpoint.name()
-            )
-        });
+    /// Send a notification to a given target (endpoint or group).
+    ///
+    /// Any errors will not be returned but only logged.
+    pub fn send(&self, endpoint_or_group: &str, notification: &Notification) {
+        if let Some(group) = self.groups.get(endpoint_or_group) {
+            for endpoint in &group.endpoint {
+                self.send_via_single_endpoint(endpoint, notification);
+            }
+        } else {
+            self.send_via_single_endpoint(endpoint_or_group, notification);
+        }
+    }
 
-        Ok(())
+    fn send_via_single_endpoint(&self, endpoint: &str, notification: &Notification) {
+        if let Some(endpoint) = self.endpoints.get(endpoint) {
+            if let Err(e) = endpoint.send(notification) {
+                // Only log on errors, do not propagate fail to the caller.
+                log::error!(
+                    "could not notify via target `{name}`: {e}",
+                    name = endpoint.name()
+                );
+            } else {
+                log::info!("notified via endpoint `{name}`", name = endpoint.name());
+            }
+        } else {
+            log::error!("could not notify via endpoint '{endpoint}', it does not exist");
+        }
     }
 
+    /// Send a test notification to a target (endpoint or group).
+    ///
+    /// In contrast to the `send` function, this function will return
+    /// any errors to the caller.
     pub fn test_target(&self, target: &str) -> Result<(), Error> {
-        let endpoint = self
-            .endpoints
-            .get(target)
-            .ok_or(Error::TargetDoesNotExist(target.into()))?;
-
-        endpoint.send(&Notification {
+        let notification = Notification {
             severity: Severity::Info,
             title: "Test notification".into(),
             body: "This is a test of the notification target '{{ target }}'".into(),
             properties: Some(json!({ "target": target })),
-        })?;
+        };
+
+        let mut errors: Vec<Box<dyn StdError + Send + Sync>> = Vec::new();
+
+        let mut my_send = |target: &str| -> Result<(), Error> {
+            if let Some(endpoint) = self.endpoints.get(target) {
+                if let Err(e) = endpoint.send(&notification) {
+                    errors.push(Box::new(e));
+                }
+            } else {
+                return Err(Error::TargetDoesNotExist(target.to_string()));
+            }
+            Ok(())
+        };
+
+        if let Some(group) = self.groups.get(target) {
+            for endpoint_name in &group.endpoint {
+                my_send(endpoint_name)?;
+            }
+        } else {
+            my_send(target)?;
+        }
+
+        if !errors.is_empty() {
+            return Err(Error::TargetTestFailed(errors));
+        }
 
         Ok(())
     }
@@ -288,6 +345,7 @@ mod tests {
 
     #[derive(Default, Clone)]
     struct MockEndpoint {
+        name: &'static str,
         // Needs to be an Rc so that we can clone MockEndpoint before
         // passing it to Bus, while still retaining a handle to the Vec
         messages: Rc<RefCell<Vec<Notification>>>,
@@ -301,11 +359,18 @@ mod tests {
         }
 
         fn name(&self) -> &str {
-            "mock-endpoint"
+            self.name
         }
     }
 
     impl MockEndpoint {
+        fn new(name: &'static str, filter: Option<String>) -> Self {
+            Self {
+                name,
+                ..Default::default()
+            }
+        }
+
         fn messages(&self) -> Vec<Notification> {
             self.messages.borrow().clone()
         }
@@ -313,24 +378,69 @@ mod tests {
 
     #[test]
     fn test_add_mock_endpoint() -> Result<(), Error> {
-        let mock = MockEndpoint::default();
+        let mock = MockEndpoint::new("endpoint", None);
 
         let mut bus = Bus::default();
-
         bus.add_endpoint(Box::new(mock.clone()));
 
+        // Send directly to endpoint
         bus.send(
-            "mock-endpoint",
+            "endpoint",
             &Notification {
                 title: "Title".into(),
                 body: "Body".into(),
                 severity: Severity::Info,
                 properties: Default::default(),
             },
-        )?;
+        );
         let messages = mock.messages();
         assert_eq!(messages.len(), 1);
 
         Ok(())
     }
+
+    #[test]
+    fn test_groups() -> Result<(), Error> {
+        let endpoint1 = MockEndpoint::new("mock1", None);
+        let endpoint2 = MockEndpoint::new("mock2", None);
+
+        let mut bus = Bus::default();
+
+        bus.add_group(GroupConfig {
+            name: "group1".to_string(),
+            endpoint: vec!["mock1".into()],
+            comment: None,
+        });
+
+        bus.add_group(GroupConfig {
+            name: "group2".to_string(),
+            endpoint: vec!["mock2".into()],
+            comment: None,
+        });
+
+        bus.add_endpoint(Box::new(endpoint1.clone()));
+        bus.add_endpoint(Box::new(endpoint2.clone()));
+
+        let send_to_group = |channel| {
+            bus.send(
+                channel,
+                &Notification {
+                    title: "Title".into(),
+                    body: "Body".into(),
+                    severity: Severity::Info,
+                    properties: Default::default(),
+                },
+            )
+        };
+
+        send_to_group("group1");
+        assert_eq!(endpoint1.messages().len(), 1);
+        assert_eq!(endpoint2.messages().len(), 0);
+
+        send_to_group("group2");
+        assert_eq!(endpoint1.messages().len(), 1);
+        assert_eq!(endpoint2.messages().len(), 1);
+
+        Ok(())
+    }
 }
-- 
2.39.2






More information about the pve-devel mailing list