From 8b05a760b5e49b1aff8465a2affd16d3eb9859a1 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Mon, 22 Jan 2024 16:03:38 -0300 Subject: [PATCH 1/7] Handle disconnections --- crates/ark/src/connections/r_connection.rs | 15 +++++++++++++++ crates/ark/src/modules/positron/connection.R | 9 +++++++++ 2 files changed, 24 insertions(+) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 2323906bb..441522c18 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -48,6 +48,7 @@ pub enum ConnectionResponse { fields: Vec, }, PreviewResponse, + DisconnectResponse, } #[derive(Debug, Serialize, Deserialize)] @@ -59,6 +60,8 @@ pub enum ConnectionRequest { FieldsRequest { path: Vec }, // The UI asks for a DataViewer preview of the table. PreviewTable { path: Vec }, + // The UI asks to close the connection + Disconnect, } #[derive(Deserialize, Serialize)] @@ -209,9 +212,20 @@ impl RConnection { })?; Ok(ConnectionResponse::PreviewResponse) }, + ConnectionRequest::Disconnect => Ok(ConnectionResponse::DisconnectResponse), } } + fn disconnect(&self) -> std::result::Result<(), anyhow::Error> { + // Execute database side disconnect method. + r_task(|| -> Result<(), anyhow::Error> { + let mut call = RFunction::from(".ps.connection_close"); + call.add(RObject::from(self.comm.comm_id.clone())); + call.call()?; + Ok(()) + }) + } + fn handle_messages(&self) -> Result<(), anyhow::Error> { loop { let msg = unwrap!(self.comm.incoming_rx.recv(), Err(err) => { @@ -233,6 +247,7 @@ impl RConnection { if let Err(err) = self.comm.outgoing_tx.send(CommMsg::Close) { log::error!("Connection Pane: Error while sending comm_close to front end: {err:?}"); } + self.disconnect()?; Ok(()) } diff --git a/crates/ark/src/modules/positron/connection.R b/crates/ark/src/modules/positron/connection.R index da1d195a8..58e0d806e 100644 --- a/crates/ark/src/modules/positron/connection.R +++ b/crates/ark/src/modules/positron/connection.R @@ -94,3 +94,12 @@ options("connectionObserver" = .ps.connection_observer()) } View(con$previewObject(table = table, ..., limit = 100), title = table) } + +.ps.connection_close <- function(id, ...) { + con <- get(id, getOption("connectionObserver")$.connections) + if (is.null(con)) { + return(NULL) + } + con$disconnect(...) + rm(list = id, envir = getOption("connectionObserver")$.connections) +} From 49976a86bbf292882fd9ef52116716992cbadadd Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Mon, 22 Jan 2024 16:08:33 -0300 Subject: [PATCH 2/7] add comment regarding the disconnection code not getting called. --- crates/ark/src/connections/r_connection.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 441522c18..7bca97dd7 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -235,6 +235,9 @@ impl RConnection { log::trace!("Connection Pane: Received message from front end: {msg:?}"); + // The CommMsg::Close is not really always received when the front-ent disposes the + // client, thus we should make sure the front-end fires the `Disconnect` method before + // disposing the client. if let CommMsg::Close = msg { log::trace!("Connection Pane: Received a close message."); break; From 6f548a7d73384960518b6d1bbedf79bcabd0ad83 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Mon, 22 Jan 2024 16:11:37 -0300 Subject: [PATCH 3/7] don't call disconnect here --- crates/ark/src/connections/r_connection.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 7bca97dd7..97ad2f0d7 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -250,7 +250,6 @@ impl RConnection { if let Err(err) = self.comm.outgoing_tx.send(CommMsg::Close) { log::error!("Connection Pane: Error while sending comm_close to front end: {err:?}"); } - self.disconnect()?; Ok(()) } From 93496aa4a6b039b8f50d12b5e0b8786cb22955fb Mon Sep 17 00:00:00 2001 From: Jonathan McPherson Date: Wed, 24 Jan 2024 10:19:52 -0800 Subject: [PATCH 4/7] notify comm of impending closure --- crates/amalthea/src/comm/comm_manager.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/amalthea/src/comm/comm_manager.rs b/crates/amalthea/src/comm/comm_manager.rs index e4f84eaee..b3eb01477 100644 --- a/crates/amalthea/src/comm/comm_manager.rs +++ b/crates/amalthea/src/comm/comm_manager.rs @@ -175,6 +175,10 @@ impl CommManager { // If we found it, remove it. if let Some(index) = index { + // Notify the comm that it's been closed + let comm = self.open_comms.get(index).unwrap(); + comm.incoming_tx.send(CommMsg::Close).unwrap(); + self.open_comms.remove(index); self.comm_shell_tx .send(CommShellEvent::Removed(comm_id)) From fadb68ed39ef4cf3c2ec649f9b416e352bff4e3c Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 25 Jan 2024 10:59:39 -0300 Subject: [PATCH 5/7] No need for a disconnect RPC method --- crates/ark/src/connections/r_connection.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 97ad2f0d7..1878cc956 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -48,7 +48,6 @@ pub enum ConnectionResponse { fields: Vec, }, PreviewResponse, - DisconnectResponse, } #[derive(Debug, Serialize, Deserialize)] @@ -60,8 +59,6 @@ pub enum ConnectionRequest { FieldsRequest { path: Vec }, // The UI asks for a DataViewer preview of the table. PreviewTable { path: Vec }, - // The UI asks to close the connection - Disconnect, } #[derive(Deserialize, Serialize)] @@ -212,7 +209,6 @@ impl RConnection { })?; Ok(ConnectionResponse::PreviewResponse) }, - ConnectionRequest::Disconnect => Ok(ConnectionResponse::DisconnectResponse), } } @@ -235,11 +231,9 @@ impl RConnection { log::trace!("Connection Pane: Received message from front end: {msg:?}"); - // The CommMsg::Close is not really always received when the front-ent disposes the - // client, thus we should make sure the front-end fires the `Disconnect` method before - // disposing the client. if let CommMsg::Close = msg { log::trace!("Connection Pane: Received a close message."); + self.disconnect()?; break; } From cb0b8c7b8568f5622d36b2e1f6bb826324a6d4a9 Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Thu, 25 Jan 2024 12:35:25 -0300 Subject: [PATCH 6/7] We don't need to remove from the connection env. --- crates/ark/src/modules/positron/connection.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/ark/src/modules/positron/connection.R b/crates/ark/src/modules/positron/connection.R index 58e0d806e..e3e8cb420 100644 --- a/crates/ark/src/modules/positron/connection.R +++ b/crates/ark/src/modules/positron/connection.R @@ -100,6 +100,7 @@ options("connectionObserver" = .ps.connection_observer()) if (is.null(con)) { return(NULL) } + # disconnect is resposible for calling connectionClosed that + # will remove the connection from the list of connections con$disconnect(...) - rm(list = id, envir = getOption("connectionObserver")$.connections) } From 1bc65a83a8680be90db9a9e2298db01090f36c7c Mon Sep 17 00:00:00 2001 From: Daniel Falbel Date: Mon, 29 Jan 2024 09:46:55 -0300 Subject: [PATCH 7/7] Log error instead --- crates/amalthea/src/comm/comm_manager.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/amalthea/src/comm/comm_manager.rs b/crates/amalthea/src/comm/comm_manager.rs index b3eb01477..20e11c15a 100644 --- a/crates/amalthea/src/comm/comm_manager.rs +++ b/crates/amalthea/src/comm/comm_manager.rs @@ -12,6 +12,7 @@ use crossbeam::channel::Select; use crossbeam::channel::Sender; use log::info; use log::warn; +use stdext::result::ResultOrLog; use stdext::spawn; use crate::comm::comm_channel::CommMsg; @@ -177,7 +178,9 @@ impl CommManager { if let Some(index) = index { // Notify the comm that it's been closed let comm = self.open_comms.get(index).unwrap(); - comm.incoming_tx.send(CommMsg::Close).unwrap(); + comm.incoming_tx + .send(CommMsg::Close) + .or_log_error("Failed to send comm_close to comm."); self.open_comms.remove(index); self.comm_shell_tx