[pbs-devel] [PATCH backup 6/7] refactor send_command
Wolfgang Bumiller
w.bumiller at proxmox.com
Tue May 11 15:53:59 CEST 2021
- refactor the combinators,
- make it take a `&T: Serialize` instead of a Value, and
allow sending the raw string via `send_raw_command`.
Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 8 +---
src/server/command_socket.rs | 71 ++++++++++++++++++---------------
src/server/worker_task.rs | 4 +-
3 files changed, 42 insertions(+), 41 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 793ba67d..fc773459 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -750,15 +750,11 @@ async fn command_reopen_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit.
let sock = server::our_ctrl_sock();
- let f1 = server::send_command(sock, serde_json::json!({
- "command": "api-access-log-reopen",
- }));
+ let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
let sock = server::ctrl_sock_from_pid(pid);
- let f2 = server::send_command(sock, serde_json::json!({
- "command": "api-access-log-reopen",
- }));
+ let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
match futures::join!(f1, f2) {
(Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs
index 89c77585..af41dd16 100644
--- a/src/server/command_socket.rs
+++ b/src/server/command_socket.rs
@@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use std::os::unix::io::AsRawFd;
-use std::path::PathBuf;
+use std::path::{PathBuf, Path};
use std::sync::Arc;
use futures::*;
use tokio::net::UnixListener;
+use serde::Serialize;
use serde_json::Value;
use nix::sys::socket;
@@ -102,43 +103,47 @@ where
}
-pub async fn send_command<P>(
- path: P,
- params: Value
-) -> Result<Value, Error>
- where P: Into<PathBuf>,
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+ T: ?Sized + Serialize,
{
- let path: PathBuf = path.into();
+ let mut command_string = serde_json::to_string(params)?;
+ command_string.push('\n');
+ send_raw_command(path.as_ref(), &command_string).await
+}
- tokio::net::UnixStream::connect(path)
- .map_err(move |err| format_err!("control socket connect failed - {}", err))
- .and_then(move |mut conn| {
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+{
+ use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
- let mut command_string = params.to_string();
- command_string.push('\n');
+ let mut conn = tokio::net::UnixStream::connect(path)
+ .map_err(move |err| format_err!("control socket connect failed - {}", err))
+ .await?;
- async move {
- use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+ conn.write_all(command_string.as_bytes()).await?;
+ if !command_string.as_bytes().ends_with(b"\n") {
+ conn.write_all(b"\n").await?;
+ }
- conn.write_all(command_string.as_bytes()).await?;
- AsyncWriteExt::shutdown(&mut conn).await?;
- let mut rx = tokio::io::BufReader::new(conn);
- let mut data = String::new();
- if rx.read_line(&mut data).await? == 0 {
- bail!("no response");
- }
- if let Some(res) = data.strip_prefix("OK: ") {
- match res.parse::<Value>() {
- Ok(v) => Ok(v),
- Err(err) => bail!("unable to parse json response - {}", err),
- }
- } else if let Some(err) = data.strip_prefix("ERROR: ") {
- bail!("{}", err);
- } else {
- bail!("unable to parse response: {}", data);
- }
- }
- }).await
+ AsyncWriteExt::shutdown(&mut conn).await?;
+ let mut rx = tokio::io::BufReader::new(conn);
+ let mut data = String::new();
+ if rx.read_line(&mut data).await? == 0 {
+ bail!("no response");
+ }
+ if let Some(res) = data.strip_prefix("OK: ") {
+ match res.parse::<Value>() {
+ Ok(v) => Ok(v),
+ Err(err) => bail!("unable to parse json response - {}", err),
+ }
+ } else if let Some(err) = data.strip_prefix("ERROR: ") {
+ bail!("{}", err);
+ } else {
+ bail!("unable to parse response: {}", data);
+ }
}
/// A callback for a specific commando socket.
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 6c5456c9..84019fef 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -59,7 +59,7 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
"upid": upid.to_string(),
},
});
- let status = super::send_command(sock, cmd).await?;
+ let status = super::send_command(sock, &cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
@@ -133,7 +133,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
"upid": upid.to_string(),
},
});
- super::send_command(sock, cmd).map_ok(|_| ()).await
+ super::send_command(sock, &cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
--
2.20.1
More information about the pbs-devel
mailing list