[pbs-devel] [REBASED backup 12/14] acme: pipe plugin output to task log

Wolfgang Bumiller w.bumiller at proxmox.com
Thu Apr 29 15:13:20 CEST 2021


Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
---
 src/api2/node/certificates.rs |  9 ++++--
 src/config/acme/plugin.rs     | 57 +++++++++++++++++++++++++++++------
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
index 6ec9f52a..26cf4414 100644
--- a/src/api2/node/certificates.rs
+++ b/src/api2/node/certificates.rs
@@ -348,11 +348,16 @@ async fn order_certificate(
         })?;
 
         worker.log("Setting up validation plugin");
-        let validation_url = plugin_cfg.setup(&mut acme, &auth, domain_config).await?;
+        let validation_url = plugin_cfg
+            .setup(&mut acme, &auth, domain_config, Arc::clone(&worker))
+            .await?;
 
         let result = request_validation(&worker, &mut acme, auth_url, validation_url).await;
 
-        if let Err(err) = plugin_cfg.teardown(&mut acme, &auth, domain_config).await {
+        if let Err(err) = plugin_cfg
+            .teardown(&mut acme, &auth, domain_config, Arc::clone(&worker))
+            .await
+        {
             worker.warn(format!(
                 "Failed to teardown plugin '{}' for domain '{}' - {}",
                 plugin_id, domain, err
diff --git a/src/config/acme/plugin.rs b/src/config/acme/plugin.rs
index 7c5a9b72..5a4851ee 100644
--- a/src/config/acme/plugin.rs
+++ b/src/config/acme/plugin.rs
@@ -8,7 +8,7 @@ use hyper::{Body, Request, Response};
 use lazy_static::lazy_static;
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
-use tokio::io::AsyncWriteExt;
+use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
 use tokio::process::Command;
 
 use proxmox::api::{
@@ -24,6 +24,7 @@ use proxmox_acme_rs::{Authorization, Challenge};
 use crate::acme::AcmeClient;
 use crate::api2::types::PROXMOX_SAFE_ID_FORMAT;
 use crate::config::acme::AcmeDomain;
+use crate::server::WorkerTask;
 
 const ACME_PATH: &str = "/usr/share/proxmox-acme/proxmox-acme";
 
@@ -280,6 +281,7 @@ pub trait AcmePlugin {
         client: &'b mut AcmeClient,
         authorization: &'c Authorization,
         domain: &'d AcmeDomain,
+        task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>>;
 
     fn teardown<'fut, 'a: 'fut, 'b: 'fut, 'c: 'fut, 'd: 'fut>(
@@ -287,6 +289,7 @@ pub trait AcmePlugin {
         client: &'b mut AcmeClient,
         authorization: &'c Authorization,
         domain: &'d AcmeDomain,
+        task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'fut>>;
 }
 
@@ -301,12 +304,29 @@ fn extract_challenge<'a>(
         .ok_or_else(|| format_err!("no supported challenge type (dns-01) found"))
 }
 
+async fn pipe_to_tasklog<T: AsyncRead + Unpin>(
+    pipe: T,
+    task: Arc<WorkerTask>,
+) -> Result<(), std::io::Error> {
+    let mut pipe = BufReader::new(pipe);
+    let mut line = String::new();
+    loop {
+        line.clear();
+        match pipe.read_line(&mut line).await {
+            Ok(0) => return Ok(()),
+            Ok(_) => task.log(line.as_str()),
+            Err(err) => return Err(err),
+        }
+    }
+}
+
 impl DnsPlugin {
     async fn action<'a>(
         &self,
         client: &mut AcmeClient,
         authorization: &'a Authorization,
         domain: &AcmeDomain,
+        task: Arc<WorkerTask>,
         action: &str,
     ) -> Result<&'a str, Error> {
         let challenge = extract_challenge(authorization, "dns-01")?;
@@ -339,20 +359,33 @@ impl DnsPlugin {
                 domain.alias.as_deref().unwrap_or(&domain.domain),
         ]);
 
-        let mut child = command.stdin(Stdio::piped()).spawn()?;
+        // We could use 1 socketpair, but tokio wraps them all in `File` internally causing `close`
+        // to be called separately on all of them without exception, so we need 3 pipes :-(
+
+        let mut child = command
+            .stdin(Stdio::piped())
+            .stdout(Stdio::piped())
+            .stderr(Stdio::piped())
+            .spawn()?;
 
         let mut stdin = child.stdin.take().expect("Stdio::piped()");
-        match async move {
+        let stdout = child.stdout.take().expect("Stdio::piped() failed?");
+        let stdout = pipe_to_tasklog(stdout, Arc::clone(&task));
+        let stderr = child.stderr.take().expect("Stdio::piped() failed?");
+        let stderr = pipe_to_tasklog(stderr, Arc::clone(&task));
+        let stdin = async move {
             stdin.write_all(&stdin_data).await?;
             stdin.flush().await?;
             Ok::<_, std::io::Error>(())
-        }
-        .await
-        {
-            Ok(()) => (),
+        };
+        match futures::try_join!(stdin, stdout, stderr) {
+            Ok(((), (), ())) => (),
             Err(err) => {
                 if let Err(err) = child.kill().await {
-                    eprintln!("failed to kill '{} {}' command: {}", ACME_PATH, action, err);
+                    task.log(format!(
+                        "failed to kill '{} {}' command: {}",
+                        ACME_PATH, action, err
+                    ));
                 }
                 bail!("'{}' failed: {}", ACME_PATH, err);
             }
@@ -378,8 +411,9 @@ impl AcmePlugin for DnsPlugin {
         client: &'b mut AcmeClient,
         authorization: &'c Authorization,
         domain: &'d AcmeDomain,
+        task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
-        Box::pin(self.action(client, authorization, domain, "setup"))
+        Box::pin(self.action(client, authorization, domain, task, "setup"))
     }
 
     fn teardown<'fut, 'a: 'fut, 'b: 'fut, 'c: 'fut, 'd: 'fut>(
@@ -387,9 +421,10 @@ impl AcmePlugin for DnsPlugin {
         client: &'b mut AcmeClient,
         authorization: &'c Authorization,
         domain: &'d AcmeDomain,
+        task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'fut>> {
         Box::pin(async move {
-            self.action(client, authorization, domain, "teardown")
+            self.action(client, authorization, domain, task, "teardown")
                 .await
                 .map(drop)
         })
@@ -441,6 +476,7 @@ impl AcmePlugin for StandaloneServer {
         client: &'b mut AcmeClient,
         authorization: &'c Authorization,
         _domain: &'d AcmeDomain,
+        _task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<&'c str, Error>> + Send + 'fut>> {
         use hyper::server::conn::AddrIncoming;
         use hyper::service::{make_service_fn, service_fn};
@@ -484,6 +520,7 @@ impl AcmePlugin for StandaloneServer {
         _client: &'b mut AcmeClient,
         _authorization: &'c Authorization,
         _domain: &'d AcmeDomain,
+        _task: Arc<WorkerTask>,
     ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'fut>> {
         Box::pin(async move {
             if let Some(abort) = self.abort_handle.take() {
-- 
2.20.1






More information about the pbs-devel mailing list