[pbs-devel] [PATCH proxmox 09/13] http: takeover tools::http from proxmox_backup

Fabian Grünbichler f.gruenbichler at proxmox.com
Fri May 14 15:44:45 CEST 2021


the parts that were not already moved.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 proxmox-http/Cargo.toml                   |   2 +-
 proxmox-http/src/http/client.rs           |   3 +
 proxmox-http/src/http/client/connector.rs | 203 ++++++++++++++++++++++
 proxmox-http/src/http/mod.rs              |   3 +
 proxmox-http/src/lib.rs                   |   2 +-
 5 files changed, 211 insertions(+), 2 deletions(-)
 create mode 100644 proxmox-http/src/http/client.rs
 create mode 100644 proxmox-http/src/http/client/connector.rs

diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9c6fc35..cc3eef6 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,6 @@ proxmox = { path = "../proxmox", optional = true, version = "0.11.3", default-fe
 [features]
 default = []
 
-client = [ "http-helpers" ]
+client = [ "futures", "http-helpers", "openssl" ]
 http-helpers = [ "base64", "http", "hyper", "tokio/io-util", "tokio-openssl", "proxmox" ]
 websocket = [ "base64", "futures", "hyper", "openssl", "proxmox/tokio", "tokio/io-util", "tokio/sync" ]
diff --git a/proxmox-http/src/http/client.rs b/proxmox-http/src/http/client.rs
new file mode 100644
index 0000000..21a65e3
--- /dev/null
+++ b/proxmox-http/src/http/client.rs
@@ -0,0 +1,3 @@
+mod connector;
+
+pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/http/client/connector.rs b/proxmox-http/src/http/client/connector.rs
new file mode 100644
index 0000000..a302dd1
--- /dev/null
+++ b/proxmox-http/src/http/client/connector.rs
@@ -0,0 +1,203 @@
+use anyhow::{Error, format_err, bail};
+use std::os::unix::io::AsRawFd;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::*;
+use http::Uri;
+use hyper::client::HttpConnector;
+use openssl::ssl::SslConnector;
+use tokio::io::{
+    AsyncRead,
+    AsyncReadExt,
+    AsyncWriteExt,
+};
+use tokio::net::TcpStream;
+use tokio_openssl::SslStream;
+
+use proxmox::sys::linux::socket::set_tcp_keepalive;
+
+use crate::http::{helpers, MaybeTlsStream, ProxyConfig};
+
+#[derive(Clone)]
+pub struct HttpsConnector {
+    connector: HttpConnector,
+    ssl_connector: Arc<SslConnector>,
+    proxy: Option<ProxyConfig>,
+    tcp_keepalive: u32,
+}
+
+impl HttpsConnector {
+    pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector, tcp_keepalive: u32) -> Self {
+        connector.enforce_http(false);
+        Self {
+            connector,
+            ssl_connector: Arc::new(ssl_connector),
+            proxy: None,
+            tcp_keepalive,
+        }
+    }
+
+    pub fn set_proxy(&mut self, proxy: ProxyConfig) {
+        self.proxy = Some(proxy);
+    }
+
+    async fn secure_stream(
+        tcp_stream: TcpStream,
+        ssl_connector: &SslConnector,
+        host: &str,
+    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+        let config = ssl_connector.configure()?;
+        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        Pin::new(&mut conn).connect().await?;
+        Ok(MaybeTlsStream::Secured(conn))
+    }
+
+    fn parse_status_line(status_line: &str) -> Result<(), Error> {
+        if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) {
+            bail!("proxy connect failed - invalid status: {}", status_line)
+        }
+        Ok(())
+    }
+
+    async fn parse_connect_response<R: AsyncRead +  Unpin>(
+        stream: &mut R,
+    ) -> Result<(), Error> {
+
+        let mut data: Vec<u8> = Vec::new();
+        let mut buffer = [0u8; 256];
+        const END_MARK: &[u8; 4] = b"\r\n\r\n";
+
+        'outer: loop {
+            let n = stream.read(&mut buffer[..]).await?;
+            if n == 0 { break; }
+            let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 };
+            data.extend(&buffer[..n]);
+            if data.len() >= END_MARK.len() {
+                if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) {
+                    let response = String::from_utf8_lossy(&data);
+                    let status_line = match response.split("\r\n").next() {
+                        Some(status) => status,
+                        None => bail!("missing newline"),
+                    };
+                    Self::parse_status_line(status_line)?;
+
+                    if pos != data.len() - END_MARK.len() {
+                        bail!("unexpected data after connect response");
+                    }
+                    break 'outer;
+                }
+            }
+            if data.len() > 1024*32 { // max 32K (random chosen limit)
+                bail!("too many bytes");
+            }
+        }
+        Ok(())
+    }
+}
+
+impl hyper::service::Service<Uri> for HttpsConnector {
+    type Response = MaybeTlsStream<TcpStream>;
+    type Error = Error;
+    #[allow(clippy::type_complexity)]
+    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
+
+    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.connector
+            .poll_ready(ctx)
+            .map_err(|err| err.into())
+    }
+
+    fn call(&mut self, dst: Uri) -> Self::Future {
+        let mut connector = self.connector.clone();
+        let ssl_connector = Arc::clone(&self.ssl_connector);
+        let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS);
+        let host = match dst.host() {
+            Some(host) => host.to_owned(),
+            None => {
+                return futures::future::err(format_err!("missing URL scheme")).boxed();
+            }
+        };
+        let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
+        let keepalive = self.tcp_keepalive;
+
+        if let Some(ref proxy) = self.proxy {
+
+            let use_connect = is_https || proxy.force_connect;
+
+            let proxy_authority = match helpers::build_authority(&proxy.host, proxy.port) {
+                Ok(authority) => authority,
+                Err(err) => return futures::future::err(err).boxed(),
+            };
+
+            let proxy_uri = match Uri::builder()
+                .scheme("http")
+                .authority(proxy_authority.as_str())
+                .path_and_query("/")
+                .build()
+            {
+                Ok(uri) => uri,
+                Err(err) => return futures::future::err(err.into()).boxed(),
+            };
+
+            let authorization = proxy.authorization.clone();
+
+            if use_connect {
+                async move {
+
+                    let mut tcp_stream = connector
+                        .call(proxy_uri)
+                        .await
+                        .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
+
+                    let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+
+                    let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
+                    if let Some(authorization) = authorization {
+                        connect_request.push_str(&format!("Proxy-Authorization: {}\r\n", authorization));
+                    }
+                    connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port));
+
+                    tcp_stream.write_all(connect_request.as_bytes()).await?;
+                    tcp_stream.flush().await?;
+
+                    Self::parse_connect_response(&mut tcp_stream).await?;
+
+                    if is_https {
+                        Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                    } else {
+                        Ok(MaybeTlsStream::Normal(tcp_stream))
+                    }
+                }.boxed()
+            } else {
+               async move {
+                   let tcp_stream = connector
+                       .call(proxy_uri)
+                       .await
+                       .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?;
+
+                   let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+
+                   Ok(MaybeTlsStream::Proxied(tcp_stream))
+               }.boxed()
+            }
+        } else {
+            async move {
+                let dst_str = dst.to_string(); // for error messages
+                let tcp_stream = connector
+                    .call(dst)
+                    .await
+                    .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+
+                let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
+
+                if is_https {
+                    Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+                } else {
+                    Ok(MaybeTlsStream::Normal(tcp_stream))
+                }
+            }.boxed()
+        }
+    }
+}
diff --git a/proxmox-http/src/http/mod.rs b/proxmox-http/src/http/mod.rs
index 055648e..3803713 100644
--- a/proxmox-http/src/http/mod.rs
+++ b/proxmox-http/src/http/mod.rs
@@ -5,3 +5,6 @@ pub mod helpers;
 
 mod proxy_config;
 pub use proxy_config::ProxyConfig;
+
+#[cfg(feature = "client")]
+pub mod client;
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 5e3f9ec..aa44c0d 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -1,5 +1,5 @@
 #[cfg(feature = "websocket")]
 pub mod websocket;
 
-#[cfg(feature = "http-helpers")]
+#[cfg(any(feature = "http-helpers", feature = "client"))]
 pub mod http;
-- 
2.20.1






More information about the pbs-devel mailing list