[pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
Fabian Grünbichler
f.gruenbichler at proxmox.com
Thu Sep 23 13:36:30 CEST 2021
On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> And application now needs to call init_worker_tasks() before using
> worker tasks.
>
> Notable changes:
> - need to call init_worker_tasks() before using worker tasks.
> - create_task_log_dirs() ís called inside init_worker_tasks()
> - removed UpidExt trait
> - use atomic_open_or_create_file()
> - remove pbs_config and pbs_buildcfg dependency
> ---
> src/api2/node/tasks.rs | 6 +-
> src/bin/proxmox-backup-api.rs | 7 +-
> src/bin/proxmox-backup-proxy.rs | 5 +-
> src/server/mod.rs | 3 -
> src/server/upid.rs | 18 --
> src/server/worker_task.rs | 475 +++++++++++++++++++-------------
> 6 files changed, 290 insertions(+), 224 deletions(-)
> delete mode 100644 src/server/upid.rs
>
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 169a3d4d..df4673a1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -16,7 +16,7 @@ use pbs_api_types::{
> };
>
> use crate::api2::pull::check_pull_privs;
> -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
> +use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
> use pbs_config::CachedUserInfo;
>
> // matches respective job execution privileges
> @@ -220,7 +220,7 @@ async fn get_task_status(
> if crate::server::worker_is_active(&upid).await? {
> result["status"] = Value::from("running");
> } else {
> - let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
> + let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
> result["status"] = Value::from("stopped");
> result["exitstatus"] = Value::from(exitstatus.to_string());
> };
> @@ -287,7 +287,7 @@ async fn read_task_log(
>
> let mut count: u64 = 0;
>
> - let path = upid.log_path();
> + let path = upid_log_path(&upid)?;
>
> let file = File::open(path)?;
>
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9ad10260..9901b85d 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
> bail!("unable to inititialize syslog - {}", err);
> }
>
> - server::create_task_log_dirs()?;
> -
> config::create_configdir()?;
>
> config::update_self_signed_cert(false)?;
> @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
>
> config.enable_auth_log(
> pbs_buildcfg::API_AUTH_LOG_FN,
> - Some(dir_opts),
> - Some(file_opts),
> + Some(dir_opts.clone()),
> + Some(file_opts.clone()),
> &mut commando_sock,
> )?;
>
>
> let rest_server = RestServer::new(config);
> + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>
> // http server future:
> let server = daemon::create_daemon(
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 518054bf..5d8ed189 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
>
> config.enable_auth_log(
> pbs_buildcfg::API_AUTH_LOG_FN,
> - Some(dir_opts),
> - Some(file_opts),
> + Some(dir_opts.clone()),
> + Some(file_opts.clone()),
> &mut commando_sock,
> )?;
>
> let rest_server = RestServer::new(config);
> + proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>
> //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
>
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index a7dcee67..77320da6 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
> ctrl_sock_from_pid(*PID)
> }
>
> -mod upid;
> -pub use upid::*;
> -
> mod worker_task;
> pub use worker_task::*;
>
> diff --git a/src/server/upid.rs b/src/server/upid.rs
> deleted file mode 100644
> index 70a3e3fb..00000000
> --- a/src/server/upid.rs
> +++ /dev/null
> @@ -1,18 +0,0 @@
> -pub trait UPIDExt: private::Sealed {
> - /// Returns the absolute path to the task log file
> - fn log_path(&self) -> std::path::PathBuf;
> -}
> -
> -mod private {
> - pub trait Sealed {}
> - impl Sealed for pbs_api_types::UPID {}
> -}
> -
> -impl UPIDExt for pbs_api_types::UPID {
> - fn log_path(&self) -> std::path::PathBuf {
> - let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
> - path.push(format!("{:02X}", self.pstart % 256));
> - path.push(self.to_string());
> - path
> - }
> -}
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index 92ab50d7..191d8a44 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -1,5 +1,6 @@
> use std::collections::{HashMap, VecDeque};
> use std::fs::File;
> +use std::path::PathBuf;
> use std::io::{Read, Write, BufRead, BufReader};
> use std::panic::UnwindSafe;
> use std::sync::atomic::{AtomicBool, Ordering};
> @@ -11,27 +12,267 @@ use lazy_static::lazy_static;
> use serde_json::{json, Value};
> use serde::{Serialize, Deserialize};
> use tokio::sync::oneshot;
> +use nix::fcntl::OFlag;
> +use once_cell::sync::OnceCell;
>
> use proxmox::sys::linux::procfs;
> use proxmox::try_block;
> -use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
> +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
>
> -use pbs_buildcfg;
> use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
> use pbs_api_types::UPID;
> -use pbs_config::{open_backup_lockfile, BackupLockGuard};
> use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>
> -use super::UPIDExt;
> +struct TaskListLockGuard(File);
>
> -macro_rules! taskdir {
> - ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
> +struct WorkerTaskSetup {
> + file_opts: CreateOptions,
> + taskdir: PathBuf,
> + task_lock_fn: PathBuf,
> + active_tasks_fn: PathBuf,
> + task_index_fn: PathBuf,
> + task_archive_fn: PathBuf,
> +}
> +
> +static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
> +
> +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
> + WORKER_TASK_SETUP.get()
> + .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
> +}
> +
> +impl WorkerTaskSetup {
> +
> + fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
> +
> + let mut taskdir = basedir.clone();
> + taskdir.push("tasks");
> +
> + let mut task_lock_fn = taskdir.clone();
> + task_lock_fn.push(".active.lock");
> +
> + let mut active_tasks_fn = taskdir.clone();
> + active_tasks_fn.push("active");
> +
> + let mut task_index_fn = taskdir.clone();
> + task_index_fn.push("index");
> +
> + let mut task_archive_fn = taskdir.clone();
> + task_archive_fn.push("archive");
> +
> + Self {
> + file_opts,
> + taskdir,
> + task_lock_fn,
> + active_tasks_fn,
> + task_index_fn,
> + task_archive_fn,
> + }
> + }
> +
> + fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
since we touch/move this (and thus have to touch all call sites), we
could take this opportunity to move the locked operations including
access to the active_task_fn, task_index_fn and task_archive_fn struct
members to the lock guard (and maybe split it into
exclusive/non-exclusive guards?) to make misuse impossible? AFAICT all
the current access is done while holding the lock.
can of course also be done as follow-up in some generic fashion since
this is a recurring problem, just struck me while reading through the
rest of the changes.
> + let options = self.file_opts.clone()
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> + let timeout = std::time::Duration::new(10, 0);
> +
> + let file = proxmox::tools::fs::open_file_locked(
> + &self.task_lock_fn,
> + timeout,
> + exclusive,
> + options,
> + )?;
> +
> + Ok(TaskListLockGuard(file))
> + }
> +
> + fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
> + let mut path = self.taskdir.clone();
> + path.push(format!("{:02X}", upid.pstart % 256));
> + path.push(upid.to_string());
> + path
> + }
> +
> + // atomically read/update the task list, update status of finished tasks
> + // new_upid is added to the list when specified.
> + fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
> +
> + let lock = self.lock_task_list_files(true)?;
> +
> + // TODO remove with 1.x
> + let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
> + let had_index_file = !finish_list.is_empty();
> +
> + // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> + // clippy doesn't quite catch this!
> + #[allow(clippy::unnecessary_filter_map)]
> + let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
> + .into_iter()
> + .filter_map(|info| {
> + if info.state.is_some() {
> + // this can happen when the active file still includes finished tasks
> + finish_list.push(info);
> + return None;
> + }
> +
> + if !worker_is_active_local(&info.upid) {
> + // println!("Detected stopped task '{}'", &info.upid_str);
> + let now = proxmox::tools::time::epoch_i64();
> + let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> + finish_list.push(TaskListInfo {
> + upid: info.upid,
> + upid_str: info.upid_str,
> + state: Some(status)
> + });
> + return None;
> + }
> +
> + Some(info)
> + }).collect();
> +
> + if let Some(upid) = new_upid {
> + active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> + }
> +
> + let active_raw = render_task_list(&active_list);
> +
> + let options = self.file_opts.clone()
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> + replace_file(
> + &self.active_tasks_fn,
> + active_raw.as_bytes(),
> + options,
> + )?;
> +
> + finish_list.sort_unstable_by(|a, b| {
> + match (&a.state, &b.state) {
> + (Some(s1), Some(s2)) => s1.cmp(&s2),
> + (Some(_), None) => std::cmp::Ordering::Less,
> + (None, Some(_)) => std::cmp::Ordering::Greater,
> + _ => a.upid.starttime.cmp(&b.upid.starttime),
> + }
> + });
> +
> + if !finish_list.is_empty() {
> + let options = self.file_opts.clone()
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> + let mut writer = atomic_open_or_create_file(
> + &self.task_archive_fn,
> + OFlag::O_APPEND | OFlag::O_RDWR,
> + &[],
> + options,
> + )?;
> + for info in &finish_list {
> + writer.write_all(render_task_line(&info).as_bytes())?;
> + }
> + }
> +
> + // TODO Remove with 1.x
> + // for compatibility, if we had an INDEX file, we do not need it anymore
> + if had_index_file {
> + let _ = nix::unistd::unlink(&self.task_index_fn);
> + }
> +
> + drop(lock);
> +
> + Ok(())
> + }
> +
> + // Create task log directory with correct permissions
> + fn create_task_log_dirs(&self) -> Result<(), Error> {
> +
> + try_block!({
> + let dir_opts = self.file_opts.clone()
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> + create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
> + // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> + Ok(())
> + }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
> + }
> +}
> +
> +/// Initialize the WorkerTask library
> +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
> + let setup = WorkerTaskSetup::new(basedir, file_opts);
> + setup.create_task_log_dirs()?;
> + WORKER_TASK_SETUP.set(setup)
> + .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
> +}
> +
> +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> +/// rotates it if it is
> +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> +
> + let setup = worker_task_setup()?;
> +
> + let _lock = setup.lock_task_list_files(true)?;
> +
> + let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
> + .ok_or_else(|| format_err!("could not get archive file names"))?;
> +
> + logrotate.rotate(size_threshold, None, max_files)
> +}
> +
> +
> +/// Path to the worker log file
> +pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
> + let setup = worker_task_setup()?;
> + Ok(setup.log_path(upid))
> +}
> +
> +/// Read endtime (time of last log line) and exitstatus from task log file
> +/// If there is not a single line with at valid datetime, we assume the
> +/// starttime to be the endtime
> +pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> +
> + let setup = worker_task_setup()?;
> +
> + let mut status = TaskState::Unknown { endtime: upid.starttime };
> +
> + let path = setup.log_path(upid);
> +
> + let mut file = File::open(path)?;
> +
> + /// speedup - only read tail
> + use std::io::Seek;
> + use std::io::SeekFrom;
> + let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> +
> + let mut data = Vec::with_capacity(8192);
> + file.read_to_end(&mut data)?;
> +
> + // strip newlines at the end of the task logs
> + while data.last() == Some(&b'\n') {
> + data.pop();
> + }
> +
> + let last_line = match data.iter().rposition(|c| *c == b'\n') {
> + Some(start) if data.len() > (start+1) => &data[start+1..],
> + Some(_) => &data, // should not happen, since we removed all trailing newlines
> + None => &data,
> + };
> +
> + let last_line = std::str::from_utf8(last_line)
> + .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> +
> + let mut iter = last_line.splitn(2, ": ");
> + if let Some(time_str) = iter.next() {
> + if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> + // set the endtime even if we cannot parse the state
> + status = TaskState::Unknown { endtime };
> + if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> + if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> + status = state;
> + }
> + }
> + }
> + }
> +
> + Ok(status)
> }
> -pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
> -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
> -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
> -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
> -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
>
> lazy_static! {
> static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
> @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
> }
> }
>
> -/// Create task log directory with correct permissions
> -pub fn create_task_log_dirs() -> Result<(), Error> {
> -
> - try_block!({
> - let backup_user = pbs_config::backup_user()?;
> - let opts = CreateOptions::new()
> - .owner(backup_user.uid)
> - .group(backup_user.gid);
> -
> - create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
> - create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
> - create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> - Ok(())
> - }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
> -
> - Ok(())
> -}
> -
> -/// Read endtime (time of last log line) and exitstatus from task log file
> -/// If there is not a single line with at valid datetime, we assume the
> -/// starttime to be the endtime
> -pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> -
> - let mut status = TaskState::Unknown { endtime: upid.starttime };
> -
> - let path = upid.log_path();
> -
> - let mut file = File::open(path)?;
> -
> - /// speedup - only read tail
> - use std::io::Seek;
> - use std::io::SeekFrom;
> - let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> -
> - let mut data = Vec::with_capacity(8192);
> - file.read_to_end(&mut data)?;
> -
> - // strip newlines at the end of the task logs
> - while data.last() == Some(&b'\n') {
> - data.pop();
> - }
> -
> - let last_line = match data.iter().rposition(|c| *c == b'\n') {
> - Some(start) if data.len() > (start+1) => &data[start+1..],
> - Some(_) => &data, // should not happen, since we removed all trailing newlines
> - None => &data,
> - };
> -
> - let last_line = std::str::from_utf8(last_line)
> - .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> -
> - let mut iter = last_line.splitn(2, ": ");
> - if let Some(time_str) = iter.next() {
> - if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> - // set the endtime even if we cannot parse the state
> - status = TaskState::Unknown { endtime };
> - if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> - if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> - status = state;
> - }
> - }
> - }
> - }
> -
> - Ok(status)
> -}
> -
> /// Task State
> #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
> pub enum TaskState {
> @@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
> }
> }
>
> -fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
> - open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
> -}
> -
> -/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> -/// rotates it if it is
> -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> - let _lock = lock_task_list_files(true)?;
> -
> - let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
> - .ok_or_else(|| format_err!("could not get archive file names"))?;
> -
> - logrotate.rotate(size_threshold, None, max_files)
> -}
> -
> -// atomically read/update the task list, update status of finished tasks
> -// new_upid is added to the list when specified.
> -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
> -
> - let backup_user = pbs_config::backup_user()?;
> -
> - let lock = lock_task_list_files(true)?;
> -
> - // TODO remove with 1.x
> - let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
> - let had_index_file = !finish_list.is_empty();
> -
> - // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> - // clippy doesn't quite catch this!
> - #[allow(clippy::unnecessary_filter_map)]
> - let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
> - .into_iter()
> - .filter_map(|info| {
> - if info.state.is_some() {
> - // this can happen when the active file still includes finished tasks
> - finish_list.push(info);
> - return None;
> - }
> -
> - if !worker_is_active_local(&info.upid) {
> - // println!("Detected stopped task '{}'", &info.upid_str);
> - let now = proxmox::tools::time::epoch_i64();
> - let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> - finish_list.push(TaskListInfo {
> - upid: info.upid,
> - upid_str: info.upid_str,
> - state: Some(status)
> - });
> - return None;
> - }
> -
> - Some(info)
> - }).collect();
> -
> - if let Some(upid) = new_upid {
> - active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> - }
> -
> - let active_raw = render_task_list(&active_list);
> -
> - replace_file(
> - PROXMOX_BACKUP_ACTIVE_TASK_FN,
> - active_raw.as_bytes(),
> - CreateOptions::new()
> - .owner(backup_user.uid)
> - .group(backup_user.gid),
> - )?;
> -
> - finish_list.sort_unstable_by(|a, b| {
> - match (&a.state, &b.state) {
> - (Some(s1), Some(s2)) => s1.cmp(&s2),
> - (Some(_), None) => std::cmp::Ordering::Less,
> - (None, Some(_)) => std::cmp::Ordering::Greater,
> - _ => a.upid.starttime.cmp(&b.upid.starttime),
> - }
> - });
> -
> - if !finish_list.is_empty() {
> - match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
> - Ok(mut writer) => {
> - for info in &finish_list {
> - writer.write_all(render_task_line(&info).as_bytes())?;
> - }
> - },
> - Err(err) => bail!("could not write task archive - {}", err),
> - }
> -
> - nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
> - }
> -
> - // TODO Remove with 1.x
> - // for compatibility, if we had an INDEX file, we do not need it anymore
> - if had_index_file {
> - let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
> - }
> -
> - drop(lock);
> -
> - Ok(())
> -}
> -
> fn render_task_line(info: &TaskListInfo) -> String {
> let mut raw = String::new();
> if let Some(status) = &info.state {
> @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
> list: VecDeque<TaskListInfo>,
> end: bool,
> archive: Option<LogRotateFiles>,
> - lock: Option<BackupLockGuard>,
> + lock: Option<TaskListLockGuard>,
> }
>
> impl TaskListInfoIterator {
> pub fn new(active_only: bool) -> Result<Self, Error> {
> +
> + let setup = worker_task_setup()?;
> +
> let (read_lock, active_list) = {
> - let lock = lock_task_list_files(false)?;
> - let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> + let lock = setup.lock_task_list_files(false)?;
> + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>
> let needs_update = active_list
> .iter()
> .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
>
> // TODO remove with 1.x
> - let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
> + let index_exists = setup.task_index_fn.is_file();
>
> if needs_update || index_exists {
> drop(lock);
> - update_active_workers(None)?;
> - let lock = lock_task_list_files(false)?;
> - let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> + setup.update_active_workers(None)?;
> + let lock = setup.lock_task_list_files(false)?;
> + let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
> (lock, active_list)
> } else {
> (lock, active_list)
> @@ -516,7 +592,7 @@ impl TaskListInfoIterator {
> let archive = if active_only {
> None
> } else {
> - let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
> + let logrotate = LogRotate::new(&setup.task_archive_fn, true)
> .ok_or_else(|| format_err!("could not get archive file names"))?;
> Some(logrotate.files())
> };
> @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
> /// persistently to files. Task should poll the `abort_requested`
> /// flag, and stop execution when requested.
> pub struct WorkerTask {
> + setup: &'static WorkerTaskSetup,
> upid: UPID,
> data: Mutex<WorkerTaskData>,
> abort_requested: AtomicBool,
> @@ -589,17 +666,26 @@ struct WorkerTaskData {
>
> impl WorkerTask {
>
> - pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
> + pub fn new(
> + worker_type: &str,
> + worker_id: Option<String>,
> + auth_id: String,
> + to_stdout: bool,
> + ) -> Result<Arc<Self>, Error> {
> +
> + let setup = worker_task_setup()?;
> +
> let upid = UPID::new(worker_type, worker_id, auth_id)?;
> let task_id = upid.task_id;
>
> - let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
> + let mut path = setup.taskdir.clone();
>
> path.push(format!("{:02X}", upid.pstart & 255));
>
> - let backup_user = pbs_config::backup_user()?;
> + let dir_opts = setup.file_opts.clone()
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
>
> - create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
> + create_path(&path, None, Some(dir_opts))?;
>
> path.push(upid.to_string());
>
> @@ -608,12 +694,13 @@ impl WorkerTask {
> exclusive: true,
> prefix_time: true,
> read: true,
> + file_opts: setup.file_opts.clone(),
> ..Default::default()
> };
> let logger = FileLogger::new(&path, logger_options)?;
> - nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
>
> let worker = Arc::new(Self {
> + setup,
> upid: upid.clone(),
> abort_requested: AtomicBool::new(false),
> data: Mutex::new(WorkerTaskData {
> @@ -631,7 +718,7 @@ impl WorkerTask {
> proxmox_rest_server::set_worker_count(hash.len());
> }
>
> - update_active_workers(Some(&upid))?;
> + setup.update_active_workers(Some(&upid))?;
>
> Ok(worker)
> }
> @@ -714,7 +801,7 @@ impl WorkerTask {
> self.log(state.result_text());
>
> WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
> - let _ = update_active_workers(None);
> + let _ = self.setup.update_active_workers(None);
> proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
> }
>
> --
> 2.30.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
More information about the pbs-devel
mailing list