[pbs-devel] [PATCH proxmox 06/13] http: takeover MaybeTlsStream from proxmox_backup
Fabian Grünbichler
f.gruenbichler at proxmox.com
Fri May 14 15:44:42 CEST 2021
this is just a (rather HTTP specific) wrapper, so put it into a
'wrapper' module for now.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
proxmox-http/Cargo.toml | 4 +-
proxmox-http/src/http/mod.rs | 3 +
proxmox-http/src/http/wrapper.rs | 122 +++++++++++++++++++++++++++++++
proxmox-http/src/lib.rs | 3 +
4 files changed, 131 insertions(+), 1 deletion(-)
create mode 100644 proxmox-http/src/http/mod.rs
create mode 100644 proxmox-http/src/http/wrapper.rs
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 641ace2..f1f53da 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -18,11 +18,13 @@ futures = { version = "0.3", optional = true }
hyper = { version = "0.14", features = [ "full" ], optional = true }
openssl = { version = "0.10", optional = true }
tokio = { version = "1.0", features = [], optional = true }
+tokio-openssl = { version = "0.6.1", optional = true }
proxmox = { path = "../proxmox", optional = true, version = "0.11.3", default-features = false }
[features]
default = []
-client = []
+client = [ "http-helpers" ]
+http-helpers = [ "hyper", "tokio/io-util", "tokio-openssl" ]
websocket = [ "base64", "futures", "hyper", "openssl", "proxmox/tokio", "tokio/io-util", "tokio/sync" ]
diff --git a/proxmox-http/src/http/mod.rs b/proxmox-http/src/http/mod.rs
new file mode 100644
index 0000000..09fa42f
--- /dev/null
+++ b/proxmox-http/src/http/mod.rs
@@ -0,0 +1,3 @@
+mod wrapper;
+
+pub use wrapper::MaybeTlsStream;
diff --git a/proxmox-http/src/http/wrapper.rs b/proxmox-http/src/http/wrapper.rs
new file mode 100644
index 0000000..3399b28
--- /dev/null
+++ b/proxmox-http/src/http/wrapper.rs
@@ -0,0 +1,122 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use hyper::client::connect::{Connection, Connected};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio_openssl::SslStream;
+
+/// Asynchronous stream, possibly encrypted and proxied
+///
+/// Usefule for HTTP client implementations using hyper.
+pub enum MaybeTlsStream<S> {
+ Normal(S),
+ Proxied(S),
+ Secured(SslStream<S>),
+}
+
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for MaybeTlsStream<S> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut ReadBuf,
+ ) -> Poll<Result<(), io::Error>> {
+ match self.get_mut() {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_read(cx, buf)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_read(cx, buf)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
+ Pin::new(s).poll_read(cx, buf)
+ }
+ }
+ }
+}
+
+impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for MaybeTlsStream<S> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ match self.get_mut() {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_write(cx, buf)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_write(cx, buf)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
+ Pin::new(s).poll_write(cx, buf)
+ }
+ }
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<Result<usize, io::Error>> {
+ match self.get_mut() {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_write_vectored(cx, bufs)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_write_vectored(cx, bufs)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
+ Pin::new(s).poll_write_vectored(cx, bufs)
+ }
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ match self {
+ MaybeTlsStream::Normal(s) => s.is_write_vectored(),
+ MaybeTlsStream::Proxied(s) => s.is_write_vectored(),
+ MaybeTlsStream::Secured(s) => s.is_write_vectored(),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
+ match self.get_mut() {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_flush(cx)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_flush(cx)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
+ Pin::new(s).poll_flush(cx)
+ }
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
+ match self.get_mut() {
+ MaybeTlsStream::Normal(ref mut s) => {
+ Pin::new(s).poll_shutdown(cx)
+ }
+ MaybeTlsStream::Proxied(ref mut s) => {
+ Pin::new(s).poll_shutdown(cx)
+ }
+ MaybeTlsStream::Secured(ref mut s) => {
+ Pin::new(s).poll_shutdown(cx)
+ }
+ }
+ }
+}
+
+// we need this for the hyper http client
+impl <S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for MaybeTlsStream<S>
+{
+ fn connected(&self) -> Connected {
+ match self {
+ MaybeTlsStream::Normal(s) => s.connected(),
+ MaybeTlsStream::Proxied(s) => s.connected().proxy(true),
+ MaybeTlsStream::Secured(s) => s.get_ref().connected(),
+ }
+ }
+}
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 31e44af..5e3f9ec 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -1,2 +1,5 @@
#[cfg(feature = "websocket")]
pub mod websocket;
+
+#[cfg(feature = "http-helpers")]
+pub mod http;
--
2.20.1
More information about the pbs-devel
mailing list