[pbs-devel] [PATCH proxmox-backup 2/3] api: reader: gracefully handle reader client disconnects

Christian Ebner c.ebner at proxmox.com
Tue Dec 3 12:27:55 CET 2024


Currently, if a reader client disconnects after finishing its work,
the connection will be closed by the client without notifying the
server. The future handling the connection on then server side will
then return with a connection error, and in consequence the reader
worker task will log with error state. This can cause confusion [0],
as this is not an error but normal behaviour.

Instead of failing, provide an api endpoint to request gracefully
closing of the connection by the client. This will trigger a signal
to the future handling the connection, to gracefully close it.

Report in the community forum:
[0] https://forum.proxmox.com/threads/158306/

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 Cargo.toml                     |  1 +
 src/api2/reader/environment.rs | 12 +++++-
 src/api2/reader/mod.rs         | 74 ++++++++++++++++++++++++++++++++--
 3 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index d14f320a6..e3ddb1942 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -185,6 +185,7 @@ num-traits.workspace = true
 once_cell.workspace = true
 openssl.workspace = true
 percent-encoding.workspace = true
+pin-project-lite.workspace = true
 regex.workspace = true
 rustyline.workspace = true
 serde.workspace = true
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f43..ac8970865 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, Mutex, RwLock};
 
 use serde_json::{json, Value};
+use tokio::sync::oneshot;
 
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 
@@ -23,6 +24,7 @@ pub struct ReaderEnvironment {
     pub worker: Arc<WorkerTask>,
     pub datastore: Arc<DataStore>,
     pub backup_dir: BackupDir,
+    connection_shutdown_trigger: Arc<Mutex<Option<oneshot::Sender<()>>>>,
     allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
 }
 
@@ -33,6 +35,7 @@ impl ReaderEnvironment {
         worker: Arc<WorkerTask>,
         datastore: Arc<DataStore>,
         backup_dir: BackupDir,
+        connection_shutdown_trigger: oneshot::Sender<()>,
     ) -> Self {
         Self {
             result_attributes: json!({}),
@@ -43,6 +46,7 @@ impl ReaderEnvironment {
             debug: tracing::enabled!(tracing::Level::DEBUG),
             formatter: JSON_FORMATTER,
             backup_dir,
+            connection_shutdown_trigger: Arc::new(Mutex::new(Some(connection_shutdown_trigger))),
             allowed_chunks: Arc::new(RwLock::new(HashSet::new())),
         }
     }
@@ -69,6 +73,12 @@ impl ReaderEnvironment {
     pub fn check_chunk_access(&self, digest: [u8; 32]) -> bool {
         self.allowed_chunks.read().unwrap().contains(&digest)
     }
+
+    pub fn connection_shutdown(&self) {
+        if let Some(trigger) = self.connection_shutdown_trigger.lock().unwrap().take() {
+            let _ = trigger.send(());
+        }
+    }
 }
 
 impl RpcEnvironment for ReaderEnvironment {
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 50f80de43..973b8f257 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -1,13 +1,20 @@
 //! Backup reader/restore protocol (HTTP2 upgrade)
 
+use std::pin::Pin;
+use std::task::Poll;
+
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
 use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
+use hyper::server::conn::Connection;
+use hyper::upgrade::Upgraded;
 use hyper::{Body, Request, Response, StatusCode};
+use pin_project_lite::pin_project;
 use serde::Deserialize;
 use serde_json::Value;
+use tokio::sync::oneshot;
 
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{
@@ -156,12 +163,15 @@ fn upgrade_to_backup_reader_protocol(
             move |worker| async move {
                 let _guard = _guard;
 
+                let (connection_shutdown_trigger, connection_shutdown_receiver) =
+                    oneshot::channel::<()>();
                 let mut env = ReaderEnvironment::new(
                     env_type,
                     auth_id,
                     worker.clone(),
                     datastore,
                     backup_dir,
+                    connection_shutdown_trigger,
                 );
 
                 env.debug = debug;
@@ -192,9 +202,10 @@ fn upgrade_to_backup_reader_protocol(
                     http.http2_initial_connection_window_size(window_size);
                     http.http2_max_frame_size(4 * 1024 * 1024);
 
-                    http.serve_connection(conn, service)
-                        .map_err(Error::from)
-                        .await
+                    let connection = http.serve_connection(conn, service);
+                    let graceful_shutdown_connection =
+                        GracefulShutdownConnection::new(connection, connection_shutdown_receiver);
+                    graceful_shutdown_connection.await.map_err(Error::from)
                 };
 
                 futures::select! {
@@ -222,12 +233,53 @@ fn upgrade_to_backup_reader_protocol(
     .boxed()
 }
 
+pin_project! {
+    struct GracefulShutdownConnection {
+        #[pin]
+        connection: Connection<Upgraded, H2Service<ReaderEnvironment>, ExecInheritLogContext>,
+        #[pin]
+        shutdown_receiver: oneshot::Receiver<()>,
+        shutdown_state: Option<Result<(), oneshot::error::RecvError>>,
+    }
+}
+
+impl Future for GracefulShutdownConnection {
+    type Output = Result<(), hyper::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+        let mut this = self.project();
+
+        if this.shutdown_state.is_none() {
+            if let Poll::Ready(shutdown) = this.shutdown_receiver.poll(cx) {
+                let _ = this.shutdown_state.insert(shutdown);
+                this.connection.as_mut().graceful_shutdown();
+            }
+        }
+
+        this.connection.poll(cx)
+    }
+}
+
+impl GracefulShutdownConnection {
+    fn new(
+        connection: Connection<Upgraded, H2Service<ReaderEnvironment>, ExecInheritLogContext>,
+        shutdown_receiver: oneshot::Receiver<()>,
+    ) -> Self {
+        Self {
+            connection,
+            shutdown_receiver,
+            shutdown_state: None,
+        }
+    }
+}
+
 const READER_API_SUBDIRS: SubdirMap = &[
     ("chunk", &Router::new().download(&API_METHOD_DOWNLOAD_CHUNK)),
     (
         "download",
         &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
     ),
+    ("finish", &Router::new().post(&API_METHOD_FINISH)),
     ("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)),
 ];
 
@@ -347,6 +399,22 @@ fn download_chunk(
     .boxed()
 }
 
+#[sortable]
+pub const API_METHOD_FINISH: ApiMethod = ApiMethod::new(
+    &ApiHandler::Sync(&finish),
+    &ObjectSchema::new("Signal the reader instance is finished", &[]),
+);
+
+fn finish(
+    _param: Value,
+    _info: &ApiMethod,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+    let env: &ReaderEnvironment = rpcenv.as_ref();
+    env.connection_shutdown();
+    Ok(Value::Null)
+}
+
 /* this is too slow
 fn download_chunk_old(
     _parts: Parts,
-- 
2.39.5





More information about the pbs-devel mailing list