[pbs-devel] [PATCH proxmox 8/9] proxmox/tools/websocket: replace CallBack with a channel

Dominik Csapak d.csapak at proxmox.com
Tue Jul 14 13:09:56 CEST 2020


instead of having a callback that we call on a control frame,
use a channel to send the data to a receiver

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 proxmox/Cargo.toml             |  2 +-
 proxmox/src/tools/websocket.rs | 23 +++++++++++------------
 2 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/proxmox/Cargo.toml b/proxmox/Cargo.toml
index e72f358..5b7a4ff 100644
--- a/proxmox/Cargo.toml
+++ b/proxmox/Cargo.toml
@@ -57,7 +57,7 @@ api-macro = ["proxmox-api-macro"]
 test-harness = []
 cli = [ "router", "hyper", "tokio" ]
 router = [ "hyper", "tokio" ]
-websocket = [ "tokio", "futures" ]
+websocket = [ "tokio", "futures", "tokio/sync" ]
 
 # tools:
 #valgrind = ["proxmox-tools/valgrind"]
diff --git a/proxmox/src/tools/websocket.rs b/proxmox/src/tools/websocket.rs
index 2a2bfa4..1ff0927 100644
--- a/proxmox/src/tools/websocket.rs
+++ b/proxmox/src/tools/websocket.rs
@@ -12,6 +12,7 @@ use std::future::Future;
 
 use tokio::io::{AsyncWrite, AsyncRead, AsyncReadExt};
 use anyhow::{bail, format_err, Error};
+use tokio::sync::mpsc;
 
 use futures::future::FutureExt;
 use futures::ready;
@@ -400,9 +401,6 @@ impl FrameHeader {
     }
 }
 
-/// Callback for control frames
-pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
-
 /// Wraps a reader that implements AsyncRead and implements it itself.
 ///
 /// On read, reads the underlying reader and tries to decode the frames and
@@ -412,7 +410,7 @@ pub type CallBack = fn(frametype: OpCode, payload: &[u8]);
 /// Has an internal Buffer for storing incomplete headers.
 pub struct WebSocketReader<R: AsyncRead> {
     reader: Option<R>,
-    callback: CallBack,
+    sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>,
     read_buffer: Option<ByteBuffer>,
     header: Option<FrameHeader>,
     state: ReaderState<R>,
@@ -421,14 +419,14 @@ pub struct WebSocketReader<R: AsyncRead> {
 impl<R: AsyncReadExt> WebSocketReader<R> {
     /// Creates a new WebSocketReader with the given CallBack for control frames
     /// and a default buffer size of 4096.
-    pub fn new(reader: R, callback: CallBack) -> WebSocketReader<R> {
-        Self::with_capacity(reader, callback, 4096)
+    pub fn new(reader: R, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
+        Self::with_capacity(reader, 4096, sender)
     }
 
-    pub fn with_capacity(reader: R, callback: CallBack, capacity: usize) -> WebSocketReader<R> {
+    pub fn with_capacity(reader: R, capacity: usize, sender: mpsc::UnboundedSender<(OpCode, Box<[u8]>)>) -> WebSocketReader<R> {
         WebSocketReader {
             reader: Some(reader),
-            callback,
+            sender,
             read_buffer: Some(ByteBuffer::with_capacity(capacity)),
             header: None,
             state: ReaderState::NoData,
@@ -518,12 +516,13 @@ impl<R: AsyncReadExt + Unpin + Send + 'static> AsyncRead for WebSocketReader<R>
 
                     if header.is_control_frame() {
                         if read_buffer.len() >= header.payload_len {
+
                             let mut data = read_buffer.remove_data(header.payload_len);
                             mask_bytes(header.mask, &mut data);
-                            (this.callback)(
-                                header.frametype,
-                                &data,
-                            );
+                            if let Err(err) = this.sender.send((header.frametype, data)) {
+                                eprintln!("error sending control frame: {}", err);
+                            }
+
                             this.state =  if read_buffer.is_empty() {
                                 ReaderState::NoData
                             } else {
-- 
2.20.1






More information about the pbs-devel mailing list