[pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group

Dietmar Maurer dietmar at proxmox.com
Thu Sep 23 12:13:25 CEST 2021


And application now needs to call init_worker_tasks() before using
worker tasks.

Notable changes:
- need to call  init_worker_tasks() before using worker tasks.
- create_task_log_dirs() ís called inside init_worker_tasks()
- removed UpidExt trait
- use atomic_open_or_create_file()
- remove pbs_config and pbs_buildcfg dependency
---
 src/api2/node/tasks.rs          |   6 +-
 src/bin/proxmox-backup-api.rs   |   7 +-
 src/bin/proxmox-backup-proxy.rs |   5 +-
 src/server/mod.rs               |   3 -
 src/server/upid.rs              |  18 --
 src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
 6 files changed, 290 insertions(+), 224 deletions(-)
 delete mode 100644 src/server/upid.rs

diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 169a3d4d..df4673a1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,7 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
+use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -220,7 +220,7 @@ async fn get_task_status(
     if crate::server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
-        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
+        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
         result["status"] = Value::from("stopped");
         result["exitstatus"] = Value::from(exitstatus.to_string());
     };
@@ -287,7 +287,7 @@ async fn read_task_log(
 
     let mut count: u64 = 0;
 
-    let path = upid.log_path();
+    let path = upid_log_path(&upid)?;
 
     let file = File::open(path)?;
 
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9ad10260..9901b85d 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
         bail!("unable to inititialize syslog - {}", err);
     }
 
-    server::create_task_log_dirs()?;
-
     config::create_configdir()?;
 
     config::update_self_signed_cert(false)?;
@@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 518054bf..5d8ed189 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a7dcee67..77320da6 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
     ctrl_sock_from_pid(*PID)
 }
 
-mod upid;
-pub use upid::*;
-
 mod worker_task;
 pub use worker_task::*;
 
diff --git a/src/server/upid.rs b/src/server/upid.rs
deleted file mode 100644
index 70a3e3fb..00000000
--- a/src/server/upid.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-pub trait UPIDExt: private::Sealed {
-    /// Returns the absolute path to the task log file
-    fn log_path(&self) -> std::path::PathBuf;
-}
-
-mod private {
-    pub trait Sealed {}
-    impl Sealed for  pbs_api_types::UPID {}
-}
-
-impl UPIDExt for  pbs_api_types::UPID {
-    fn log_path(&self) -> std::path::PathBuf {
-        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
-        path.push(format!("{:02X}", self.pstart % 256));
-        path.push(self.to_string());
-        path
-    }
-}
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 92ab50d7..191d8a44 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,5 +1,6 @@
 use std::collections::{HashMap, VecDeque};
 use std::fs::File;
+use std::path::PathBuf;
 use std::io::{Read, Write, BufRead, BufReader};
 use std::panic::UnwindSafe;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -11,27 +12,267 @@ use lazy_static::lazy_static;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
 
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
 
-use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
 use pbs_api_types::UPID;
-use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
-use super::UPIDExt;
+struct TaskListLockGuard(File);
 
-macro_rules! taskdir {
-    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+struct WorkerTaskSetup {
+    file_opts: CreateOptions,
+    taskdir: PathBuf,
+    task_lock_fn: PathBuf,
+    active_tasks_fn: PathBuf,
+    task_index_fn: PathBuf,
+    task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+    WORKER_TASK_SETUP.get()
+        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+        let mut taskdir = basedir.clone();
+        taskdir.push("tasks");
+
+        let mut task_lock_fn = taskdir.clone();
+        task_lock_fn.push(".active.lock");
+
+        let mut active_tasks_fn = taskdir.clone();
+        active_tasks_fn.push("active");
+
+        let mut task_index_fn = taskdir.clone();
+        task_index_fn.push("index");
+
+        let mut task_archive_fn = taskdir.clone();
+        task_archive_fn.push("archive");
+
+        Self {
+            file_opts,
+            taskdir,
+            task_lock_fn,
+            active_tasks_fn,
+            task_index_fn,
+            task_archive_fn,
+        }
+    }
+
+    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        let timeout = std::time::Duration::new(10, 0);
+
+        let file = proxmox::tools::fs::open_file_locked(
+            &self.task_lock_fn,
+            timeout,
+            exclusive,
+            options,
+        )?;
+
+        Ok(TaskListLockGuard(file))
+    }
+
+    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+        let mut path = self.taskdir.clone();
+        path.push(format!("{:02X}", upid.pstart % 256));
+        path.push(upid.to_string());
+        path
+    }
+
+    // atomically read/update the task list, update status of finished tasks
+    // new_upid is added to the list when specified.
+    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+        let lock = self.lock_task_list_files(true)?;
+
+        // TODO remove with 1.x
+        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+        let had_index_file = !finish_list.is_empty();
+
+        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+        // clippy doesn't quite catch this!
+        #[allow(clippy::unnecessary_filter_map)]
+        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+            .into_iter()
+            .filter_map(|info| {
+                if info.state.is_some() {
+                    // this can happen when the active file still includes finished tasks
+                    finish_list.push(info);
+                    return None;
+                }
+
+                if !worker_is_active_local(&info.upid) {
+                    // println!("Detected stopped task '{}'", &info.upid_str);
+                    let now = proxmox::tools::time::epoch_i64();
+                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+                    finish_list.push(TaskListInfo {
+                        upid: info.upid,
+                        upid_str: info.upid_str,
+                        state: Some(status)
+                    });
+                    return None;
+                }
+
+                Some(info)
+            }).collect();
+
+        if let Some(upid) = new_upid {
+            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+        }
+
+        let active_raw = render_task_list(&active_list);
+
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        replace_file(
+            &self.active_tasks_fn,
+            active_raw.as_bytes(),
+            options,
+        )?;
+
+        finish_list.sort_unstable_by(|a, b| {
+            match (&a.state, &b.state) {
+                (Some(s1), Some(s2)) => s1.cmp(&s2),
+                (Some(_), None) => std::cmp::Ordering::Less,
+                (None, Some(_)) => std::cmp::Ordering::Greater,
+                _ => a.upid.starttime.cmp(&b.upid.starttime),
+            }
+        });
+
+        if !finish_list.is_empty() {
+            let options =  self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+            let mut writer = atomic_open_or_create_file(
+                &self.task_archive_fn,
+                OFlag::O_APPEND | OFlag::O_RDWR,
+                &[],
+                options,
+            )?;
+            for info in &finish_list {
+                writer.write_all(render_task_line(&info).as_bytes())?;
+            }
+        }
+
+        // TODO Remove with 1.x
+        // for compatibility, if we had an INDEX file, we do not need it anymore
+        if had_index_file {
+            let _ = nix::unistd::unlink(&self.task_index_fn);
+        }
+
+        drop(lock);
+
+        Ok(())
+    }
+
+    // Create task log directory with correct permissions
+    fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+        try_block!({
+            let dir_opts = self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+            Ok(())
+        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+    }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+    let setup = WorkerTaskSetup::new(basedir, file_opts);
+    setup.create_task_log_dirs()?;
+    WORKER_TASK_SETUP.set(setup)
+        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let _lock = setup.lock_task_list_files(true)?;
+
+    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+            .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+    logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+    let setup = worker_task_setup()?;
+    Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+    let path = setup.log_path(upid);
+
+    let mut file = File::open(path)?;
+
+    /// speedup - only read tail
+    use std::io::Seek;
+    use std::io::SeekFrom;
+    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+    let mut data = Vec::with_capacity(8192);
+    file.read_to_end(&mut data)?;
+
+    // strip newlines at the end of the task logs
+    while data.last() == Some(&b'\n') {
+        data.pop();
+    }
+
+    let last_line = match data.iter().rposition(|c| *c == b'\n') {
+        Some(start) if data.len() > (start+1) => &data[start+1..],
+        Some(_) => &data, // should not happen, since we removed all trailing newlines
+        None => &data,
+    };
+
+    let last_line = std::str::from_utf8(last_line)
+        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+    let mut iter = last_line.splitn(2, ": ");
+    if let Some(time_str) = iter.next() {
+        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+            // set the endtime even if we cannot parse the state
+            status = TaskState::Unknown { endtime };
+            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+                    status = state;
+                }
+            }
+        }
+    }
+
+    Ok(status)
 }
-pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
     }
 }
 
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
-
-    try_block!({
-        let backup_user = pbs_config::backup_user()?;
-        let opts = CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid);
-
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
-        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
-        Ok(())
-    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
-
-    Ok(())
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
-    let mut status = TaskState::Unknown { endtime: upid.starttime };
-
-    let path = upid.log_path();
-
-    let mut file = File::open(path)?;
-
-    /// speedup - only read tail
-    use std::io::Seek;
-    use std::io::SeekFrom;
-    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
-    let mut data = Vec::with_capacity(8192);
-    file.read_to_end(&mut data)?;
-
-    // strip newlines at the end of the task logs
-    while data.last() == Some(&b'\n') {
-        data.pop();
-    }
-
-    let last_line = match data.iter().rposition(|c| *c == b'\n') {
-        Some(start) if data.len() > (start+1) => &data[start+1..],
-        Some(_) => &data, // should not happen, since we removed all trailing newlines
-        None => &data,
-    };
-
-    let last_line = std::str::from_utf8(last_line)
-        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
-    let mut iter = last_line.splitn(2, ": ");
-    if let Some(time_str) = iter.next() {
-        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
-            // set the endtime even if we cannot parse the state
-            status = TaskState::Unknown { endtime };
-            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
-                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
-                    status = state;
-                }
-            }
-        }
-    }
-
-    Ok(status)
-}
-
 /// Task State
 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
 pub enum TaskState {
@@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
     }
 }
 
-fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
-    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
-    let _lock = lock_task_list_files(true)?;
-
-    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
-        .ok_or_else(|| format_err!("could not get archive file names"))?;
-
-    logrotate.rotate(size_threshold, None, max_files)
-}
-
-// atomically read/update the task list, update status of finished tasks
-// new_upid is added to the list when specified.
-fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
-
-    let backup_user = pbs_config::backup_user()?;
-
-    let lock = lock_task_list_files(true)?;
-
-    // TODO remove with 1.x
-    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
-    let had_index_file = !finish_list.is_empty();
-
-    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
-    // clippy doesn't quite catch this!
-    #[allow(clippy::unnecessary_filter_map)]
-    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
-        .into_iter()
-        .filter_map(|info| {
-            if info.state.is_some() {
-                // this can happen when the active file still includes finished tasks
-                finish_list.push(info);
-                return None;
-            }
-
-            if !worker_is_active_local(&info.upid) {
-                // println!("Detected stopped task '{}'", &info.upid_str);
-                let now = proxmox::tools::time::epoch_i64();
-                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
-                finish_list.push(TaskListInfo {
-                    upid: info.upid,
-                    upid_str: info.upid_str,
-                    state: Some(status)
-                });
-                return None;
-            }
-
-            Some(info)
-        }).collect();
-
-    if let Some(upid) = new_upid {
-        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
-    }
-
-    let active_raw = render_task_list(&active_list);
-
-    replace_file(
-        PROXMOX_BACKUP_ACTIVE_TASK_FN,
-        active_raw.as_bytes(),
-        CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid),
-    )?;
-
-    finish_list.sort_unstable_by(|a, b| {
-        match (&a.state, &b.state) {
-            (Some(s1), Some(s2)) => s1.cmp(&s2),
-            (Some(_), None) => std::cmp::Ordering::Less,
-            (None, Some(_)) => std::cmp::Ordering::Greater,
-            _ => a.upid.starttime.cmp(&b.upid.starttime),
-        }
-    });
-
-    if !finish_list.is_empty() {
-        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
-            Ok(mut writer) => {
-                for info in &finish_list {
-                    writer.write_all(render_task_line(&info).as_bytes())?;
-                }
-            },
-            Err(err) => bail!("could not write task archive - {}", err),
-        }
-
-        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
-    }
-
-    // TODO Remove with 1.x
-    // for compatibility, if we had an INDEX file, we do not need it anymore
-    if had_index_file {
-        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
-    }
-
-    drop(lock);
-
-    Ok(())
-}
-
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
@@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
     list: VecDeque<TaskListInfo>,
     end: bool,
     archive: Option<LogRotateFiles>,
-    lock: Option<BackupLockGuard>,
+    lock: Option<TaskListLockGuard>,
 }
 
 impl TaskListInfoIterator {
     pub fn new(active_only: bool) -> Result<Self, Error> {
+
+        let setup = worker_task_setup()?;
+
         let (read_lock, active_list) = {
-            let lock = lock_task_list_files(false)?;
-            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+            let lock = setup.lock_task_list_files(false)?;
+            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
 
             let needs_update = active_list
                 .iter()
                 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
 
             // TODO remove with 1.x
-            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+            let index_exists = setup.task_index_fn.is_file();
 
             if needs_update || index_exists {
                 drop(lock);
-                update_active_workers(None)?;
-                let lock = lock_task_list_files(false)?;
-                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+                setup.update_active_workers(None)?;
+                let lock = setup.lock_task_list_files(false)?;
+                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
                 (lock, active_list)
             } else {
                 (lock, active_list)
@@ -516,7 +592,7 @@ impl TaskListInfoIterator {
         let archive = if active_only {
             None
         } else {
-            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
+            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
                 .ok_or_else(|| format_err!("could not get archive file names"))?;
             Some(logrotate.files())
         };
@@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
 /// persistently to files. Task should poll the `abort_requested`
 /// flag, and stop execution when requested.
 pub struct WorkerTask {
+    setup: &'static WorkerTaskSetup,
     upid: UPID,
     data: Mutex<WorkerTaskData>,
     abort_requested: AtomicBool,
@@ -589,17 +666,26 @@ struct WorkerTaskData {
 
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+    ) -> Result<Arc<Self>, Error> {
+
+        let setup = worker_task_setup()?;
+
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
-        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
+        let mut path = setup.taskdir.clone();
 
         path.push(format!("{:02X}", upid.pstart & 255));
 
-        let backup_user = pbs_config::backup_user()?;
+        let dir_opts = setup.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
 
-        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
+        create_path(&path, None, Some(dir_opts))?;
 
         path.push(upid.to_string());
 
@@ -608,12 +694,13 @@ impl WorkerTask {
             exclusive: true,
             prefix_time: true,
             read: true,
+            file_opts: setup.file_opts.clone(),
             ..Default::default()
         };
         let logger = FileLogger::new(&path, logger_options)?;
-        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
 
         let worker = Arc::new(Self {
+            setup,
             upid: upid.clone(),
             abort_requested: AtomicBool::new(false),
             data: Mutex::new(WorkerTaskData {
@@ -631,7 +718,7 @@ impl WorkerTask {
             proxmox_rest_server::set_worker_count(hash.len());
         }
 
-        update_active_workers(Some(&upid))?;
+        setup.update_active_workers(Some(&upid))?;
 
         Ok(worker)
     }
@@ -714,7 +801,7 @@ impl WorkerTask {
         self.log(state.result_text());
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
-        let _ = update_active_workers(None);
+        let _ = self.setup.update_active_workers(None);
         proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
-- 
2.30.2






More information about the pbs-devel mailing list