[pve-devel] [PATCH v3 proxmox 09/66] notify: add notification groups
Lukas Wagner
l.wagner at proxmox.com
Mon Jul 17 16:59:54 CEST 2023
When notifying via a group, all endpoints contained in that group
will send out the notification.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
proxmox-notify/src/config.rs | 9 ++
proxmox-notify/src/group.rs | 40 +++++++++
proxmox-notify/src/lib.rs | 170 ++++++++++++++++++++++++++++-------
3 files changed, 189 insertions(+), 30 deletions(-)
create mode 100644 proxmox-notify/src/group.rs
diff --git a/proxmox-notify/src/config.rs b/proxmox-notify/src/config.rs
index 5508c916..53817254 100644
--- a/proxmox-notify/src/config.rs
+++ b/proxmox-notify/src/config.rs
@@ -2,6 +2,7 @@ use lazy_static::lazy_static;
use proxmox_schema::{ApiType, ObjectSchema};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+use crate::group::{GroupConfig, GROUP_TYPENAME};
use crate::schema::BACKEND_NAME_SCHEMA;
use crate::Error;
@@ -36,6 +37,14 @@ fn config_init() -> SectionConfig {
));
}
+ const GROUP_SCHEMA: &ObjectSchema = GroupConfig::API_SCHEMA.unwrap_object_schema();
+
+ config.register_plugin(SectionConfigPlugin::new(
+ GROUP_TYPENAME.to_string(),
+ Some(String::from("name")),
+ GROUP_SCHEMA,
+ ));
+
config
}
diff --git a/proxmox-notify/src/group.rs b/proxmox-notify/src/group.rs
new file mode 100644
index 00000000..d9ded2dd
--- /dev/null
+++ b/proxmox-notify/src/group.rs
@@ -0,0 +1,40 @@
+use crate::schema::COMMENT_SCHEMA;
+use proxmox_schema::{api, Updater};
+use serde::{Deserialize, Serialize};
+
+pub(crate) const GROUP_TYPENAME: &str = "group";
+
+#[api(
+ properties: {
+ "endpoint": {
+ type: Array,
+ items: {
+ description: "Name of the included endpoint(s)",
+ type: String,
+ },
+ },
+ comment: {
+ optional: true,
+ schema: COMMENT_SCHEMA,
+ },
+ },
+)]
+#[derive(Debug, Serialize, Deserialize, Updater, Default)]
+#[serde(rename_all = "kebab-case")]
+/// Config for notification channels
+pub struct GroupConfig {
+ /// Name of the channel
+ #[updater(skip)]
+ pub name: String,
+ /// Endpoints for this channel
+ pub endpoint: Vec<String>,
+ /// Comment
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub comment: Option<String>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub enum DeleteableGroupProperty {
+ Comment,
+}
diff --git a/proxmox-notify/src/lib.rs b/proxmox-notify/src/lib.rs
index 83991add..35d5208b 100644
--- a/proxmox-notify/src/lib.rs
+++ b/proxmox-notify/src/lib.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::fmt::Display;
+use group::{GroupConfig, GROUP_TYPENAME};
use proxmox_schema::api;
use proxmox_section_config::SectionConfigData;
use serde::{Deserialize, Serialize};
@@ -12,6 +13,7 @@ use std::error::Error as StdError;
pub mod api;
mod config;
pub mod endpoints;
+pub mod group;
pub mod schema;
#[derive(Debug)]
@@ -20,6 +22,7 @@ pub enum Error {
ConfigDeserialization(Box<dyn StdError + Send + Sync + 'static>),
NotifyFailed(String, Box<dyn StdError + Send + Sync + 'static>),
TargetDoesNotExist(String),
+ TargetTestFailed(Vec<Box<dyn StdError + Send + Sync + 'static>>),
}
impl Display for Error {
@@ -37,6 +40,13 @@ impl Display for Error {
Error::TargetDoesNotExist(target) => {
write!(f, "notification target '{target}' does not exist")
}
+ Error::TargetTestFailed(errs) => {
+ for err in errs {
+ writeln!(f, "{err}")?;
+ }
+
+ Ok(())
+ }
}
}
}
@@ -48,6 +58,7 @@ impl StdError for Error {
Error::ConfigDeserialization(err) => Some(&**err),
Error::NotifyFailed(_, err) => Some(&**err),
Error::TargetDoesNotExist(_) => None,
+ Error::TargetTestFailed(errs) => Some(&*errs[0]),
}
}
}
@@ -149,6 +160,7 @@ impl Config {
#[derive(Default)]
pub struct Bus {
endpoints: HashMap<String, Box<dyn Endpoint>>,
+ groups: HashMap<String, GroupConfig>,
}
#[allow(unused_macros)]
@@ -246,7 +258,15 @@ impl Bus {
);
}
- Ok(Bus { endpoints })
+ let groups: HashMap<String, GroupConfig> = config
+ .config
+ .convert_to_typed_array(GROUP_TYPENAME)
+ .map_err(|err| Error::ConfigDeserialization(err.into()))?
+ .into_iter()
+ .map(|group: GroupConfig| (group.name.clone(), group))
+ .collect();
+
+ Ok(Bus { endpoints, groups })
}
#[cfg(test)]
@@ -254,39 +274,76 @@ impl Bus {
self.endpoints.insert(endpoint.name().to_string(), endpoint);
}
- pub fn send(&self, target: &str, notification: &Notification) -> Result<(), Error> {
- log::info!(
- "sending notification with title '{title}'",
- title = notification.title
- );
-
- let endpoint = self
- .endpoints
- .get(target)
- .ok_or(Error::TargetDoesNotExist(target.into()))?;
+ #[cfg(test)]
+ pub fn add_group(&mut self, group: GroupConfig) {
+ self.groups.insert(group.name.clone(), group);
+ }
- endpoint.send(notification).unwrap_or_else(|e| {
- log::error!(
- "could not notfiy via endpoint `{name}`: {e}",
- name = endpoint.name()
- )
- });
+ /// Send a notification to a given target (endpoint or group).
+ ///
+ /// Any errors will not be returned but only logged.
+ pub fn send(&self, endpoint_or_group: &str, notification: &Notification) {
+ if let Some(group) = self.groups.get(endpoint_or_group) {
+ for endpoint in &group.endpoint {
+ self.send_via_single_endpoint(endpoint, notification);
+ }
+ } else {
+ self.send_via_single_endpoint(endpoint_or_group, notification);
+ }
+ }
- Ok(())
+ fn send_via_single_endpoint(&self, endpoint: &str, notification: &Notification) {
+ if let Some(endpoint) = self.endpoints.get(endpoint) {
+ if let Err(e) = endpoint.send(notification) {
+ // Only log on errors, do not propagate fail to the caller.
+ log::error!(
+ "could not notify via target `{name}`: {e}",
+ name = endpoint.name()
+ );
+ } else {
+ log::info!("notified via endpoint `{name}`", name = endpoint.name());
+ }
+ } else {
+ log::error!("could not notify via endpoint '{endpoint}', it does not exist");
+ }
}
+ /// Send a test notification to a target (endpoint or group).
+ ///
+ /// In contrast to the `send` function, this function will return
+ /// any errors to the caller.
pub fn test_target(&self, target: &str) -> Result<(), Error> {
- let endpoint = self
- .endpoints
- .get(target)
- .ok_or(Error::TargetDoesNotExist(target.into()))?;
-
- endpoint.send(&Notification {
+ let notification = Notification {
severity: Severity::Info,
title: "Test notification".into(),
body: "This is a test of the notification target '{{ target }}'".into(),
properties: Some(json!({ "target": target })),
- })?;
+ };
+
+ let mut errors: Vec<Box<dyn StdError + Send + Sync>> = Vec::new();
+
+ let mut my_send = |target: &str| -> Result<(), Error> {
+ if let Some(endpoint) = self.endpoints.get(target) {
+ if let Err(e) = endpoint.send(¬ification) {
+ errors.push(Box::new(e));
+ }
+ } else {
+ return Err(Error::TargetDoesNotExist(target.to_string()));
+ }
+ Ok(())
+ };
+
+ if let Some(group) = self.groups.get(target) {
+ for endpoint_name in &group.endpoint {
+ my_send(endpoint_name)?;
+ }
+ } else {
+ my_send(target)?;
+ }
+
+ if !errors.is_empty() {
+ return Err(Error::TargetTestFailed(errors));
+ }
Ok(())
}
@@ -300,6 +357,7 @@ mod tests {
#[derive(Default, Clone)]
struct MockEndpoint {
+ name: &'static str,
messages: Rc<RefCell<Vec<Notification>>>,
}
@@ -311,11 +369,18 @@ mod tests {
}
fn name(&self) -> &str {
- "mock-endpoint"
+ self.name
}
}
impl MockEndpoint {
+ fn new(name: &'static str, filter: Option<String>) -> Self {
+ Self {
+ name,
+ ..Default::default()
+ }
+ }
+
fn messages(&self) -> Vec<Notification> {
self.messages.borrow().clone()
}
@@ -323,24 +388,69 @@ mod tests {
#[test]
fn test_add_mock_endpoint() -> Result<(), Error> {
- let mock = MockEndpoint::default();
+ let mock = MockEndpoint::new("endpoint", None);
let mut bus = Bus::default();
-
bus.add_endpoint(Box::new(mock.clone()));
+ // Send directly to endpoint
bus.send(
- "mock-endpoint",
+ "endpoint",
&Notification {
title: "Title".into(),
body: "Body".into(),
severity: Severity::Info,
properties: Default::default(),
},
- )?;
+ );
let messages = mock.messages();
assert_eq!(messages.len(), 1);
Ok(())
}
+
+ #[test]
+ fn test_groups() -> Result<(), Error> {
+ let endpoint1 = MockEndpoint::new("mock1", None);
+ let endpoint2 = MockEndpoint::new("mock2", None);
+
+ let mut bus = Bus::default();
+
+ bus.add_group(GroupConfig {
+ name: "group1".to_string(),
+ endpoint: vec!["mock1".into()],
+ comment: None,
+ });
+
+ bus.add_group(GroupConfig {
+ name: "group2".to_string(),
+ endpoint: vec!["mock2".into()],
+ comment: None,
+ });
+
+ bus.add_endpoint(Box::new(endpoint1.clone()));
+ bus.add_endpoint(Box::new(endpoint2.clone()));
+
+ let send_to_group = |channel| {
+ bus.send(
+ channel,
+ &Notification {
+ title: "Title".into(),
+ body: "Body".into(),
+ severity: Severity::Info,
+ properties: Default::default(),
+ },
+ )
+ };
+
+ send_to_group("group1");
+ assert_eq!(endpoint1.messages().len(), 1);
+ assert_eq!(endpoint2.messages().len(), 0);
+
+ send_to_group("group2");
+ assert_eq!(endpoint1.messages().len(), 1);
+ assert_eq!(endpoint2.messages().len(), 1);
+
+ Ok(())
+ }
}
--
2.39.2
More information about the pve-devel
mailing list