[pbs-devel] [PATCH proxmox v5 2/4] enable tracing logger, remove task_log macros

Lukas Wagner l.wagner at proxmox.com
Mon Jun 24 13:09:50 CEST 2024


2 nits inline

On  2024-06-13 15:56, Gabriel Goller wrote:
> Enable the tracing-system by setting the LOGGER task local variable
> to a instance of a FileLogger and initializing the WARN_COUNTER.
> Removed the task_log! macros and some occurences.
> 
> Signed-off-by: Gabriel Goller <g.goller at proxmox.com>
> ---
>  proxmox-rest-server/src/api_config.rs  |   3 +-
>  proxmox-rest-server/src/file_logger.rs | 147 -------------------------
>  proxmox-rest-server/src/lib.rs         |   3 -
>  proxmox-rest-server/src/rest.rs        |   4 +-
>  proxmox-rest-server/src/worker_task.rs | 104 ++++++++---------
>  proxmox-sys/src/worker_task_context.rs |  47 --------
>  6 files changed, 56 insertions(+), 252 deletions(-)
>  delete mode 100644 proxmox-rest-server/src/file_logger.rs
> 
> diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
> index eaece05a..5b7a2a4f 100644
> --- a/proxmox-rest-server/src/api_config.rs
> +++ b/proxmox-rest-server/src/api_config.rs
> @@ -12,11 +12,12 @@ use hyper::http::request::Parts;
>  use hyper::{Body, Response};
>  use tower_service::Service;
>  
> +use proxmox_log::{FileLogOptions, FileLogger};
>  use proxmox_router::{Router, RpcEnvironmentType, UserInformation};
>  use proxmox_sys::fs::{create_path, CreateOptions};
>  
>  use crate::rest::Handler;
> -use crate::{CommandSocket, FileLogOptions, FileLogger, RestEnvironment};
> +use crate::{CommandSocket, RestEnvironment};
>  
>  /// REST server configuration
>  pub struct ApiConfig {
> diff --git a/proxmox-rest-server/src/file_logger.rs b/proxmox-rest-server/src/file_logger.rs
> deleted file mode 100644
> index 2bb1fac6..00000000
> --- a/proxmox-rest-server/src/file_logger.rs
> +++ /dev/null
> @@ -1,147 +0,0 @@
> -use std::io::Write;
> -
> -use anyhow::Error;
> -use nix::fcntl::OFlag;
> -
> -use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions};
> -
> -/// Options to control the behavior of a [FileLogger] instance
> -#[derive(Default)]
> -pub struct FileLogOptions {
> -    /// Open underlying log file in append mode, useful when multiple concurrent processes
> -    /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
> -    /// for writes smaller than the PIPE_BUF (4k on Linux).
> -    /// Inside the same process you may need to still use an mutex, for shared access.
> -    pub append: bool,
> -    /// Open underlying log file as readable
> -    pub read: bool,
> -    /// If set, ensure that the file is newly created or error out if already existing.
> -    pub exclusive: bool,
> -    /// Duplicate logged messages to STDOUT, like tee
> -    pub to_stdout: bool,
> -    /// Prefix messages logged to the file with the current local time as RFC 3339
> -    pub prefix_time: bool,
> -    /// File owner/group and mode
> -    pub file_opts: CreateOptions,
> -}
> -
> -/// Log messages with optional automatically added timestamps into files
> -///
> -/// #### Example:
> -/// ```
> -/// # use anyhow::{bail, format_err, Error};
> -/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions};
> -///
> -/// # std::fs::remove_file("test.log");
> -/// let options = FileLogOptions {
> -///     to_stdout: true,
> -///     exclusive: true,
> -///     ..Default::default()
> -/// };
> -/// let mut log = FileLogger::new("test.log", options).unwrap();
> -/// flog!(log, "A simple log: {}", "Hello!");
> -/// # std::fs::remove_file("test.log");
> -/// ```
> -pub struct FileLogger {
> -    file: std::fs::File,
> -    file_name: std::path::PathBuf,
> -    options: FileLogOptions,
> -}
> -
> -/// Log messages to [FileLogger] - ``println`` like macro
> -#[macro_export]
> -macro_rules! flog {
> -    ($log:expr, $($arg:tt)*) => ({
> -        $log.log(format!($($arg)*));
> -    })
> -}
> -
> -impl FileLogger {
> -    pub fn new<P: AsRef<std::path::Path>>(
> -        file_name: P,
> -        options: FileLogOptions,
> -    ) -> Result<Self, Error> {
> -        let file = Self::open(&file_name, &options)?;
> -
> -        let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
> -
> -        Ok(Self {
> -            file,
> -            file_name,
> -            options,
> -        })
> -    }
> -
> -    pub fn reopen(&mut self) -> Result<&Self, Error> {
> -        let file = Self::open(&self.file_name, &self.options)?;
> -        self.file = file;
> -        Ok(self)
> -    }
> -
> -    fn open<P: AsRef<std::path::Path>>(
> -        file_name: P,
> -        options: &FileLogOptions,
> -    ) -> Result<std::fs::File, Error> {
> -        let mut flags = OFlag::O_CLOEXEC;
> -
> -        if options.read {
> -            flags |= OFlag::O_RDWR;
> -        } else {
> -            flags |= OFlag::O_WRONLY;
> -        }
> -
> -        if options.append {
> -            flags |= OFlag::O_APPEND;
> -        }
> -        if options.exclusive {
> -            flags |= OFlag::O_EXCL;
> -        }
> -
> -        let file =
> -            atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
> -
> -        Ok(file)
> -    }
> -
> -    pub fn log<S: AsRef<str>>(&mut self, msg: S) {
> -        let msg = msg.as_ref();
> -
> -        if self.options.to_stdout {
> -            let mut stdout = std::io::stdout();
> -            stdout.write_all(msg.as_bytes()).unwrap();
> -            stdout.write_all(b"\n").unwrap();
> -        }
> -
> -        let line = if self.options.prefix_time {
> -            let now = proxmox_time::epoch_i64();
> -            let rfc3339 = match proxmox_time::epoch_to_rfc3339(now) {
> -                Ok(rfc3339) => rfc3339,
> -                Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
> -            };
> -            format!("{}: {}\n", rfc3339, msg)
> -        } else {
> -            format!("{}\n", msg)
> -        };
> -        if let Err(err) = self.file.write_all(line.as_bytes()) {
> -            // avoid panicking, log methods should not do that
> -            // FIXME: or, return result???
> -            log::error!("error writing to log file - {}", err);
> -        }
> -    }
> -}
> -
> -impl std::io::Write for FileLogger {
> -    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
> -        if self.options.to_stdout {
> -            let _ = std::io::stdout().write(buf);
> -        }
> -        self.file.write(buf)
> -    }
> -
> -    fn flush(&mut self) -> Result<(), std::io::Error> {
> -        if self.options.to_stdout {
> -            let _ = std::io::stdout().flush();
> -        }
> -        self.file.flush()
> -    }
> -}
> diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
> index ce9e4f15..e8866366 100644
> --- a/proxmox-rest-server/src/lib.rs
> +++ b/proxmox-rest-server/src/lib.rs
> @@ -41,9 +41,6 @@ pub use state::*;
>  mod command_socket;
>  pub use command_socket::*;
>  
> -mod file_logger;
> -pub use file_logger::{FileLogOptions, FileLogger};
> -
>  mod api_config;
>  pub use api_config::{ApiConfig, AuthError, AuthHandler, IndexHandler, UnixAcceptor};
>  
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index 39ae9a18..3a3c287c 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -31,10 +31,10 @@ use proxmox_schema::{ObjectSchemaType, ParameterSchema};
>  
>  use proxmox_async::stream::AsyncReaderStream;
>  use proxmox_compression::{DeflateEncoder, Level};
> +use proxmox_log::FileLogger;
>  
>  use crate::{
> -    formatter::*, normalize_path, ApiConfig, AuthError, CompressionMethod, FileLogger,
> -    RestEnvironment,
> +    formatter::*, normalize_path, ApiConfig, AuthError, CompressionMethod, RestEnvironment,
>  };
>  
>  extern "C" {
> diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs
> index 5bab4cdc..58537f54 100644
> --- a/proxmox-rest-server/src/worker_task.rs
> +++ b/proxmox-rest-server/src/worker_task.rs
> @@ -1,3 +1,4 @@
> +use std::cell::{Cell, RefCell};
>  use std::collections::{HashMap, VecDeque};
>  use std::fs::File;
>  use std::io::{BufRead, BufReader, Read, Write};
> @@ -12,21 +13,23 @@ use futures::*;
>  use lazy_static::lazy_static;
>  use nix::fcntl::OFlag;
>  use once_cell::sync::OnceCell;
> +use proxmox_log::{LOGGER, WARN_COUNTER};

Nit: Order of imports, let's put this import to the other
proxmox_* imports below

>  use serde::{Deserialize, Serialize};
>  use serde_json::{json, Value};
>  use tokio::signal::unix::SignalKind;
>  use tokio::sync::oneshot;
> +use tracing::{info, warn};
>  
>  use proxmox_lang::try_block;
> +use proxmox_log::{FileLogOptions, FileLogger};
>  use proxmox_schema::upid::UPID;
>  use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
>  use proxmox_sys::linux::procfs;
> -use proxmox_sys::task_warn;
>  
>  use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
>  use proxmox_sys::WorkerTaskContext;
>  
> -use crate::{CommandSocket, FileLogOptions, FileLogger};
> +use crate::CommandSocket;
>  
>  struct TaskListLockGuard(File);
>  
> @@ -294,7 +297,7 @@ pub fn rotate_task_log_archive(
>  
>  /// removes all task logs that are older than the oldest task entry in the
>  /// task archive
> -pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> {
> +pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
>      let setup = worker_task_setup()?;
>  
>      let _lock = setup.lock_task_list_files(true)?;
> @@ -332,7 +335,10 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
>                  Ok(files) => files,
>                  Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
>                  Err(err) => {
> -                    task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err);
> +                    warn!(
> +                        tasklog = true,
> +                        "could not check task logs in '{:02X}': {}", i, err
> +                    );
>                      continue;
>                  }
>              };
> @@ -340,11 +346,9 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
>                  let file = match file {
>                      Ok(file) => file,
>                      Err(err) => {
> -                        task_warn!(
> -                            worker,
> -                            "could not check some task log in '{:02X}': {}",
> -                            i,
> -                            err
> +                        warn!(
> +                            tasklog = true,
> +                            "could not check some task log in '{:02X}': {}", i
err

Nit: You could inline these vars if you edit these lines any way.

>                          );
>                          continue;
>                      }
> @@ -354,7 +358,10 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
>                  let modified = match get_modified(file) {
>                      Ok(modified) => modified,
>                      Err(err) => {
> -                        task_warn!(worker, "error getting mtime for '{:?}': {}", path, err);
> +                        warn!(
> +                            tasklog = true,
> +                            "error getting mtime for '{:?}': {}", path, err
> +                        );
>                          continue;
>                      }
>                  };
> @@ -364,7 +371,10 @@ pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Re
>                          Ok(()) => {}
>                          Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
>                          Err(err) => {
> -                            task_warn!(worker, "could not remove file '{:?}': {}", path, err)
> +                            warn!(
> +                                tasklog = true,
> +                                "could not remove file '{:?}': {}", path, err
> +                            )
>                          }
>                      }
>                  }
> @@ -822,9 +832,7 @@ impl std::fmt::Display for WorkerTask {
>  }
>  
>  struct WorkerTaskData {
> -    logger: FileLogger,
>      progress: f64, // 0..1
> -    warn_count: u64,
>      pub abort_listeners: Vec<oneshot::Sender<()>>,
>  }
>  
> @@ -834,7 +842,7 @@ impl WorkerTask {
>          worker_id: Option<String>,
>          auth_id: String,
>          to_stdout: bool,
> -    ) -> Result<Arc<Self>, Error> {
> +    ) -> Result<(Arc<Self>, FileLogger), Error> {
>          let setup = worker_task_setup()?;
>  
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
> @@ -857,9 +865,7 @@ impl WorkerTask {
>              upid: upid.clone(),
>              abort_requested: AtomicBool::new(false),
>              data: Mutex::new(WorkerTaskData {
> -                logger,
>                  progress: 0.0,
> -                warn_count: 0,
>                  abort_listeners: vec![],
>              }),
>          });
> @@ -873,7 +879,7 @@ impl WorkerTask {
>  
>          setup.update_active_workers(Some(&upid))?;
>  
> -        Ok(worker)
> +        Ok((worker, logger))
>      }
>  
>      /// Spawn a new tokio task/future.
> @@ -888,13 +894,20 @@ impl WorkerTask {
>          F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
>          T: Send + 'static + Future<Output = Result<(), Error>>,
>      {
> -        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
> +        let (worker, logger) = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
>          let upid_str = worker.upid.to_string();
>          let f = f(worker.clone());
> -        tokio::spawn(async move {
> -            let result = f.await;
> -            worker.log_result(&result);
> -        });
> +
> +        let logger = RefCell::new(logger);
> +        let counter = Cell::new(0);
> +        tokio::spawn(LOGGER.scope(logger, async move {
> +            WARN_COUNTER
> +                .scope(counter, async move {
> +                    let result = f.await;
> +                    worker.log_result(&result);
> +                })
> +                .await;
> +        }));
>  
>          Ok(upid_str)
>      }
> @@ -910,22 +923,27 @@ impl WorkerTask {
>      where
>          F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>,
>      {
> -        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
> +        let (worker, logger) = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
>          let upid_str = worker.upid.to_string();
>  
>          let _child = std::thread::Builder::new()
>              .name(upid_str.clone())
>              .spawn(move || {
> -                let worker1 = worker.clone();
> -                let result = match std::panic::catch_unwind(move || f(worker1)) {
> -                    Ok(r) => r,
> -                    Err(panic) => match panic.downcast::<&str>() {
> -                        Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
> -                        Err(_) => Err(format_err!("worker panicked: unknown type.")),
> -                    },
> -                };
> +                LOGGER.sync_scope(RefCell::new(logger), || {
> +                    WARN_COUNTER.sync_scope(Cell::new(0), || {
> +                        let worker1 = worker.clone();
> +
> +                        let result = match std::panic::catch_unwind(move || f(worker1)) {
> +                            Ok(r) => r,
> +                            Err(panic) => match panic.downcast::<&str>() {
> +                                Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
> +                                Err(_) => Err(format_err!("worker panicked: unknown type.")),
> +                            },
> +                        };
>  
> -                worker.log_result(&result);
> +                        worker.log_result(&result);
> +                    });
> +                });
>              });
>  
>          Ok(upid_str)
> @@ -933,7 +951,7 @@ impl WorkerTask {
>  
>      /// create state from self and a result
>      pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
> -        let warn_count = self.data.lock().unwrap().warn_count;
> +        let warn_count = WARN_COUNTER.try_with(Cell::get).unwrap_or(0);
>  
>          let endtime = proxmox_time::epoch_i64();
>  
> @@ -964,15 +982,7 @@ impl WorkerTask {
>  
>      /// Log a message.
>      pub fn log_message<S: AsRef<str>>(&self, msg: S) {
> -        let mut data = self.data.lock().unwrap();
> -        data.logger.log(msg);
> -    }
> -
> -    /// Log a message as warning.
> -    pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
> -        let mut data = self.data.lock().unwrap();
> -        data.logger.log(format!("WARN: {}", msg.as_ref()));
> -        data.warn_count += 1;
> +        info!(tasklog = true, "{}", msg.as_ref());
>      }
>  
>      /// Set progress indicator
> @@ -1035,16 +1045,6 @@ impl WorkerTaskContext for WorkerTask {
>      fn fail_on_shutdown(&self) -> Result<(), Error> {
>          crate::fail_on_shutdown()
>      }
> -
> -    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
> -        match level {
> -            log::Level::Error => self.log_warning(message.to_string()),
> -            log::Level::Warn => self.log_warning(message.to_string()),
> -            log::Level::Info => self.log_message(message.to_string()),
> -            log::Level::Debug => self.log_message(format!("DEBUG: {}", message)),
> -            log::Level::Trace => self.log_message(format!("TRACE: {}", message)),
> -        }
> -    }
>  }
>  
>  /// Wait for a locally spanned worker task
> diff --git a/proxmox-sys/src/worker_task_context.rs b/proxmox-sys/src/worker_task_context.rs
> index 2c86857c..743ae53f 100644
> --- a/proxmox-sys/src/worker_task_context.rs
> +++ b/proxmox-sys/src/worker_task_context.rs
> @@ -26,9 +26,6 @@ pub trait WorkerTaskContext: Send + Sync {
>          }
>          Ok(())
>      }
> -
> -    /// Create a log message for this task.
> -    fn log(&self, level: log::Level, message: &std::fmt::Arguments);
>  }
>  
>  /// Convenience implementation:
> @@ -48,48 +45,4 @@ impl<T: WorkerTaskContext + ?Sized> WorkerTaskContext for std::sync::Arc<T> {
>      fn fail_on_shutdown(&self) -> Result<(), Error> {
>          <T as WorkerTaskContext>::fail_on_shutdown(self)
>      }
> -
> -    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
> -        <T as WorkerTaskContext>::log(self, level, message)
> -    }
> -}
> -
> -/// Log an error to a [WorkerTaskContext]
> -#[macro_export]
> -macro_rules! task_error {
> -    ($task:expr, $($fmt:tt)+) => {{
> -        $crate::WorkerTaskContext::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
> -    }};
> -}
> -
> -/// Log a warning to a [WorkerTaskContext]
> -#[macro_export]
> -macro_rules! task_warn {
> -    ($task:expr, $($fmt:tt)+) => {{
> -        $crate::WorkerTaskContext::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
> -    }};
> -}
> -
> -/// Log a message to a [WorkerTaskContext]
> -#[macro_export]
> -macro_rules! task_log {
> -    ($task:expr, $($fmt:tt)+) => {{
> -        $crate::WorkerTaskContext::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
> -    }};
> -}
> -
> -/// Log a debug message to a [WorkerTaskContext]
> -#[macro_export]
> -macro_rules! task_debug {
> -    ($task:expr, $($fmt:tt)+) => {{
> -        $crate::WorkerTaskContext::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
> -    }};
> -}
> -
> -/// Log a trace message to a [WorkerTaskContext]
> -#[macro_export]
> -macro_rules! task_trace {
> -    ($task:expr, $($fmt:tt)+) => {{
> -        $crate::WorkerTaskContext::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
> -    }};
>  }

-- 
- Lukas




More information about the pbs-devel mailing list