[pbs-devel] [RFC proxmox-backup 5/5] HttpsConnector: add proxy support
Dietmar Maurer
dietmar at proxmox.com
Wed Apr 21 13:17:02 CEST 2021
---
src/api2/node/apt.rs | 2 +-
src/tools/http.rs | 176 +++++++++++++++++++++++++++++++++-----
src/tools/subscription.rs | 2 +-
3 files changed, 156 insertions(+), 24 deletions(-)
diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index dbdb2019..44b13edd 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -194,7 +194,7 @@ fn apt_get_changelog(
bail!("Package '{}' not found", name);
}
- let mut client = SimpleHttp::new();
+ let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
let changelog_url = &pkg_info[0].change_log_url;
// FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
diff --git a/src/tools/http.rs b/src/tools/http.rs
index a0dbfd01..6f00d6e0 100644
--- a/src/tools/http.rs
+++ b/src/tools/http.rs
@@ -10,7 +10,14 @@ use hyper::client::{Client, HttpConnector};
use http::{Request, Response};
use openssl::ssl::{SslConnector, SslMethod};
use futures::*;
-use tokio::net::TcpStream;
+use tokio::{
+ io::{
+ AsyncBufReadExt,
+ AsyncWriteExt,
+ BufStream,
+ },
+ net::TcpStream,
+};
use tokio_openssl::SslStream;
use crate::tools::{
@@ -21,6 +28,14 @@ use crate::tools::{
},
};
+/// HTTP Proxy Configuration
+#[derive(Clone)]
+pub struct ProxyConfig {
+ pub host: String,
+ pub port: u16,
+ pub force_connect: bool,
+}
+
/// Asyncrounous HTTP client implementation
pub struct SimpleHttp {
client: Client<HttpsConnector, Body>,
@@ -28,18 +43,27 @@ pub struct SimpleHttp {
impl SimpleHttp {
- pub fn new() -> Self {
+ pub fn new(proxy_config: Option<ProxyConfig>) -> Self {
let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
- Self::with_ssl_connector(ssl_connector)
+ Self::with_ssl_connector(ssl_connector, proxy_config)
}
- pub fn with_ssl_connector(ssl_connector: SslConnector) -> Self {
+ pub fn with_ssl_connector(ssl_connector: SslConnector, proxy_config: Option<ProxyConfig>) -> Self {
let connector = HttpConnector::new();
- let https = HttpsConnector::with_connector(connector, ssl_connector);
+ let mut https = HttpsConnector::with_connector(connector, ssl_connector);
+ if let Some(proxy_config) = proxy_config {
+ https.set_proxy(proxy_config);
+ }
let client = Client::builder().build(https);
Self { client }
}
+ pub async fn request(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
+ self.client.request(request)
+ .map_err(Error::from)
+ .await
+ }
+
pub async fn post(
&mut self,
uri: &str,
@@ -106,6 +130,7 @@ impl SimpleHttp {
pub struct HttpsConnector {
connector: HttpConnector,
ssl_connector: Arc<SslConnector>,
+ proxy: Option<ProxyConfig>,
}
impl HttpsConnector {
@@ -114,8 +139,56 @@ impl HttpsConnector {
Self {
connector,
ssl_connector: Arc::new(ssl_connector),
+ proxy: None,
}
}
+
+ pub fn set_proxy(&mut self, proxy: ProxyConfig) {
+ self.proxy = Some(proxy);
+ }
+
+ async fn secure_stream(
+ tcp_stream: TcpStream,
+ ssl_connector: &SslConnector,
+ host: &str,
+ ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+ let config = ssl_connector.configure()?;
+ let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+ Pin::new(&mut conn).connect().await?;
+ Ok(MaybeTlsStream::Secured(conn))
+ }
+
+ async fn parse_connect_status(
+ stream: &mut BufStream<TcpStream>,
+ ) -> Result<(), Error> {
+
+ let mut status_str = String::new();
+
+ // TODO: limit read-length
+
+ if stream.read_line(&mut status_str).await? == 0 {
+ bail!("proxy connect failed - unexpected EOF")
+ }
+
+ if !(status_str.starts_with("HTTP/1.1 200") || status_str.starts_with("HTTP/1.0 200")) {
+ bail!("proxy connect failed - invalid status: {}", status_str)
+ }
+
+ loop {
+ // skip rest until \r\n
+ let mut response = String::new();
+ if stream.read_line(&mut response).await? == 0 {
+ bail!("proxy connect failed - unexpected EOF")
+ }
+ if response.len() > 8192 {
+ bail!("proxy connect failed - long lines in connect rtesponse")
+ }
+ if response == "\r\n" {
+ break;
+ }
+ }
+ Ok(())
+ }
}
impl hyper::service::Service<Uri> for HttpsConnector {
@@ -124,9 +197,10 @@ impl hyper::service::Service<Uri> for HttpsConnector {
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
- fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- // This connector is always ready, but others might not be.
- Poll::Ready(Ok(()))
+ fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.connector
+ .poll_ready(ctx)
+ .map_err(|err| err.into())
}
fn call(&mut self, dst: Uri) -> Self::Future {
@@ -139,24 +213,82 @@ impl hyper::service::Service<Uri> for HttpsConnector {
return futures::future::err(format_err!("missing URL scheme")).boxed();
}
};
+ let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
+
+ if let Some(ref proxy) = self.proxy {
+
+ let use_connect = is_https || proxy.force_connect;
+
+ let proxy_url = format!("{}:{}", proxy.host, proxy.port);
+ let proxy_uri = match Uri::builder()
+ .scheme("http")
+ .authority(proxy_url.as_str())
+ .path_and_query("/")
+ .build()
+ {
+ Ok(uri) => uri,
+ Err(err) => return futures::future::err(err.into()).boxed(),
+ };
+
+ if use_connect {
+ async move {
+
+ let proxy_stream = connector
+ .call(proxy_uri)
+ .await
+ .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
- async move {
- let config = ssl_connector.configure()?;
- let dst_str = dst.to_string(); // for error messages
- let conn = connector
- .call(dst)
- .await
- .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+ let _ = set_tcp_keepalive(proxy_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
- let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+ let mut stream = BufStream::new(proxy_stream);
- if is_https {
- let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(&host)?, conn)?;
- Pin::new(&mut conn).connect().await?;
- Ok(MaybeTlsStream::Secured(conn))
+ let connect_request = format!(
+ "CONNECT {0}:{1} HTTP/1.1\r\n\
+ Host: {0}:{1}\r\n\r\n",
+ host, port,
+ );
+
+ stream.write_all(connect_request.as_bytes()).await?;
+ stream.flush().await?;
+
+ Self::parse_connect_status(&mut stream).await?;
+
+ let tcp_stream = stream.into_inner();
+
+ if is_https {
+ Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+ } else {
+ Ok(MaybeTlsStream::Normal(tcp_stream))
+ }
+ }.boxed()
} else {
- Ok(MaybeTlsStream::Normal(conn))
+ async move {
+ let tcp_stream = connector
+ .call(proxy_uri)
+ .await
+ .map_err(|err| format_err!("error connecting to {} - {}", proxy_url, err))?;
+
+ let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+ Ok(MaybeTlsStream::Proxied(tcp_stream))
+ }.boxed()
}
- }.boxed()
+ } else {
+ async move {
+ let dst_str = dst.to_string(); // for error messages
+ let tcp_stream = connector
+ .call(dst)
+ .await
+ .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?;
+
+ let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
+ if is_https {
+ Self::secure_stream(tcp_stream, &ssl_connector, &host).await
+ } else {
+ Ok(MaybeTlsStream::Normal(tcp_stream))
+ }
+ }.boxed()
+ }
}
}
diff --git a/src/tools/subscription.rs b/src/tools/subscription.rs
index 9a920aee..eaaf0389 100644
--- a/src/tools/subscription.rs
+++ b/src/tools/subscription.rs
@@ -102,7 +102,7 @@ async fn register_subscription(
"check_token": challenge,
});
- let mut client = SimpleHttp::new();
+ let mut client = SimpleHttp::new(None); // TODO: pass proxy_config
let uri = "https://shop.maurer-it.com/modules/servers/licensing/verify.php";
let query = tools::json_object_to_query(params)?;
--
2.20.1
More information about the pbs-devel
mailing list