notifications: allow sending notifications via proxmox_notify
- Set the context in proxmox_notify - Add helper function which queues notifications to a spool directory - Set up a worker task, running in the privileged process, which periodically checks the spool directory for queued notifications The queuing is needed because on PBS we send most if not all notifications from the proxy-process running as the `backup` user. However, to have access to the protected passwords/tokens for various notification endpoints, we need to read the notification config as root. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> Tested-by: Maximiliano Sandoval <m.sandoval@proxmox.com> Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
parent
2764be3db5
commit
57570bda1d
@ -56,6 +56,7 @@ async fn run() -> Result<(), Error> {
|
||||
proxmox_backup::server::create_state_dir()?;
|
||||
proxmox_backup::server::create_active_operations_dir()?;
|
||||
proxmox_backup::server::jobstate::create_jobstate_dir()?;
|
||||
proxmox_backup::server::notifications::create_spool_dir()?;
|
||||
proxmox_backup::tape::create_tape_status_dir()?;
|
||||
proxmox_backup::tape::create_drive_state_dir()?;
|
||||
proxmox_backup::tape::create_changer_state_dir()?;
|
||||
@ -72,6 +73,7 @@ async fn run() -> Result<(), Error> {
|
||||
let _ = csrf_secret(); // load with lazy_static
|
||||
|
||||
proxmox_backup::auth_helpers::setup_auth_context(true);
|
||||
proxmox_backup::server::notifications::init()?;
|
||||
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let mut command_sock = proxmox_rest_server::CommandSocket::new(
|
||||
@ -153,6 +155,8 @@ async fn run() -> Result<(), Error> {
|
||||
std::thread::sleep(std::time::Duration::from_secs(3));
|
||||
});
|
||||
|
||||
start_notification_worker();
|
||||
|
||||
server.await?;
|
||||
log::info!("server shutting down, waiting for active workers to complete");
|
||||
proxmox_rest_server::last_worker_future().await?;
|
||||
@ -161,3 +165,10 @@ async fn run() -> Result<(), Error> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_notification_worker() {
|
||||
let abort_future = proxmox_rest_server::shutdown_future();
|
||||
let future = Box::pin(proxmox_backup::server::notifications::notification_worker());
|
||||
let task = futures::future::select(future, abort_future);
|
||||
tokio::spawn(task);
|
||||
}
|
||||
|
@ -198,6 +198,7 @@ async fn run() -> Result<(), Error> {
|
||||
}
|
||||
|
||||
proxmox_backup::auth_helpers::setup_auth_context(false);
|
||||
proxmox_backup::server::notifications::init()?;
|
||||
|
||||
let rrd_cache = initialize_rrd_cache()?;
|
||||
rrd_cache.apply_journal()?;
|
||||
|
@ -1,20 +1,29 @@
|
||||
use anyhow::Error;
|
||||
use serde_json::json;
|
||||
use const_format::concatcp;
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use handlebars::{
|
||||
Context, Handlebars, Helper, HelperResult, Output, RenderContext, RenderError, TemplateError,
|
||||
};
|
||||
use nix::unistd::Uid;
|
||||
|
||||
use proxmox_human_byte::HumanByte;
|
||||
use proxmox_lang::try_block;
|
||||
use proxmox_notify::context::pbs::PBS_CONTEXT;
|
||||
use proxmox_schema::ApiType;
|
||||
use proxmox_sys::email::sendmail;
|
||||
use proxmox_sys::fs::{create_path, CreateOptions};
|
||||
|
||||
use pbs_api_types::{
|
||||
APTUpdateInfo, DataStoreConfig, DatastoreNotify, GarbageCollectionStatus, Notify,
|
||||
SyncJobConfig, TapeBackupJobSetup, User, Userid, VerificationJobConfig,
|
||||
};
|
||||
use proxmox_notify::{Notification, Severity};
|
||||
|
||||
const SPOOL_DIR: &str = concatcp!(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR, "/notifications");
|
||||
const GC_OK_TEMPLATE: &str = r###"
|
||||
|
||||
Datastore: {{datastore}}
|
||||
@ -283,6 +292,102 @@ lazy_static::lazy_static! {
|
||||
};
|
||||
}
|
||||
|
||||
/// Initialize the notification system by setting context in proxmox_notify
|
||||
pub fn init() -> Result<(), Error> {
|
||||
proxmox_notify::context::set_context(&PBS_CONTEXT);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create the directory which will be used to temporarily store notifications
|
||||
/// which were sent from an unprivileged process.
|
||||
pub fn create_spool_dir() -> Result<(), Error> {
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let opts = CreateOptions::new()
|
||||
.owner(backup_user.uid)
|
||||
.group(backup_user.gid);
|
||||
|
||||
create_path(SPOOL_DIR, None, Some(opts))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_queued_notifications() -> Result<(), Error> {
|
||||
let mut read_dir = tokio::fs::read_dir(SPOOL_DIR).await?;
|
||||
|
||||
let mut notifications = Vec::new();
|
||||
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
let path = entry.path();
|
||||
|
||||
if let Some(ext) = path.extension() {
|
||||
if ext == "json" {
|
||||
let p = path.clone();
|
||||
|
||||
let bytes = tokio::fs::read(p).await?;
|
||||
let notification: Notification = serde_json::from_slice(&bytes)?;
|
||||
notifications.push(notification);
|
||||
|
||||
// Currently, there is no retry-mechanism in case of failure...
|
||||
// For retries, we'd have to keep track of which targets succeeded/failed
|
||||
// to send, so we do not retry notifying a target which succeeded before.
|
||||
tokio::fs::remove_file(path).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that we send the oldest notification first
|
||||
notifications.sort_unstable_by_key(|n| n.timestamp());
|
||||
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
let config = pbs_config::notifications::config()?;
|
||||
for notification in notifications {
|
||||
if let Err(err) = proxmox_notify::api::common::send(&config, ¬ification) {
|
||||
log::error!("failed to send notification: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<(), Error>(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
if let Err(e) = res {
|
||||
log::error!("could not read notification config: {e}");
|
||||
}
|
||||
|
||||
Ok::<(), Error>(())
|
||||
}
|
||||
|
||||
/// Worker task to periodically send any queued notifications.
|
||||
pub async fn notification_worker() {
|
||||
loop {
|
||||
let delay_target = Instant::now() + Duration::from_secs(5);
|
||||
|
||||
if let Err(err) = send_queued_notifications().await {
|
||||
log::error!("notification worker task error: {err}");
|
||||
}
|
||||
|
||||
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn send_notification(notification: Notification) -> Result<(), Error> {
|
||||
if nix::unistd::ROOT == Uid::current() {
|
||||
let config = pbs_config::notifications::config()?;
|
||||
proxmox_notify::api::common::send(&config, ¬ification)?;
|
||||
} else {
|
||||
let ser = serde_json::to_vec(¬ification)?;
|
||||
let path = Path::new(SPOOL_DIR).join(format!("{id}.json", id = notification.id()));
|
||||
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let opts = CreateOptions::new()
|
||||
.owner(backup_user.uid)
|
||||
.group(backup_user.gid);
|
||||
proxmox_sys::fs::replace_file(path, &ser, opts, true)?;
|
||||
log::info!("queued notification (id={id})", id = notification.id())
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Summary of a successful Tape Job
|
||||
#[derive(Default)]
|
||||
pub struct TapeBackupJobSummary {
|
||||
|
Loading…
x
Reference in New Issue
Block a user