[pbs-devel] [PATCH proxmox-backup 1/3] cleanup: merge endtime into TaskState
Dominik Csapak
d.csapak at proxmox.com
Thu Aug 13 14:30:17 CEST 2020
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
src/api2/admin/sync.rs | 4 +-
src/api2/node/tasks.rs | 7 +--
src/api2/types/mod.rs | 2 +-
src/config/jobstate.rs | 12 ++---
src/server/worker_task.rs | 110 ++++++++++++++++++++++----------------
5 files changed, 76 insertions(+), 59 deletions(-)
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index c09bea4f..aafd808f 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -42,9 +42,9 @@ pub fn list_sync_jobs(
let parsed_upid: UPID = upid.parse()?;
(Some(upid), None, None, parsed_upid.starttime)
},
- JobState::Finished { upid, endtime, state } => {
+ JobState::Finished { upid, state } => {
let parsed_upid: UPID = upid.parse()?;
- (Some(upid), Some(endtime), Some(state.to_string()), parsed_upid.starttime)
+ (Some(upid), Some(state.endtime()), Some(state.to_string()), parsed_upid.starttime)
},
};
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 1e9643ec..c8add6b4 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -105,7 +105,7 @@ async fn get_task_status(
if crate::server::worker_is_active(&upid).await? {
result["status"] = Value::from("running");
} else {
- let (_, exitstatus) = crate::server::upid_read_status(&upid).unwrap_or((0, TaskState::Unknown));
+ let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
result["status"] = Value::from("stopped");
result["exitstatus"] = Value::from(exitstatus.to_string());
};
@@ -352,8 +352,9 @@ pub fn list_tasks(
if let Some(ref state) = info.state {
if running { continue; }
- if errors && state.1 == crate::server::TaskState::OK {
- continue;
+ match state {
+ crate::server::TaskState::OK { .. } if errors => continue,
+ _ => {},
}
}
diff --git a/src/api2/types/mod.rs b/src/api2/types/mod.rs
index a619810d..de7a5ca0 100644
--- a/src/api2/types/mod.rs
+++ b/src/api2/types/mod.rs
@@ -595,7 +595,7 @@ impl From<crate::server::TaskListInfo> for TaskListItem {
fn from(info: crate::server::TaskListInfo) -> Self {
let (endtime, status) = info
.state
- .map_or_else(|| (None, None), |(a,b)| (Some(a), Some(b.to_string())));
+ .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
TaskListItem {
upid: info.upid_str,
diff --git a/src/config/jobstate.rs b/src/config/jobstate.rs
index 45672cea..94566bb7 100644
--- a/src/config/jobstate.rs
+++ b/src/config/jobstate.rs
@@ -16,7 +16,7 @@
//! # use anyhow::{bail, Error};
//! # use proxmox_backup::server::TaskState;
//! # use proxmox_backup::config::jobstate::*;
-//! # fn some_code() -> TaskState { TaskState::OK }
+//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
//! # fn code() -> Result<(), Error> {
//! // locks the correct file under /var/lib
//! // or fails if someone else holds the lock
@@ -62,8 +62,8 @@ pub enum JobState {
Created { time: i64 },
/// The Job was last started in 'upid',
Started { upid: String },
- /// The Job was last started in 'upid', which finished with 'state' at 'endtime'
- Finished { upid: String, endtime: i64, state: TaskState }
+ /// The Job was last started in 'upid', which finished with 'state'
+ Finished { upid: String, state: TaskState }
}
/// Represents a Job and holds the correct lock
@@ -143,12 +143,11 @@ impl JobState {
.map_err(|err| format_err!("error parsing upid: {}", err))?;
if !worker_is_active_local(&parsed) {
- let (endtime, state) = upid_read_status(&parsed)
+ let state = upid_read_status(&parsed)
.map_err(|err| format_err!("error reading upid log status: {}", err))?;
Ok(JobState::Finished {
upid,
- endtime,
state
})
} else {
@@ -225,11 +224,8 @@ impl Job {
JobState::Finished { upid, .. } => upid,
}.to_string();
- let endtime: i64 = epoch_now_u64()? as i64;
-
self.state = JobState::Finished {
upid,
- endtime,
state,
};
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index da1a877e..a9e4a36a 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -156,7 +156,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
super::send_command(socketname, cmd).map_ok(|_| ()).await
}
-fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
+fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
let data = line.splitn(3, ' ').collect::<Vec<&str>>();
@@ -166,7 +166,8 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
3 => {
let endtime = i64::from_str_radix(data[1], 16)?;
- Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned()))))
+ let state = TaskState::from_endtime_and_message(endtime, data[2])?;
+ Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
}
_ => bail!("wrong number of components"),
}
@@ -193,9 +194,9 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
/// Read endtime (time of last log line) and exitstatus from task log file
/// If there is not a single line with at valid datetime, we assume the
/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
- let mut status = TaskState::Unknown;
- let mut time = upid.starttime;
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+ let mut endtime = upid.starttime;
+ let mut status = TaskState::Unknown { endtime };
let path = upid.log_path();
@@ -213,7 +214,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
let mut iter = line.splitn(2, ": ");
if let Some(time_str) = iter.next() {
- time = chrono::DateTime::parse_from_rfc3339(time_str)
+ endtime = chrono::DateTime::parse_from_rfc3339(time_str)
.map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))?
.timestamp();
} else {
@@ -222,69 +223,86 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
None => continue,
Some(rest) => {
- if let Ok(state) = rest.parse() {
+ if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
status = state;
}
}
}
}
- Ok((time, status))
+ Ok(status)
}
/// Task State
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskState {
/// The Task ended with an undefined state
- Unknown,
+ Unknown { endtime: i64 },
/// The Task ended and there were no errors or warnings
- OK,
+ OK { endtime: i64 },
/// The Task had 'count' amount of warnings and no errors
- Warning { count: u64 },
+ Warning { count: u64, endtime: i64 },
/// The Task ended with the error described in 'message'
- Error { message: String },
+ Error { message: String, endtime: i64 },
}
impl TaskState {
- fn result_text(&self) -> String {
- match self {
- TaskState::Error { message } => format!("TASK ERROR: {}", message),
- other => format!("TASK {}", other),
+ pub fn endtime(&self) -> i64 {
+ match *self {
+ TaskState::Unknown { endtime } => endtime,
+ TaskState::OK { endtime } => endtime,
+ TaskState::Warning { endtime, .. } => endtime,
+ TaskState::Error { endtime, .. } => endtime,
}
}
-}
-impl std::fmt::Display for TaskState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ fn result_text(&self) -> String {
match self {
- TaskState::Unknown => write!(f, "unknown"),
- TaskState::OK => write!(f, "OK"),
- TaskState::Warning { count } => write!(f, "WARNINGS: {}", count),
- TaskState::Error { message } => write!(f, "{}", message),
+ TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
+ other => format!("TASK {}", other),
}
}
-}
-impl std::str::FromStr for TaskState {
- type Err = Error;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
+ fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
if s == "unknown" {
- Ok(TaskState::Unknown)
+ Ok(TaskState::Unknown { endtime })
} else if s == "OK" {
- Ok(TaskState::OK)
+ Ok(TaskState::OK { endtime })
} else if s.starts_with("WARNINGS: ") {
let count: u64 = s[10..].parse()?;
- Ok(TaskState::Warning{ count })
+ Ok(TaskState::Warning{ count, endtime })
} else if s.len() > 0 {
let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
- Ok(TaskState::Error{ message })
+ Ok(TaskState::Error{ message, endtime })
} else {
bail!("unable to parse Task Status '{}'", s);
}
}
}
+impl std::cmp::PartialOrd for TaskState {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.endtime().cmp(&other.endtime()))
+ }
+}
+
+impl std::cmp::Ord for TaskState {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.endtime().cmp(&other.endtime())
+ }
+}
+
+impl std::fmt::Display for TaskState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskState::Unknown { .. } => write!(f, "unknown"),
+ TaskState::OK { .. }=> write!(f, "OK"),
+ TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
+ TaskState::Error { message, .. } => write!(f, "{}", message),
+ }
+ }
+}
+
/// Task details including parsed UPID
///
/// If there is no `state`, the task is still running.
@@ -295,7 +313,7 @@ pub struct TaskListInfo {
/// UPID string representation
pub upid_str: String,
/// Task `(endtime, status)` if already finished
- pub state: Option<(i64, TaskState)>, // endtime, status
+ pub state: Option<TaskState>, // endtime, status
}
// atomically read/update the task list, update status of finished tasks
@@ -334,15 +352,15 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
},
None => {
println!("Detected stopped UPID {}", upid_str);
- let (time, status) = upid_read_status(&upid)
- .unwrap_or_else(|_| (Local::now().timestamp(), TaskState::Unknown));
+ let status = upid_read_status(&upid)
+ .unwrap_or_else(|_| TaskState::Unknown { endtime: Local::now().timestamp() });
finish_list.push(TaskListInfo {
- upid, upid_str, state: Some((time, status))
+ upid, upid_str, state: Some(status)
});
},
- Some((endtime, status)) => {
+ Some(status) => {
finish_list.push(TaskListInfo {
- upid, upid_str, state: Some((endtime, status.parse()?))
+ upid, upid_str, state: Some(status)
})
}
}
@@ -378,7 +396,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
task_list.sort_unstable_by(|b, a| { // lastest on top
match (&a.state, &b.state) {
- (Some(s1), Some(s2)) => s1.0.cmp(&s2.0),
+ (Some(s1), Some(s2)) => s1.cmp(&s2),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
_ => a.upid.starttime.cmp(&b.upid.starttime),
@@ -387,8 +405,8 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
let mut raw = String::new();
for info in &task_list {
- if let Some((endtime, status)) = &info.state {
- raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status));
+ if let Some(status) = &info.state {
+ raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
} else {
raw.push_str(&info.upid_str);
raw.push('\n');
@@ -559,12 +577,14 @@ impl WorkerTask {
pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
let warn_count = self.data.lock().unwrap().warn_count;
+ let endtime = Local::now().timestamp();
+
if let Err(err) = result {
- TaskState::Error { message: err.to_string() }
+ TaskState::Error { message: err.to_string(), endtime }
} else if warn_count > 0 {
- TaskState::Warning { count: warn_count }
+ TaskState::Warning { count: warn_count, endtime }
} else {
- TaskState::OK
+ TaskState::OK { endtime }
}
}
--
2.20.1
More information about the pbs-devel
mailing list