notify: replace filters and groups with matcher-based system

This shifts notification routing into the matcher-system. Every
notification has associated metadata (key-value fields, severity -
to be extended) that can be match with match directives in
notification matchers. Right now, there are 2 matching directives,
match-field and match-severity. The first one allows one to do a
regex match/exact match on a metadata field, the other one allows one
to match one or more severites.
Every matcher also allows 'target' directives, these decide which
target(s) will be notified if a matcher matches a notification.

Since routing now happens in matchers, the API for sending is
simplified, since we do not need to specify a target any more.

The API routes for filters and groups have been removed completely.
The parser for the configuration file will still accept filter/group
entries, but will delete them once the config is saved again. This is
needed to allow a smooth transition from the old system to the new
system, since the old system was already available on pvetest.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
Lukas Wagner 2023-11-14 13:59:13 +01:00 committed by Thomas Lamprecht
parent df4858e989
commit b421a7ca24
16 changed files with 848 additions and 1075 deletions

View File

@ -8,6 +8,7 @@ repository.workspace = true
exclude.workspace = true
[dependencies]
anyhow.workspace = true
handlebars = { workspace = true }
lazy_static.workspace = true
log.workspace = true
@ -16,6 +17,7 @@ openssl.workspace = true
proxmox-http = { workspace = true, features = ["client-sync"], optional = true }
proxmox-http-error.workspace = true
proxmox-human-byte.workspace = true
proxmox-serde.workspace = true
proxmox-schema = { workspace = true, features = ["api-macro", "api-types"]}
proxmox-section-config = { workspace = true }
proxmox-sys = { workspace = true, optional = true }

View File

@ -7,7 +7,7 @@ use crate::{Bus, Config, Notification};
///
/// The caller is responsible for any needed permission checks.
/// Returns an `anyhow::Error` in case of an error.
pub fn send(config: &Config, channel: &str, notification: &Notification) -> Result<(), HttpError> {
pub fn send(config: &Config, notification: &Notification) -> Result<(), HttpError> {
let bus = Bus::from_config(config).map_err(|err| {
http_err!(
INTERNAL_SERVER_ERROR,
@ -15,7 +15,7 @@ pub fn send(config: &Config, channel: &str, notification: &Notification) -> Resu
)
})?;
bus.send(channel, notification);
bus.send(notification);
Ok(())
}
@ -50,5 +50,5 @@ pub fn test_target(config: &Config, endpoint: &str) -> Result<(), HttpError> {
/// If the entity does not exist, the result will only contain the entity.
pub fn get_referenced_entities(config: &Config, entity: &str) -> Result<Vec<String>, HttpError> {
let entities = super::get_referenced_entities(config, entity);
Ok(Vec::from_iter(entities.into_iter()))
Ok(Vec::from_iter(entities))
}

View File

@ -1,231 +0,0 @@
use proxmox_http_error::HttpError;
use crate::api::http_err;
use crate::filter::{DeleteableFilterProperty, FilterConfig, FilterConfigUpdater, FILTER_TYPENAME};
use crate::Config;
/// Get a list of all filters
///
/// The caller is responsible for any needed permission checks.
/// Returns a list of all filters or a `HttpError` if the config is
/// (`500 Internal server error`).
pub fn get_filters(config: &Config) -> Result<Vec<FilterConfig>, HttpError> {
config
.config
.convert_to_typed_array(FILTER_TYPENAME)
.map_err(|e| http_err!(INTERNAL_SERVER_ERROR, "Could not fetch filters: {e}"))
}
/// Get filter with given `name`
///
/// The caller is responsible for any needed permission checks.
/// Returns the endpoint or a `HttpError` if the filter was not found (`404 Not found`).
pub fn get_filter(config: &Config, name: &str) -> Result<FilterConfig, HttpError> {
config
.config
.lookup(FILTER_TYPENAME, name)
.map_err(|_| http_err!(NOT_FOUND, "filter '{name}' not found"))
}
/// Add new notification filter.
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
pub fn add_filter(config: &mut Config, filter_config: &FilterConfig) -> Result<(), HttpError> {
super::ensure_unique(config, &filter_config.name)?;
config
.config
.set_data(&filter_config.name, FILTER_TYPENAME, filter_config)
.map_err(|e| {
http_err!(
INTERNAL_SERVER_ERROR,
"could not save filter '{}': {e}",
filter_config.name
)
})?;
Ok(())
}
/// Update existing notification filter
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - the configuration could not be saved (`500 Internal server error`)
/// - an invalid digest was passed (`400 Bad request`)
pub fn update_filter(
config: &mut Config,
name: &str,
filter_updater: &FilterConfigUpdater,
delete: Option<&[DeleteableFilterProperty]>,
digest: Option<&[u8]>,
) -> Result<(), HttpError> {
super::verify_digest(config, digest)?;
let mut filter = get_filter(config, name)?;
if let Some(delete) = delete {
for deleteable_property in delete {
match deleteable_property {
DeleteableFilterProperty::MinSeverity => filter.min_severity = None,
DeleteableFilterProperty::Mode => filter.mode = None,
DeleteableFilterProperty::InvertMatch => filter.invert_match = None,
DeleteableFilterProperty::Comment => filter.comment = None,
}
}
}
if let Some(min_severity) = filter_updater.min_severity {
filter.min_severity = Some(min_severity);
}
if let Some(mode) = filter_updater.mode {
filter.mode = Some(mode);
}
if let Some(invert_match) = filter_updater.invert_match {
filter.invert_match = Some(invert_match);
}
if let Some(comment) = &filter_updater.comment {
filter.comment = Some(comment.into());
}
config
.config
.set_data(name, FILTER_TYPENAME, &filter)
.map_err(|e| http_err!(INTERNAL_SERVER_ERROR, "could not save filter '{name}': {e}"))?;
Ok(())
}
/// Delete existing filter
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - the entity does not exist (`404 Not found`)
/// - the filter is still referenced by another entity (`400 Bad request`)
pub fn delete_filter(config: &mut Config, name: &str) -> Result<(), HttpError> {
// Check if the filter exists
let _ = get_filter(config, name)?;
super::ensure_unused(config, name)?;
config.config.sections.remove(name);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::filter::FilterModeOperator;
use crate::Severity;
fn empty_config() -> Config {
Config::new("", "").unwrap()
}
fn config_with_two_filters() -> Config {
Config::new(
"
filter: filter1
min-severity info
filter: filter2
min-severity warning
",
"",
)
.unwrap()
}
#[test]
fn test_update_not_existing_returns_error() -> Result<(), HttpError> {
let mut config = empty_config();
assert!(update_filter(&mut config, "test", &Default::default(), None, None).is_err());
Ok(())
}
#[test]
fn test_update_invalid_digest_returns_error() -> Result<(), HttpError> {
let mut config = config_with_two_filters();
assert!(update_filter(
&mut config,
"filter1",
&Default::default(),
None,
Some(&[0u8; 32])
)
.is_err());
Ok(())
}
#[test]
fn test_filter_update() -> Result<(), HttpError> {
let mut config = config_with_two_filters();
let digest = config.digest;
update_filter(
&mut config,
"filter1",
&FilterConfigUpdater {
min_severity: Some(Severity::Error),
mode: Some(FilterModeOperator::Or),
invert_match: Some(true),
comment: Some("new comment".into()),
},
None,
Some(&digest),
)?;
let filter = get_filter(&config, "filter1")?;
assert!(matches!(filter.mode, Some(FilterModeOperator::Or)));
assert!(matches!(filter.min_severity, Some(Severity::Error)));
assert_eq!(filter.invert_match, Some(true));
assert_eq!(filter.comment, Some("new comment".into()));
// Test property deletion
update_filter(
&mut config,
"filter1",
&Default::default(),
Some(&[
DeleteableFilterProperty::InvertMatch,
DeleteableFilterProperty::Mode,
DeleteableFilterProperty::InvertMatch,
DeleteableFilterProperty::MinSeverity,
DeleteableFilterProperty::Comment,
]),
Some(&digest),
)?;
let filter = get_filter(&config, "filter1")?;
assert_eq!(filter.invert_match, None);
assert_eq!(filter.min_severity, None);
assert!(matches!(filter.mode, None));
assert_eq!(filter.comment, None);
Ok(())
}
#[test]
fn test_filter_delete() -> Result<(), HttpError> {
let mut config = config_with_two_filters();
delete_filter(&mut config, "filter1")?;
assert!(delete_filter(&mut config, "filter1").is_err());
assert_eq!(get_filters(&config)?.len(), 1);
Ok(())
}
}

View File

@ -36,7 +36,6 @@ pub fn get_endpoint(config: &Config, name: &str) -> Result<GotifyConfig, HttpErr
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - a referenced filter does not exist (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
///
/// Panics if the names of the private config and the public config do not match.
@ -52,11 +51,6 @@ pub fn add_endpoint(
super::ensure_unique(config, &endpoint_config.name)?;
if let Some(filter) = &endpoint_config.filter {
// Check if filter exists
super::filter::get_filter(config, filter)?;
}
set_private_config_entry(config, private_endpoint_config)?;
config
@ -77,7 +71,6 @@ pub fn add_endpoint(
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - a referenced filter does not exist (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
pub fn update_endpoint(
config: &mut Config,
@ -95,7 +88,6 @@ pub fn update_endpoint(
for deleteable_property in delete {
match deleteable_property {
DeleteableGotifyProperty::Comment => endpoint.comment = None,
DeleteableGotifyProperty::Filter => endpoint.filter = None,
}
}
}
@ -118,13 +110,6 @@ pub fn update_endpoint(
endpoint.comment = Some(comment.into());
}
if let Some(filter) = &endpoint_config_updater.filter {
// Check if filter exists
let _ = super::filter::get_filter(config, filter)?;
endpoint.filter = Some(filter.into());
}
config
.config
.set_data(name, GOTIFY_TYPENAME, &endpoint)
@ -247,7 +232,6 @@ mod tests {
&GotifyConfigUpdater {
server: Some("newhost".into()),
comment: Some("newcomment".into()),
filter: None,
},
&GotifyPrivateConfigUpdater {
token: Some("changedtoken".into()),

View File

@ -1,259 +0,0 @@
use proxmox_http_error::HttpError;
use crate::api::{http_bail, http_err};
use crate::group::{DeleteableGroupProperty, GroupConfig, GroupConfigUpdater, GROUP_TYPENAME};
use crate::Config;
/// Get all notification groups
///
/// The caller is responsible for any needed permission checks.
/// Returns a list of all groups or a `HttpError` if the config is
/// erroneous (`500 Internal server error`).
pub fn get_groups(config: &Config) -> Result<Vec<GroupConfig>, HttpError> {
config
.config
.convert_to_typed_array(GROUP_TYPENAME)
.map_err(|e| http_err!(INTERNAL_SERVER_ERROR, "Could not fetch groups: {e}"))
}
/// Get group with given `name`
///
/// The caller is responsible for any needed permission checks.
/// Returns the endpoint or an `HttpError` if the group was not found (`404 Not found`).
pub fn get_group(config: &Config, name: &str) -> Result<GroupConfig, HttpError> {
config
.config
.lookup(GROUP_TYPENAME, name)
.map_err(|_| http_err!(NOT_FOUND, "group '{name}' not found"))
}
/// Add a new group.
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - a referenced filter does not exist (`400 Bad request`)
/// - no endpoints were passed (`400 Bad request`)
/// - referenced endpoints do not exist (`404 Not found`)
/// - the configuration could not be saved (`500 Internal server error`)
pub fn add_group(config: &mut Config, group_config: &GroupConfig) -> Result<(), HttpError> {
super::ensure_unique(config, &group_config.name)?;
if group_config.endpoint.is_empty() {
http_bail!(BAD_REQUEST, "group must contain at least one endpoint",);
}
if let Some(filter) = &group_config.filter {
// Check if filter exists
super::filter::get_filter(config, filter)?;
}
super::ensure_endpoints_exist(config, &group_config.endpoint)?;
config
.config
.set_data(&group_config.name, GROUP_TYPENAME, group_config)
.map_err(|e| {
http_err!(
INTERNAL_SERVER_ERROR,
"could not save group '{}': {e}",
group_config.name
)
})
}
/// Update existing group
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - a referenced filter does not exist (`400 Bad request`)
/// - an invalid digest was passed (`400 Bad request`)
/// - no endpoints were passed (`400 Bad request`)
/// - referenced endpoints do not exist (`404 Not found`)
/// - the configuration could not be saved (`500 Internal server error`)
pub fn update_group(
config: &mut Config,
name: &str,
updater: &GroupConfigUpdater,
delete: Option<&[DeleteableGroupProperty]>,
digest: Option<&[u8]>,
) -> Result<(), HttpError> {
super::verify_digest(config, digest)?;
let mut group = get_group(config, name)?;
if let Some(delete) = delete {
for deleteable_property in delete {
match deleteable_property {
DeleteableGroupProperty::Comment => group.comment = None,
DeleteableGroupProperty::Filter => group.filter = None,
}
}
}
if let Some(endpoints) = &updater.endpoint {
super::ensure_endpoints_exist(config, endpoints)?;
if endpoints.is_empty() {
http_bail!(BAD_REQUEST, "group must contain at least one endpoint",);
}
group.endpoint = endpoints.iter().map(Into::into).collect()
}
if let Some(comment) = &updater.comment {
group.comment = Some(comment.into());
}
if let Some(filter) = &updater.filter {
// Check if filter exists
let _ = super::filter::get_filter(config, filter)?;
group.filter = Some(filter.into());
}
config
.config
.set_data(name, GROUP_TYPENAME, &group)
.map_err(|e| http_err!(INTERNAL_SERVER_ERROR, "could not save group '{name}': {e}"))
}
/// Delete existing group
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if the group does not exist (`404 Not found`).
pub fn delete_group(config: &mut Config, name: &str) -> Result<(), HttpError> {
// Check if the group exists
let _ = get_group(config, name)?;
config.config.sections.remove(name);
Ok(())
}
// groups cannot be empty, so only build the tests if we have the
// sendmail endpoint available
#[cfg(all(test, feature = "sendmail"))]
mod tests {
use super::*;
use crate::api::sendmail::tests::add_sendmail_endpoint_for_test;
use crate::api::test_helpers::*;
fn add_default_group(config: &mut Config) -> Result<(), HttpError> {
add_sendmail_endpoint_for_test(config, "test")?;
add_group(
config,
&GroupConfig {
name: "group1".into(),
endpoint: vec!["test".to_string()],
comment: None,
filter: None,
},
)?;
Ok(())
}
#[test]
fn test_add_group_fails_if_endpoint_does_not_exist() {
let mut config = empty_config();
assert!(add_group(
&mut config,
&GroupConfig {
name: "group1".into(),
endpoint: vec!["foo".into()],
comment: None,
filter: None,
},
)
.is_err());
}
#[test]
fn test_add_group() -> Result<(), HttpError> {
let mut config = empty_config();
assert!(add_default_group(&mut config).is_ok());
Ok(())
}
#[test]
fn test_update_group_fails_if_endpoint_does_not_exist() -> Result<(), HttpError> {
let mut config = empty_config();
add_default_group(&mut config)?;
assert!(update_group(
&mut config,
"group1",
&GroupConfigUpdater {
endpoint: Some(vec!["foo".into()]),
..Default::default()
},
None,
None
)
.is_err());
Ok(())
}
#[test]
fn test_update_group_fails_if_digest_invalid() -> Result<(), HttpError> {
let mut config = empty_config();
add_default_group(&mut config)?;
assert!(update_group(
&mut config,
"group1",
&Default::default(),
None,
Some(&[0u8; 32])
)
.is_err());
Ok(())
}
#[test]
fn test_update_group() -> Result<(), HttpError> {
let mut config = empty_config();
add_default_group(&mut config)?;
assert!(update_group(
&mut config,
"group1",
&GroupConfigUpdater {
endpoint: None,
comment: Some("newcomment".into()),
filter: None
},
None,
None,
)
.is_ok());
let group = get_group(&config, "group1")?;
assert_eq!(group.comment, Some("newcomment".into()));
assert!(update_group(
&mut config,
"group1",
&Default::default(),
Some(&[DeleteableGroupProperty::Comment]),
None
)
.is_ok());
let group = get_group(&config, "group1")?;
assert_eq!(group.comment, None);
Ok(())
}
#[test]
fn test_group_delete() -> Result<(), HttpError> {
let mut config = empty_config();
add_default_group(&mut config)?;
assert!(delete_group(&mut config, "group1").is_ok());
assert!(delete_group(&mut config, "group1").is_err());
Ok(())
}
}

View File

@ -0,0 +1,254 @@
use proxmox_http_error::HttpError;
use crate::api::http_err;
use crate::matcher::{
DeleteableMatcherProperty, MatcherConfig, MatcherConfigUpdater, MATCHER_TYPENAME,
};
use crate::Config;
/// Get a list of all matchers
///
/// The caller is responsible for any needed permission checks.
/// Returns a list of all matchers or a `HttpError` if the config is
/// (`500 Internal server error`).
pub fn get_matchers(config: &Config) -> Result<Vec<MatcherConfig>, HttpError> {
config
.config
.convert_to_typed_array(MATCHER_TYPENAME)
.map_err(|e| http_err!(INTERNAL_SERVER_ERROR, "Could not fetch matchers: {e}"))
}
/// Get matcher with given `name`
///
/// The caller is responsible for any needed permission checks.
/// Returns the endpoint or a `HttpError` if the matcher was not found (`404 Not found`).
pub fn get_matcher(config: &Config, name: &str) -> Result<MatcherConfig, HttpError> {
config
.config
.lookup(MATCHER_TYPENAME, name)
.map_err(|_| http_err!(NOT_FOUND, "matcher '{name}' not found"))
}
/// Add new notification matcher.
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
pub fn add_matcher(config: &mut Config, matcher_config: &MatcherConfig) -> Result<(), HttpError> {
super::ensure_unique(config, &matcher_config.name)?;
if let Some(targets) = matcher_config.target.as_deref() {
super::ensure_endpoints_exist(config, targets)?;
}
config
.config
.set_data(&matcher_config.name, MATCHER_TYPENAME, matcher_config)
.map_err(|e| {
http_err!(
INTERNAL_SERVER_ERROR,
"could not save matcher '{}': {e}",
matcher_config.name
)
})?;
Ok(())
}
/// Update existing notification matcher
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - the configuration could not be saved (`500 Internal server error`)
/// - an invalid digest was passed (`400 Bad request`)
pub fn update_matcher(
config: &mut Config,
name: &str,
matcher_updater: &MatcherConfigUpdater,
delete: Option<&[DeleteableMatcherProperty]>,
digest: Option<&[u8]>,
) -> Result<(), HttpError> {
super::verify_digest(config, digest)?;
let mut matcher = get_matcher(config, name)?;
if let Some(delete) = delete {
for deleteable_property in delete {
match deleteable_property {
DeleteableMatcherProperty::MatchSeverity => matcher.match_severity = None,
DeleteableMatcherProperty::MatchField => matcher.match_field = None,
DeleteableMatcherProperty::Target => matcher.target = None,
DeleteableMatcherProperty::Mode => matcher.mode = None,
DeleteableMatcherProperty::InvertMatch => matcher.invert_match = None,
DeleteableMatcherProperty::Comment => matcher.comment = None,
}
}
}
if let Some(match_severity) = &matcher_updater.match_severity {
matcher.match_severity = Some(match_severity.clone());
}
if let Some(match_field) = &matcher_updater.match_field {
matcher.match_field = Some(match_field.clone());
}
if let Some(mode) = matcher_updater.mode {
matcher.mode = Some(mode);
}
if let Some(invert_match) = matcher_updater.invert_match {
matcher.invert_match = Some(invert_match);
}
if let Some(comment) = &matcher_updater.comment {
matcher.comment = Some(comment.into());
}
if let Some(target) = &matcher_updater.target {
super::ensure_endpoints_exist(config, target.as_slice())?;
matcher.target = Some(target.clone());
}
config
.config
.set_data(name, MATCHER_TYPENAME, &matcher)
.map_err(|e| {
http_err!(
INTERNAL_SERVER_ERROR,
"could not save matcher '{name}': {e}"
)
})?;
Ok(())
}
/// Delete existing matcher
///
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - the entity does not exist (`404 Not found`)
pub fn delete_matcher(config: &mut Config, name: &str) -> Result<(), HttpError> {
// Check if the matcher exists
let _ = get_matcher(config, name)?;
config.config.sections.remove(name);
Ok(())
}
#[cfg(all(test, feature = "sendmail"))]
mod tests {
use super::*;
use crate::matcher::MatchModeOperator;
fn empty_config() -> Config {
Config::new("", "").unwrap()
}
fn config_with_two_matchers() -> Config {
Config::new(
"
sendmail: foo
mailto test@example.com
matcher: matcher1
matcher: matcher2
",
"",
)
.unwrap()
}
#[test]
fn test_update_not_existing_returns_error() -> Result<(), HttpError> {
let mut config = empty_config();
assert!(update_matcher(&mut config, "test", &Default::default(), None, None).is_err());
Ok(())
}
#[test]
fn test_update_invalid_digest_returns_error() -> Result<(), HttpError> {
let mut config = config_with_two_matchers();
assert!(update_matcher(
&mut config,
"matcher1",
&Default::default(),
None,
Some(&[0u8; 32])
)
.is_err());
Ok(())
}
#[test]
fn test_matcher_update() -> Result<(), HttpError> {
let mut config = config_with_two_matchers();
let digest = config.digest;
update_matcher(
&mut config,
"matcher1",
&MatcherConfigUpdater {
mode: Some(MatchModeOperator::Any),
match_field: None,
match_severity: None,
invert_match: Some(true),
target: Some(vec!["foo".into()]),
comment: Some("new comment".into()),
},
None,
Some(&digest),
)?;
let matcher = get_matcher(&config, "matcher1")?;
assert!(matches!(matcher.mode, Some(MatchModeOperator::Any)));
assert_eq!(matcher.invert_match, Some(true));
assert_eq!(matcher.comment, Some("new comment".into()));
// Test property deletion
update_matcher(
&mut config,
"matcher1",
&Default::default(),
Some(&[
DeleteableMatcherProperty::InvertMatch,
DeleteableMatcherProperty::Mode,
DeleteableMatcherProperty::MatchField,
DeleteableMatcherProperty::Target,
DeleteableMatcherProperty::Comment,
]),
Some(&digest),
)?;
let matcher = get_matcher(&config, "matcher1")?;
assert_eq!(matcher.invert_match, None);
assert!(matcher.match_severity.is_none());
assert!(matches!(matcher.match_field, None));
assert_eq!(matcher.target, None);
assert!(matcher.mode.is_none());
assert_eq!(matcher.comment, None);
Ok(())
}
#[test]
fn test_matcher_delete() -> Result<(), HttpError> {
let mut config = config_with_two_matchers();
delete_matcher(&mut config, "matcher1")?;
assert!(delete_matcher(&mut config, "matcher1").is_err());
assert_eq!(get_matchers(&config)?.len(), 1);
Ok(())
}
}

View File

@ -5,10 +5,9 @@ use proxmox_http_error::HttpError;
use crate::Config;
pub mod common;
pub mod filter;
#[cfg(feature = "gotify")]
pub mod gotify;
pub mod group;
pub mod matcher;
#[cfg(feature = "sendmail")]
pub mod sendmail;
@ -94,36 +93,13 @@ fn ensure_unique(config: &Config, entity: &str) -> Result<(), HttpError> {
fn get_referrers(config: &Config, entity: &str) -> Result<HashSet<String>, HttpError> {
let mut referrers = HashSet::new();
for group in group::get_groups(config)? {
if group.endpoint.iter().any(|endpoint| endpoint == entity) {
referrers.insert(group.name.clone());
}
if let Some(filter) = group.filter {
if filter == entity {
referrers.insert(group.name);
for matcher in matcher::get_matchers(config)? {
if let Some(targets) = matcher.target {
if targets.iter().any(|target| target == entity) {
referrers.insert(matcher.name.clone());
}
}
}
#[cfg(feature = "sendmail")]
for endpoint in sendmail::get_endpoints(config)? {
if let Some(filter) = endpoint.filter {
if filter == entity {
referrers.insert(endpoint.name);
}
}
}
#[cfg(feature = "gotify")]
for endpoint in gotify::get_endpoints(config)? {
if let Some(filter) = endpoint.filter {
if filter == entity {
referrers.insert(endpoint.name);
}
}
}
Ok(referrers)
}
@ -151,24 +127,12 @@ fn get_referenced_entities(config: &Config, entity: &str) -> HashSet<String> {
let mut new = HashSet::new();
for entity in entities {
if let Ok(group) = group::get_group(config, entity) {
for target in group.endpoint {
if let Ok(group) = matcher::get_matcher(config, entity) {
if let Some(targets) = group.target {
for target in targets {
new.insert(target.clone());
}
}
#[cfg(feature = "sendmail")]
if let Ok(target) = sendmail::get_endpoint(config, entity) {
if let Some(filter) = target.filter {
new.insert(filter.clone());
}
}
#[cfg(feature = "gotify")]
if let Ok(target) = gotify::get_endpoint(config, entity) {
if let Some(filter) = target.filter {
new.insert(filter.clone());
}
}
}
@ -205,11 +169,12 @@ mod tests {
fn prepare_config() -> Result<Config, HttpError> {
let mut config = super::test_helpers::empty_config();
filter::add_filter(
matcher::add_matcher(
&mut config,
&FilterConfig {
name: "filter".to_string(),
..Default::default()
&MatcherConfig {
name: "matcher".to_string(),
target: Some(vec!["sendmail".to_string(), "gotify".to_string()])
..Default::default(),
},
)?;
@ -218,7 +183,6 @@ mod tests {
&SendmailConfig {
name: "sendmail".to_string(),
mailto: Some(vec!["foo@example.com".to_string()]),
filter: Some("filter".to_string()),
..Default::default()
},
)?;
@ -228,7 +192,6 @@ mod tests {
&GotifyConfig {
name: "gotify".to_string(),
server: "localhost".to_string(),
filter: Some("filter".to_string()),
..Default::default()
},
&GotifyPrivateConfig {
@ -237,16 +200,6 @@ mod tests {
},
)?;
group::add_group(
&mut config,
&GroupConfig {
name: "group".to_string(),
endpoint: vec!["gotify".to_string(), "sendmail".to_string()],
filter: Some("filter".to_string()),
..Default::default()
},
)?;
Ok(config)
}
@ -255,24 +208,11 @@ mod tests {
let config = prepare_config().unwrap();
assert_eq!(
get_referenced_entities(&config, "filter"),
HashSet::from(["filter".to_string()])
);
assert_eq!(
get_referenced_entities(&config, "sendmail"),
HashSet::from(["filter".to_string(), "sendmail".to_string()])
);
assert_eq!(
get_referenced_entities(&config, "gotify"),
HashSet::from(["filter".to_string(), "gotify".to_string()])
);
assert_eq!(
get_referenced_entities(&config, "group"),
get_referenced_entities(&config, "matcher"),
HashSet::from([
"filter".to_string(),
"gotify".to_string(),
"matcher".to_string(),
"sendmail".to_string(),
"group".to_string()
"gotify".to_string()
])
);
}
@ -281,27 +221,16 @@ mod tests {
fn test_get_referrers_for_entity() -> Result<(), HttpError> {
let config = prepare_config().unwrap();
assert_eq!(
get_referrers(&config, "filter")?,
HashSet::from([
"gotify".to_string(),
"sendmail".to_string(),
"group".to_string()
])
);
assert_eq!(
get_referrers(&config, "sendmail")?,
HashSet::from(["group".to_string()])
HashSet::from(["matcher".to_string()])
);
assert_eq!(
get_referrers(&config, "gotify")?,
HashSet::from(["group".to_string()])
HashSet::from(["matcher".to_string()])
);
assert!(get_referrers(&config, "group")?.is_empty(),);
Ok(())
}
@ -309,10 +238,9 @@ mod tests {
fn test_ensure_unused() {
let config = prepare_config().unwrap();
assert!(ensure_unused(&config, "filter").is_err());
assert!(ensure_unused(&config, "gotify").is_err());
assert!(ensure_unused(&config, "sendmail").is_err());
assert!(ensure_unused(&config, "group").is_ok());
assert!(ensure_unused(&config, "matcher").is_ok());
}
#[test]
@ -329,6 +257,5 @@ mod tests {
let config = prepare_config().unwrap();
assert!(ensure_endpoints_exist(&config, &vec!["sendmail", "gotify"]).is_ok());
assert!(ensure_endpoints_exist(&config, &vec!["group", "filter"]).is_err());
}
}

View File

@ -35,17 +35,11 @@ pub fn get_endpoint(config: &Config, name: &str) -> Result<SendmailConfig, HttpE
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - an entity with the same name already exists (`400 Bad request`)
/// - a referenced filter does not exist (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
/// - mailto *and* mailto_user are both set to `None`
pub fn add_endpoint(config: &mut Config, endpoint: &SendmailConfig) -> Result<(), HttpError> {
super::ensure_unique(config, &endpoint.name)?;
if let Some(filter) = &endpoint.filter {
// Check if filter exists
super::filter::get_filter(config, filter)?;
}
if endpoint.mailto.is_none() && endpoint.mailto_user.is_none() {
http_bail!(
BAD_REQUEST,
@ -70,7 +64,6 @@ pub fn add_endpoint(config: &mut Config, endpoint: &SendmailConfig) -> Result<()
/// The caller is responsible for any needed permission checks.
/// The caller also responsible for locking the configuration files.
/// Returns a `HttpError` if:
/// - a referenced filter does not exist (`400 Bad request`)
/// - the configuration could not be saved (`500 Internal server error`)
/// - mailto *and* mailto_user are both set to `None`
pub fn update_endpoint(
@ -90,7 +83,6 @@ pub fn update_endpoint(
DeleteableSendmailProperty::FromAddress => endpoint.from_address = None,
DeleteableSendmailProperty::Author => endpoint.author = None,
DeleteableSendmailProperty::Comment => endpoint.comment = None,
DeleteableSendmailProperty::Filter => endpoint.filter = None,
DeleteableSendmailProperty::Mailto => endpoint.mailto = None,
DeleteableSendmailProperty::MailtoUser => endpoint.mailto_user = None,
}
@ -117,11 +109,6 @@ pub fn update_endpoint(
endpoint.comment = Some(comment.into());
}
if let Some(filter) = &updater.filter {
let _ = super::filter::get_filter(config, filter)?;
endpoint.filter = Some(filter.into());
}
if endpoint.mailto.is_none() && endpoint.mailto_user.is_none() {
http_bail!(
BAD_REQUEST,
@ -221,7 +208,6 @@ pub mod tests {
from_address: Some("root@example.com".into()),
author: Some("newauthor".into()),
comment: Some("new comment".into()),
filter: None,
},
None,
Some(&[0; 32]),
@ -247,7 +233,6 @@ pub mod tests {
from_address: Some("root@example.com".into()),
author: Some("newauthor".into()),
comment: Some("new comment".into()),
filter: None,
},
None,
Some(&digest),

View File

@ -5,6 +5,7 @@ use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlug
use crate::filter::{FilterConfig, FILTER_TYPENAME};
use crate::group::{GroupConfig, GROUP_TYPENAME};
use crate::matcher::{MatcherConfig, MATCHER_TYPENAME};
use crate::schema::BACKEND_NAME_SCHEMA;
use crate::Error;
@ -39,8 +40,14 @@ fn config_init() -> SectionConfig {
));
}
const GROUP_SCHEMA: &ObjectSchema = GroupConfig::API_SCHEMA.unwrap_object_schema();
const MATCHER_SCHEMA: &ObjectSchema = MatcherConfig::API_SCHEMA.unwrap_object_schema();
config.register_plugin(SectionConfigPlugin::new(
MATCHER_TYPENAME.to_string(),
Some(String::from("name")),
MATCHER_SCHEMA,
));
const GROUP_SCHEMA: &ObjectSchema = GroupConfig::API_SCHEMA.unwrap_object_schema();
config.register_plugin(SectionConfigPlugin::new(
GROUP_TYPENAME.to_string(),
Some(String::from("name")),
@ -78,9 +85,32 @@ fn private_config_init() -> SectionConfig {
pub fn config(raw_config: &str) -> Result<(SectionConfigData, [u8; 32]), Error> {
let digest = openssl::sha::sha256(raw_config.as_bytes());
let data = CONFIG
let mut data = CONFIG
.parse("notifications.cfg", raw_config)
.map_err(|err| Error::ConfigDeserialization(err.into()))?;
// TODO: Remove this once this has been in production for a while.
// 'group' and 'filter' sections are remnants of the 'old'
// notification routing approach that already hit pvetest...
// This mechanism cleans out left-over entries.
let entries: Vec<GroupConfig> = data.convert_to_typed_array("group").unwrap_or_default();
if !entries.is_empty() {
log::warn!("clearing left-over 'group' entries from notifications.cfg");
}
for entry in entries {
data.sections.remove(&entry.name);
}
let entries: Vec<FilterConfig> = data.convert_to_typed_array("filter").unwrap_or_default();
if !entries.is_empty() {
log::warn!("clearing left-over 'filter' entries from notifications.cfg");
}
for entry in entries {
data.sections.remove(&entry.name);
}
Ok((data, digest))
}

View File

@ -33,10 +33,6 @@ pub(crate) const GOTIFY_TYPENAME: &str = "gotify";
optional: true,
schema: COMMENT_SCHEMA,
},
filter: {
optional: true,
schema: ENTITY_NAME_SCHEMA,
},
}
)]
#[derive(Serialize, Deserialize, Updater, Default)]
@ -51,8 +47,9 @@ pub struct GotifyConfig {
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
/// Filter to apply
#[serde(skip_serializing_if = "Option::is_none")]
/// Deprecated.
#[serde(skip_serializing)]
#[updater(skip)]
pub filter: Option<String>,
}
@ -80,17 +77,15 @@ pub struct GotifyEndpoint {
#[serde(rename_all = "kebab-case")]
pub enum DeleteableGotifyProperty {
Comment,
Filter,
}
impl Endpoint for GotifyEndpoint {
fn send(&self, notification: &Notification) -> Result<(), Error> {
let (title, message) = match &notification.content {
Content::Template {
title_template,
body_template,
data
data,
} => {
let rendered_title =
renderer::render_template(TemplateRenderer::Plaintext, title_template, data)?;
@ -108,7 +103,7 @@ impl Endpoint for GotifyEndpoint {
let body = json!({
"title": &title,
"message": &message,
"priority": severity_to_priority(notification.severity),
"priority": severity_to_priority(notification.metadata.severity),
"extras": {
"client::display": {
"contentType": "text/markdown"
@ -152,8 +147,4 @@ impl Endpoint for GotifyEndpoint {
fn name(&self) -> &str {
&self.config.name
}
fn filter(&self) -> Option<&str> {
self.config.filter.as_deref()
}
}

View File

@ -35,10 +35,6 @@ pub(crate) const SENDMAIL_TYPENAME: &str = "sendmail";
optional: true,
schema: COMMENT_SCHEMA,
},
filter: {
optional: true,
schema: ENTITY_NAME_SCHEMA,
},
},
)]
#[derive(Debug, Serialize, Deserialize, Updater, Default)]
@ -63,8 +59,9 @@ pub struct SendmailConfig {
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
/// Filter to apply
#[serde(skip_serializing_if = "Option::is_none")]
/// Deprecated.
#[serde(skip_serializing)]
#[updater(skip)]
pub filter: Option<String>,
}
@ -74,7 +71,6 @@ pub enum DeleteableSendmailProperty {
FromAddress,
Author,
Comment,
Filter,
Mailto,
MailtoUser,
}
@ -144,8 +140,4 @@ impl Endpoint for SendmailEndpoint {
fn name(&self) -> &str {
&self.config.name
}
fn filter(&self) -> Option<&str> {
self.config.filter.as_deref()
}
}

View File

@ -1,202 +1,23 @@
use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use proxmox_schema::api_types::COMMENT_SCHEMA;
use proxmox_schema::{api, Updater};
use proxmox_schema::api;
use crate::schema::ENTITY_NAME_SCHEMA;
use crate::{Error, Notification, Severity};
pub const FILTER_TYPENAME: &str = "filter";
#[api]
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
pub enum FilterModeOperator {
/// All filter properties have to match (AND)
#[default]
And,
/// At least one filter property has to match (OR)
Or,
}
impl FilterModeOperator {
/// Apply the mode operator to two bools, lhs and rhs
fn apply(&self, lhs: bool, rhs: bool) -> bool {
match self {
FilterModeOperator::And => lhs && rhs,
FilterModeOperator::Or => lhs || rhs,
}
}
fn neutral_element(&self) -> bool {
match self {
FilterModeOperator::And => true,
FilterModeOperator::Or => false,
}
}
}
pub(crate) const FILTER_TYPENAME: &str = "filter";
#[api(
properties: {
name: {
schema: ENTITY_NAME_SCHEMA,
},
comment: {
optional: true,
schema: COMMENT_SCHEMA,
},
})]
#[derive(Debug, Serialize, Deserialize, Updater, Default)]
additional_properties: true,
)]
#[derive(Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
/// Config for Sendmail notification endpoints
/// Config for the old filter system - can be removed at some point.
pub struct FilterConfig {
/// Name of the filter
#[updater(skip)]
/// Name of the group
pub name: String,
/// Minimum severity to match
#[serde(skip_serializing_if = "Option::is_none")]
pub min_severity: Option<Severity>,
/// Choose between 'and' and 'or' for when multiple properties are specified
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<FilterModeOperator>,
/// Invert match of the whole filter
#[serde(skip_serializing_if = "Option::is_none")]
pub invert_match: Option<bool>,
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DeleteableFilterProperty {
MinSeverity,
Mode,
InvertMatch,
Comment,
}
/// A caching, lazily-evaluating notification filter. Parameterized with the notification itself,
/// since there are usually multiple filters to check for a single notification that is to be sent.
pub(crate) struct FilterMatcher<'a> {
filters: HashMap<&'a str, &'a FilterConfig>,
cached_results: HashMap<&'a str, bool>,
notification: &'a Notification,
}
impl<'a> FilterMatcher<'a> {
pub(crate) fn new(filters: &'a [FilterConfig], notification: &'a Notification) -> Self {
let filters = filters.iter().map(|f| (f.name.as_str(), f)).collect();
Self {
filters,
cached_results: Default::default(),
notification,
}
}
/// Check if the notification that was used to instantiate Self matches a given filter
pub(crate) fn check_filter_match(&mut self, filter_name: &str) -> Result<bool, Error> {
let mut visited = HashSet::new();
self.do_check_filter(filter_name, &mut visited)
}
fn do_check_filter(
&mut self,
filter_name: &str,
visited: &mut HashSet<String>,
) -> Result<bool, Error> {
if visited.contains(filter_name) {
return Err(Error::FilterFailed(format!(
"recursive filter definition: {filter_name}"
)));
}
if let Some(is_match) = self.cached_results.get(filter_name) {
return Ok(*is_match);
}
visited.insert(filter_name.into());
let filter_config =
self.filters.get(filter_name).copied().ok_or_else(|| {
Error::FilterFailed(format!("filter '{filter_name}' does not exist"))
})?;
let invert_match = filter_config.invert_match.unwrap_or_default();
let mode_operator = filter_config.mode.unwrap_or_default();
let mut notification_matches = mode_operator.neutral_element();
notification_matches = mode_operator.apply(
notification_matches,
self.check_severity_match(filter_config, mode_operator),
);
Ok(notification_matches != invert_match)
}
fn check_severity_match(
&self,
filter_config: &FilterConfig,
mode_operator: FilterModeOperator,
) -> bool {
if let Some(min_severity) = filter_config.min_severity {
self.notification.severity >= min_severity
} else {
mode_operator.neutral_element()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{config, Content};
fn parse_filters(config: &str) -> Result<Vec<FilterConfig>, Error> {
let (config, _) = config::config(config)?;
Ok(config.convert_to_typed_array("filter").unwrap())
}
fn empty_notification_with_severity(severity: Severity) -> Notification {
Notification {
content: Content::Template {
title_template: String::new(),
body_template: String::new(),
data: Default::default(),
},
severity,
}
}
#[test]
fn test_trivial_severity_filters() -> Result<(), Error> {
let config = "
filter: test
min-severity warning
";
let filters = parse_filters(config)?;
let is_match = |severity| {
let notification = empty_notification_with_severity(severity);
let mut results = FilterMatcher::new(&filters, &notification);
results.check_filter_match("test")
};
assert!(is_match(Severity::Warning)?);
assert!(!is_match(Severity::Notice)?);
assert!(is_match(Severity::Error)?);
Ok(())
}
}

View File

@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};
use proxmox_schema::api_types::COMMENT_SCHEMA;
use proxmox_schema::{api, Updater};
use proxmox_schema::api;
use crate::schema::ENTITY_NAME_SCHEMA;
@ -9,43 +8,16 @@ pub(crate) const GROUP_TYPENAME: &str = "group";
#[api(
properties: {
"endpoint": {
type: Array,
items: {
description: "Name of the included endpoint(s)",
type: String,
},
},
comment: {
optional: true,
schema: COMMENT_SCHEMA,
},
filter: {
optional: true,
name: {
schema: ENTITY_NAME_SCHEMA,
},
},
additional_properties: true,
)]
#[derive(Debug, Serialize, Deserialize, Updater, Default)]
#[derive(Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
/// Config for notification channels
/// Config for the old target groups - can be removed at some point.
pub struct GroupConfig {
/// Name of the channel
#[updater(skip)]
/// Name of the group
pub name: String,
/// Endpoints for this channel
pub endpoint: Vec<String>,
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
/// Filter to apply
#[serde(skip_serializing_if = "Option::is_none")]
pub filter: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DeleteableGroupProperty {
Comment,
Filter,
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt::Display;
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -9,15 +10,14 @@ use serde_json::Value;
use proxmox_schema::api;
use proxmox_section_config::SectionConfigData;
pub mod filter;
use filter::{FilterConfig, FilterMatcher, FILTER_TYPENAME};
pub mod group;
use group::{GroupConfig, GROUP_TYPENAME};
pub mod matcher;
use matcher::{MatcherConfig, MATCHER_TYPENAME};
pub mod api;
pub mod context;
pub mod endpoints;
pub mod filter;
pub mod group;
pub mod renderer;
pub mod schema;
@ -104,6 +104,30 @@ pub enum Severity {
Error,
}
impl Display for Severity {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
match self {
Severity::Info => f.write_str("info"),
Severity::Notice => f.write_str("notice"),
Severity::Warning => f.write_str("warning"),
Severity::Error => f.write_str("error"),
}
}
}
impl FromStr for Severity {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Error> {
match s {
"info" => Ok(Self::Info),
"notice" => Ok(Self::Notice),
"warning" => Ok(Self::Warning),
"error" => Ok(Self::Error),
_ => Err(Error::Generic(format!("invalid severity {s}"))),
}
}
}
/// Notification endpoint trait, implemented by all endpoint plugins
pub trait Endpoint {
/// Send a documentation
@ -111,9 +135,6 @@ pub trait Endpoint {
/// The name/identifier for this endpoint
fn name(&self) -> &str;
/// The name of the filter to use
fn filter(&self) -> Option<&str>;
}
#[derive(Debug, Clone)]
@ -130,12 +151,20 @@ pub enum Content {
}
#[derive(Debug, Clone)]
/// Notification which can be sent
pub struct Notification {
pub struct Metadata {
/// Notification severity
severity: Severity,
/// Additional fields for additional key-value metadata
additional_fields: HashMap<String, String>,
}
#[derive(Debug, Clone)]
/// Notification which can be sent
pub struct Notification {
/// Notification content
content: Content,
/// Metadata
metadata: Metadata,
}
impl Notification {
@ -143,14 +172,18 @@ impl Notification {
severity: Severity,
title: S,
body: S,
properties: Value,
template_data: Value,
fields: HashMap<String, String>,
) -> Self {
Self {
metadata: Metadata {
severity,
additional_fields: fields,
},
content: Content::Template {
title_template: title.as_ref().to_string(),
body_template: body.as_ref().to_string(),
data: properties,
data: template_data,
},
}
}
@ -198,8 +231,7 @@ impl Config {
#[derive(Default)]
pub struct Bus {
endpoints: HashMap<String, Box<dyn Endpoint>>,
groups: HashMap<String, GroupConfig>,
filters: Vec<FilterConfig>,
matchers: Vec<MatcherConfig>,
}
#[allow(unused_macros)]
@ -304,23 +336,14 @@ impl Bus {
);
}
let groups: HashMap<String, GroupConfig> = config
let matchers = config
.config
.convert_to_typed_array(GROUP_TYPENAME)
.map_err(|err| Error::ConfigDeserialization(err.into()))?
.into_iter()
.map(|group: GroupConfig| (group.name.clone(), group))
.collect();
let filters = config
.config
.convert_to_typed_array(FILTER_TYPENAME)
.convert_to_typed_array(MATCHER_TYPENAME)
.map_err(|err| Error::ConfigDeserialization(err.into()))?;
Ok(Bus {
endpoints,
groups,
filters,
matchers,
})
}
@ -330,65 +353,20 @@ impl Bus {
}
#[cfg(test)]
pub fn add_group(&mut self, group: GroupConfig) {
self.groups.insert(group.name.clone(), group);
pub fn add_matcher(&mut self, filter: MatcherConfig) {
self.matchers.push(filter)
}
#[cfg(test)]
pub fn add_filter(&mut self, filter: FilterConfig) {
self.filters.push(filter)
}
/// Send a notification to a given target (endpoint or group).
/// Send a notification. Notification matchers will determine which targets will receive
/// the notification.
///
/// Any errors will not be returned but only logged.
pub fn send(&self, endpoint_or_group: &str, notification: &Notification) {
let mut filter_matcher = FilterMatcher::new(&self.filters, notification);
pub fn send(&self, notification: &Notification) {
let targets = matcher::check_matches(self.matchers.as_slice(), notification);
if let Some(group) = self.groups.get(endpoint_or_group) {
if !Bus::check_filter(&mut filter_matcher, group.filter.as_deref()) {
log::info!("skipped target '{endpoint_or_group}', filter did not match");
return;
}
log::info!("target '{endpoint_or_group}' is a group, notifying all members...");
for endpoint in &group.endpoint {
self.send_via_single_endpoint(endpoint, notification, &mut filter_matcher);
}
} else {
self.send_via_single_endpoint(endpoint_or_group, notification, &mut filter_matcher);
}
}
fn check_filter(filter_matcher: &mut FilterMatcher, filter: Option<&str>) -> bool {
if let Some(filter) = filter {
match filter_matcher.check_filter_match(filter) {
// If the filter does not match, do nothing
Ok(r) => r,
Err(err) => {
// If there is an error, only log it and still send
log::error!("could not apply filter '{filter}': {err}");
true
}
}
} else {
true
}
}
fn send_via_single_endpoint(
&self,
endpoint: &str,
notification: &Notification,
filter_matcher: &mut FilterMatcher,
) {
if let Some(endpoint) = self.endpoints.get(endpoint) {
for target in targets {
if let Some(endpoint) = self.endpoints.get(target) {
let name = endpoint.name();
if !Bus::check_filter(filter_matcher, endpoint.filter()) {
log::info!("skipped target '{name}', filter did not match");
return;
}
match endpoint.send(notification) {
Ok(_) => {
@ -400,7 +378,8 @@ impl Bus {
}
}
} else {
log::error!("could not notify via target '{endpoint}', it does not exist");
log::error!("could not notify via target '{target}', it does not exist");
}
}
}
@ -410,7 +389,11 @@ impl Bus {
/// any errors to the caller.
pub fn test_target(&self, target: &str) -> Result<(), Error> {
let notification = Notification {
metadata: Metadata {
severity: Severity::Info,
// TODO: what fields would make sense for test notifications?
additional_fields: Default::default(),
},
content: Content::Template {
title_template: "Test notification".into(),
body_template: "This is a test of the notification target '{{ target }}'".into(),
@ -418,30 +401,11 @@ impl Bus {
},
};
let mut errors: Vec<Box<dyn StdError + Send + Sync>> = Vec::new();
let mut my_send = |target: &str| -> Result<(), Error> {
if let Some(endpoint) = self.endpoints.get(target) {
if let Err(e) = endpoint.send(&notification) {
errors.push(Box::new(e));
}
endpoint.send(&notification)?;
} else {
return Err(Error::TargetDoesNotExist(target.to_string()));
}
Ok(())
};
if let Some(group) = self.groups.get(target) {
for endpoint_name in &group.endpoint {
my_send(endpoint_name)?;
}
} else {
my_send(target)?;
}
if !errors.is_empty() {
return Err(Error::TargetTestFailed(errors));
}
Ok(())
}
@ -459,7 +423,6 @@ mod tests {
// Needs to be an Rc so that we can clone MockEndpoint before
// passing it to Bus, while still retaining a handle to the Vec
messages: Rc<RefCell<Vec<Notification>>>,
filter: Option<String>,
}
impl Endpoint for MockEndpoint {
@ -472,17 +435,12 @@ mod tests {
fn name(&self) -> &str {
self.name
}
fn filter(&self) -> Option<&str> {
self.filter.as_deref()
}
}
impl MockEndpoint {
fn new(name: &'static str, filter: Option<String>) -> Self {
fn new(name: &'static str) -> Self {
Self {
name,
filter,
..Default::default()
}
}
@ -494,16 +452,26 @@ mod tests {
#[test]
fn test_add_mock_endpoint() -> Result<(), Error> {
let mock = MockEndpoint::new("endpoint", None);
let mock = MockEndpoint::new("endpoint");
let mut bus = Bus::default();
bus.add_endpoint(Box::new(mock.clone()));
let matcher = MatcherConfig {
target: Some(vec!["endpoint".into()]),
..Default::default()
};
bus.add_matcher(matcher);
// Send directly to endpoint
bus.send(
"endpoint",
&Notification::new_templated(Severity::Info, "Title", "Body", Default::default()),
);
bus.send(&Notification::new_templated(
Severity::Info,
"Title",
"Body",
Default::default(),
Default::default(),
));
let messages = mock.messages();
assert_eq!(messages.len(), 1);
@ -511,96 +479,39 @@ mod tests {
}
#[test]
fn test_groups() -> Result<(), Error> {
let endpoint1 = MockEndpoint::new("mock1", None);
let endpoint2 = MockEndpoint::new("mock2", None);
let mut bus = Bus::default();
bus.add_group(GroupConfig {
name: "group1".to_string(),
endpoint: vec!["mock1".into()],
comment: None,
filter: None,
});
bus.add_group(GroupConfig {
name: "group2".to_string(),
endpoint: vec!["mock2".into()],
comment: None,
filter: None,
});
bus.add_endpoint(Box::new(endpoint1.clone()));
bus.add_endpoint(Box::new(endpoint2.clone()));
let send_to_group = |channel| {
let notification =
Notification::new_templated(Severity::Info, "Title", "Body", Default::default());
bus.send(channel, &notification)
};
send_to_group("group1");
assert_eq!(endpoint1.messages().len(), 1);
assert_eq!(endpoint2.messages().len(), 0);
send_to_group("group2");
assert_eq!(endpoint1.messages().len(), 1);
assert_eq!(endpoint2.messages().len(), 1);
Ok(())
}
#[test]
fn test_severity_ordering() {
// Not intended to be exhaustive, just a quick
// sanity check ;)
assert!(Severity::Info < Severity::Notice);
assert!(Severity::Info < Severity::Warning);
assert!(Severity::Info < Severity::Error);
assert!(Severity::Error > Severity::Warning);
assert!(Severity::Warning > Severity::Notice);
}
#[test]
fn test_multiple_endpoints_with_different_filters() -> Result<(), Error> {
let endpoint1 = MockEndpoint::new("mock1", Some("filter1".into()));
let endpoint2 = MockEndpoint::new("mock2", Some("filter2".into()));
fn test_multiple_endpoints_with_different_matchers() -> Result<(), Error> {
let endpoint1 = MockEndpoint::new("mock1");
let endpoint2 = MockEndpoint::new("mock2");
let mut bus = Bus::default();
bus.add_endpoint(Box::new(endpoint1.clone()));
bus.add_endpoint(Box::new(endpoint2.clone()));
bus.add_group(GroupConfig {
name: "channel1".to_string(),
endpoint: vec!["mock1".into(), "mock2".into()],
comment: None,
filter: None,
bus.add_matcher(MatcherConfig {
name: "matcher1".into(),
match_severity: Some(vec!["warning,error".parse()?]),
target: Some(vec!["mock1".into()]),
..Default::default()
});
bus.add_filter(FilterConfig {
name: "filter1".into(),
min_severity: Some(Severity::Warning),
mode: None,
invert_match: None,
comment: None,
});
bus.add_filter(FilterConfig {
name: "filter2".into(),
min_severity: Some(Severity::Error),
mode: None,
invert_match: None,
comment: None,
bus.add_matcher(MatcherConfig {
name: "matcher2".into(),
match_severity: Some(vec!["error".parse()?]),
target: Some(vec!["mock2".into()]),
..Default::default()
});
let send_with_severity = |severity| {
let notification =
Notification::new_templated(severity, "Title", "Body", Default::default());
let notification = Notification::new_templated(
severity,
"Title",
"Body",
Default::default(),
Default::default(),
);
bus.send("channel1", &notification);
bus.send(&notification);
};
send_with_severity(Severity::Info);

View File

@ -0,0 +1,395 @@
use regex::Regex;
use std::collections::HashSet;
use std::fmt;
use std::fmt::Debug;
use std::str::FromStr;
use serde::{Deserialize, Serialize};
use proxmox_schema::api_types::COMMENT_SCHEMA;
use proxmox_schema::{
api, const_regex, ApiStringFormat, Schema, StringSchema, Updater, SAFE_ID_REGEX_STR,
};
use crate::schema::ENTITY_NAME_SCHEMA;
use crate::{Error, Notification, Severity};
pub const MATCHER_TYPENAME: &str = "matcher";
#[api]
#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
pub enum MatchModeOperator {
/// All match statements have to match (AND)
#[default]
All,
/// At least one filter property has to match (OR)
Any,
}
impl MatchModeOperator {
/// Apply the mode operator to two bools, lhs and rhs
fn apply(&self, lhs: bool, rhs: bool) -> bool {
match self {
MatchModeOperator::All => lhs && rhs,
MatchModeOperator::Any => lhs || rhs,
}
}
// https://en.wikipedia.org/wiki/Identity_element
fn neutral_element(&self) -> bool {
match self {
MatchModeOperator::All => true,
MatchModeOperator::Any => false,
}
}
}
const_regex! {
pub MATCH_FIELD_ENTRY_REGEX = concat!(r"^(?:(exact|regex):)?(", SAFE_ID_REGEX_STR!(), r")=(.*)$");
}
pub const MATCH_FIELD_ENTRY_FORMAT: ApiStringFormat =
ApiStringFormat::VerifyFn(verify_field_matcher);
fn verify_field_matcher(s: &str) -> Result<(), anyhow::Error> {
let _: FieldMatcher = s.parse()?;
Ok(())
}
pub const MATCH_FIELD_ENTRY_SCHEMA: Schema = StringSchema::new("Match metadata field.")
.format(&MATCH_FIELD_ENTRY_FORMAT)
.min_length(1)
.max_length(1024)
.schema();
#[api(
properties: {
name: {
schema: ENTITY_NAME_SCHEMA,
},
comment: {
optional: true,
schema: COMMENT_SCHEMA,
},
"match-field": {
type: Array,
items: {
description: "Fields to match",
type: String
},
optional: true,
},
"match-severity": {
type: Array,
items: {
description: "Severity level to match.",
type: String
},
optional: true,
},
"target": {
type: Array,
items: {
schema: ENTITY_NAME_SCHEMA,
},
optional: true,
},
})]
#[derive(Debug, Serialize, Deserialize, Updater, Default)]
#[serde(rename_all = "kebab-case")]
/// Config for Sendmail notification endpoints
pub struct MatcherConfig {
/// Name of the matcher
#[updater(skip)]
pub name: String,
/// List of matched metadata fields
#[serde(skip_serializing_if = "Option::is_none")]
pub match_field: Option<Vec<FieldMatcher>>,
/// List of matched severity levels
#[serde(skip_serializing_if = "Option::is_none")]
pub match_severity: Option<Vec<SeverityMatcher>>,
/// Decide if 'all' or 'any' match statements must match
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<MatchModeOperator>,
/// Invert match of the whole filter
#[serde(skip_serializing_if = "Option::is_none")]
pub invert_match: Option<bool>,
/// Targets to notify
#[serde(skip_serializing_if = "Option::is_none")]
pub target: Option<Vec<String>>,
/// Comment
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}
#[derive(Clone, Debug)]
pub enum FieldMatcher {
Exact {
field: String,
matched_value: String,
},
Regex {
field: String,
matched_regex: Regex,
},
}
proxmox_serde::forward_deserialize_to_from_str!(FieldMatcher);
proxmox_serde::forward_serialize_to_display!(FieldMatcher);
impl FieldMatcher {
fn matches(&self, notification: &Notification) -> bool {
match self {
FieldMatcher::Exact {
field,
matched_value,
} => {
let value = notification.metadata.additional_fields.get(field);
if let Some(value) = value {
matched_value == value
} else {
// Metadata field does not exist, so we do not match
false
}
}
FieldMatcher::Regex {
field,
matched_regex,
} => {
let value = notification.metadata.additional_fields.get(field);
if let Some(value) = value {
matched_regex.is_match(value)
} else {
// Metadata field does not exist, so we do not match
false
}
}
}
}
}
impl fmt::Display for FieldMatcher {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Attention, Display is used to implement Serialize, do not
// change the format.
match self {
FieldMatcher::Exact {
field,
matched_value,
} => {
write!(f, "exact:{field}={matched_value}")
}
FieldMatcher::Regex {
field,
matched_regex,
} => {
let re = matched_regex.as_str();
write!(f, "regex:{field}={re}")
}
}
}
}
impl FromStr for FieldMatcher {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Error> {
if !MATCH_FIELD_ENTRY_REGEX.is_match(s) {
return Err(Error::FilterFailed(format!(
"invalid match-field statement: {s}"
)));
}
if let Some(remaining) = s.strip_prefix("regex:") {
match remaining.split_once('=') {
None => Err(Error::FilterFailed(format!(
"invalid match-field statement: {s}"
))),
Some((field, expected_value_regex)) => {
let regex = Regex::new(expected_value_regex)
.map_err(|err| Error::FilterFailed(format!("invalid regex: {err}")))?;
Ok(Self::Regex {
field: field.into(),
matched_regex: regex,
})
}
}
} else if let Some(remaining) = s.strip_prefix("exact:") {
match remaining.split_once('=') {
None => Err(Error::FilterFailed(format!(
"invalid match-field statement: {s}"
))),
Some((field, expected_value)) => Ok(Self::Exact {
field: field.into(),
matched_value: expected_value.into(),
}),
}
} else {
Err(Error::FilterFailed(format!(
"invalid match-field statement: {s}"
)))
}
}
}
impl MatcherConfig {
pub fn matches(&self, notification: &Notification) -> Result<Option<&[String]>, Error> {
let mode = self.mode.unwrap_or_default();
let mut is_match = mode.neutral_element();
is_match = mode.apply(is_match, self.check_severity_match(notification));
is_match = mode.apply(is_match, self.check_field_match(notification)?);
let invert_match = self.invert_match.unwrap_or_default();
Ok(if is_match != invert_match {
Some(self.target.as_deref().unwrap_or_default())
} else {
None
})
}
fn check_field_match(&self, notification: &Notification) -> Result<bool, Error> {
let mode = self.mode.unwrap_or_default();
let mut is_match = mode.neutral_element();
if let Some(match_field) = self.match_field.as_deref() {
for field_matcher in match_field {
// let field_matcher: FieldMatcher = match_stmt.parse()?;
is_match = mode.apply(is_match, field_matcher.matches(notification));
}
}
Ok(is_match)
}
fn check_severity_match(&self, notification: &Notification) -> bool {
let mode = self.mode.unwrap_or_default();
let mut is_match = mode.neutral_element();
if let Some(matchers) = self.match_severity.as_ref() {
for severity_matcher in matchers {
is_match = mode.apply(is_match, severity_matcher.matches(notification));
}
}
is_match
}
}
#[derive(Clone, Debug)]
pub struct SeverityMatcher {
severities: Vec<Severity>,
}
proxmox_serde::forward_deserialize_to_from_str!(SeverityMatcher);
proxmox_serde::forward_serialize_to_display!(SeverityMatcher);
impl SeverityMatcher {
fn matches(&self, notification: &Notification) -> bool {
self.severities.contains(&notification.metadata.severity)
}
}
impl fmt::Display for SeverityMatcher {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let severities: Vec<String> = self.severities.iter().map(|s| format!("{s}")).collect();
f.write_str(&severities.join(","))
}
}
impl FromStr for SeverityMatcher {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Error> {
let mut severities = Vec::new();
for element in s.split(',') {
let element = element.trim();
let severity: Severity = element.parse()?;
severities.push(severity)
}
Ok(Self { severities })
}
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DeleteableMatcherProperty {
MatchSeverity,
MatchField,
Target,
Mode,
InvertMatch,
Comment,
}
pub fn check_matches<'a>(
matchers: &'a [MatcherConfig],
notification: &Notification,
) -> HashSet<&'a str> {
let mut targets = HashSet::new();
for matcher in matchers {
match matcher.matches(notification) {
Ok(t) => {
let t = t.unwrap_or_default();
targets.extend(t.iter().map(|s| s.as_str()));
}
Err(err) => log::error!("matcher '{matcher}' failed: {err}", matcher = matcher.name),
}
}
targets
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
use std::collections::HashMap;
#[test]
fn test_matching() {
let mut fields = HashMap::new();
fields.insert("foo".into(), "bar".into());
let notification =
Notification::new_templated(Severity::Notice, "test", "test", Value::Null, fields);
let matcher: FieldMatcher = "exact:foo=bar".parse().unwrap();
assert!(matcher.matches(&notification));
let matcher: FieldMatcher = "regex:foo=b.*".parse().unwrap();
assert!(matcher.matches(&notification));
let matcher: FieldMatcher = "regex:notthere=b.*".parse().unwrap();
assert!(!matcher.matches(&notification));
assert!("regex:'3=b.*".parse::<FieldMatcher>().is_err());
assert!("invalid:'bar=b.*".parse::<FieldMatcher>().is_err());
}
#[test]
fn test_severities() {
let notification = Notification::new_templated(
Severity::Notice,
"test",
"test",
Value::Null,
Default::default(),
);
let matcher: SeverityMatcher = "info,notice,warning,error".parse().unwrap();
assert!(matcher.matches(&notification));
}
}

View File

@ -19,8 +19,7 @@ pub const BACKEND_NAME_SCHEMA: Schema = StringSchema::new("Notification backend
.max_length(32)
.schema();
pub const ENTITY_NAME_SCHEMA: Schema =
StringSchema::new("Name schema for endpoints, filters and groups")
pub const ENTITY_NAME_SCHEMA: Schema = StringSchema::new("Name schema for targets and matchers")
.format(&SAFE_ID_FORMAT)
.min_length(2)
.max_length(32)