[pbs-devel] [PATCH proxmox-backup v2 1/7] make tasklog downloadable in the backup server backend

Daniel Tschlatscher d.tschlatscher at proxmox.com
Wed Sep 7 10:56:27 CEST 2022


The API call for getting the tasklog now returns a file stream when
the URL parameter 'limit' is set to 0, in accordance with the changes
in the PMG and PVE backends.
To make this work I had to use one of the lower level apimethod types
from the proxmox-router. Therefore I also had to change declarations
for the API routing and the corresponding Object Schemas.
If the parameter 'limit' is not set to 0 the API should behave the
same as before.

Signed-off-by: Daniel Tschlatscher <d.tschlatscher at proxmox.com>
---
 src/api2/node/tasks.rs | 159 ++++++++++++++++++++++++++---------------
 1 file changed, 101 insertions(+), 58 deletions(-)

diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index cbd0894e..99600bf4 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -1,11 +1,17 @@
 use std::fs::File;
 use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
 
 use anyhow::{bail, Error};
+use futures::FutureExt;
+use http::{Response, StatusCode, header};
+use http::request::Parts;
+use hyper::Body;
+use proxmox_async::stream::AsyncReaderStream;
 use serde_json::{json, Value};
 
-use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap};
-use proxmox_schema::api;
+use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap, ApiMethod, ApiHandler, ApiResponseFuture};
+use proxmox_schema::{api, Schema, IntegerSchema, BooleanSchema, ObjectSchema};
 use proxmox_sys::sortable;
 
 use pbs_api_types::{
@@ -19,6 +25,20 @@ use crate::api2::pull::check_pull_privs;
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::{upid_log_path, upid_read_status, TaskListInfoIterator, TaskState};
 
+pub const START_PARAM_SCHEMA: Schema =
+    IntegerSchema::new("Start at this line when reading the tasklog")
+        .minimum(0)
+        .schema();
+
+pub const LIMIT_PARAM_SCHEMA: Schema =
+    IntegerSchema::new("The amount of lines to read from the tasklog")
+        .minimum(0)
+        .schema();
+
+pub const TEST_STATUS_PARAM_SCHEMA: Schema =
+    BooleanSchema::new("Test task status, and set result attribute \"active\" accordingly.")
+        .schema();
+
 // matches respective job execution privileges
 fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
     match (upid.worker_type.as_str(), &upid.worker_id) {
@@ -268,58 +288,88 @@ fn extract_upid(param: &Value) -> Result<UPID, Error> {
     upid_str.parse::<UPID>()
 }
 
-#[api(
-    input: {
-        properties: {
-            node: {
-                schema: NODE_SCHEMA,
-            },
-            upid: {
-                schema: UPID_SCHEMA,
-            },
-            "test-status": {
-                type: bool,
-                optional: true,
-                description: "Test task status, and set result attribute \"active\" accordingly.",
-            },
-            start: {
-                type: u64,
-                optional: true,
-                description: "Start at this line.",
-                default: 0,
-            },
-            limit: {
-                type: u64,
-                optional: true,
-                description: "Only list this amount of lines.",
-                default: 50,
-            },
-        },
-    },
-    access: {
-        description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
-        permission: &Permission::Anybody,
-    },
-)]
-/// Read task log.
-async fn read_task_log(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
-    let upid = extract_upid(&param)?;
-
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
-    check_task_access(&auth_id, &upid)?;
-
-    let test_status = param["test-status"].as_bool().unwrap_or(false);
-
-    let start = param["start"].as_u64().unwrap_or(0);
-    let mut limit = param["limit"].as_u64().unwrap_or(50);
-
-    let mut count: u64 = 0;
+#[sortable]
+pub const API_METHOD_READ_TASK_LOG: ApiMethod = ApiMethod::new(
+    &ApiHandler::AsyncHttp(&download_task_log),
+    &ObjectSchema::new(
+        "Read the task log",
+        &sorted!([
+            ("node", false, &NODE_SCHEMA),
+            ("upid", false, &UPID_SCHEMA),
+            ("start", true, &START_PARAM_SCHEMA),
+            ("limit", true, &LIMIT_PARAM_SCHEMA),
+            ("test-status", true, &TEST_STATUS_PARAM_SCHEMA)
+        ]),
+    ),
+)
+.access(
+    Some("Users can access their own tasks, or need Sys.Audit on /system/tasks."),
+    &Permission::Anybody,
+);
+fn download_task_log(
+    _parts: Parts,
+    _req_body: Body,
+    param: Value,
+    _info: &ApiMethod,
+    rpcenv: Box<dyn RpcEnvironment>,
+) -> ApiResponseFuture {
+    async move {
+        let upid: UPID = extract_upid(&param)?;
+        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+        check_task_access(&auth_id, &upid)?;
+
+        let test_status = param["test-status"].as_bool().unwrap_or(false);
+        let start = param["start"].as_u64().unwrap_or(0);
+        let limit = param["limit"].as_u64().unwrap_or(50);
+
+        let path = upid_log_path(&upid)?;
+
+        if limit == 0 {
+            let file = tokio::fs::File::open(path).await?;
+
+            let header_disp = format!("attachment; filename={}", &upid.to_string());
+
+            let stream = AsyncReaderStream::new(file);
+
+            Ok(Response::builder()
+                .status(StatusCode::OK)
+                .header(header::CONTENT_TYPE, "text/plain")
+                .header(header::CONTENT_DISPOSITION, &header_disp)
+                .body(Body::wrap_stream(stream))
+                .unwrap())
+        } else {
+            let (count, lines) = read_tasklog_lines(path, start, limit).unwrap();
+
+            let mut json = json!({
+                "data": lines,
+                "total": count,
+                "success": 1,
+            });
+
+            if test_status {
+                let active = proxmox_rest_server::worker_is_active(&upid).await?;
+
+                json["test-status"] = Value::from(active);
+            }
 
-    let path = upid_log_path(&upid)?;
+            Ok(Response::builder()
+                .status(StatusCode::OK)
+                .header(header::CONTENT_TYPE, "application/json")
+                .body(Body::from(json.to_string()))
+                .unwrap())
+        }
+    }
+    .boxed()
+}
 
+fn read_tasklog_lines(
+    path: PathBuf,
+    start: u64,
+    mut limit: u64,
+) -> Result<(u64, Vec<Value>), Error> {
     let file = File::open(path)?;
 
+    let mut count: u64 = 0;
     let mut lines: Vec<Value> = vec![];
 
     for line in BufReader::new(file).lines() {
@@ -344,14 +394,7 @@ async fn read_task_log(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<
         }
     }
 
-    rpcenv["total"] = Value::from(count);
-
-    if test_status {
-        let active = proxmox_rest_server::worker_is_active(&upid).await?;
-        rpcenv["active"] = Value::from(active);
-    }
-
-    Ok(json!(lines))
+    Ok((count, lines))
 }
 
 #[api(
-- 
2.30.2






More information about the pbs-devel mailing list