[pve-devel] [PATCH proxmox 03/18] notification: add notification filter mechanism

Lukas Wagner l.wagner at proxmox.com
Mon Mar 27 17:18:42 CEST 2023


This commit adds a way to filter notifications based on a.) severity and
b.) arbitrary metadata property fields. For better demonstration, an example
configuration file follows:

  sendmail: mail
      recipient root at example.org
      filter only-certain-vms-and-errors

  filter: only-certain-vms-or-errors
      mode or
      min-severity error
      sub-filter only-certain-vms
      sub-filter all-but-one-ct

  filter: only-certain-vms
      mode and
      match-property object_type=vm
      sub-filter vm-ids

  filter: vm-ids
      mode or
      match-property object_id=103
      match-property object_id=104

  filter: all-but-one-ct
      mode and
      invert-match true
      match-property object_type=ct
      match-property object_id=110

In plain English, this translates to: "Send mails for all errors, as
well as all events related to VM with the IDs 103 and 104, and also
all events for any container except the one with ID 110".
The example demonstrates how sub-filters and and/or/not operators can be
used to construct filters with high granularity.

Filters are lazily evaluated, and at most once, in case multiple
endpoints/filters use the same (sub-)filter. Furthermore, there
are checks in place so that recursive sub-filter definitions are
detected.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 proxmox-notification/src/config.rs            |   9 +
 .../src/endpoints/sendmail.rs                 |   7 +
 proxmox-notification/src/filter.rs            | 426 ++++++++++++++++++
 proxmox-notification/src/lib.rs               | 131 +++++-
 4 files changed, 566 insertions(+), 7 deletions(-)
 create mode 100644 proxmox-notification/src/filter.rs

diff --git a/proxmox-notification/src/config.rs b/proxmox-notification/src/config.rs
index 58c79d4..939bdb6 100644
--- a/proxmox-notification/src/config.rs
+++ b/proxmox-notification/src/config.rs
@@ -5,6 +5,7 @@ use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlug
 
 use crate::endpoints::sendmail::SendmailConfig;
 use crate::endpoints::sendmail::SENDMAIL_TYPENAME;
+use crate::filter::{FilterConfig, FILTER_TYPENAME};
 
 // Copied from PBS
 #[rustfmt::skip]
@@ -33,12 +34,20 @@ fn init() -> SectionConfig {
 
     const SENDMAIL_SCHEMA: &ObjectSchema = SendmailConfig::API_SCHEMA.unwrap_object_schema();
 
+    const FILTER_SCHEMA: &ObjectSchema = FilterConfig::API_SCHEMA.unwrap_object_schema();
+
     config.register_plugin(SectionConfigPlugin::new(
         SENDMAIL_TYPENAME.to_string(),
         Some(String::from("name")),
         SENDMAIL_SCHEMA,
     ));
 
+    config.register_plugin(SectionConfigPlugin::new(
+        FILTER_TYPENAME.to_string(),
+        Some(String::from("name")),
+        FILTER_SCHEMA,
+    ));
+
     config
 }
 
diff --git a/proxmox-notification/src/endpoints/sendmail.rs b/proxmox-notification/src/endpoints/sendmail.rs
index 2c43ab1..7f29b48 100644
--- a/proxmox-notification/src/endpoints/sendmail.rs
+++ b/proxmox-notification/src/endpoints/sendmail.rs
@@ -42,6 +42,9 @@ pub struct SendmailConfig {
     /// Author of the mail
     #[serde(skip_serializing_if = "Option::is_none")]
     pub author: Option<String>,
+    /// Filter to apply
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub filter: Option<String>,
 }
 
 impl Endpoint for SendmailConfig {
@@ -68,4 +71,8 @@ impl Endpoint for SendmailConfig {
     fn name(&self) -> &str {
         &self.name
     }
+
+    fn filter(&self) -> Option<&str> {
+        self.filter.as_deref()
+    }
 }
diff --git a/proxmox-notification/src/filter.rs b/proxmox-notification/src/filter.rs
new file mode 100644
index 0000000..9846e93
--- /dev/null
+++ b/proxmox-notification/src/filter.rs
@@ -0,0 +1,426 @@
+use std::collections::{HashMap, HashSet};
+
+use anyhow::{bail, Context, Error};
+use proxmox_schema::{api, property_string::PropertyIterator, Updater};
+use serde::{Deserialize, Serialize};
+
+use crate::{Notification, Severity};
+
+pub const FILTER_TYPENAME: &str = "filter";
+
+#[api]
+#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
+#[serde(rename_all = "kebab-case")]
+pub enum FilterModeOperator {
+    /// All filter properties have to match (AND)
+    #[default]
+    And,
+    /// At least one filter property has to match (OR)
+    Or,
+}
+
+impl FilterModeOperator {
+    /// Apply the mode operator to two bools, lhs and rhs
+    fn apply(&self, lhs: bool, rhs: bool) -> bool {
+        match self {
+            FilterModeOperator::And => lhs && rhs,
+            FilterModeOperator::Or => lhs || rhs,
+        }
+    }
+
+    fn neutral_element(&self) -> bool {
+        match self {
+            FilterModeOperator::And => true,
+            FilterModeOperator::Or => false,
+        }
+    }
+
+    /// Check if we need to evaluate any other properties, or if we can return early, since
+    /// false AND (...) = false
+    /// true OR (...) = true
+    fn short_circuit_return_possible(&self, value: bool) -> bool {
+        matches!(
+            (self, value),
+            (FilterModeOperator::And, false) | (FilterModeOperator::Or, true)
+        )
+    }
+}
+
+#[api(
+    properties: {
+        "sub-filter": {
+            optional: true,
+            type: Array,
+            items: {
+                description: "Name of the subfilter",
+                type: String,
+            },
+        },
+        "match-property": {
+            optional: true,
+            type: Array,
+            items: {
+                description: "Notification properties to match",
+                type: String,
+            },
+        },
+    },
+)]
+#[derive(Debug, Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// Config for Sendmail notification endpoints
+pub struct FilterConfig {
+    /// Name of the filter
+    pub name: String,
+
+    /// Minimum severity to match
+    pub min_severity: Option<Severity>,
+
+    /// Subfilter, allows arbitrary nesting (no recursion allowed)
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub sub_filter: Option<Vec<String>>,
+
+    /// Choose between 'and' and 'or' for when multiple properties are specified
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub mode: Option<FilterModeOperator>,
+
+    /// Notification properties to match.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub match_property: Option<Vec<String>>,
+
+    /// Invert match of the whole filter
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub invert_match: Option<bool>,
+}
+
+/// A caching, lazily-evaluating notification filter. Parameterized with the notification itself,
+/// since there are usually multiple filters to check for a single notification that is to be sent.
+pub(crate) struct FilterMatcher<'a> {
+    filters: HashMap<&'a str, &'a FilterConfig>,
+    cached_results: HashMap<&'a str, bool>,
+    notification: &'a Notification,
+}
+
+impl<'a> FilterMatcher<'a> {
+    pub(crate) fn new(filters: &'a [FilterConfig], notification: &'a Notification) -> Self {
+        let filters = filters.iter().map(|f| (f.name.as_str(), f)).collect();
+
+        Self {
+            filters,
+            cached_results: Default::default(),
+            notification,
+        }
+    }
+
+    /// Check if the notification that was used to instatiate Self matches a given filter
+    pub(crate) fn check_filter_match(&mut self, filter_name: &str) -> Result<bool, Error> {
+        let mut visited = HashSet::new();
+
+        self.do_check_filter(filter_name, &mut visited)
+    }
+
+    fn do_check_filter(
+        &mut self,
+        filter_name: &str,
+        visited: &mut HashSet<String>,
+    ) -> Result<bool, Error> {
+        if visited.contains(filter_name) {
+            bail!("recursive filter definition: {filter_name}");
+        }
+
+        if let Some(is_match) = self.cached_results.get(filter_name) {
+            return Ok(*is_match);
+        }
+
+        visited.insert(filter_name.into());
+
+        let filter_config = self
+            .filters
+            .get(filter_name)
+            .copied()
+            .context("no filter with name {filter_name} defined")?;
+
+        let mode_operator = filter_config.mode.unwrap_or_default();
+
+        let mut notification_matches = mode_operator.neutral_element();
+
+        notification_matches = mode_operator.apply(
+            notification_matches,
+            self.check_severity_match(filter_config, mode_operator),
+        );
+
+        notification_matches = mode_operator.apply(
+            notification_matches,
+            self.check_property_match(filter_config, mode_operator)?,
+        );
+
+        // ...then check the sub-filters
+        if let Some(sub_filters) = &filter_config.sub_filter {
+            for filter in sub_filters {
+                let is_match = self.do_check_filter(filter, visited)?;
+
+                self.cached_results.insert(filter.as_str(), is_match);
+
+                notification_matches = mode_operator.apply(notification_matches, is_match);
+
+                if mode_operator.short_circuit_return_possible(notification_matches) {
+                    return Ok(notification_matches);
+                }
+            }
+        }
+
+        if filter_config.invert_match.unwrap_or_default() {
+            notification_matches = !notification_matches;
+        }
+
+        Ok(notification_matches)
+    }
+
+    fn check_property_match(
+        &self,
+        filter_config: &FilterConfig,
+        mode_operator: FilterModeOperator,
+    ) -> Result<bool, Error> {
+        let mut notification_matches = mode_operator.neutral_element();
+
+        if let Some(match_property_operators) = filter_config.match_property.as_ref() {
+            for op in match_property_operators {
+                for prop in PropertyIterator::new(op) {
+                    let prop = prop?;
+
+                    if let (Some(key), expected_value) = prop {
+                        let is_match = if let Some(value) = self.notification.properties.get(key) {
+                            value.as_str() == expected_value
+                        } else {
+                            // If the metadata field is not present, we don't match
+                            false
+                        };
+
+                        notification_matches = mode_operator.apply(notification_matches, is_match);
+
+                        if mode_operator.short_circuit_return_possible(notification_matches) {
+                            return Ok(notification_matches);
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(notification_matches)
+    }
+
+    fn check_severity_match(
+        &self,
+        filter_config: &FilterConfig,
+        mode_operator: FilterModeOperator,
+    ) -> bool {
+        if let Some(min_severity) = filter_config.min_severity {
+            self.notification.severity >= min_severity
+        } else {
+            mode_operator.neutral_element()
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config;
+
+    #[test]
+    fn test_filter_config_parses_correctly() -> Result<(), Error> {
+        let (c, _) = config::config(
+            r"
+filter: foo
+    min-severity info
+    match-property object_type=vm
+    match-property object_id=103
+    invert-match true
+    mode and
+
+filter: bar
+    min-severity warning
+    match-property object_type=ct,object_id=104
+    sub-filter foo
+    mode or
+",
+        )?;
+
+        let filters: Vec<FilterConfig> = c.convert_to_typed_array("filter")?;
+
+        assert_eq!(filters.len(), 2);
+
+        Ok(())
+    }
+
+    fn parse_filters(config: &str) -> Result<Vec<FilterConfig>, Error> {
+        let (config, _) = config::config(config)?;
+        Ok(config.convert_to_typed_array("filter")?)
+    }
+
+    fn empty_notification_with_severity(severity: Severity) -> Notification {
+        Notification {
+            title: String::new(),
+            body: String::new(),
+            severity,
+            properties: HashMap::new(),
+        }
+    }
+
+    fn empty_notification_with_metadata(metadata: &[(&str, &str)]) -> Notification {
+        let metadata = HashMap::from_iter(
+            metadata
+                .into_iter()
+                .map(|e| (e.0.to_string(), e.1.to_string())),
+        );
+
+        Notification {
+            title: String::new(),
+            body: String::new(),
+            severity: Severity::Error,
+            properties: metadata,
+        }
+    }
+
+    #[test]
+    fn test_trivial_severity_filters() -> Result<(), Error> {
+        let config = "
+filter: test
+    min-severity warning
+";
+
+        let filters = parse_filters(config)?;
+
+        let is_match = |severity| {
+            let notifiction = empty_notification_with_severity(severity);
+            let mut results = FilterMatcher::new(&filters, &notifiction);
+            results.check_filter_match("test")
+        };
+
+        assert!(is_match(Severity::Warning)?);
+        assert!(!is_match(Severity::Notice)?);
+        assert!(is_match(Severity::Error)?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recursive_filter_loop() -> Result<(), Error> {
+        let config = "
+filter: direct-a
+    sub-filter direct-b
+
+filter: direct-b
+    sub-filter direct-a
+
+filter: indirect-c
+    sub-filter indirect-d
+
+filter: indirect-d
+    sub-filter indirect-e
+
+filter: indirect-e
+    sub-filter indirect-c
+";
+
+        let filters = parse_filters(config)?;
+
+        let notifiction = empty_notification_with_severity(Severity::Info);
+        let mut results = FilterMatcher::new(&filters, &notifiction);
+        assert!(results.check_filter_match("direct-a").is_err());
+        assert!(results.check_filter_match("indirect-c").is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_property_matches() -> Result<(), Error> {
+        let config = "
+filter: test
+    match-property object_type=vm
+
+filter: multiple-and
+    mode and
+    match-property a=foo,b=bar
+    match-property c=lorem,d=ipsum
+
+filter: multiple-or
+    mode or
+    match-property a=foo,b=bar
+    match-property c=lorem,d=ipsum
+";
+        let filters = parse_filters(config)?;
+
+        let is_match = |filter, metadata| -> Result<bool, Error> {
+            let notifiction = empty_notification_with_metadata(metadata);
+            let mut results = FilterMatcher::new(&filters, &notifiction);
+            results.check_filter_match(filter)
+        };
+
+        assert!(is_match("test", &[("object_type", "vm")])?);
+        assert!(!is_match("test", &[("object_type", "ct")])?);
+        assert!(is_match(
+            "multiple-and",
+            &[("a", "foo"), ("b", "bar"), ("c", "lorem"), ("d", "ipsum")],
+        )?);
+        assert!(!is_match(
+            "multiple-and",
+            &[
+                ("a", "invalid"),
+                ("b", "bar"),
+                ("c", "lorem"),
+                ("d", "ipsum")
+            ],
+        )?);
+        assert!(!is_match("multiple-and", &[("a", "foo"), ("b", "bar")],)?);
+        assert!(is_match("multiple-or", &[("a", "foo"),])?);
+        assert!(is_match("multiple-or", &[("b", "bar"),])?);
+        assert!(is_match("multiple-or", &[("d", "ipsum"),])?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_invert_match() -> Result<(), Error> {
+        let config = "
+filter: test
+    match-property object_type=vm
+    invert-match true
+";
+        let filters = parse_filters(config)?;
+
+        let notifiction = empty_notification_with_metadata(&[("object_type", "vm")]);
+        let mut results = FilterMatcher::new(&filters, &notifiction);
+        assert!(!results.check_filter_match("test")?);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_subfilter_matches() -> Result<(), Error> {
+        let config = "
+filter: test
+    match-property object_type=vm
+    sub-filter vm-ids
+
+filter: vm-ids
+    mode or
+    match-property object_id=100
+    match-property object_id=101
+";
+        let filters = parse_filters(config)?;
+
+        let is_match = |metadata| -> Result<bool, Error> {
+            let notifiction = empty_notification_with_metadata(metadata);
+            let mut results = FilterMatcher::new(&filters, &notifiction);
+            results.check_filter_match("test")
+        };
+
+        assert!(is_match(&[("object_type", "vm"), ("object_id", "100")])?);
+        assert!(is_match(&[("object_type", "vm"), ("object_id", "101")])?);
+        assert!(!is_match(&[("object_type", "ct"), ("object_id", "101")])?);
+        assert!(!is_match(&[("object_type", "vm"), ("object_id", "111")])?);
+
+        Ok(())
+    }
+}
diff --git a/proxmox-notification/src/lib.rs b/proxmox-notification/src/lib.rs
index f076c88..c688a10 100644
--- a/proxmox-notification/src/lib.rs
+++ b/proxmox-notification/src/lib.rs
@@ -4,12 +4,14 @@ use anyhow::Error;
 
 use endpoints::sendmail::SendmailConfig;
 use endpoints::sendmail::SENDMAIL_TYPENAME;
+use filter::{FilterConfig, FilterMatcher, FILTER_TYPENAME};
 use proxmox_schema::api;
 use proxmox_section_config::SectionConfigData;
 use serde::{Deserialize, Serialize};
 
 mod config;
 mod endpoints;
+mod filter;
 
 #[api()]
 #[derive(Clone, Debug, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd)]
@@ -33,6 +35,9 @@ pub trait Endpoint {
 
     /// The name/identifier for this endpoint
     fn name(&self) -> &str;
+
+    /// The name of the filter to use
+    fn filter(&self) -> Option<&str>;
 }
 
 #[derive(Debug, Clone)]
@@ -88,7 +93,9 @@ impl Config {
                 .map(|e| e as Box<dyn Endpoint>),
         );
 
-        Ok(Bus { endpoints })
+        let filters = self.0.convert_to_typed_array(FILTER_TYPENAME)?;
+
+        Ok(Bus { endpoints, filters })
     }
 }
 
@@ -98,6 +105,7 @@ impl Config {
 #[derive(Default)]
 pub struct Bus {
     endpoints: Vec<Box<dyn Endpoint>>,
+    filters: Vec<FilterConfig>,
 }
 
 impl Bus {
@@ -105,20 +113,52 @@ impl Bus {
         self.endpoints.push(endpoint);
     }
 
+    pub fn add_filter(&mut self, filter: FilterConfig) {
+        self.filters.push(filter)
+    }
+
     /// Send a notification to all registered endpoints
     pub fn send(&self, notification: &Notification) -> Result<(), Error> {
         log::info!(
-            "sending notification with title '{title}'",
+            "sending notification with title `{title}`",
             title = notification.title
         );
 
+        let mut notification_filter = FilterMatcher::new(&self.filters, notification);
+
         for endpoint in &self.endpoints {
-            endpoint.send(notification).unwrap_or_else(|e| {
-                log::error!(
-                    "could not notfiy via endpoint `{name}`: {e}",
-                    name = endpoint.name()
+            let should_notify = if let Some(filter) = endpoint.filter() {
+                notification_filter
+                    .check_filter_match(filter)
+                    .unwrap_or_else(|e| {
+                        log::error!(
+                            "could not apply filter `{filter}` for endpoint `{name}: {e}`",
+                            name = endpoint.name()
+                        );
+                        // If the filter is somehow erroneous, we send a notification by default,
+                        // so no events are missed
+                        true
+                    })
+            } else {
+                true
+            };
+
+            if should_notify {
+                if let Err(e) = endpoint.send(notification) {
+                    log::error!(
+                        "could not notfiy via endpoint `{name}`: {e}",
+                        name = endpoint.name()
+                    )
+                } else {
+                    log::info!("notified via endpoint `{name}`", name = endpoint.name())
+                }
+            } else {
+                log::info!(
+                    "skipped endpoint `{name}`, filter `{filter}` did not match",
+                    name = endpoint.name(),
+                    filter = endpoint.filter().unwrap_or_default()
                 )
-            })
+            }
         }
 
         Ok(())
@@ -136,6 +176,7 @@ mod tests {
     #[derive(Default, Clone)]
     struct MockEndpoint {
         messages: Rc<RefCell<Vec<Notification>>>,
+        filter: Option<String>,
     }
 
     impl Endpoint for MockEndpoint {
@@ -148,9 +189,20 @@ mod tests {
         fn name(&self) -> &str {
             "mock-endpoint"
         }
+
+        fn filter(&self) -> Option<&str> {
+            self.filter.as_deref()
+        }
     }
 
     impl MockEndpoint {
+        fn new(filter: Option<String>) -> Self {
+            Self {
+                filter,
+                ..Default::default()
+            }
+        }
+
         fn messages(&self) -> Vec<Notification> {
             self.messages.borrow().clone()
         }
@@ -175,4 +227,69 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_severity_ordering() {
+        // Not intended to be exhaustive, just a quick
+        // sanity check ;)
+
+        assert!(Severity::Info < Severity::Notice);
+        assert!(Severity::Info < Severity::Warning);
+        assert!(Severity::Info < Severity::Error);
+        assert!(Severity::Error > Severity::Warning);
+        assert!(Severity::Warning > Severity::Notice);
+    }
+
+    #[test]
+    fn test_multiple_endpoints_with_different_filters() -> Result<(), Error> {
+        let endpoint1 = MockEndpoint::new(Some("filter1".into()));
+        let endpoint2 = MockEndpoint::new(Some("filter2".into()));
+
+        let mut bus = Bus::default();
+
+        bus.add_endpoint(Box::new(endpoint1.clone()));
+        bus.add_endpoint(Box::new(endpoint2.clone()));
+
+        bus.add_filter(FilterConfig {
+            name: "filter1".into(),
+            min_severity: Some(Severity::Warning),
+            sub_filter: None,
+            mode: None,
+            match_property: None,
+            invert_match: None,
+        });
+
+        bus.add_filter(FilterConfig {
+            name: "filter2".into(),
+            min_severity: Some(Severity::Error),
+            sub_filter: None,
+            mode: None,
+            match_property: None,
+            invert_match: None,
+        });
+
+        let send_with_severity = |severity| {
+            bus.send(&Notification {
+                title: "Title".into(),
+                body: "Body".into(),
+                severity,
+                properties: Default::default(),
+            })
+            .unwrap();
+        };
+
+        send_with_severity(Severity::Info);
+        assert_eq!(endpoint1.messages().len(), 0);
+        assert_eq!(endpoint2.messages().len(), 0);
+
+        send_with_severity(Severity::Warning);
+        assert_eq!(endpoint1.messages().len(), 1);
+        assert_eq!(endpoint2.messages().len(), 0);
+
+        send_with_severity(Severity::Error);
+        assert_eq!(endpoint1.messages().len(), 2);
+        assert_eq!(endpoint2.messages().len(), 1);
+
+        Ok(())
+    }
 }
-- 
2.30.2






More information about the pve-devel mailing list