[pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Sep 23 13:36:30 CEST 2021


On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> And application now needs to call init_worker_tasks() before using
> worker tasks.
> 
> Notable changes:
> - need to call  init_worker_tasks() before using worker tasks.
> - create_task_log_dirs() ís called inside init_worker_tasks()
> - removed UpidExt trait
> - use atomic_open_or_create_file()
> - remove pbs_config and pbs_buildcfg dependency
> ---
>  src/api2/node/tasks.rs          |   6 +-
>  src/bin/proxmox-backup-api.rs   |   7 +-
>  src/bin/proxmox-backup-proxy.rs |   5 +-
>  src/server/mod.rs               |   3 -
>  src/server/upid.rs              |  18 --
>  src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
>  6 files changed, 290 insertions(+), 224 deletions(-)
>  delete mode 100644 src/server/upid.rs
> 
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 169a3d4d..df4673a1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -16,7 +16,7 @@ use pbs_api_types::{
>  };
>  
>  use crate::api2::pull::check_pull_privs;
> -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
> +use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
>  use pbs_config::CachedUserInfo;
>  
>  // matches respective job execution privileges
> @@ -220,7 +220,7 @@ async fn get_task_status(
>      if crate::server::worker_is_active(&upid).await? {
>          result["status"] = Value::from("running");
>      } else {
> -        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
> +        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
>          result["status"] = Value::from("stopped");
>          result["exitstatus"] = Value::from(exitstatus.to_string());
>      };
> @@ -287,7 +287,7 @@ async fn read_task_log(
>  
>      let mut count: u64 = 0;
>  
> -    let path = upid.log_path();
> +    let path = upid_log_path(&upid)?;
>  
>      let file = File::open(path)?;
>  
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9ad10260..9901b85d 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
>          bail!("unable to inititialize syslog - {}", err);
>      }
>  
> -    server::create_task_log_dirs()?;
> -
>      config::create_configdir()?;
>  
>      config::update_self_signed_cert(false)?;
> @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      // http server future:
>      let server = daemon::create_daemon(
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 518054bf..5d8ed189 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
>  
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index a7dcee67..77320da6 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
>      ctrl_sock_from_pid(*PID)
>  }
>  
> -mod upid;
> -pub use upid::*;
> -
>  mod worker_task;
>  pub use worker_task::*;
>  
> diff --git a/src/server/upid.rs b/src/server/upid.rs
> deleted file mode 100644
> index 70a3e3fb..00000000
> --- a/src/server/upid.rs
> +++ /dev/null
> @@ -1,18 +0,0 @@
> -pub trait UPIDExt: private::Sealed {
> -    /// Returns the absolute path to the task log file
> -    fn log_path(&self) -> std::path::PathBuf;
> -}
> -
> -mod private {
> -    pub trait Sealed {}
> -    impl Sealed for  pbs_api_types::UPID {}
> -}
> -
> -impl UPIDExt for  pbs_api_types::UPID {
> -    fn log_path(&self) -> std::path::PathBuf {
> -        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
> -        path.push(format!("{:02X}", self.pstart % 256));
> -        path.push(self.to_string());
> -        path
> -    }
> -}
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index 92ab50d7..191d8a44 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -1,5 +1,6 @@
>  use std::collections::{HashMap, VecDeque};
>  use std::fs::File;
> +use std::path::PathBuf;
>  use std::io::{Read, Write, BufRead, BufReader};
>  use std::panic::UnwindSafe;
>  use std::sync::atomic::{AtomicBool, Ordering};
> @@ -11,27 +12,267 @@ use lazy_static::lazy_static;
>  use serde_json::{json, Value};
>  use serde::{Serialize, Deserialize};
>  use tokio::sync::oneshot;
> +use nix::fcntl::OFlag;
> +use once_cell::sync::OnceCell;
>  
>  use proxmox::sys::linux::procfs;
>  use proxmox::try_block;
> -use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
> +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
>  
> -use pbs_buildcfg;
>  use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
>  use pbs_api_types::UPID;
> -use pbs_config::{open_backup_lockfile, BackupLockGuard};
>  use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>  
> -use super::UPIDExt;
> +struct TaskListLockGuard(File);
>  
> -macro_rules! taskdir {
> -    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
> +struct WorkerTaskSetup {
> +    file_opts: CreateOptions,
> +    taskdir: PathBuf,
> +    task_lock_fn: PathBuf,
> +    active_tasks_fn: PathBuf,
> +    task_index_fn: PathBuf,
> +    task_archive_fn: PathBuf,
> +}
> +
> +static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
> +
> +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
> +    WORKER_TASK_SETUP.get()
> +        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
> +}
> +
> +impl WorkerTaskSetup {
> +
> +    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
> +
> +        let mut taskdir = basedir.clone();
> +        taskdir.push("tasks");
> +
> +        let mut task_lock_fn = taskdir.clone();
> +        task_lock_fn.push(".active.lock");
> +
> +        let mut active_tasks_fn = taskdir.clone();
> +        active_tasks_fn.push("active");
> +
> +        let mut task_index_fn = taskdir.clone();
> +        task_index_fn.push("index");
> +
> +        let mut task_archive_fn = taskdir.clone();
> +        task_archive_fn.push("archive");
> +
> +        Self {
> +            file_opts,
> +            taskdir,
> +            task_lock_fn,
> +            active_tasks_fn,
> +            task_index_fn,
> +            task_archive_fn,
> +        }
> +    }
> +
> +    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {

since we touch/move this (and thus have to touch all call sites), we 
could take this opportunity to move the locked operations including 
access to the active_task_fn, task_index_fn and task_archive_fn struct 
members to the lock guard (and maybe split it into 
exclusive/non-exclusive guards?) to make misuse impossible? AFAICT all 
the current access is done while holding the lock.

can of course also be done as follow-up in some generic fashion since 
this is a recurring problem, just struck me while reading through the 
rest of the changes.

> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        let timeout = std::time::Duration::new(10, 0);
> +
> +        let file = proxmox::tools::fs::open_file_locked(
> +            &self.task_lock_fn,
> +            timeout,
> +            exclusive,
> +            options,
> +        )?;
> +
> +        Ok(TaskListLockGuard(file))
> +    }
> +
> +    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
> +        let mut path = self.taskdir.clone();
> +        path.push(format!("{:02X}", upid.pstart % 256));
> +        path.push(upid.to_string());
> +        path
> +    }
> +
> +    // atomically read/update the task list, update status of finished tasks
> +    // new_upid is added to the list when specified.
> +    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
> +
> +        let lock = self.lock_task_list_files(true)?;
> +
> +        // TODO remove with 1.x
> +        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
> +        let had_index_file = !finish_list.is_empty();
> +
> +        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> +        // clippy doesn't quite catch this!
> +        #[allow(clippy::unnecessary_filter_map)]
> +        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
> +            .into_iter()
> +            .filter_map(|info| {
> +                if info.state.is_some() {
> +                    // this can happen when the active file still includes finished tasks
> +                    finish_list.push(info);
> +                    return None;
> +                }
> +
> +                if !worker_is_active_local(&info.upid) {
> +                    // println!("Detected stopped task '{}'", &info.upid_str);
> +                    let now = proxmox::tools::time::epoch_i64();
> +                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> +                    finish_list.push(TaskListInfo {
> +                        upid: info.upid,
> +                        upid_str: info.upid_str,
> +                        state: Some(status)
> +                    });
> +                    return None;
> +                }
> +
> +                Some(info)
> +            }).collect();
> +
> +        if let Some(upid) = new_upid {
> +            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> +        }
> +
> +        let active_raw = render_task_list(&active_list);
> +
> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        replace_file(
> +            &self.active_tasks_fn,
> +            active_raw.as_bytes(),
> +            options,
> +        )?;
> +
> +        finish_list.sort_unstable_by(|a, b| {
> +            match (&a.state, &b.state) {
> +                (Some(s1), Some(s2)) => s1.cmp(&s2),
> +                (Some(_), None) => std::cmp::Ordering::Less,
> +                (None, Some(_)) => std::cmp::Ordering::Greater,
> +                _ => a.upid.starttime.cmp(&b.upid.starttime),
> +            }
> +        });
> +
> +        if !finish_list.is_empty() {
> +            let options =  self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +            let mut writer = atomic_open_or_create_file(
> +                &self.task_archive_fn,
> +                OFlag::O_APPEND | OFlag::O_RDWR,
> +                &[],
> +                options,
> +            )?;
> +            for info in &finish_list {
> +                writer.write_all(render_task_line(&info).as_bytes())?;
> +            }
> +        }
> +
> +        // TODO Remove with 1.x
> +        // for compatibility, if we had an INDEX file, we do not need it anymore
> +        if had_index_file {
> +            let _ = nix::unistd::unlink(&self.task_index_fn);
> +        }
> +
> +        drop(lock);
> +
> +        Ok(())
> +    }
> +
> +    // Create task log directory with correct permissions
> +    fn create_task_log_dirs(&self) -> Result<(), Error> {
> +
> +        try_block!({
> +            let dir_opts = self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> +            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
> +            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> +            Ok(())
> +        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
> +    }
> +}
> +
> +/// Initialize the WorkerTask library
> +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
> +    let setup = WorkerTaskSetup::new(basedir, file_opts);
> +    setup.create_task_log_dirs()?;
> +    WORKER_TASK_SETUP.set(setup)
> +        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
> +}
> +
> +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> +/// rotates it if it is
> +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let _lock = setup.lock_task_list_files(true)?;
> +
> +    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
> +            .ok_or_else(|| format_err!("could not get archive file names"))?;
> +
> +    logrotate.rotate(size_threshold, None, max_files)
> +}
> +
> +
> +/// Path to the worker log file
> +pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
> +    let setup = worker_task_setup()?;
> +    Ok(setup.log_path(upid))
> +}
> +
> +/// Read endtime (time of last log line) and exitstatus from task log file
> +/// If there is not a single line with at valid datetime, we assume the
> +/// starttime to be the endtime
> +pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let mut status = TaskState::Unknown { endtime: upid.starttime };
> +
> +    let path = setup.log_path(upid);
> +
> +    let mut file = File::open(path)?;
> +
> +    /// speedup - only read tail
> +    use std::io::Seek;
> +    use std::io::SeekFrom;
> +    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> +
> +    let mut data = Vec::with_capacity(8192);
> +    file.read_to_end(&mut data)?;
> +
> +    // strip newlines at the end of the task logs
> +    while data.last() == Some(&b'\n') {
> +        data.pop();
> +    }
> +
> +    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> +        Some(start) if data.len() > (start+1) => &data[start+1..],
> +        Some(_) => &data, // should not happen, since we removed all trailing newlines
> +        None => &data,
> +    };
> +
> +    let last_line = std::str::from_utf8(last_line)
> +        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> +
> +    let mut iter = last_line.splitn(2, ": ");
> +    if let Some(time_str) = iter.next() {
> +        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> +            // set the endtime even if we cannot parse the state
> +            status = TaskState::Unknown { endtime };
> +            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> +                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> +                    status = state;
> +                }
> +            }
> +        }
> +    }
> +
> +    Ok(status)
>  }
> -pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
> -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
> -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
> -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
> -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
>  
>  lazy_static! {
>      static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
> @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
>      }
>  }
>  
> -/// Create task log directory with correct permissions
> -pub fn create_task_log_dirs() -> Result<(), Error> {
> -
> -    try_block!({
> -        let backup_user = pbs_config::backup_user()?;
> -        let opts = CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid);
> -
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
> -        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> -        Ok(())
> -    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
> -
> -    Ok(())
> -}
> -
> -/// Read endtime (time of last log line) and exitstatus from task log file
> -/// If there is not a single line with at valid datetime, we assume the
> -/// starttime to be the endtime
> -pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> -
> -    let mut status = TaskState::Unknown { endtime: upid.starttime };
> -
> -    let path = upid.log_path();
> -
> -    let mut file = File::open(path)?;
> -
> -    /// speedup - only read tail
> -    use std::io::Seek;
> -    use std::io::SeekFrom;
> -    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> -
> -    let mut data = Vec::with_capacity(8192);
> -    file.read_to_end(&mut data)?;
> -
> -    // strip newlines at the end of the task logs
> -    while data.last() == Some(&b'\n') {
> -        data.pop();
> -    }
> -
> -    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> -        Some(start) if data.len() > (start+1) => &data[start+1..],
> -        Some(_) => &data, // should not happen, since we removed all trailing newlines
> -        None => &data,
> -    };
> -
> -    let last_line = std::str::from_utf8(last_line)
> -        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> -
> -    let mut iter = last_line.splitn(2, ": ");
> -    if let Some(time_str) = iter.next() {
> -        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> -            // set the endtime even if we cannot parse the state
> -            status = TaskState::Unknown { endtime };
> -            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> -                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> -                    status = state;
> -                }
> -            }
> -        }
> -    }
> -
> -    Ok(status)
> -}
> -
>  /// Task State
>  #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
>  pub enum TaskState {
> @@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
>      }
>  }
>  
> -fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
> -    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
> -}
> -
> -/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> -/// rotates it if it is
> -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> -    let _lock = lock_task_list_files(true)?;
> -
> -    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
> -        .ok_or_else(|| format_err!("could not get archive file names"))?;
> -
> -    logrotate.rotate(size_threshold, None, max_files)
> -}
> -
> -// atomically read/update the task list, update status of finished tasks
> -// new_upid is added to the list when specified.
> -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
> -
> -    let backup_user = pbs_config::backup_user()?;
> -
> -    let lock = lock_task_list_files(true)?;
> -
> -    // TODO remove with 1.x
> -    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
> -    let had_index_file = !finish_list.is_empty();
> -
> -    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> -    // clippy doesn't quite catch this!
> -    #[allow(clippy::unnecessary_filter_map)]
> -    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
> -        .into_iter()
> -        .filter_map(|info| {
> -            if info.state.is_some() {
> -                // this can happen when the active file still includes finished tasks
> -                finish_list.push(info);
> -                return None;
> -            }
> -
> -            if !worker_is_active_local(&info.upid) {
> -                // println!("Detected stopped task '{}'", &info.upid_str);
> -                let now = proxmox::tools::time::epoch_i64();
> -                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> -                finish_list.push(TaskListInfo {
> -                    upid: info.upid,
> -                    upid_str: info.upid_str,
> -                    state: Some(status)
> -                });
> -                return None;
> -            }
> -
> -            Some(info)
> -        }).collect();
> -
> -    if let Some(upid) = new_upid {
> -        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> -    }
> -
> -    let active_raw = render_task_list(&active_list);
> -
> -    replace_file(
> -        PROXMOX_BACKUP_ACTIVE_TASK_FN,
> -        active_raw.as_bytes(),
> -        CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid),
> -    )?;
> -
> -    finish_list.sort_unstable_by(|a, b| {
> -        match (&a.state, &b.state) {
> -            (Some(s1), Some(s2)) => s1.cmp(&s2),
> -            (Some(_), None) => std::cmp::Ordering::Less,
> -            (None, Some(_)) => std::cmp::Ordering::Greater,
> -            _ => a.upid.starttime.cmp(&b.upid.starttime),
> -        }
> -    });
> -
> -    if !finish_list.is_empty() {
> -        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
> -            Ok(mut writer) => {
> -                for info in &finish_list {
> -                    writer.write_all(render_task_line(&info).as_bytes())?;
> -                }
> -            },
> -            Err(err) => bail!("could not write task archive - {}", err),
> -        }
> -
> -        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
> -    }
> -
> -    // TODO Remove with 1.x
> -    // for compatibility, if we had an INDEX file, we do not need it anymore
> -    if had_index_file {
> -        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
> -    }
> -
> -    drop(lock);
> -
> -    Ok(())
> -}
> -
>  fn render_task_line(info: &TaskListInfo) -> String {
>      let mut raw = String::new();
>      if let Some(status) = &info.state {
> @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
>      list: VecDeque<TaskListInfo>,
>      end: bool,
>      archive: Option<LogRotateFiles>,
> -    lock: Option<BackupLockGuard>,
> +    lock: Option<TaskListLockGuard>,
>  }
>  
>  impl TaskListInfoIterator {
>      pub fn new(active_only: bool) -> Result<Self, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let (read_lock, active_list) = {
> -            let lock = lock_task_list_files(false)?;
> -            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +            let lock = setup.lock_task_list_files(false)?;
> +            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>  
>              let needs_update = active_list
>                  .iter()
>                  .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
>  
>              // TODO remove with 1.x
> -            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
> +            let index_exists = setup.task_index_fn.is_file();
>  
>              if needs_update || index_exists {
>                  drop(lock);
> -                update_active_workers(None)?;
> -                let lock = lock_task_list_files(false)?;
> -                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +                setup.update_active_workers(None)?;
> +                let lock = setup.lock_task_list_files(false)?;
> +                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>                  (lock, active_list)
>              } else {
>                  (lock, active_list)
> @@ -516,7 +592,7 @@ impl TaskListInfoIterator {
>          let archive = if active_only {
>              None
>          } else {
> -            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
> +            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
>                  .ok_or_else(|| format_err!("could not get archive file names"))?;
>              Some(logrotate.files())
>          };
> @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
>  /// persistently to files. Task should poll the `abort_requested`
>  /// flag, and stop execution when requested.
>  pub struct WorkerTask {
> +    setup: &'static WorkerTaskSetup,
>      upid: UPID,
>      data: Mutex<WorkerTaskData>,
>      abort_requested: AtomicBool,
> @@ -589,17 +666,26 @@ struct WorkerTaskData {
>  
>  impl WorkerTask {
>  
> -    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
> +    pub fn new(
> +        worker_type: &str,
> +        worker_id: Option<String>,
> +        auth_id: String,
> +        to_stdout: bool,
> +    ) -> Result<Arc<Self>, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
>          let task_id = upid.task_id;
>  
> -        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
> +        let mut path = setup.taskdir.clone();
>  
>          path.push(format!("{:02X}", upid.pstart & 255));
>  
> -        let backup_user = pbs_config::backup_user()?;
> +        let dir_opts = setup.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
>  
> -        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
> +        create_path(&path, None, Some(dir_opts))?;
>  
>          path.push(upid.to_string());
>  
> @@ -608,12 +694,13 @@ impl WorkerTask {
>              exclusive: true,
>              prefix_time: true,
>              read: true,
> +            file_opts: setup.file_opts.clone(),
>              ..Default::default()
>          };
>          let logger = FileLogger::new(&path, logger_options)?;
> -        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
>  
>          let worker = Arc::new(Self {
> +            setup,
>              upid: upid.clone(),
>              abort_requested: AtomicBool::new(false),
>              data: Mutex::new(WorkerTaskData {
> @@ -631,7 +718,7 @@ impl WorkerTask {
>              proxmox_rest_server::set_worker_count(hash.len());
>          }
>  
> -        update_active_workers(Some(&upid))?;
> +        setup.update_active_workers(Some(&upid))?;
>  
>          Ok(worker)
>      }
> @@ -714,7 +801,7 @@ impl WorkerTask {
>          self.log(state.result_text());
>  
>          WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
> -        let _ = update_active_workers(None);
> +        let _ = self.setup.update_active_workers(None);
>          proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
>      }
>  
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 





More information about the pbs-devel mailing list