[pbs-devel] [RFC proxmox-backup 5/5] HttpsConnector: add proxy support

Dietmar Maurer dietmar at proxmox.com
Wed Apr 21 13:17:02 CEST 2021


---
 src/api2/node/apt.rs      |   2 +-
 src/tools/http.rs         | 176 +++++++++++++++++++++++++++++++++-----
 src/tools/subscription.rs |   2 +-
 3 files changed, 156 insertions(+), 24 deletions(-)

diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index dbdb2019..44b13edd 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -194,7 +194,7 @@ fn apt_get_changelog(
         bail!("Package '{}' not found", name);
     }
 
-    let mut client = SimpleHttp::new();
+    let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
 
     let changelog_url = &pkg_info[0].change_log_url;
     // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
diff --git a/src/tools/http.rs b/src/tools/http.rs
index a0dbfd01..6f00d6e0 100644
--- a/src/tools/http.rs
+++ b/src/tools/http.rs
@@ -10,7 +10,14 @@ use hyper::client::{Client, HttpConnector};
 use http::{Request, Response};
 use openssl::ssl::{SslConnector, SslMethod};
 use futures::*;
-use tokio::net::TcpStream;
+use tokio::{
+    io::{
+        AsyncBufReadExt,
+        AsyncWriteExt,
+        BufStream,
+    },
+    net::TcpStream,
+};
 use tokio_openssl::SslStream;
 
 use crate::tools::{
@@ -21,6 +28,14 @@ use crate::tools::{
     },
 };
 
+/// HTTP Proxy Configuration
+#[derive(Clone)]
+pub struct ProxyConfig {
+    pub host: String,
+    pub port: u16,
+    pub force_connect: bool,
+}
+
 /// Asyncrounous HTTP client implementation
 pub struct SimpleHttp {
     client: Client<HttpsConnector, Body>,
@@ -28,18 +43,27 @@ pub struct SimpleHttp {
 
 impl SimpleHttp {
 
-    pub fn new() -> Self {
+    pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
         let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
-        Self::with_ssl_connector(ssl_connector)
+        Self::with_ssl_connector(ssl_connector, proxy_config)
     }
 
-    pub fn with_ssl_connector(ssl_connector: SslConnector) -> Self {
+    pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
         let connector = HttpConnector::new();
-        let https = HttpsConnector::with_connector(connector, ssl_connector);
+        let mut https = HttpsConnector::with_connector(connector, ssl_connector);
+        if let Some(proxy_config) = proxy_config {
+            https.set_proxy(proxy_config);
+        }
         let client = Client::builder().build(https);
         Self { client }
     }
 
+    pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
+        self.client.request(request)
+            .map_err(Error::from)
+            .await
+    }
+
     pub async fn post(
         &mut self,
         uri: &str,
@@ -106,6 +130,7 @@ impl SimpleHttp {
 pub struct HttpsConnector {
     connector: HttpConnector,
     ssl_connector: Arc<SslConnector>,
+    proxy: Option<ProxyConfig>,
 }
 
 impl HttpsConnector {
@@ -114,8 +139,56 @@ impl HttpsConnector {
         Self {
             connector,
             ssl_connector: Arc::new(ssl_connector),
+            proxy: None,
         }
     }
+
+    pub fn set_proxy(&mut self, proxy: ProxyConfig) {
+        self.proxy = Some(proxy);
+    }
+
+    async fn secure_stream(
+        tcp_stream: TcpStream,
+        ssl_connector: &SslConnector,
+        host: &str,
+    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+        let config = ssl_connector.configure()?;
+        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        Pin::new(&mut conn).connect().await?;
+        Ok(MaybeTlsStream::Secured(conn))
+    }
+
+    async fn parse_connect_status(
+        stream: &mut BufStream<TcpStream>,
+    ) -> Result<(), Error> {
+
+        let mut status_str = String::new();
+
+        // TODO: limit read-length
+
+        if stream.read_line(&mut status_str).await? == 0 {
+            bail!("proxy connect failed - unexpected EOF")
+        }
+
+        if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
+            bail!("proxy connect failed - invalid status: {}", status_str)
+        }
+
+        loop {
+            // skip rest until \r\n
+            let mut response = String::new();
+            if stream.read_line(&mut response).await? == 0 {
+                bail!("proxy connect failed - unexpected EOF")
+            }
+            if response.len() > 8192 {
+                bail!("proxy connect failed - long lines in connect rtesponse")
+            }
+            if response == "\r\n" {
+                break;
+            }
+        }
+        Ok(())
+    }
 }
 
 impl hyper::service::Service<Uri> for HttpsConnector {
@@ -124,9 +197,10 @@ impl hyper::service::Service<Uri> for HttpsConnector {
     #[allow(clippy::type_complexity)]
     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
 
-    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        // This connector is always ready, but others might not be.
-        Poll::Ready(Ok(()))
+    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.connector
+            .poll_ready(ctx)
+            .map_err(|err| err.into())
     }
 
     fn call(&mut self, dst: Uri) -> Self::Future {
@@ -139,24 +213,82 @@ impl hyper::service::Service<Uri> for HttpsConnector {
                 return futures::future::err(format_err!("missing URL scheme")).boxed();
             }
         };
+        let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
+
+        if let Some(ref proxy) = self.proxy {
+
+            let use_connect = is_https || proxy.force_connect;
+
+            let proxy_url = format!("{}:{}", proxy.host, proxy.port);
+            let proxy_uri = match Uri::builder()
+                .scheme("http")
+                .authority(proxy_url.as_str())
+                .path_and_query("/")
+                .build()
+            {
+                Ok(uri) => uri,
+                Err(err) => return futures::future::err(err.into()).boxed(),
+            };
+
+            if use_connect {
+                async move {
+
+                    let proxy_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
 
-        async move {
-            let config = ssl_connector.configure()?;
-            let dst_str = dst.to_string(); // for error messages
-            let conn = connector
-                .call(dst)
-                .await
-                .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+                    let _ = set_tcp_keepalive(proxy_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
 
-            let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+                    let mut stream = BufStream::new(proxy_stream);
 
-            if is_https {
-                let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(&host)?, conn)?;
-                Pin::new(&mut conn).connect().await?;
-                Ok(MaybeTlsStream::Secured(conn))
+                    let connect_request = format!(
+                        "CONNECT {0}:{1} HTTP/1.1\r\n\
+                         Host: {0}:{1}\r\n\r\n",
+                        host, port,
+                    );
+
+                    stream.write_all(connect_request.as_bytes()).await?;
+                    stream.flush().await?;
+
+                    Self::parse_connect_status(&mut stream).await?;
+
+                    let tcp_stream = stream.into_inner();
+
+                    if is_https {
+                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                    } else {
+                        Ok(MaybeTlsStream::Normal(tcp_stream))
+                    }
+                }.boxed()
             } else {
-                Ok(MaybeTlsStream::Normal(conn))
+               async move {
+                   let tcp_stream = connector
+                       .call(proxy_uri)
+                       .await
+                       .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
+
+                   let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+                   Ok(MaybeTlsStream::Proxied(tcp_stream))
+               }.boxed()
             }
-        }.boxed()
+        } else {
+            async move {
+                let dst_str = dst.to_string(); // for error messages
+                let tcp_stream = connector
+                    .call(dst)
+                    .await
+                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+
+                let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+                if is_https {
+                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                } else {
+                    Ok(MaybeTlsStream::Normal(tcp_stream))
+                }
+            }.boxed()
+        }
     }
 }
diff --git a/src/tools/subscription.rs b/src/tools/subscription.rs
index 9a920aee..eaaf0389 100644
--- a/src/tools/subscription.rs
+++ b/src/tools/subscription.rs
@@ -102,7 +102,7 @@ async fn register_subscription(
         "check_token": challenge,
     });
 
-    let mut client = SimpleHttp::new();
+    let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
 
     let uri = "https://shop.maurer-it.com/modules/servers/licensing/verify.php";
     let query = tools::json_object_to_query(params)?;
-- 
2.20.1





More information about the pbs-devel mailing list