[pve-devel] [PATCH v2 proxmox 08/42] notify: add notification channels
Lukas Wagner
l.wagner at proxmox.com
Wed May 24 15:56:15 CEST 2023
A notification channel is basically a 'group' of endpoints.
When notifying, a notification is now sent to a 'channel',
and forwared to all included endpoints.
To illustrate why the channel concept is useful, consider a backup job.
The plan is to provide a new option there where the user can choose a
notification channel that should be used for any notifications.
The channel decouples the job configuration from any
endpoints present in the system.
I expected this to be nicer than:
- notifying via *all* endpoints. If this is not desired, the user
would be forced to configure notification filtering (to be
introduced in a later patch). The filtering approach is a bit
cumbersome, since it requires the filter to be adapted for each and
every new backup job.
- adding the endpoints directly to the job configuration. This would
mean that new/removed endpoints have to be added/removed from *all*
affected backup job configurations.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
proxmox-notify/src/channel.rs | 53 ++++++++++++++
proxmox-notify/src/config.rs | 9 +++
proxmox-notify/src/lib.rs | 133 ++++++++++++++++++++++++++++++----
3 files changed, 179 insertions(+), 16 deletions(-)
create mode 100644 proxmox-notify/src/channel.rs
diff --git a/proxmox-notify/src/channel.rs b/proxmox-notify/src/channel.rs
new file mode 100644
index 00000000..dc9edf98
--- /dev/null
+++ b/proxmox-notify/src/channel.rs
@@ -0,0 +1,53 @@
+use crate::schema::COMMENT_SCHEMA;
+use proxmox_schema::{api, Updater};
+use serde::{Deserialize, Serialize};
+
+pub(crate) const CHANNEL_TYPENAME: &str = "channel";
+
+#[api(
+ properties: {
+ "endpoint": {
+ optional: true,
+ type: Array,
+ items: {
+ description: "Name of the included endpoint(s)",
+ type: String,
+ },
+ },
+ comment: {
+ optional: true,
+ schema: COMMENT_SCHEMA,
+ },
+ },
+)]
+#[derive(Debug, Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// Config for notification channels
+pub struct ChannelConfig {
+ /// Name of the channel
+ #[updater(skip)]
+ pub name: String,
+ /// Endpoints for this channel
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub endpoint: Option<Vec<String>>,
+ /// Comment
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub comment: Option<String>,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub enum DeleteableChannelProperty {
+ Endpoint,
+ Comment,
+}
+
+impl ChannelConfig {
+ pub fn should_notify_via_endpoint(&self, endpoint: &str) -> bool {
+ if let Some(endpoints) = &self.endpoint {
+ endpoints.iter().any(|e| *e == endpoint)
+ } else {
+ false
+ }
+ }
+}
diff --git a/proxmox-notify/src/config.rs b/proxmox-notify/src/config.rs
index 362ca0fc..3064065b 100644
--- a/proxmox-notify/src/config.rs
+++ b/proxmox-notify/src/config.rs
@@ -2,6 +2,7 @@ use lazy_static::lazy_static;
use proxmox_schema::{ApiType, ObjectSchema};
use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+use crate::channel::{ChannelConfig, CHANNEL_TYPENAME};
use crate::schema::BACKEND_NAME_SCHEMA;
use crate::Error;
@@ -13,6 +14,14 @@ lazy_static! {
fn config_init() -> SectionConfig {
let mut config = SectionConfig::new(&BACKEND_NAME_SCHEMA);
+ const CHANNEL_SCHEMA: &ObjectSchema = ChannelConfig::API_SCHEMA.unwrap_object_schema();
+
+ config.register_plugin(SectionConfigPlugin::new(
+ CHANNEL_TYPENAME.to_string(),
+ Some(String::from("name")),
+ CHANNEL_SCHEMA,
+ ));
+
config
}
diff --git a/proxmox-notify/src/lib.rs b/proxmox-notify/src/lib.rs
index f2d0e16c..3b881e26 100644
--- a/proxmox-notify/src/lib.rs
+++ b/proxmox-notify/src/lib.rs
@@ -1,5 +1,6 @@
use std::fmt::Display;
+use channel::{ChannelConfig, CHANNEL_TYPENAME};
use proxmox_schema::api;
use proxmox_section_config::SectionConfigData;
use serde::{Deserialize, Serialize};
@@ -9,6 +10,7 @@ use serde_json::Value;
use std::error::Error as StdError;
pub mod api;
+pub mod channel;
mod config;
pub mod endpoints;
pub mod schema;
@@ -19,6 +21,7 @@ pub enum Error {
ConfigDeserialization(Box<dyn StdError + Send + Sync + 'static>),
NotifyFailed(String, Box<dyn StdError + Send + Sync + 'static>),
EndpointDoesNotExist(String),
+ ChannelDoesNotExist(String),
}
impl Display for Error {
@@ -36,6 +39,9 @@ impl Display for Error {
Error::EndpointDoesNotExist(endpoint) => {
write!(f, "endpoint '{endpoint}' does not exist")
}
+ Error::ChannelDoesNotExist(channel) => {
+ write!(f, "channel '{channel}' does not exist")
+ }
}
}
}
@@ -47,6 +53,7 @@ impl StdError for Error {
Error::ConfigDeserialization(err) => Some(&**err),
Error::NotifyFailed(_, err) => Some(&**err),
Error::EndpointDoesNotExist(_) => None,
+ Error::ChannelDoesNotExist(_) => None,
}
}
}
@@ -142,6 +149,7 @@ impl Config {
#[derive(Default)]
pub struct Bus {
endpoints: Vec<Box<dyn Endpoint>>,
+ channels: Vec<ChannelConfig>,
}
#[allow(unused_macros)]
@@ -204,7 +212,15 @@ impl Bus {
pub fn from_config(config: &Config) -> Result<Self, Error> {
let mut endpoints = Vec::new();
- Ok(Bus { endpoints })
+ let channels = config
+ .config
+ .convert_to_typed_array(CHANNEL_TYPENAME)
+ .map_err(|err| Error::ConfigDeserialization(err.into()))?;
+
+ Ok(Bus {
+ endpoints,
+ channels,
+ })
}
#[cfg(test)]
@@ -212,20 +228,43 @@ impl Bus {
self.endpoints.push(endpoint);
}
- /// Send a notification to all registered endpoints
- pub fn send(&self, notification: &Notification) -> Result<(), Error> {
- log::info!(
- "sending notification with title '{title}'",
+ #[cfg(test)]
+ pub fn add_channel(&mut self, channel: ChannelConfig) {
+ self.channels.push(channel);
+ }
+
+ pub fn send(&self, channel: &str, notification: &Notification) -> Result<(), Error> {
+ log::debug!(
+ "sending notification with title `{title}`",
title = notification.title
);
+ // TODO: Maybe fallback to some default channel (e.g. send to all endpoints) in case
+ // the channel does not exist? Just to ensure that notifications are *never* swallowed...
+ let channel = self
+ .channels
+ .iter()
+ .find(|c| c.name == channel)
+ .ok_or(Error::ChannelDoesNotExist(channel.into()))?;
+
for endpoint in &self.endpoints {
- endpoint.send(notification).unwrap_or_else(|e| {
+ if !channel.should_notify_via_endpoint(endpoint.name()) {
+ log::debug!(
+ "channel '{channel}' does not notify via endpoint '{endpoint}', skipping",
+ channel = channel.name,
+ endpoint = endpoint.name()
+ );
+ continue;
+ }
+
+ if let Err(e) = endpoint.send(notification) {
log::error!(
"could not notfiy via endpoint `{name}`: {e}",
name = endpoint.name()
- )
- })
+ );
+ } else {
+ log::info!("notified via endpoint `{name}`", name = endpoint.name());
+ }
}
Ok(())
@@ -257,6 +296,7 @@ mod tests {
#[derive(Default, Clone)]
struct MockEndpoint {
+ name: &'static str,
messages: Rc<RefCell<Vec<Notification>>>,
}
@@ -268,11 +308,18 @@ mod tests {
}
fn name(&self) -> &str {
- "mock-endpoint"
+ self.name
}
}
impl MockEndpoint {
+ fn new(name: &'static str, filter: Option<String>) -> Self {
+ Self {
+ name,
+ ..Default::default()
+ }
+ }
+
fn messages(&self) -> Vec<Notification> {
self.messages.borrow().clone()
}
@@ -283,18 +330,72 @@ mod tests {
let mock = MockEndpoint::default();
let mut bus = Bus::default();
-
bus.add_endpoint(Box::new(mock.clone()));
+ bus.add_channel(ChannelConfig {
+ name: "channel".to_string(),
+ endpoint: Some(vec!["".into()]),
+ comment: None,
+ });
+
+ bus.send(
+ "channel",
+ &Notification {
+ title: "Title".into(),
+ body: "Body".into(),
+ severity: Severity::Info,
+ properties: Default::default(),
+ },
+ )?;
- bus.send(&Notification {
- title: "Title".into(),
- body: "Body".into(),
- severity: Severity::Info,
- properties: Default::default(),
- })?;
let messages = mock.messages();
assert_eq!(messages.len(), 1);
Ok(())
}
+
+ #[test]
+ fn test_channels() -> Result<(), Error> {
+ let endpoint1 = MockEndpoint::new("mock1", None);
+ let endpoint2 = MockEndpoint::new("mock2", None);
+
+ let mut bus = Bus::default();
+
+ bus.add_channel(ChannelConfig {
+ name: "channel1".to_string(),
+ endpoint: Some(vec!["mock1".into()]),
+ comment: None,
+ });
+
+ bus.add_channel(ChannelConfig {
+ name: "channel2".to_string(),
+ endpoint: Some(vec!["mock2".into()]),
+ comment: None,
+ });
+
+ bus.add_endpoint(Box::new(endpoint1.clone()));
+ bus.add_endpoint(Box::new(endpoint2.clone()));
+
+ let send_to_channel = |channel| {
+ bus.send(
+ channel,
+ &Notification {
+ title: "Title".into(),
+ body: "Body".into(),
+ severity: Severity::Info,
+ properties: Default::default(),
+ },
+ )
+ .unwrap();
+ };
+
+ send_to_channel("channel1");
+ assert_eq!(endpoint1.messages().len(), 1);
+ assert_eq!(endpoint2.messages().len(), 0);
+
+ send_to_channel("channel2");
+ assert_eq!(endpoint1.messages().len(), 1);
+ assert_eq!(endpoint2.messages().len(), 1);
+
+ Ok(())
+ }
}
--
2.30.2
More information about the pve-devel
mailing list