log: introduce a shareable LogContext struct

Since hyper can spawn() more tasks, when we stop passing `WorkerTask`
references down the stack, we still need to be able to *inherit* the
current logging context. Hyper provides a way to replace its used
`spawn()` method, so we need to provide a way to reuse the logging
context.

Instead of having the `FileLogger` and warn counter separately
available with local-only access, put them behind an Arc<Mutex<>>.
Previously they already *were* behind an Arc<Mutex<>> as part of the
WorkerTaskState.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-07-11 13:54:12 +02:00
parent 847a57740b
commit 4b9c907b68
3 changed files with 98 additions and 45 deletions

View File

@ -1,11 +1,11 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![deny(unsafe_op_in_unsafe_fn)]
use std::{
cell::{Cell, RefCell},
env,
};
use std::env;
use std::future::Future;
use std::sync::{Arc, Mutex};
use tokio::task::futures::TaskLocalFuture;
use tracing::Level;
use tracing_log::{AsLog, LogTracer};
use tracing_subscriber::filter::{filter_fn, LevelFilter};
@ -34,8 +34,7 @@ pub use tracing::warn;
pub use tracing::warn_span;
tokio::task_local! {
pub static LOGGER: RefCell<FileLogger>;
pub static WARN_COUNTER: Cell<u64>;
static LOG_CONTEXT: LogContext;
}
pub fn init_logger(
@ -55,7 +54,7 @@ pub fn init_logger(
.expect("Unable to open syslog")
.with_filter(log_level)
.with_filter(filter_fn(|metadata| {
LOGGER.try_with(|_| {}).is_err() || *metadata.level() == Level::ERROR
LogContext::exists() || *metadata.level() == Level::ERROR
})),
)
.with(TasklogLayer {}.with_filter(log_level));
@ -64,3 +63,66 @@ pub fn init_logger(
LogTracer::init_with_filter(log_level.as_log())?;
Ok(())
}
/// A file logger and warnings counter which can be used across a scope for separate logging.
/// Mainly used for worker-task logging.
pub struct FileLogState {
pub warn_count: u64,
pub logger: FileLogger,
}
impl FileLogState {
fn new(logger: FileLogger) -> Self {
Self {
warn_count: 0,
logger,
}
}
}
/// A log context can be set for a sync or asynchronous scope to cause log messages to be added to
/// a [`FileLogger`].
#[derive(Clone)]
pub struct LogContext {
logger: Arc<Mutex<FileLogState>>,
}
impl LogContext {
/// Create a logging context for a [`FileLogger`].
pub fn new(logger: FileLogger) -> Self {
Self {
logger: Arc::new(Mutex::new(FileLogState::new(logger))),
}
}
/// Check to see if a log context exists without getting a strong reference to it.
pub fn exists() -> bool {
LOG_CONTEXT.try_with(|_| ()).is_ok()
}
/// Get the current logging context if set.
pub fn current() -> Option<Self> {
LOG_CONTEXT.try_with(|ctx| ctx.clone()).ok()
}
/// Run a task with a new logger and an initial warn counter of zero.
pub fn sync_scope<F, R>(self, func: F) -> R
where
F: FnOnce() -> R,
{
LOG_CONTEXT.sync_scope(self, func)
}
/// Run a task with a new logger and an initial warn counter of zero.
pub fn scope<F>(self, f: F) -> TaskLocalFuture<Self, F>
where
F: Future,
{
LOG_CONTEXT.scope(self, f)
}
/// Access the internal state.
pub fn state(&self) -> &Arc<Mutex<FileLogState>> {
&self.logger
}
}

View File

@ -8,34 +8,31 @@ use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use crate::FileLogger;
use crate::LOGGER;
use crate::WARN_COUNTER;
use crate::{FileLogState, LogContext};
pub struct TasklogLayer;
impl<S: Subscriber> Layer<S> for TasklogLayer {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let _result = LOGGER.try_with(|logger| {
if let Some(ctx) = LogContext::current() {
let mut logger = ctx.logger.lock().unwrap();
let mut buf = String::new();
event.record(&mut EventVisitor::new(&mut buf));
let level = event.metadata().level();
log_to_file(&mut logger.borrow_mut(), level, &buf);
});
log_to_file(&mut logger, level, &buf);
}
}
}
fn log_to_file(logger: &mut FileLogger, level: &Level, buf: &String) {
fn log_to_file(logger: &mut FileLogState, level: &Level, buf: &String) {
match *level {
Level::ERROR | Level::WARN => {
WARN_COUNTER.with(|counter| {
counter.set(counter.get() + 1);
});
logger.log(buf);
logger.warn_count += 1;
logger.logger.log(buf);
}
Level::INFO => logger.log(buf),
Level::DEBUG => logger.log(format!("DEBUG: {buf}")),
Level::TRACE => logger.log(format!("TRACE: {buf}")),
Level::INFO => logger.logger.log(buf),
Level::DEBUG => logger.logger.log(format!("DEBUG: {buf}")),
Level::TRACE => logger.logger.log(format!("TRACE: {buf}")),
};
}

View File

@ -1,4 +1,3 @@
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
@ -20,7 +19,7 @@ use tokio::sync::oneshot;
use tracing::{info, warn};
use proxmox_lang::try_block;
use proxmox_log::{FileLogOptions, FileLogger, LOGGER, WARN_COUNTER};
use proxmox_log::{FileLogOptions, FileLogger, LogContext};
use proxmox_schema::upid::UPID;
use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
use proxmox_sys::linux::procfs;
@ -885,15 +884,9 @@ impl WorkerTask {
let upid_str = worker.upid.to_string();
let f = f(worker.clone());
let logger = RefCell::new(logger);
let counter = Cell::new(0);
tokio::spawn(LOGGER.scope(logger, async move {
WARN_COUNTER
.scope(counter, async move {
let result = f.await;
worker.log_result(&result);
})
.await;
tokio::spawn(LogContext::new(logger).scope(async move {
let result = f.await;
worker.log_result(&result);
}));
Ok(upid_str)
@ -916,20 +909,18 @@ impl WorkerTask {
let _child = std::thread::Builder::new()
.name(upid_str.clone())
.spawn(move || {
LOGGER.sync_scope(RefCell::new(logger), || {
WARN_COUNTER.sync_scope(Cell::new(0), || {
let worker1 = worker.clone();
LogContext::new(logger).sync_scope(|| {
let worker1 = worker.clone();
let result = match std::panic::catch_unwind(move || f(worker1)) {
Ok(r) => r,
Err(panic) => match panic.downcast::<&str>() {
Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
Err(_) => Err(format_err!("worker panicked: unknown type.")),
},
};
let result = match std::panic::catch_unwind(move || f(worker1)) {
Ok(r) => r,
Err(panic) => match panic.downcast::<&str>() {
Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
Err(_) => Err(format_err!("worker panicked: unknown type.")),
},
};
worker.log_result(&result);
});
worker.log_result(&result);
});
});
@ -938,7 +929,10 @@ impl WorkerTask {
/// create state from self and a result
pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
let warn_count = WARN_COUNTER.try_with(Cell::get).unwrap_or(0);
let warn_count = match LogContext::current() {
Some(ctx) => ctx.state().lock().unwrap().warn_count,
None => 0,
};
let endtime = proxmox_time::epoch_i64();