[pbs-devel] [RFC proxmox 2/3] new proxmox-jobstate crate

Dominik Csapak d.csapak at proxmox.com
Wed Oct 18 12:39:09 CEST 2023


split out from proxmox-backup/pbs-api-types

includes the JobScheduleStatus api type

depends on the new ServerConfig for the global user/directory config

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 Cargo.toml                            |   2 +
 proxmox-jobstate/Cargo.toml           |  26 +++
 proxmox-jobstate/debian/changelog     |   5 +
 proxmox-jobstate/debian/control       |  55 +++++
 proxmox-jobstate/debian/copyright     |  18 ++
 proxmox-jobstate/debian/debcargo.toml |   7 +
 proxmox-jobstate/src/api_types.rs     |  40 ++++
 proxmox-jobstate/src/jobstate.rs      | 315 ++++++++++++++++++++++++++
 proxmox-jobstate/src/lib.rs           |  46 ++++
 9 files changed, 514 insertions(+)
 create mode 100644 proxmox-jobstate/Cargo.toml
 create mode 100644 proxmox-jobstate/debian/changelog
 create mode 100644 proxmox-jobstate/debian/control
 create mode 100644 proxmox-jobstate/debian/copyright
 create mode 100644 proxmox-jobstate/debian/debcargo.toml
 create mode 100644 proxmox-jobstate/src/api_types.rs
 create mode 100644 proxmox-jobstate/src/jobstate.rs
 create mode 100644 proxmox-jobstate/src/lib.rs

diff --git a/Cargo.toml b/Cargo.toml
index 6b22c58..4b4b787 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ members = [
     "proxmox-http-error",
     "proxmox-human-byte",
     "proxmox-io",
+    "proxmox-jobstate",
     "proxmox-lang",
     "proxmox-ldap",
     "proxmox-login",
@@ -22,6 +23,7 @@ members = [
     "proxmox-schema",
     "proxmox-section-config",
     "proxmox-serde",
+    "proxmox-server-config",
     "proxmox-shared-memory",
     "proxmox-sortable-macro",
     "proxmox-subscription",
diff --git a/proxmox-jobstate/Cargo.toml b/proxmox-jobstate/Cargo.toml
new file mode 100644
index 0000000..d2ba93b
--- /dev/null
+++ b/proxmox-jobstate/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "proxmox-jobstate"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+repository.workspace = true
+description = "Jobstate handling"
+
+exclude.workspace = true
+
+[dependencies]
+serde = { workspace = true, features = ["derive"] }
+
+proxmox-schema = { workspace = true, features = [ "api-macro" ] }
+
+[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
+anyhow.workspace = true
+nix.workspace = true
+once_cell.workspace = true
+serde_json.workspace = true
+
+proxmox-rest-server = { workspace = true, features = [] }
+proxmox-server-config.workspace = true
+proxmox-sys.workspace = true
+proxmox-time.workspace = true
diff --git a/proxmox-jobstate/debian/changelog b/proxmox-jobstate/debian/changelog
new file mode 100644
index 0000000..484a98a
--- /dev/null
+++ b/proxmox-jobstate/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-jobstate (0.1.0-1) stable; urgency=medium
+
+  * initial split out of proxmox-backup
+
+ -- Proxmox Support Team <support at proxmox.com>  Tue, 26 Sep 2023 13:51:49 +0200
diff --git a/proxmox-jobstate/debian/control b/proxmox-jobstate/debian/control
new file mode 100644
index 0000000..fe3c0bc
--- /dev/null
+++ b/proxmox-jobstate/debian/control
@@ -0,0 +1,55 @@
+Source: rust-proxmox-jobstate
+Section: rust
+Priority: optional
+Build-Depends: debhelper (>= 12),
+ dh-cargo (>= 25),
+ cargo:native <!nocheck>,
+ rustc:native <!nocheck>,
+ libstd-rust-dev <!nocheck>,
+ librust-anyhow-1+default-dev <!nocheck>,
+ librust-nix-0.26+default-dev (>= 0.26.1-~~) <!nocheck>,
+ librust-once-cell-1+default-dev (>= 1.3.1-~~) <!nocheck>,
+ librust-proxmox-rest-server-0.4+default-dev <!nocheck>,
+ librust-proxmox-schema-2+api-macro-dev <!nocheck>,
+ librust-proxmox-schema-2+default-dev <!nocheck>,
+ librust-proxmox-server-config-0.1+default-dev <!nocheck>,
+ librust-proxmox-sys-0.5+default-dev <!nocheck>,
+ librust-proxmox-time-1+default-dev (>= 1.1.4-~~) <!nocheck>,
+ librust-serde-1+default-dev <!nocheck>,
+ librust-serde-1+derive-dev <!nocheck>,
+ librust-serde-json-1+default-dev <!nocheck>
+Maintainer: Proxmox Support Team <support at proxmox.com>
+Standards-Version: 4.6.1
+Vcs-Git: git://git.proxmox.com/git/proxmox.git
+Vcs-Browser: https://git.proxmox.com/?p=proxmox.git
+X-Cargo-Crate: proxmox-jobstate
+Rules-Requires-Root: no
+
+Package: librust-proxmox-jobstate-dev
+Architecture: any
+Multi-Arch: same
+Depends:
+ ${misc:Depends},
+ librust-anyhow-1+default-dev,
+ librust-nix-0.26+default-dev (>= 0.26.1-~~),
+ librust-once-cell-1+default-dev (>= 1.3.1-~~),
+ librust-proxmox-rest-server-0.4+default-dev,
+ librust-proxmox-schema-2+api-macro-dev,
+ librust-proxmox-schema-2+default-dev,
+ librust-proxmox-server-config-0.1+default-dev,
+ librust-proxmox-sys-0.5+default-dev,
+ librust-proxmox-time-1+default-dev (>= 1.1.4-~~),
+ librust-serde-1+default-dev,
+ librust-serde-1+derive-dev,
+ librust-serde-json-1+default-dev
+Provides:
+ librust-proxmox-jobstate+default-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0+default-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0.1-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0.1+default-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0.1.0-dev (= ${binary:Version}),
+ librust-proxmox-jobstate-0.1.0+default-dev (= ${binary:Version})
+Description: Jobstate handling - Rust source code
+ This package contains the source for the Rust proxmox-jobstate crate, packaged
+ by debcargo for use with cargo and dh-cargo.
diff --git a/proxmox-jobstate/debian/copyright b/proxmox-jobstate/debian/copyright
new file mode 100644
index 0000000..0d9eab3
--- /dev/null
+++ b/proxmox-jobstate/debian/copyright
@@ -0,0 +1,18 @@
+Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
+
+Files:
+ *
+Copyright: 2019 - 2023 Proxmox Server Solutions GmbH <support at proxmox.com>
+License: AGPL-3.0-or-later
+ This program is free software: you can redistribute it and/or modify it under
+ the terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or (at your option) any
+ later version.
+ .
+ This program is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+ details.
+ .
+ You should have received a copy of the GNU Affero General Public License along
+ with this program. If not, see <https://www.gnu.org/licenses/>.
diff --git a/proxmox-jobstate/debian/debcargo.toml b/proxmox-jobstate/debian/debcargo.toml
new file mode 100644
index 0000000..b7864cd
--- /dev/null
+++ b/proxmox-jobstate/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support at proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-jobstate/src/api_types.rs b/proxmox-jobstate/src/api_types.rs
new file mode 100644
index 0000000..c04d72a
--- /dev/null
+++ b/proxmox-jobstate/src/api_types.rs
@@ -0,0 +1,40 @@
+use proxmox_schema::api;
+use serde::{Deserialize, Serialize};
+
+#[api(
+    properties: {
+        "next-run": {
+            description: "Estimated time of the next run (UNIX epoch).",
+            optional: true,
+            type: Integer,
+        },
+        "last-run-state": {
+            description: "Result of the last run.",
+            optional: true,
+            type: String,
+        },
+        "last-run-upid": {
+            description: "Task UPID of the last run.",
+            optional: true,
+            type: String,
+        },
+        "last-run-endtime": {
+            description: "Endtime of the last run.",
+            optional: true,
+            type: Integer,
+        },
+    }
+)]
+#[derive(Serialize, Deserialize, Default, Clone, PartialEq)]
+#[serde(rename_all = "kebab-case")]
+/// Job Scheduling Status
+pub struct JobScheduleStatus {
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub next_run: Option<i64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub last_run_state: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub last_run_upid: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub last_run_endtime: Option<i64>,
+}
diff --git a/proxmox-jobstate/src/jobstate.rs b/proxmox-jobstate/src/jobstate.rs
new file mode 100644
index 0000000..9522c5b
--- /dev/null
+++ b/proxmox-jobstate/src/jobstate.rs
@@ -0,0 +1,315 @@
+use std::path::{Path, PathBuf};
+
+use anyhow::{bail, format_err, Error};
+use nix::unistd::User;
+use serde::{Deserialize, Serialize};
+
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
+use proxmox_schema::upid::UPID;
+use proxmox_sys::fs::{create_path, file_read_optional_string, replace_file, CreateOptions};
+use proxmox_time::CalendarEvent;
+
+use crate::JobScheduleStatus;
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Represents the State of a specific Job
+pub enum JobState {
+    /// A job was created at 'time', but never started/finished
+    Created { time: i64 },
+    /// The Job was last started in 'upid',
+    Started { upid: String },
+    /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
+    Finished {
+        upid: String,
+        state: TaskState,
+        updated: Option<i64>,
+    },
+}
+
+/// Represents a Job and holds the correct lock
+pub struct Job {
+    jobtype: String,
+    jobname: String,
+    /// The State of the job
+    pub state: JobState,
+    _lock: std::fs::File,
+}
+
+/// Create jobstate stat dir with correct permission
+///
+/// It is necessary to initialize a global [`ServerConfig`](proxmox_server_config::ServerConfig)
+/// first.
+pub fn create_jobstate_dir() -> Result<(), Error> {
+    let server_config = proxmox_server_config::get_server_config()?;
+    let path = server_config.state_dir().join("jobstates");
+
+    let user = server_config.user();
+    let opts = CreateOptions::new().owner(user.uid).group(user.gid);
+
+    create_path(path, Some(opts.clone()), Some(opts))
+        .map_err(|err: Error| format_err!("unable to create job state dir - {err}"))?;
+
+    Ok(())
+}
+
+fn get_path(jobtype: &str, jobname: &str) -> Result<PathBuf, Error> {
+    let server_config = proxmox_server_config::get_server_config()?;
+    let mut path = server_config.state_dir().join("jobstates");
+    path.push(format!("{jobtype}-{jobname}.json"));
+    Ok(path)
+}
+
+fn get_user() -> Result<&'static User, Error> {
+    Ok(proxmox_server_config::get_server_config()?.user())
+}
+
+fn get_lock<P>(path: P) -> Result<std::fs::File, Error>
+where
+    P: AsRef<Path>,
+{
+    let mut path = path.as_ref().to_path_buf();
+    path.set_extension("lck");
+    let user = get_user()?;
+    let options = proxmox_sys::fs::CreateOptions::new()
+        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660))
+        .owner(user.uid)
+        .group(user.gid);
+    let timeout = std::time::Duration::new(10, 0);
+
+    let file = proxmox_sys::fs::open_file_locked(&path, timeout, true, options)?;
+    Ok(file)
+}
+
+/// Removes the statefile of a job, this is useful if we delete a job
+pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut path = get_path(jobtype, jobname)?;
+    let _lock = get_lock(&path)?;
+    if let Err(err) = std::fs::remove_file(&path) {
+        if err.kind() != std::io::ErrorKind::NotFound {
+            bail!("cannot remove statefile for {jobtype} - {jobname}: {err}");
+        }
+    }
+    path.set_extension("lck");
+    if let Err(err) = std::fs::remove_file(&path) {
+        if err.kind() != std::io::ErrorKind::NotFound {
+            bail!("cannot remove lockfile for {jobtype} - {jobname}: {err}");
+        }
+    }
+    Ok(())
+}
+
+/// Creates the statefile with the state 'Created'
+/// overwrites if it exists already
+pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut job = Job::new(jobtype, jobname)?;
+    job.write_state()
+}
+
+/// Tries to update the state file with the current time
+/// if the job is currently running, does nothing.
+/// Intended for use when the schedule changes.
+pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut job = match Job::new(jobtype, jobname) {
+        Ok(job) => job,
+        Err(_) => return Ok(()), // was locked (running), so do not update
+    };
+    let time = proxmox_time::epoch_i64();
+
+    job.state = match JobState::load(jobtype, jobname)? {
+        JobState::Created { .. } => JobState::Created { time },
+        JobState::Started { .. } => return Ok(()), // currently running (without lock?)
+        JobState::Finished {
+            upid,
+            state,
+            updated: _,
+        } => JobState::Finished {
+            upid,
+            state,
+            updated: Some(time),
+        },
+    };
+    job.write_state()
+}
+
+/// Returns the last run time of a job by reading the statefile
+/// Note that this is not locked
+pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
+    match JobState::load(jobtype, jobname)? {
+        JobState::Created { time } => Ok(time),
+        JobState::Finished {
+            updated: Some(time),
+            ..
+        } => Ok(time),
+        JobState::Started { upid }
+        | JobState::Finished {
+            upid,
+            state: _,
+            updated: None,
+        } => {
+            let upid: UPID = upid
+                .parse()
+                .map_err(|err| format_err!("could not parse upid from state: {err}"))?;
+            Ok(upid.starttime)
+        }
+    }
+}
+
+impl JobState {
+    /// Loads and deserializes the jobstate from type and name.
+    /// When the loaded state indicates a started UPID,
+    /// we go and check if it has already stopped, and
+    /// returning the correct state.
+    ///
+    /// This does not update the state in the file.
+    pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        if let Some(state) = file_read_optional_string(get_path(jobtype, jobname)?)? {
+            match serde_json::from_str(&state)? {
+                JobState::Started { upid } => {
+                    let parsed: UPID = upid
+                        .parse()
+                        .map_err(|err| format_err!("error parsing upid: {err}"))?;
+
+                    if !worker_is_active_local(&parsed) {
+                        let state = upid_read_status(&parsed)
+                            .map_err(|err| format_err!("error reading upid log status: {err}"))?;
+
+                        Ok(JobState::Finished {
+                            upid,
+                            state,
+                            updated: None,
+                        })
+                    } else {
+                        Ok(JobState::Started { upid })
+                    }
+                }
+                other => Ok(other),
+            }
+        } else {
+            Ok(JobState::Created {
+                time: proxmox_time::epoch_i64() - 30,
+            })
+        }
+    }
+}
+
+impl Job {
+    /// Creates a new instance of a job with the correct lock held
+    /// (will be hold until the job is dropped again).
+    ///
+    /// This does not load the state from the file, to do that,
+    /// 'load' must be called
+    pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        let path = get_path(jobtype, jobname)?;
+
+        let _lock = get_lock(path)?;
+
+        Ok(Self {
+            jobtype: jobtype.to_string(),
+            jobname: jobname.to_string(),
+            state: JobState::Created {
+                time: proxmox_time::epoch_i64(),
+            },
+            _lock,
+        })
+    }
+
+    /// Start the job and update the statefile accordingly
+    /// Fails if the job was already started
+    pub fn start(&mut self, upid: &str) -> Result<(), Error> {
+        if let JobState::Started { .. } = self.state {
+            bail!("cannot start job that is started!");
+        }
+
+        self.state = JobState::Started {
+            upid: upid.to_string(),
+        };
+
+        self.write_state()
+    }
+
+    /// Finish the job and update the statefile accordingly with the given taskstate
+    /// Fails if the job was not yet started
+    pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
+        let upid = match &self.state {
+            JobState::Created { .. } => bail!("cannot finish when not started"),
+            JobState::Started { upid } => upid,
+            JobState::Finished { upid, .. } => upid,
+        }
+        .to_string();
+
+        self.state = JobState::Finished {
+            upid,
+            state,
+            updated: None,
+        };
+
+        self.write_state()
+    }
+
+    pub fn jobtype(&self) -> &str {
+        &self.jobtype
+    }
+
+    pub fn jobname(&self) -> &str {
+        &self.jobname
+    }
+
+    fn write_state(&mut self) -> Result<(), Error> {
+        let serialized = serde_json::to_string(&self.state)?;
+        let path = get_path(&self.jobtype, &self.jobname)?;
+
+        let backup_user = get_user()?;
+        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
+        // set the correct owner/group/permissions while saving file
+        // owner(rw) = backup, group(r)= backup
+        let options = CreateOptions::new()
+            .perm(mode)
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
+
+        replace_file(path, serialized.as_bytes(), options, false)
+    }
+}
+
+pub fn compute_schedule_status(
+    job_state: &JobState,
+    schedule: Option<&str>,
+) -> Result<JobScheduleStatus, Error> {
+    let (upid, endtime, state, last) = match job_state {
+        JobState::Created { time } => (None, None, None, *time),
+        JobState::Started { upid } => {
+            let parsed_upid: UPID = upid.parse()?;
+            (Some(upid), None, None, parsed_upid.starttime)
+        }
+        JobState::Finished {
+            upid,
+            state,
+            updated,
+        } => {
+            let last = updated.unwrap_or_else(|| state.endtime());
+            (
+                Some(upid),
+                Some(state.endtime()),
+                Some(state.to_string()),
+                last,
+            )
+        }
+    };
+
+    let mut status = JobScheduleStatus {
+        last_run_upid: upid.map(String::from),
+        last_run_state: state,
+        last_run_endtime: endtime,
+        ..Default::default()
+    };
+
+    if let Some(schedule) = schedule {
+        if let Ok(event) = schedule.parse::<CalendarEvent>() {
+            // ignore errors
+            status.next_run = event.compute_next_event(last).unwrap_or(None);
+        }
+    }
+
+    Ok(status)
+}
diff --git a/proxmox-jobstate/src/lib.rs b/proxmox-jobstate/src/lib.rs
new file mode 100644
index 0000000..0d1ea70
--- /dev/null
+++ b/proxmox-jobstate/src/lib.rs
@@ -0,0 +1,46 @@
+/// Generic JobState handling
+///
+/// A 'Job' can have 3 states
+///  - Created, when a schedule was created but never executed
+///  - Started, when a job is running right now
+///  - Finished, when a job was running in the past
+///
+/// and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
+///
+/// This module Provides 2 helper structs to handle those coniditons
+/// 'Job' which handles locking and writing to a file
+/// 'JobState' which is the actual state
+///
+/// an example usage would be
+/// ```no_run
+/// # use anyhow::{bail, Error};
+/// # use proxmox_rest_server::TaskState;
+/// # use proxmox_jobstate::*;
+/// # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
+/// # fn code() -> Result<(), Error> {
+/// // locks the correct file under /var/lib
+/// // or fails if someone else holds the lock
+/// let mut job = match Job::new("jobtype", "jobname") {
+///     Ok(job) => job,
+///     Err(err) => bail!("could not lock jobstate"),
+/// };
+///
+/// // job holds the lock, we can start it
+/// job.start("someupid")?;
+/// // do something
+/// let task_state = some_code();
+/// job.finish(task_state)?;
+///
+/// // release the lock
+/// drop(job);
+/// # Ok(())
+/// # }
+///
+/// ```
+mod api_types;
+pub use api_types::*;
+
+#[cfg(not(target_arch = "wasm32"))]
+mod jobstate;
+#[cfg(not(target_arch = "wasm32"))]
+pub use jobstate::*;
-- 
2.30.2






More information about the pbs-devel mailing list