[pbs-devel] [PATCH proxmox-backup 08/12] tokio 1.0: update to new tokio-openssl interface
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Jan 12 14:58:22 CET 2021
connect/accept are now happening on pinned SslStreams
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
Notes:
there might be further potential to improve error handling now?
src/bin/proxmox-backup-proxy.rs | 27 ++++++++++++++++++++++-----
src/server/rest.rs | 4 ++--
src/tools/async_io.rs | 2 +-
src/tools/http.rs | 11 +++++------
4 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 16450244..c8eb237c 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -167,7 +167,7 @@ fn accept_connections(
mut listener: tokio::net::TcpListener,
acceptor: Arc<openssl::ssl::SslAcceptor>,
debug: bool,
-) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> {
+) -> tokio::sync::mpsc::Receiver<Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>> {
const MAX_PENDING_ACCEPTS: usize = 1024;
@@ -185,7 +185,24 @@ fn accept_connections(
sock.set_nodelay(true).unwrap();
let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
let acceptor = Arc::clone(&acceptor);
- let mut sender = sender.clone();
+
+ let ssl = match openssl::ssl::Ssl::new(acceptor.context()) {
+ Ok(ssl) => ssl,
+ Err(err) => {
+ eprintln!("failed to create Ssl object from Acceptor context - {}", err);
+ continue;
+ },
+ };
+ let stream = match tokio_openssl::SslStream::new(ssl, sock) {
+ Ok(stream) => stream,
+ Err(err) => {
+ eprintln!("failed to create SslStream using ssl and connection socket - {}", err);
+ continue;
+ },
+ };
+
+ let mut stream = Box::pin(stream);
+ let sender = sender.clone();
if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
eprintln!("connection rejected - to many open connections");
@@ -195,13 +212,13 @@ fn accept_connections(
let accept_counter = accept_counter.clone();
tokio::spawn(async move {
let accept_future = tokio::time::timeout(
- Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock));
+ Duration::new(10, 0), stream.as_mut().accept());
let result = accept_future.await;
match result {
- Ok(Ok(connection)) => {
- if let Err(_) = sender.send(Ok(connection)).await {
+ Ok(Ok(())) => {
+ if let Err(_) = sender.send(Ok(stream)).await {
if debug {
eprintln!("detect closed connection channel");
}
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 04bdc5f9..c30d1c92 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -65,7 +65,7 @@ impl RestServer {
}
}
-impl tower_service::Service<&tokio_openssl::SslStream<tokio::net::TcpStream>> for RestServer {
+impl tower_service::Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>> for RestServer {
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
@@ -74,7 +74,7 @@ impl tower_service::Service<&tokio_openssl::SslStream<tokio::net::TcpStream>> fo
Poll::Ready(Ok(()))
}
- fn call(&mut self, ctx: &tokio_openssl::SslStream<tokio::net::TcpStream>) -> Self::Future {
+ fn call(&mut self, ctx: &Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>) -> Self::Future {
match ctx.get_ref().peer_addr() {
Err(err) => {
future::err(format_err!("unable to get peer address - {}", err)).boxed()
diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs
index 3a5a6c9a..997c02fa 100644
--- a/src/tools/async_io.rs
+++ b/src/tools/async_io.rs
@@ -74,7 +74,7 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
// we need this for crate::client::http_client:
impl Connection for EitherStream<
tokio::net::TcpStream,
- tokio_openssl::SslStream<tokio::net::TcpStream>,
+ Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
> {
fn connected(&self) -> hyper::client::connect::Connected {
match self {
diff --git a/src/tools/http.rs b/src/tools/http.rs
index 130aa381..47d6e1f6 100644
--- a/src/tools/http.rs
+++ b/src/tools/http.rs
@@ -3,6 +3,7 @@ use lazy_static::lazy_static;
use std::task::{Context, Poll};
use std::os::unix::io::AsRawFd;
use std::collections::HashMap;
+use std::pin::Pin;
use hyper::{Uri, Body};
use hyper::client::{Client, HttpConnector};
@@ -101,7 +102,7 @@ impl HttpsConnector {
type MaybeTlsStream = EitherStream<
tokio::net::TcpStream,
- tokio_openssl::SslStream<tokio::net::TcpStream>,
+ Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
>;
impl hyper::service::Service<Uri> for HttpsConnector {
@@ -123,10 +124,6 @@ impl hyper::service::Service<Uri> for HttpsConnector {
.scheme()
.ok_or_else(|| format_err!("missing URL scheme"))?
== "https";
- let host = dst
- .host()
- .ok_or_else(|| format_err!("missing hostname in destination url?"))?
- .to_string();
let config = this.ssl_connector.configure();
let dst_str = dst.to_string(); // for error messages
@@ -139,7 +136,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
if is_https {
- let conn = tokio_openssl::connect(config?, &host, conn).await?;
+ let conn: tokio_openssl::SslStream<tokio::net::TcpStream> = tokio_openssl::SslStream::new(config?.into_ssl(&dst_str)?, conn)?;
+ let mut conn = Box::pin(conn);
+ conn.as_mut().connect().await?;
Ok(MaybeTlsStream::Right(conn))
} else {
Ok(MaybeTlsStream::Left(conn))
--
2.20.1
More information about the pbs-devel
mailing list