[pbs-devel] [PATCH proxmox-backup 1/3] cleanup: merge endtime into TaskState

Dominik Csapak d.csapak at proxmox.com
Thu Aug 13 14:30:17 CEST 2020


Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/api2/admin/sync.rs    |   4 +-
 src/api2/node/tasks.rs    |   7 +--
 src/api2/types/mod.rs     |   2 +-
 src/config/jobstate.rs    |  12 ++---
 src/server/worker_task.rs | 110 ++++++++++++++++++++++----------------
 5 files changed, 76 insertions(+), 59 deletions(-)

diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index c09bea4f..aafd808f 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -42,9 +42,9 @@ pub fn list_sync_jobs(
                 let parsed_upid: UPID = upid.parse()?;
                 (Some(upid), None, None, parsed_upid.starttime)
             },
-            JobState::Finished { upid, endtime, state } => {
+            JobState::Finished { upid, state } => {
                 let parsed_upid: UPID = upid.parse()?;
-                (Some(upid), Some(endtime), Some(state.to_string()), parsed_upid.starttime)
+                (Some(upid), Some(state.endtime()), Some(state.to_string()), parsed_upid.starttime)
             },
         };
 
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 1e9643ec..c8add6b4 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -105,7 +105,7 @@ async fn get_task_status(
     if crate::server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
-        let (_, exitstatus) = crate::server::upid_read_status(&upid).unwrap_or((0, TaskState::Unknown));
+        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
         result["status"] = Value::from("stopped");
         result["exitstatus"] = Value::from(exitstatus.to_string());
     };
@@ -352,8 +352,9 @@ pub fn list_tasks(
 
         if let Some(ref state) = info.state {
             if running { continue; }
-            if errors && state.1 == crate::server::TaskState::OK {
-                continue;
+            match state {
+                crate::server::TaskState::OK { .. } if errors => continue,
+                _ => {},
             }
         }
 
diff --git a/src/api2/types/mod.rs b/src/api2/types/mod.rs
index a619810d..de7a5ca0 100644
--- a/src/api2/types/mod.rs
+++ b/src/api2/types/mod.rs
@@ -595,7 +595,7 @@ impl From<crate::server::TaskListInfo> for TaskListItem {
     fn from(info: crate::server::TaskListInfo) -> Self {
         let (endtime, status) = info
             .state
-            .map_or_else(|| (None, None), |(a,b)| (Some(a), Some(b.to_string())));
+            .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
 
         TaskListItem {
             upid: info.upid_str,
diff --git a/src/config/jobstate.rs b/src/config/jobstate.rs
index 45672cea..94566bb7 100644
--- a/src/config/jobstate.rs
+++ b/src/config/jobstate.rs
@@ -16,7 +16,7 @@
 //! # use anyhow::{bail, Error};
 //! # use proxmox_backup::server::TaskState;
 //! # use proxmox_backup::config::jobstate::*;
-//! # fn some_code() -> TaskState { TaskState::OK }
+//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
 //! # fn code() -> Result<(), Error> {
 //! // locks the correct file under /var/lib
 //! // or fails if someone else holds the lock
@@ -62,8 +62,8 @@ pub enum JobState {
     Created { time: i64 },
     /// The Job was last started in 'upid',
     Started { upid: String },
-    /// The Job was last started in 'upid', which finished with 'state' at 'endtime'
-    Finished { upid: String, endtime: i64, state: TaskState }
+    /// The Job was last started in 'upid', which finished with 'state'
+    Finished { upid: String, state: TaskState }
 }
 
 /// Represents a Job and holds the correct lock
@@ -143,12 +143,11 @@ impl JobState {
                         .map_err(|err| format_err!("error parsing upid: {}", err))?;
 
                     if !worker_is_active_local(&parsed) {
-                        let (endtime, state) = upid_read_status(&parsed)
+                        let state = upid_read_status(&parsed)
                             .map_err(|err| format_err!("error reading upid log status: {}", err))?;
 
                         Ok(JobState::Finished {
                             upid,
-                            endtime,
                             state
                         })
                     } else {
@@ -225,11 +224,8 @@ impl Job {
             JobState::Finished { upid, .. } => upid,
         }.to_string();
 
-        let endtime: i64 = epoch_now_u64()? as i64;
-
         self.state = JobState::Finished {
             upid,
-            endtime,
             state,
         };
 
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index da1a877e..a9e4a36a 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -156,7 +156,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
     super::send_command(socketname, cmd).map_ok(|_| ()).await
 }
 
-fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
+fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
 
     let data = line.splitn(3, ' ').collect::<Vec<&str>>();
 
@@ -166,7 +166,8 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
         1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
         3 => {
             let endtime = i64::from_str_radix(data[1], 16)?;
-            Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned()))))
+            let state = TaskState::from_endtime_and_message(endtime, data[2])?;
+            Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
         }
         _ => bail!("wrong number of components"),
     }
@@ -193,9 +194,9 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
 /// Read endtime (time of last log line) and exitstatus from task log file
 /// If there is not a single line with at valid datetime, we assume the
 /// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
-    let mut status = TaskState::Unknown;
-    let mut time = upid.starttime;
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+    let mut endtime = upid.starttime;
+    let mut status = TaskState::Unknown { endtime };
 
     let path = upid.log_path();
 
@@ -213,7 +214,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
 
         let mut iter = line.splitn(2, ": ");
         if let Some(time_str) = iter.next() {
-            time = chrono::DateTime::parse_from_rfc3339(time_str)
+            endtime = chrono::DateTime::parse_from_rfc3339(time_str)
                 .map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))?
                 .timestamp();
         } else {
@@ -222,69 +223,86 @@ pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> {
         match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
             None => continue,
             Some(rest) => {
-                if let Ok(state) = rest.parse() {
+                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
                     status = state;
                 }
             }
         }
     }
 
-    Ok((time, status))
+    Ok(status)
 }
 
 /// Task State
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
 pub enum TaskState {
     /// The Task ended with an undefined state
-    Unknown,
+    Unknown { endtime: i64 },
     /// The Task ended and there were no errors or warnings
-    OK,
+    OK { endtime: i64 },
     /// The Task had 'count' amount of warnings and no errors
-    Warning { count: u64 },
+    Warning { count: u64, endtime: i64 },
     /// The Task ended with the error described in 'message'
-    Error { message: String },
+    Error { message: String, endtime: i64 },
 }
 
 impl TaskState {
-    fn result_text(&self) -> String {
-        match self {
-            TaskState::Error { message } => format!("TASK ERROR: {}", message),
-            other => format!("TASK {}", other),
+    pub fn endtime(&self) -> i64 {
+        match *self {
+            TaskState::Unknown { endtime } => endtime,
+            TaskState::OK { endtime } => endtime,
+            TaskState::Warning { endtime, .. } => endtime,
+            TaskState::Error { endtime, .. } => endtime,
         }
     }
-}
 
-impl std::fmt::Display for TaskState {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+    fn result_text(&self) -> String {
         match self {
-            TaskState::Unknown => write!(f, "unknown"),
-            TaskState::OK => write!(f, "OK"),
-            TaskState::Warning { count } => write!(f, "WARNINGS: {}", count),
-            TaskState::Error { message } => write!(f, "{}", message),
+            TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
+            other => format!("TASK {}", other),
         }
     }
-}
 
-impl std::str::FromStr for TaskState {
-    type Err = Error;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
+    fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
         if s == "unknown" {
-            Ok(TaskState::Unknown)
+            Ok(TaskState::Unknown { endtime })
         } else if s == "OK" {
-            Ok(TaskState::OK)
+            Ok(TaskState::OK { endtime })
         } else if s.starts_with("WARNINGS: ") {
             let count: u64 = s[10..].parse()?;
-            Ok(TaskState::Warning{ count })
+            Ok(TaskState::Warning{ count, endtime })
         } else if s.len() > 0 {
             let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
-            Ok(TaskState::Error{ message })
+            Ok(TaskState::Error{ message, endtime })
         } else {
             bail!("unable to parse Task Status '{}'", s);
         }
     }
 }
 
+impl std::cmp::PartialOrd for TaskState {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.endtime().cmp(&other.endtime()))
+    }
+}
+
+impl std::cmp::Ord for TaskState {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.endtime().cmp(&other.endtime())
+    }
+}
+
+impl std::fmt::Display for TaskState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            TaskState::Unknown { .. } => write!(f, "unknown"),
+            TaskState::OK { .. }=> write!(f, "OK"),
+            TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
+            TaskState::Error { message, .. } => write!(f, "{}", message),
+        }
+    }
+}
+
 /// Task details including parsed UPID
 ///
 /// If there is no `state`, the task is still running.
@@ -295,7 +313,7 @@ pub struct TaskListInfo {
     /// UPID string representation
     pub upid_str: String,
     /// Task `(endtime, status)` if already finished
-    pub state: Option<(i64, TaskState)>, // endtime, status
+    pub state: Option<TaskState>, // endtime, status
 }
 
 // atomically read/update the task list, update status of finished tasks
@@ -334,15 +352,15 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
                     },
                     None => {
                         println!("Detected stopped UPID {}", upid_str);
-                        let (time, status) = upid_read_status(&upid)
-                            .unwrap_or_else(|_| (Local::now().timestamp(), TaskState::Unknown));
+                        let status = upid_read_status(&upid)
+                            .unwrap_or_else(|_| TaskState::Unknown { endtime: Local::now().timestamp() });
                         finish_list.push(TaskListInfo {
-                            upid, upid_str, state: Some((time, status))
+                            upid, upid_str, state: Some(status)
                         });
                     },
-                    Some((endtime, status)) => {
+                    Some(status) => {
                         finish_list.push(TaskListInfo {
-                            upid, upid_str, state: Some((endtime, status.parse()?))
+                            upid, upid_str, state: Some(status)
                         })
                     }
                 }
@@ -378,7 +396,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
 
     task_list.sort_unstable_by(|b, a| { // lastest on top
         match (&a.state, &b.state) {
-            (Some(s1), Some(s2)) => s1.0.cmp(&s2.0),
+            (Some(s1), Some(s2)) => s1.cmp(&s2),
             (Some(_), None) => std::cmp::Ordering::Less,
             (None, Some(_)) => std::cmp::Ordering::Greater,
             _ => a.upid.starttime.cmp(&b.upid.starttime),
@@ -387,8 +405,8 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
 
     let mut raw = String::new();
     for info in &task_list {
-        if let Some((endtime, status)) = &info.state {
-            raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status));
+        if let Some(status) = &info.state {
+            raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
         } else {
             raw.push_str(&info.upid_str);
             raw.push('\n');
@@ -559,12 +577,14 @@ impl WorkerTask {
     pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
         let warn_count = self.data.lock().unwrap().warn_count;
 
+        let endtime = Local::now().timestamp();
+
         if let Err(err) = result {
-            TaskState::Error { message: err.to_string() }
+            TaskState::Error { message: err.to_string(), endtime }
         } else if warn_count > 0 {
-            TaskState::Warning { count: warn_count }
+            TaskState::Warning { count: warn_count, endtime }
         } else {
-            TaskState::OK
+            TaskState::OK { endtime }
         }
     }
 
-- 
2.20.1






More information about the pbs-devel mailing list