[pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
Dietmar Maurer
dietmar at proxmox.com
Thu Sep 23 12:13:25 CEST 2021
And application now needs to call init_worker_tasks() before using
worker tasks.
Notable changes:
- need to call init_worker_tasks() before using worker tasks.
- create_task_log_dirs() ís called inside init_worker_tasks()
- removed UpidExt trait
- use atomic_open_or_create_file()
- remove pbs_config and pbs_buildcfg dependency
---
src/api2/node/tasks.rs | 6 +-
src/bin/proxmox-backup-api.rs | 7 +-
src/bin/proxmox-backup-proxy.rs | 5 +-
src/server/mod.rs | 3 -
src/server/upid.rs | 18 --
src/server/worker_task.rs | 475 +++++++++++++++++++-------------
6 files changed, 290 insertions(+), 224 deletions(-)
delete mode 100644 src/server/upid.rs
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 169a3d4d..df4673a1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,7 @@ use pbs_api_types::{
};
use crate::api2::pull::check_pull_privs;
-use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
+use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
use pbs_config::CachedUserInfo;
// matches respective job execution privileges
@@ -220,7 +220,7 @@ async fn get_task_status(
if crate::server::worker_is_active(&upid).await? {
result["status"] = Value::from("running");
} else {
- let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
+ let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
result["status"] = Value::from("stopped");
result["exitstatus"] = Value::from(exitstatus.to_string());
};
@@ -287,7 +287,7 @@ async fn read_task_log(
let mut count: u64 = 0;
- let path = upid.log_path();
+ let path = upid_log_path(&upid)?;
let file = File::open(path)?;
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9ad10260..9901b85d 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
bail!("unable to inititialize syslog - {}", err);
}
- server::create_task_log_dirs()?;
-
config::create_configdir()?;
config::update_self_signed_cert(false)?;
@@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
config.enable_auth_log(
pbs_buildcfg::API_AUTH_LOG_FN,
- Some(dir_opts),
- Some(file_opts),
+ Some(dir_opts.clone()),
+ Some(file_opts.clone()),
&mut commando_sock,
)?;
let rest_server = RestServer::new(config);
+ proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
// http server future:
let server = daemon::create_daemon(
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 518054bf..5d8ed189 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
config.enable_auth_log(
pbs_buildcfg::API_AUTH_LOG_FN,
- Some(dir_opts),
- Some(file_opts),
+ Some(dir_opts.clone()),
+ Some(file_opts.clone()),
&mut commando_sock,
)?;
let rest_server = RestServer::new(config);
+ proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a7dcee67..77320da6 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
-mod upid;
-pub use upid::*;
-
mod worker_task;
pub use worker_task::*;
diff --git a/src/server/upid.rs b/src/server/upid.rs
deleted file mode 100644
index 70a3e3fb..00000000
--- a/src/server/upid.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-pub trait UPIDExt: private::Sealed {
- /// Returns the absolute path to the task log file
- fn log_path(&self) -> std::path::PathBuf;
-}
-
-mod private {
- pub trait Sealed {}
- impl Sealed for pbs_api_types::UPID {}
-}
-
-impl UPIDExt for pbs_api_types::UPID {
- fn log_path(&self) -> std::path::PathBuf {
- let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
- path.push(format!("{:02X}", self.pstart % 256));
- path.push(self.to_string());
- path
- }
-}
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 92ab50d7..191d8a44 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,5 +1,6 @@
use std::collections::{HashMap, VecDeque};
use std::fs::File;
+use std::path::PathBuf;
use std::io::{Read, Write, BufRead, BufReader};
use std::panic::UnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -11,27 +12,267 @@ use lazy_static::lazy_static;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
use proxmox::sys::linux::procfs;
use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use pbs_buildcfg;
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
use pbs_api_types::UPID;
-use pbs_config::{open_backup_lockfile, BackupLockGuard};
use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
-use super::UPIDExt;
+struct TaskListLockGuard(File);
-macro_rules! taskdir {
- ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+struct WorkerTaskSetup {
+ file_opts: CreateOptions,
+ taskdir: PathBuf,
+ task_lock_fn: PathBuf,
+ active_tasks_fn: PathBuf,
+ task_index_fn: PathBuf,
+ task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+ WORKER_TASK_SETUP.get()
+ .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+ fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+ let mut taskdir = basedir.clone();
+ taskdir.push("tasks");
+
+ let mut task_lock_fn = taskdir.clone();
+ task_lock_fn.push(".active.lock");
+
+ let mut active_tasks_fn = taskdir.clone();
+ active_tasks_fn.push("active");
+
+ let mut task_index_fn = taskdir.clone();
+ task_index_fn.push("index");
+
+ let mut task_archive_fn = taskdir.clone();
+ task_archive_fn.push("archive");
+
+ Self {
+ file_opts,
+ taskdir,
+ task_lock_fn,
+ active_tasks_fn,
+ task_index_fn,
+ task_archive_fn,
+ }
+ }
+
+ fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let timeout = std::time::Duration::new(10, 0);
+
+ let file = proxmox::tools::fs::open_file_locked(
+ &self.task_lock_fn,
+ timeout,
+ exclusive,
+ options,
+ )?;
+
+ Ok(TaskListLockGuard(file))
+ }
+
+ fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+ let mut path = self.taskdir.clone();
+ path.push(format!("{:02X}", upid.pstart % 256));
+ path.push(upid.to_string());
+ path
+ }
+
+ // atomically read/update the task list, update status of finished tasks
+ // new_upid is added to the list when specified.
+ fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+ let lock = self.lock_task_list_files(true)?;
+
+ // TODO remove with 1.x
+ let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+ let had_index_file = !finish_list.is_empty();
+
+ // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+ // clippy doesn't quite catch this!
+ #[allow(clippy::unnecessary_filter_map)]
+ let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+ .into_iter()
+ .filter_map(|info| {
+ if info.state.is_some() {
+ // this can happen when the active file still includes finished tasks
+ finish_list.push(info);
+ return None;
+ }
+
+ if !worker_is_active_local(&info.upid) {
+ // println!("Detected stopped task '{}'", &info.upid_str);
+ let now = proxmox::tools::time::epoch_i64();
+ let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+ finish_list.push(TaskListInfo {
+ upid: info.upid,
+ upid_str: info.upid_str,
+ state: Some(status)
+ });
+ return None;
+ }
+
+ Some(info)
+ }).collect();
+
+ if let Some(upid) = new_upid {
+ active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+ }
+
+ let active_raw = render_task_list(&active_list);
+
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ replace_file(
+ &self.active_tasks_fn,
+ active_raw.as_bytes(),
+ options,
+ )?;
+
+ finish_list.sort_unstable_by(|a, b| {
+ match (&a.state, &b.state) {
+ (Some(s1), Some(s2)) => s1.cmp(&s2),
+ (Some(_), None) => std::cmp::Ordering::Less,
+ (None, Some(_)) => std::cmp::Ordering::Greater,
+ _ => a.upid.starttime.cmp(&b.upid.starttime),
+ }
+ });
+
+ if !finish_list.is_empty() {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let mut writer = atomic_open_or_create_file(
+ &self.task_archive_fn,
+ OFlag::O_APPEND | OFlag::O_RDWR,
+ &[],
+ options,
+ )?;
+ for info in &finish_list {
+ writer.write_all(render_task_line(&info).as_bytes())?;
+ }
+ }
+
+ // TODO Remove with 1.x
+ // for compatibility, if we had an INDEX file, we do not need it anymore
+ if had_index_file {
+ let _ = nix::unistd::unlink(&self.task_index_fn);
+ }
+
+ drop(lock);
+
+ Ok(())
+ }
+
+ // Create task log directory with correct permissions
+ fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+ try_block!({
+ let dir_opts = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+ create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+ // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+ Ok(())
+ }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+ }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+ let setup = WorkerTaskSetup::new(basedir, file_opts);
+ setup.create_task_log_dirs()?;
+ WORKER_TASK_SETUP.set(setup)
+ .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let _lock = setup.lock_task_list_files(true)?;
+
+ let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+ .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+ logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+ let setup = worker_task_setup()?;
+ Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+ let path = setup.log_path(upid);
+
+ let mut file = File::open(path)?;
+
+ /// speedup - only read tail
+ use std::io::Seek;
+ use std::io::SeekFrom;
+ let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+ let mut data = Vec::with_capacity(8192);
+ file.read_to_end(&mut data)?;
+
+ // strip newlines at the end of the task logs
+ while data.last() == Some(&b'\n') {
+ data.pop();
+ }
+
+ let last_line = match data.iter().rposition(|c| *c == b'\n') {
+ Some(start) if data.len() > (start+1) => &data[start+1..],
+ Some(_) => &data, // should not happen, since we removed all trailing newlines
+ None => &data,
+ };
+
+ let last_line = std::str::from_utf8(last_line)
+ .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+ let mut iter = last_line.splitn(2, ": ");
+ if let Some(time_str) = iter.next() {
+ if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+ // set the endtime even if we cannot parse the state
+ status = TaskState::Unknown { endtime };
+ if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+ if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+ status = state;
+ }
+ }
+ }
+ }
+
+ Ok(status)
}
-pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
}
}
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
-
- try_block!({
- let backup_user = pbs_config::backup_user()?;
- let opts = CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
- create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
- create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
- Ok(())
- }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
-
- Ok(())
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
- let mut status = TaskState::Unknown { endtime: upid.starttime };
-
- let path = upid.log_path();
-
- let mut file = File::open(path)?;
-
- /// speedup - only read tail
- use std::io::Seek;
- use std::io::SeekFrom;
- let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
- let mut data = Vec::with_capacity(8192);
- file.read_to_end(&mut data)?;
-
- // strip newlines at the end of the task logs
- while data.last() == Some(&b'\n') {
- data.pop();
- }
-
- let last_line = match data.iter().rposition(|c| *c == b'\n') {
- Some(start) if data.len() > (start+1) => &data[start+1..],
- Some(_) => &data, // should not happen, since we removed all trailing newlines
- None => &data,
- };
-
- let last_line = std::str::from_utf8(last_line)
- .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
- let mut iter = last_line.splitn(2, ": ");
- if let Some(time_str) = iter.next() {
- if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
- // set the endtime even if we cannot parse the state
- status = TaskState::Unknown { endtime };
- if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
- if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
- status = state;
- }
- }
- }
- }
-
- Ok(status)
-}
-
/// Task State
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskState {
@@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
}
}
-fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
- open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
- let _lock = lock_task_list_files(true)?;
-
- let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
- .ok_or_else(|| format_err!("could not get archive file names"))?;
-
- logrotate.rotate(size_threshold, None, max_files)
-}
-
-// atomically read/update the task list, update status of finished tasks
-// new_upid is added to the list when specified.
-fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
-
- let backup_user = pbs_config::backup_user()?;
-
- let lock = lock_task_list_files(true)?;
-
- // TODO remove with 1.x
- let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
- let had_index_file = !finish_list.is_empty();
-
- // We use filter_map because one negative case wants to *move* the data into `finish_list`,
- // clippy doesn't quite catch this!
- #[allow(clippy::unnecessary_filter_map)]
- let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
- .into_iter()
- .filter_map(|info| {
- if info.state.is_some() {
- // this can happen when the active file still includes finished tasks
- finish_list.push(info);
- return None;
- }
-
- if !worker_is_active_local(&info.upid) {
- // println!("Detected stopped task '{}'", &info.upid_str);
- let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
- finish_list.push(TaskListInfo {
- upid: info.upid,
- upid_str: info.upid_str,
- state: Some(status)
- });
- return None;
- }
-
- Some(info)
- }).collect();
-
- if let Some(upid) = new_upid {
- active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
- }
-
- let active_raw = render_task_list(&active_list);
-
- replace_file(
- PROXMOX_BACKUP_ACTIVE_TASK_FN,
- active_raw.as_bytes(),
- CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid),
- )?;
-
- finish_list.sort_unstable_by(|a, b| {
- match (&a.state, &b.state) {
- (Some(s1), Some(s2)) => s1.cmp(&s2),
- (Some(_), None) => std::cmp::Ordering::Less,
- (None, Some(_)) => std::cmp::Ordering::Greater,
- _ => a.upid.starttime.cmp(&b.upid.starttime),
- }
- });
-
- if !finish_list.is_empty() {
- match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
- Ok(mut writer) => {
- for info in &finish_list {
- writer.write_all(render_task_line(&info).as_bytes())?;
- }
- },
- Err(err) => bail!("could not write task archive - {}", err),
- }
-
- nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
- }
-
- // TODO Remove with 1.x
- // for compatibility, if we had an INDEX file, we do not need it anymore
- if had_index_file {
- let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
- }
-
- drop(lock);
-
- Ok(())
-}
-
fn render_task_line(info: &TaskListInfo) -> String {
let mut raw = String::new();
if let Some(status) = &info.state {
@@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
end: bool,
archive: Option<LogRotateFiles>,
- lock: Option<BackupLockGuard>,
+ lock: Option<TaskListLockGuard>,
}
impl TaskListInfoIterator {
pub fn new(active_only: bool) -> Result<Self, Error> {
+
+ let setup = worker_task_setup()?;
+
let (read_lock, active_list) = {
- let lock = lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
let needs_update = active_list
.iter()
.any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
// TODO remove with 1.x
- let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+ let index_exists = setup.task_index_fn.is_file();
if needs_update || index_exists {
drop(lock);
- update_active_workers(None)?;
- let lock = lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+ setup.update_active_workers(None)?;
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
(lock, active_list)
} else {
(lock, active_list)
@@ -516,7 +592,7 @@ impl TaskListInfoIterator {
let archive = if active_only {
None
} else {
- let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
+ let logrotate = LogRotate::new(&setup.task_archive_fn, true)
.ok_or_else(|| format_err!("could not get archive file names"))?;
Some(logrotate.files())
};
@@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
/// persistently to files. Task should poll the `abort_requested`
/// flag, and stop execution when requested.
pub struct WorkerTask {
+ setup: &'static WorkerTaskSetup,
upid: UPID,
data: Mutex<WorkerTaskData>,
abort_requested: AtomicBool,
@@ -589,17 +666,26 @@ struct WorkerTaskData {
impl WorkerTask {
- pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
+ pub fn new(
+ worker_type: &str,
+ worker_id: Option<String>,
+ auth_id: String,
+ to_stdout: bool,
+ ) -> Result<Arc<Self>, Error> {
+
+ let setup = worker_task_setup()?;
+
let upid = UPID::new(worker_type, worker_id, auth_id)?;
let task_id = upid.task_id;
- let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
+ let mut path = setup.taskdir.clone();
path.push(format!("{:02X}", upid.pstart & 255));
- let backup_user = pbs_config::backup_user()?;
+ let dir_opts = setup.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
- create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
+ create_path(&path, None, Some(dir_opts))?;
path.push(upid.to_string());
@@ -608,12 +694,13 @@ impl WorkerTask {
exclusive: true,
prefix_time: true,
read: true,
+ file_opts: setup.file_opts.clone(),
..Default::default()
};
let logger = FileLogger::new(&path, logger_options)?;
- nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
let worker = Arc::new(Self {
+ setup,
upid: upid.clone(),
abort_requested: AtomicBool::new(false),
data: Mutex::new(WorkerTaskData {
@@ -631,7 +718,7 @@ impl WorkerTask {
proxmox_rest_server::set_worker_count(hash.len());
}
- update_active_workers(Some(&upid))?;
+ setup.update_active_workers(Some(&upid))?;
Ok(worker)
}
@@ -714,7 +801,7 @@ impl WorkerTask {
self.log(state.result_text());
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
- let _ = update_active_workers(None);
+ let _ = self.setup.update_active_workers(None);
proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
}
--
2.30.2
More information about the pbs-devel
mailing list