[pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel
Dominik Csapak
d.csapak at proxmox.com
Tue Jul 14 13:09:56 CEST 2020
instead of having a callback that we call on a control frame,
use a channel to send the data to a receiver
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
proxmox/Cargo.toml | 2 +-
proxmox/src/tools/websocket.rs | 23 +++++++++++------------
2 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml
index e72f358..5b7a4ff 100644
--- a/proxmox/Cargo.toml
+++ b/proxmox/Cargo.toml
@@ -57,7 +57,7 @@ api-macro = ["proxmox-api-macro"]
test-harness = []
cli = [ "router", "hyper", "tokio" ]
router = [ "hyper", "tokio" ]
-websocket = [ "tokio", "futures" ]
+websocket = [ "tokio", "futures", "tokio/sync" ]
# tools:
#valgrind = ["proxmox-tools/valgrind"]
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 2a2bfa4..1ff0927 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -12,6 +12,7 @@ use std::future::Future;
use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
use anyhow::{bail, format_err, Error};
+use tokio::sync::mpsc;
use futures::future::FutureExt;
use futures::ready;
@@ -400,9 +401,6 @@ impl FrameHeader {
}
}
-/// Callback for control frames
-pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
-
/// Wraps a reader that implements AsyncRead and implements it itself.
///
/// On read, reads the underlying reader and tries to decode the frames and
@@ -412,7 +410,7 @@ pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
/// Has an internal Buffer for storing incomplete headers.
pub struct WebSocketReader<R: AsyncRead> {
reader: Option<R>,
- callback: CallBack,
+ sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>,
read_buffer: Option<ByteBuffer>,
header: Option<FrameHeader>,
state: ReaderState<R>,
@@ -421,14 +419,14 @@ pub struct WebSocketReader<R: AsyncRead> {
impl<R: AsyncReadExt> WebSocketReader<R> {
/// Creates a new WebSocketReader with the given CallBack for control frames
/// and a default buffer size of 4096.
- pub fn new(reader: R, callback: CallBack) -> WebSocketReader<R> {
- Self::with_capacity(reader, callback, 4096)
+ pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
+ Self::with_capacity(reader, 4096, sender)
}
- pub fn with_capacity(reader: R, callback: CallBack, capacity: usize) -> WebSocketReader<R> {
+ pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
WebSocketReader {
reader: Some(reader),
- callback,
+ sender,
read_buffer: Some(ByteBuffer::with_capacity(capacity)),
header: None,
state: ReaderState::NoData,
@@ -518,12 +516,13 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
if header.is_control_frame() {
if read_buffer.len() >= header.payload_len {
+
let mut data = read_buffer.remove_data(header.payload_len);
mask_bytes(header.mask, &mut data);
- (this.callback)(
- header.frametype,
- &data,
- );
+ if let Err(err) = this.sender.send((header.frametype, data)) {
+ eprintln!("error sending control frame: {}", err);
+ }
+
this.state = if read_buffer.is_empty() {
ReaderState::NoData
} else {
--
2.20.1
More information about the pbs-devel
mailing list