[pve-devel] [PATCH v2 proxmox 08/42] notify: add notification channels

Lukas Wagner l.wagner at proxmox.com
Wed May 24 15:56:15 CEST 2023


A notification channel is basically a 'group' of endpoints.
When notifying, a notification is now sent to a 'channel',
and forwared to all included endpoints.

To illustrate why the channel concept is useful, consider a backup job.
The plan is to provide a new option there where the user can choose a
notification channel that should be used for any notifications.
The channel decouples the job configuration from any
endpoints present in the system.
I expected this to be nicer than:
  - notifying via *all* endpoints. If this is not desired, the user
    would be forced to configure notification filtering (to be
    introduced in a later patch). The filtering approach is a bit
    cumbersome, since it requires the filter to be adapted for each and
    every new backup job.
  - adding the endpoints directly to the job configuration. This would
    mean that new/removed endpoints have to be added/removed from *all*
    affected backup job configurations.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 proxmox-notify/src/channel.rs |  53 ++++++++++++++
 proxmox-notify/src/config.rs  |   9 +++
 proxmox-notify/src/lib.rs     | 133 ++++++++++++++++++++++++++++++----
 3 files changed, 179 insertions(+), 16 deletions(-)
 create mode 100644 proxmox-notify/src/channel.rs

diff --git a/proxmox-notify/src/channel.rs b/proxmox-notify/src/channel.rs
new file mode 100644
index 00000000..dc9edf98
--- /dev/null
+++ b/proxmox-notify/src/channel.rs
@@ -0,0 +1,53 @@
+use crate::schema::COMMENT_SCHEMA;
+use proxmox_schema::{api, Updater};
+use serde::{Deserialize, Serialize};
+
+pub(crate) const CHANNEL_TYPENAME: &str = "channel";
+
+#[api(
+    properties: {
+        "endpoint": {
+            optional: true,
+            type: Array,
+            items: {
+                description: "Name of the included endpoint(s)",
+                type: String,
+            },
+        },
+        comment: {
+            optional: true,
+            schema: COMMENT_SCHEMA,
+        },
+    },
+)]
+#[derive(Debug, Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// Config for notification channels
+pub struct ChannelConfig {
+    /// Name of the channel
+    #[updater(skip)]
+    pub name: String,
+    /// Endpoints for this channel
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub endpoint: Option<Vec<String>>,
+    /// Comment
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub comment: Option<String>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub enum DeleteableChannelProperty {
+    Endpoint,
+    Comment,
+}
+
+impl ChannelConfig {
+    pub fn should_notify_via_endpoint(&self, endpoint: &str) -> bool {
+        if let Some(endpoints) = &self.endpoint {
+            endpoints.iter().any(|e| *e == endpoint)
+        } else {
+            false
+        }
+    }
+}
diff --git a/proxmox-notify/src/config.rs b/proxmox-notify/src/config.rs
index 362ca0fc..3064065b 100644
--- a/proxmox-notify/src/config.rs
+++ b/proxmox-notify/src/config.rs
@@ -2,6 +2,7 @@ use lazy_static::lazy_static;
 use proxmox_schema::{ApiType, ObjectSchema};
 use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
 
+use crate::channel::{ChannelConfig, CHANNEL_TYPENAME};
 use crate::schema::BACKEND_NAME_SCHEMA;
 use crate::Error;
 
@@ -13,6 +14,14 @@ lazy_static! {
 fn config_init() -> SectionConfig {
     let mut config = SectionConfig::new(&BACKEND_NAME_SCHEMA);
 
+    const CHANNEL_SCHEMA: &ObjectSchema = ChannelConfig::API_SCHEMA.unwrap_object_schema();
+
+    config.register_plugin(SectionConfigPlugin::new(
+        CHANNEL_TYPENAME.to_string(),
+        Some(String::from("name")),
+        CHANNEL_SCHEMA,
+    ));
+
     config
 }
 
diff --git a/proxmox-notify/src/lib.rs b/proxmox-notify/src/lib.rs
index f2d0e16c..3b881e26 100644
--- a/proxmox-notify/src/lib.rs
+++ b/proxmox-notify/src/lib.rs
@@ -1,5 +1,6 @@
 use std::fmt::Display;
 
+use channel::{ChannelConfig, CHANNEL_TYPENAME};
 use proxmox_schema::api;
 use proxmox_section_config::SectionConfigData;
 use serde::{Deserialize, Serialize};
@@ -9,6 +10,7 @@ use serde_json::Value;
 use std::error::Error as StdError;
 
 pub mod api;
+pub mod channel;
 mod config;
 pub mod endpoints;
 pub mod schema;
@@ -19,6 +21,7 @@ pub enum Error {
     ConfigDeserialization(Box<dyn StdError + Send + Sync + 'static>),
     NotifyFailed(String, Box<dyn StdError + Send + Sync + 'static>),
     EndpointDoesNotExist(String),
+    ChannelDoesNotExist(String),
 }
 
 impl Display for Error {
@@ -36,6 +39,9 @@ impl Display for Error {
             Error::EndpointDoesNotExist(endpoint) => {
                 write!(f, "endpoint '{endpoint}' does not exist")
             }
+            Error::ChannelDoesNotExist(channel) => {
+                write!(f, "channel '{channel}' does not exist")
+            }
         }
     }
 }
@@ -47,6 +53,7 @@ impl StdError for Error {
             Error::ConfigDeserialization(err) => Some(&**err),
             Error::NotifyFailed(_, err) => Some(&**err),
             Error::EndpointDoesNotExist(_) => None,
+            Error::ChannelDoesNotExist(_) => None,
         }
     }
 }
@@ -142,6 +149,7 @@ impl Config {
 #[derive(Default)]
 pub struct Bus {
     endpoints: Vec<Box<dyn Endpoint>>,
+    channels: Vec<ChannelConfig>,
 }
 
 #[allow(unused_macros)]
@@ -204,7 +212,15 @@ impl Bus {
     pub fn from_config(config: &Config) -> Result<Self, Error> {
         let mut endpoints = Vec::new();
 
-        Ok(Bus { endpoints })
+        let channels = config
+            .config
+            .convert_to_typed_array(CHANNEL_TYPENAME)
+            .map_err(|err| Error::ConfigDeserialization(err.into()))?;
+
+        Ok(Bus {
+            endpoints,
+            channels,
+        })
     }
 
     #[cfg(test)]
@@ -212,20 +228,43 @@ impl Bus {
         self.endpoints.push(endpoint);
     }
 
-    /// Send a notification to all registered endpoints
-    pub fn send(&self, notification: &Notification) -> Result<(), Error> {
-        log::info!(
-            "sending notification with title '{title}'",
+    #[cfg(test)]
+    pub fn add_channel(&mut self, channel: ChannelConfig) {
+        self.channels.push(channel);
+    }
+
+    pub fn send(&self, channel: &str, notification: &Notification) -> Result<(), Error> {
+        log::debug!(
+            "sending notification with title `{title}`",
             title = notification.title
         );
 
+        // TODO: Maybe fallback to some default channel (e.g. send to all endpoints) in case
+        // the channel does not exist? Just to ensure that notifications are *never* swallowed...
+        let channel = self
+            .channels
+            .iter()
+            .find(|c| c.name == channel)
+            .ok_or(Error::ChannelDoesNotExist(channel.into()))?;
+
         for endpoint in &self.endpoints {
-            endpoint.send(notification).unwrap_or_else(|e| {
+            if !channel.should_notify_via_endpoint(endpoint.name()) {
+                log::debug!(
+                    "channel '{channel}' does not notify via endpoint '{endpoint}', skipping",
+                    channel = channel.name,
+                    endpoint = endpoint.name()
+                );
+                continue;
+            }
+
+            if let Err(e) = endpoint.send(notification) {
                 log::error!(
                     "could not notfiy via endpoint `{name}`: {e}",
                     name = endpoint.name()
-                )
-            })
+                );
+            } else {
+                log::info!("notified via endpoint `{name}`", name = endpoint.name());
+            }
         }
 
         Ok(())
@@ -257,6 +296,7 @@ mod tests {
 
     #[derive(Default, Clone)]
     struct MockEndpoint {
+        name: &'static str,
         messages: Rc<RefCell<Vec<Notification>>>,
     }
 
@@ -268,11 +308,18 @@ mod tests {
         }
 
         fn name(&self) -> &str {
-            "mock-endpoint"
+            self.name
         }
     }
 
     impl MockEndpoint {
+        fn new(name: &'static str, filter: Option<String>) -> Self {
+            Self {
+                name,
+                ..Default::default()
+            }
+        }
+
         fn messages(&self) -> Vec<Notification> {
             self.messages.borrow().clone()
         }
@@ -283,18 +330,72 @@ mod tests {
         let mock = MockEndpoint::default();
 
         let mut bus = Bus::default();
-
         bus.add_endpoint(Box::new(mock.clone()));
+        bus.add_channel(ChannelConfig {
+            name: "channel".to_string(),
+            endpoint: Some(vec!["".into()]),
+            comment: None,
+        });
+
+        bus.send(
+            "channel",
+            &Notification {
+                title: "Title".into(),
+                body: "Body".into(),
+                severity: Severity::Info,
+                properties: Default::default(),
+            },
+        )?;
 
-        bus.send(&Notification {
-            title: "Title".into(),
-            body: "Body".into(),
-            severity: Severity::Info,
-            properties: Default::default(),
-        })?;
         let messages = mock.messages();
         assert_eq!(messages.len(), 1);
 
         Ok(())
     }
+
+    #[test]
+    fn test_channels() -> Result<(), Error> {
+        let endpoint1 = MockEndpoint::new("mock1", None);
+        let endpoint2 = MockEndpoint::new("mock2", None);
+
+        let mut bus = Bus::default();
+
+        bus.add_channel(ChannelConfig {
+            name: "channel1".to_string(),
+            endpoint: Some(vec!["mock1".into()]),
+            comment: None,
+        });
+
+        bus.add_channel(ChannelConfig {
+            name: "channel2".to_string(),
+            endpoint: Some(vec!["mock2".into()]),
+            comment: None,
+        });
+
+        bus.add_endpoint(Box::new(endpoint1.clone()));
+        bus.add_endpoint(Box::new(endpoint2.clone()));
+
+        let send_to_channel = |channel| {
+            bus.send(
+                channel,
+                &Notification {
+                    title: "Title".into(),
+                    body: "Body".into(),
+                    severity: Severity::Info,
+                    properties: Default::default(),
+                },
+            )
+            .unwrap();
+        };
+
+        send_to_channel("channel1");
+        assert_eq!(endpoint1.messages().len(), 1);
+        assert_eq!(endpoint2.messages().len(), 0);
+
+        send_to_channel("channel2");
+        assert_eq!(endpoint1.messages().len(), 1);
+        assert_eq!(endpoint2.messages().len(), 1);
+
+        Ok(())
+    }
 }
-- 
2.30.2






More information about the pve-devel mailing list