enable tracing logger, remove task_log macros

Enable the tracing-system by setting the LOGGER task local variable
to a instance of a FileLogger and initializing the WARN_COUNTER.
Removed the task_log! macros and some occurences.

Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Lukas Wagner <l.wagner@proxmox.com>
Signed-off-by: Gabriel Goller <g.goller@proxmox.com>
[WB: remove flog! import in doctests]
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Gabriel Goller 2024-07-09 16:20:11 +02:00 committed by Wolfgang Bumiller
parent 0550659cd1
commit ddb91a6594
8 changed files with 48 additions and 263 deletions

View File

@ -30,7 +30,7 @@ pub struct FileLogOptions {
/// #### Example: /// #### Example:
/// ``` /// ```
/// # use anyhow::{bail, format_err, Error}; /// # use anyhow::{bail, format_err, Error};
/// use proxmox_log::{flog, FileLogger, FileLogOptions}; /// use proxmox_log::{FileLogger, FileLogOptions};
/// ///
/// # std::fs::remove_file("test.log"); /// # std::fs::remove_file("test.log");
/// let options = FileLogOptions { /// let options = FileLogOptions {
@ -39,7 +39,7 @@ pub struct FileLogOptions {
/// ..Default::default() /// ..Default::default()
/// }; /// };
/// let mut log = FileLogger::new("test.log", options).unwrap(); /// let mut log = FileLogger::new("test.log", options).unwrap();
/// flog!(log, "A simple log: {}", "Hello!"); /// log.log(format!("A simple log: {}", "Hello!"));
/// # std::fs::remove_file("test.log"); /// # std::fs::remove_file("test.log");
/// ``` /// ```
pub struct FileLogger { pub struct FileLogger {
@ -48,14 +48,6 @@ pub struct FileLogger {
options: FileLogOptions, options: FileLogOptions,
} }
/// Log messages to [FileLogger] - ``println`` like macro
#[macro_export]
macro_rules! flog {
($log:expr, $($arg:tt)*) => ({
$log.log(format!($($arg)*));
})
}
impl FileLogger { impl FileLogger {
/// Create a new FileLogger. This opens the specified file and /// Create a new FileLogger. This opens the specified file and
/// stores a file descriptor. /// stores a file descriptor.

View File

@ -32,6 +32,7 @@ serde_json.workspace = true
tokio = { workspace = true, features = ["signal", "process"] } tokio = { workspace = true, features = ["signal", "process"] }
tokio-openssl.workspace = true tokio-openssl.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
tracing.workspace = true
tower-service.workspace = true tower-service.workspace = true
url.workspace = true url.workspace = true
@ -40,6 +41,7 @@ proxmox-compression.workspace = true
proxmox-http = { workspace = true, optional = true } proxmox-http = { workspace = true, optional = true }
proxmox-io.workspace = true proxmox-io.workspace = true
proxmox-lang.workspace = true proxmox-lang.workspace = true
proxmox-log.workspace = true
proxmox-router.workspace = true proxmox-router.workspace = true
proxmox-schema = { workspace = true, features = [ "api-macro", "upid-api-impl" ] } proxmox-schema = { workspace = true, features = [ "api-macro", "upid-api-impl" ] }
proxmox-sys = { workspace = true, features = [ "logrotate", "timer" ] } proxmox-sys = { workspace = true, features = [ "logrotate", "timer" ] }

View File

@ -12,11 +12,12 @@ use hyper::http::request::Parts;
use hyper::{Body, Response}; use hyper::{Body, Response};
use tower_service::Service; use tower_service::Service;
use proxmox_log::{FileLogOptions, FileLogger};
use proxmox_router::{Router, RpcEnvironmentType, UserInformation}; use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{create_path, CreateOptions}; use proxmox_sys::fs::{create_path, CreateOptions};
use crate::rest::Handler; use crate::rest::Handler;
use crate::{CommandSocket, FileLogOptions, FileLogger, RestEnvironment}; use crate::{CommandSocket, RestEnvironment};
/// REST server configuration /// REST server configuration
pub struct ApiConfig { pub struct ApiConfig {

View File

@ -1,147 +0,0 @@
use std::io::Write;
use anyhow::Error;
use nix::fcntl::OFlag;
use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions};
/// Options to control the behavior of a [FileLogger] instance
#[derive(Default)]
pub struct FileLogOptions {
/// Open underlying log file in append mode, useful when multiple concurrent processes
/// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
/// for writes smaller than the PIPE_BUF (4k on Linux).
/// Inside the same process you may need to still use an mutex, for shared access.
pub append: bool,
/// Open underlying log file as readable
pub read: bool,
/// If set, ensure that the file is newly created or error out if already existing.
pub exclusive: bool,
/// Duplicate logged messages to STDOUT, like tee
pub to_stdout: bool,
/// Prefix messages logged to the file with the current local time as RFC 3339
pub prefix_time: bool,
/// File owner/group and mode
pub file_opts: CreateOptions,
}
/// Log messages with optional automatically added timestamps into files
///
/// #### Example:
/// ```
/// # use anyhow::{bail, format_err, Error};
/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions};
///
/// # std::fs::remove_file("test.log");
/// let options = FileLogOptions {
/// to_stdout: true,
/// exclusive: true,
/// ..Default::default()
/// };
/// let mut log = FileLogger::new("test.log", options).unwrap();
/// flog!(log, "A simple log: {}", "Hello!");
/// # std::fs::remove_file("test.log");
/// ```
pub struct FileLogger {
file: std::fs::File,
file_name: std::path::PathBuf,
options: FileLogOptions,
}
/// Log messages to [FileLogger] - ``println`` like macro
#[macro_export]
macro_rules! flog {
($log:expr, $($arg:tt)*) => ({
$log.log(format!($($arg)*));
})
}
impl FileLogger {
pub fn new<P: AsRef<std::path::Path>>(
file_name: P,
options: FileLogOptions,
) -> Result<Self, Error> {
let file = Self::open(&file_name, &options)?;
let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
Ok(Self {
file,
file_name,
options,
})
}
pub fn reopen(&mut self) -> Result<&Self, Error> {
let file = Self::open(&self.file_name, &self.options)?;
self.file = file;
Ok(self)
}
fn open<P: AsRef<std::path::Path>>(
file_name: P,
options: &FileLogOptions,
) -> Result<std::fs::File, Error> {
let mut flags = OFlag::O_CLOEXEC;
if options.read {
flags |= OFlag::O_RDWR;
} else {
flags |= OFlag::O_WRONLY;
}
if options.append {
flags |= OFlag::O_APPEND;
}
if options.exclusive {
flags |= OFlag::O_EXCL;
}
let file =
atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
Ok(file)
}
pub fn log<S: AsRef<str>>(&mut self, msg: S) {
let msg = msg.as_ref();
if self.options.to_stdout {
let mut stdout = std::io::stdout();
stdout.write_all(msg.as_bytes()).unwrap();
stdout.write_all(b"\n").unwrap();
}
let line = if self.options.prefix_time {
let now = proxmox_time::epoch_i64();
let rfc3339 = match proxmox_time::epoch_to_rfc3339(now) {
Ok(rfc3339) => rfc3339,
Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
};
format!("{}: {}\n", rfc3339, msg)
} else {
format!("{}\n", msg)
};
if let Err(err) = self.file.write_all(line.as_bytes()) {
// avoid panicking, log methods should not do that
// FIXME: or, return result???
log::error!("error writing to log file - {}", err);
}
}
}
impl std::io::Write for FileLogger {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
if self.options.to_stdout {
let _ = std::io::stdout().write(buf);
}
self.file.write(buf)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
if self.options.to_stdout {
let _ = std::io::stdout().flush();
}
self.file.flush()
}
}

View File

@ -43,9 +43,6 @@ pub use state::*;
mod command_socket; mod command_socket;
pub use command_socket::*; pub use command_socket::*;
mod file_logger;
pub use file_logger::{FileLogOptions, FileLogger};
mod api_config; mod api_config;
pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor}; pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};

View File

@ -31,10 +31,10 @@ use proxmox_schema::{ObjectSchemaType, ParameterSchema};
use proxmox_async::stream::AsyncReaderStream; use proxmox_async::stream::AsyncReaderStream;
use proxmox_compression::{DeflateEncoder, Level}; use proxmox_compression::{DeflateEncoder, Level};
use proxmox_log::FileLogger;
use crate::{ use crate::{
formatter::*, normalize_path, ApiConfig, AuthError, CompressionMethod, FileLogger, formatter::*, normalize_path, ApiConfig, AuthError, CompressionMethod, RestEnvironment,
RestEnvironment,
}; };
extern "C" { extern "C" {

View File

@ -1,3 +1,4 @@
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write}; use std::io::{BufRead, BufReader, Read, Write};
@ -16,17 +17,18 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::signal::unix::SignalKind; use tokio::signal::unix::SignalKind;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::{info, warn};
use proxmox_lang::try_block; use proxmox_lang::try_block;
use proxmox_log::{FileLogOptions, FileLogger, LOGGER, WARN_COUNTER};
use proxmox_schema::upid::UPID; use proxmox_schema::upid::UPID;
use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions}; use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
use proxmox_sys::linux::procfs; use proxmox_sys::linux::procfs;
use proxmox_sys::task_warn;
use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
use proxmox_sys::WorkerTaskContext; use proxmox_sys::WorkerTaskContext;
use crate::{CommandSocket, FileLogOptions, FileLogger}; use crate::CommandSocket;
#[allow(dead_code)] #[allow(dead_code)]
struct TaskListLockGuard(File); struct TaskListLockGuard(File);
@ -295,7 +297,7 @@ pub fn rotate_task_log_archive(
/// removes all task logs that are older than the oldest task entry in the /// removes all task logs that are older than the oldest task entry in the
/// task archive /// task archive
pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> { pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
let setup = worker_task_setup()?; let setup = worker_task_setup()?;
let _lock = setup.lock_task_list_files(true)?; let _lock = setup.lock_task_list_files(true)?;
@ -333,7 +335,7 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
Ok(files) => files, Ok(files) => files,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
Err(err) => { Err(err) => {
task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err); warn!("could not check task logs in '{i:02X}': {err}");
continue; continue;
} }
}; };
@ -341,12 +343,7 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
let file = match file { let file = match file {
Ok(file) => file, Ok(file) => file,
Err(err) => { Err(err) => {
task_warn!( warn!("could not check some task log in '{i:02X}': {err}");
worker,
"could not check some task log in '{:02X}': {}",
i,
err
);
continue; continue;
} }
}; };
@ -355,7 +352,7 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
let modified = match get_modified(file) { let modified = match get_modified(file) {
Ok(modified) => modified, Ok(modified) => modified,
Err(err) => { Err(err) => {
task_warn!(worker, "error getting mtime for '{:?}': {}", path, err); warn!("error getting mtime for '{path:?}': {err}");
continue; continue;
} }
}; };
@ -365,7 +362,7 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
Ok(()) => {} Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => { Err(err) => {
task_warn!(worker, "could not remove file '{:?}': {}", path, err) warn!("could not remove file '{path:?}': {err}")
} }
} }
} }
@ -823,9 +820,7 @@ impl std::fmt::Display for WorkerTask {
} }
struct WorkerTaskData { struct WorkerTaskData {
logger: FileLogger,
progress: f64, // 0..1 progress: f64, // 0..1
warn_count: u64,
pub abort_listeners: Vec<oneshot::Sender<()>>, pub abort_listeners: Vec<oneshot::Sender<()>>,
} }
@ -835,7 +830,7 @@ impl WorkerTask {
worker_id: Option<String>, worker_id: Option<String>,
auth_id: String, auth_id: String,
to_stdout: bool, to_stdout: bool,
) -> Result<Arc<Self>, Error> { ) -> Result<(Arc<Self>, FileLogger), Error> {
let setup = worker_task_setup()?; let setup = worker_task_setup()?;
let upid = UPID::new(worker_type, worker_id, auth_id)?; let upid = UPID::new(worker_type, worker_id, auth_id)?;
@ -858,9 +853,7 @@ impl WorkerTask {
upid: upid.clone(), upid: upid.clone(),
abort_requested: AtomicBool::new(false), abort_requested: AtomicBool::new(false),
data: Mutex::new(WorkerTaskData { data: Mutex::new(WorkerTaskData {
logger,
progress: 0.0, progress: 0.0,
warn_count: 0,
abort_listeners: vec![], abort_listeners: vec![],
}), }),
}); });
@ -874,7 +867,7 @@ impl WorkerTask {
setup.update_active_workers(Some(&upid))?; setup.update_active_workers(Some(&upid))?;
Ok(worker) Ok((worker, logger))
} }
/// Spawn a new tokio task/future. /// Spawn a new tokio task/future.
@ -889,13 +882,20 @@ impl WorkerTask {
F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
T: Send + 'static + Future<Output = Result<(), Error>>, T: Send + 'static + Future<Output = Result<(), Error>>,
{ {
let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; let (worker, logger) = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string(); let upid_str = worker.upid.to_string();
let f = f(worker.clone()); let f = f(worker.clone());
tokio::spawn(async move {
let result = f.await; let logger = RefCell::new(logger);
worker.log_result(&result); let counter = Cell::new(0);
}); tokio::spawn(LOGGER.scope(logger, async move {
WARN_COUNTER
.scope(counter, async move {
let result = f.await;
worker.log_result(&result);
})
.await;
}));
Ok(upid_str) Ok(upid_str)
} }
@ -911,22 +911,27 @@ impl WorkerTask {
where where
F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>, F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>,
{ {
let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; let (worker, logger) = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string(); let upid_str = worker.upid.to_string();
let _child = std::thread::Builder::new() let _child = std::thread::Builder::new()
.name(upid_str.clone()) .name(upid_str.clone())
.spawn(move || { .spawn(move || {
let worker1 = worker.clone(); LOGGER.sync_scope(RefCell::new(logger), || {
let result = match std::panic::catch_unwind(move || f(worker1)) { WARN_COUNTER.sync_scope(Cell::new(0), || {
Ok(r) => r, let worker1 = worker.clone();
Err(panic) => match panic.downcast::<&str>() {
Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
Err(_) => Err(format_err!("worker panicked: unknown type.")),
},
};
worker.log_result(&result); let result = match std::panic::catch_unwind(move || f(worker1)) {
Ok(r) => r,
Err(panic) => match panic.downcast::<&str>() {
Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
Err(_) => Err(format_err!("worker panicked: unknown type.")),
},
};
worker.log_result(&result);
});
});
}); });
Ok(upid_str) Ok(upid_str)
@ -934,7 +939,7 @@ impl WorkerTask {
/// create state from self and a result /// create state from self and a result
pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
let warn_count = self.data.lock().unwrap().warn_count; let warn_count = WARN_COUNTER.try_with(Cell::get).unwrap_or(0);
let endtime = proxmox_time::epoch_i64(); let endtime = proxmox_time::epoch_i64();
@ -965,15 +970,7 @@ impl WorkerTask {
/// Log a message. /// Log a message.
pub fn log_message<S: AsRef<str>>(&self, msg: S) { pub fn log_message<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap(); info!("{}", msg.as_ref());
data.logger.log(msg);
}
/// Log a message as warning.
pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap();
data.logger.log(format!("WARN: {}", msg.as_ref()));
data.warn_count += 1;
} }
/// Set progress indicator /// Set progress indicator
@ -1036,16 +1033,6 @@ impl WorkerTaskContext for WorkerTask {
fn fail_on_shutdown(&self) -> Result<(), Error> { fn fail_on_shutdown(&self) -> Result<(), Error> {
crate::fail_on_shutdown() crate::fail_on_shutdown()
} }
fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
match level {
log::Level::Error => self.log_warning(message.to_string()),
log::Level::Warn => self.log_warning(message.to_string()),
log::Level::Info => self.log_message(message.to_string()),
log::Level::Debug => self.log_message(format!("DEBUG: {}", message)),
log::Level::Trace => self.log_message(format!("TRACE: {}", message)),
}
}
} }
/// Wait for a locally spanned worker task /// Wait for a locally spanned worker task

View File

@ -26,9 +26,6 @@ pub trait WorkerTaskContext: Send + Sync {
} }
Ok(()) Ok(())
} }
/// Create a log message for this task.
fn log(&self, level: log::Level, message: &std::fmt::Arguments);
} }
/// Convenience implementation: /// Convenience implementation:
@ -48,48 +45,4 @@ impl<T: WorkerTaskContext + ?Sized> WorkerTaskContext for std::sync::Arc<T> {
fn fail_on_shutdown(&self) -> Result<(), Error> { fn fail_on_shutdown(&self) -> Result<(), Error> {
<T as WorkerTaskContext>::fail_on_shutdown(self) <T as WorkerTaskContext>::fail_on_shutdown(self)
} }
fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
<T as WorkerTaskContext>::log(self, level, message)
}
}
/// Log an error to a [WorkerTaskContext]
#[macro_export]
macro_rules! task_error {
($task:expr, $($fmt:tt)+) => {{
$crate::WorkerTaskContext::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
}};
}
/// Log a warning to a [WorkerTaskContext]
#[macro_export]
macro_rules! task_warn {
($task:expr, $($fmt:tt)+) => {{
$crate::WorkerTaskContext::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
}};
}
/// Log a message to a [WorkerTaskContext]
#[macro_export]
macro_rules! task_log {
($task:expr, $($fmt:tt)+) => {{
$crate::WorkerTaskContext::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
}};
}
/// Log a debug message to a [WorkerTaskContext]
#[macro_export]
macro_rules! task_debug {
($task:expr, $($fmt:tt)+) => {{
$crate::WorkerTaskContext::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
}};
}
/// Log a trace message to a [WorkerTaskContext]
#[macro_export]
macro_rules! task_trace {
($task:expr, $($fmt:tt)+) => {{
$crate::WorkerTaskContext::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
}};
} }