[pbs-devel] [PATCH backup v3] make tasklog downloadable in the backup server backend

Daniel Tschlatscher d.tschlatscher at proxmox.com
Tue Oct 11 15:59:00 CEST 2022


The read tasklog API call now streams the log file if the limit
parameter is set to 0. If not, it should behave the same as before.
This is done in preparation for the task log download button to be
added in the TaskViewer.

To make this work I had to use one of the lower level apimethod types
from the proxmox-router. Therefore I also had to change declarations
for the API routing and the corresponding Object Schemas.

Signed-off-by: Daniel Tschlatscher <d.tschlatscher at proxmox.com>
---

changes from v2
* improved the documentation for the limit parameter
* changed the api method name to "read_task_log"

 src/api2/node/tasks.rs | 161 ++++++++++++++++++++++++++---------------
 1 file changed, 103 insertions(+), 58 deletions(-)

diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index cbd0894e..bc8e3ab5 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -1,11 +1,17 @@
 use std::fs::File;
 use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
 
 use anyhow::{bail, Error};
+use futures::FutureExt;
+use http::{Response, StatusCode, header};
+use http::request::Parts;
+use hyper::Body;
+use proxmox_async::stream::AsyncReaderStream;
 use serde_json::{json, Value};
 
-use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap};
-use proxmox_schema::api;
+use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap, ApiMethod, ApiHandler, ApiResponseFuture};
+use proxmox_schema::{api, Schema, IntegerSchema, BooleanSchema, ObjectSchema};
 use proxmox_sys::sortable;
 
 use pbs_api_types::{
@@ -19,6 +25,22 @@ use crate::api2::pull::check_pull_privs;
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::{upid_log_path, upid_read_status, TaskListInfoIterator, TaskState};
 
+pub const START_PARAM_SCHEMA: Schema =
+    IntegerSchema::new("Start at this line when reading the tasklog")
+        .minimum(0)
+        .default(0)
+        .schema();
+
+pub const LIMIT_PARAM_SCHEMA: Schema =
+    IntegerSchema::new("The amount of lines to read from the tasklog. Setting this parameter to 0 will stream the file directly")
+        .minimum(0)
+        .default(50)
+        .schema();
+
+pub const TEST_STATUS_PARAM_SCHEMA: Schema =
+    BooleanSchema::new("Test task status, and set result attribute \"active\" accordingly.")
+        .schema();
+
 // matches respective job execution privileges
 fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
     match (upid.worker_type.as_str(), &upid.worker_id) {
@@ -268,58 +290,88 @@ fn extract_upid(param: &Value) -> Result<UPID, Error> {
     upid_str.parse::<UPID>()
 }
 
-#[api(
-    input: {
-        properties: {
-            node: {
-                schema: NODE_SCHEMA,
-            },
-            upid: {
-                schema: UPID_SCHEMA,
-            },
-            "test-status": {
-                type: bool,
-                optional: true,
-                description: "Test task status, and set result attribute \"active\" accordingly.",
-            },
-            start: {
-                type: u64,
-                optional: true,
-                description: "Start at this line.",
-                default: 0,
-            },
-            limit: {
-                type: u64,
-                optional: true,
-                description: "Only list this amount of lines.",
-                default: 50,
-            },
-        },
-    },
-    access: {
-        description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
-        permission: &Permission::Anybody,
-    },
-)]
-/// Read task log.
-async fn read_task_log(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
-    let upid = extract_upid(&param)?;
-
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
-    check_task_access(&auth_id, &upid)?;
-
-    let test_status = param["test-status"].as_bool().unwrap_or(false);
-
-    let start = param["start"].as_u64().unwrap_or(0);
-    let mut limit = param["limit"].as_u64().unwrap_or(50);
-
-    let mut count: u64 = 0;
+#[sortable]
+pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
+    &ApiHandler::AsyncHttp(&read_task_log),
+    &ObjectSchema::new(
+        "Read the task log",
+        &sorted!([
+            ("node", false, &NODE_SCHEMA),
+            ("upid", false, &UPID_SCHEMA),
+            ("start", true, &START_PARAM_SCHEMA),
+            ("limit", true, &LIMIT_PARAM_SCHEMA),
+            ("test-status", true, &TEST_STATUS_PARAM_SCHEMA)
+        ]),
+    ),
+)
+.access(
+    Some("Users can access their own tasks, or need Sys.Audit on /system/tasks."),
+    &Permission::Anybody,
+);
+fn read_task_log(
+    _parts: Parts,
+    _req_body: Body,
+    param: Value,
+    _info: &ApiMethod,
+    rpcenv: Box<dyn RpcEnvironment>,
+) -> ApiResponseFuture {
+    async move {
+        let upid: UPID = extract_upid(&param)?;
+        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+        check_task_access(&auth_id, &upid)?;
+
+        let test_status = param["test-status"].as_bool().unwrap_or(false);
+        let start = param["start"].as_u64().unwrap_or(0);
+        let limit = param["limit"].as_u64().unwrap_or(50);
+
+        let path = upid_log_path(&upid)?;
+
+        if limit == 0 {
+            let file = tokio::fs::File::open(path).await?;
+
+            let header_disp = format!("attachment; filename={}", &upid.to_string());
+
+            let stream = AsyncReaderStream::new(file);
+
+            Ok(Response::builder()
+                .status(StatusCode::OK)
+                .header(header::CONTENT_TYPE, "text/plain")
+                .header(header::CONTENT_DISPOSITION, &header_disp)
+                .body(Body::wrap_stream(stream))
+                .unwrap())
+        } else {
+            let (count, lines) = read_task_log_lines(path, start, limit).unwrap();
+
+            let mut json = json!({
+                "data": lines,
+                "total": count,
+                "success": 1,
+            });
+
+            if test_status {
+                let active = proxmox_rest_server::worker_is_active(&upid).await?;
+
+                json["test-status"] = Value::from(active);
+            }
 
-    let path = upid_log_path(&upid)?;
+            Ok(Response::builder()
+                .status(StatusCode::OK)
+                .header(header::CONTENT_TYPE, "application/json")
+                .body(Body::from(json.to_string()))
+                .unwrap())
+        }
+    }
+    .boxed()
+}
 
+fn read_task_log_lines(
+    path: PathBuf,
+    start: u64,
+    mut limit: u64,
+) -> Result<(u64, Vec<Value>), Error> {
     let file = File::open(path)?;
 
+    let mut count: u64 = 0;
     let mut lines: Vec<Value> = vec![];
 
     for line in BufReader::new(file).lines() {
@@ -344,14 +396,7 @@ async fn read_task_log(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<
         }
     }
 
-    rpcenv["total"] = Value::from(count);
-
-    if test_status {
-        let active = proxmox_rest_server::worker_is_active(&upid).await?;
-        rpcenv["active"] = Value::from(active);
-    }
-
-    Ok(json!(lines))
+    Ok((count, lines))
 }
 
 #[api(
-- 
2.30.2






More information about the pbs-devel mailing list