[pbs-devel] [RFC PATCH proxmox-backup 1/3] proxmox-rest-server: OutputFormatter: add new format_data_streaming method
Dominik Csapak
d.csapak at proxmox.com
Thu Feb 17 10:40:39 CET 2022
that takes the data in form of a `Box<dyn SerializableReturn + Send>`
instead of a Value.
Implement it in json and extjs formatter, by starting a thread and
stream the serialized data via a `BufWriter<SenderWriter>` and use
the Receiver side as a stream for the response body.
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 52 +++++++++++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 8fbbe8c0..a5855c88 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -27,6 +27,7 @@ serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
tokio = { version = "1.6", features = ["signal", "process"] }
tokio-openssl = "0.6.1"
+tokio-stream = "0.1.0"
tower-service = "0.3.0"
url = "2.1"
diff --git a/proxmox-rest-server/src/formatter.rs b/proxmox-rest-server/src/formatter.rs
index e3958826..7b8d16bc 100644
--- a/proxmox-rest-server/src/formatter.rs
+++ b/proxmox-rest-server/src/formatter.rs
@@ -7,7 +7,7 @@ use serde_json::{json, Value};
use hyper::{Body, Response, StatusCode};
use hyper::header;
-use proxmox_router::{HttpError, RpcEnvironment};
+use proxmox_router::{HttpError, RpcEnvironment, SerializableReturn};
use proxmox_schema::ParameterError;
/// Extension to set error message for server side logging
@@ -18,6 +18,9 @@ pub trait OutputFormatter: Send + Sync {
/// Transform json data into a http response
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body>;
+ /// Transform serializable data into a streaming http response
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error>;
+
/// Transform errors into a http response
fn format_error(&self, err: Error) -> Response<Body>;
@@ -46,6 +49,16 @@ fn json_data_response(data: Value) -> Response<Body> {
response
}
+fn json_data_response_streaming(body: Body) -> Result<Response<Body>, Error> {
+ let response = Response::builder()
+ .header(
+ header::CONTENT_TYPE,
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE)
+ )
+ .body(body)?;
+ Ok(response)
+}
+
fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
{
let attributes = match rpcenv.result_attrib().as_object() {
@@ -58,6 +71,19 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
}
}
+fn start_data_streaming(value: Value, data: Box<dyn SerializableReturn + Send>) -> tokio::sync::mpsc::Receiver<Result<Vec<u8>, Error>> {
+ let (writer, reader) = tokio::sync::mpsc::channel(1);
+
+ std::thread::spawn(move || {
+ let output = proxmox_async::blocking::SenderWriter::from_sender(writer);
+ let mut output = std::io::BufWriter::new(output);
+ let mut serializer = serde_json::Serializer::new(&mut output);
+ let _ = data.sender_serialize(&mut serializer, value);
+ });
+
+ reader
+}
+
struct JsonFormatter();
@@ -86,6 +112,17 @@ impl OutputFormatter for JsonFormatter {
json_data_response(result)
}
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+ let mut value = json!({});
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
@@ -142,6 +179,19 @@ impl OutputFormatter for ExtJsFormatter {
json_data_response(result)
}
+ fn format_data_streaming(&self, data: Box<dyn SerializableReturn + Send>, rpcenv: &dyn RpcEnvironment) -> Result<Response<Body>, Error> {
+ let mut value = json!({
+ "success": true,
+ });
+
+ add_result_attributes(&mut value, rpcenv);
+
+ let reader = start_data_streaming(value, data);
+ let stream = tokio_stream::wrappers::ReceiverStream::new(reader);
+
+ json_data_response_streaming(Body::wrap_stream(stream))
+ }
+
fn format_error(&self, err: Error) -> Response<Body> {
let message: String;
--
2.30.2
More information about the pbs-devel
mailing list