diff --git a/crates/ark/src/lib.rs b/crates/ark/src/lib.rs index 4654244e4..b650b45d3 100644 --- a/crates/ark/src/lib.rs +++ b/crates/ark/src/lib.rs @@ -26,6 +26,7 @@ pub mod modules_utils; pub mod plots; pub mod r_task; pub mod request; +pub mod reticulate; pub mod shell; pub mod signals; pub mod srcref; diff --git a/crates/ark/src/modules/positron/package.R b/crates/ark/src/modules/positron/package.R index 1c5099011..105f6743c 100644 --- a/crates/ark/src/modules/positron/package.R +++ b/crates/ark/src/modules/positron/package.R @@ -21,6 +21,17 @@ #' @export .ps.rpc.is_installed <- .ps.is_installed +#' @export +.ps.rpc.install_packages <- function(packages) { + for (pkg in packages) { + if (.ps.rpc.isPackageAttached(pkg)) { + stop("Should not install a package if it's already attached.") + } + } + utils::install.packages(packages) + TRUE +} + #' @export .ps.rpc.isPackageAttached <- function(pkg) { if (!is_string(pkg)) { diff --git a/crates/ark/src/modules/positron/reticulate.R b/crates/ark/src/modules/positron/reticulate.R new file mode 100644 index 000000000..ae31fa1ea --- /dev/null +++ b/crates/ark/src/modules/positron/reticulate.R @@ -0,0 +1,92 @@ +#' @export +.ps.reticulate_open <- function(input="") { + .ps.Call("ps_reticulate_open", input) +} + +#' Called by the front-end right before starting the reticulate session. +#' +#' At this point it should be fine to load Python if it's not loaded, and +#' check if it can be started and if necessary packages are installed. +#' @export +.ps.rpc.reticulate_check_prerequisites <- function() { + + # This should return a list with the following fields: + # python: NULL or string + # venv: NULL or string + # ipykernel: NULL or string + # error: NULL or string + + config <- tryCatch({ + reticulate::py_discover_config() + }, error = function(err) { + err + }) + + if (inherits(config, "error")) { + # py_discover_config() can fail if the user forced a Python session + # via RETICULATE_PYTHON, but this version doesn't exist. + return(list(error = conditionMessage(config))) + } + + if (is.null(config) || is.null(config$python)) { + # The front-end will offer to install Python. + return(list(python = NULL, error = NULL)) + } + + python <- config$python + venv <- config$virtualenv + + # Check that python can be loaded, if it can't we will throw + # an error, which is unrecoverable. + config <- tryCatch({ + reticulate::py_config() + }, error = function(err) { + err + }) + + if (inherits(config, "error")) { + return(list(python = python, venv = venv, error = conditionMessage(config))) + } + + # Now check ipykernel + ipykernel <- tryCatch({ + reticulate::py_module_available("ipykernel") + }, error = function(err) { + err + }) + + if (inherits(ipykernel, "error")) { + return(list(python = python, venv = venv, error = conditionMessage(ipykernel))) + } + + list( + python = config$python, + venv = venv, + ipykernel = ipykernel, + error = NULL + ) +} + +#' @export +.ps.rpc.reticulate_start_kernel <- function(kernelPath, connectionFile, logFile, logLevel) { + # Starts an IPykernel in a separate thread with information provided by + # the caller. + # It it's essentially executing the kernel startup script: + # https://github.com/posit-dev/positron/blob/main/extensions/positron-python/python_files/positron/positron_language_server.py + # and passing the communication files that Positron Jupyter's Adapter sets up. + tryCatch({ + reticulate:::py_run_file_on_thread( + file = kernelPath, + args = c( + "-f", connectionFile, + "--logfile", logFile, + "--loglevel", logLevel, + "--session-mode", "console" + ) + ) + # Empty string means that no error happened. + "" + }, error = function(err) { + conditionMessage(err) + }) +} diff --git a/crates/ark/src/reticulate.rs b/crates/ark/src/reticulate.rs new file mode 100644 index 000000000..c3fb09b83 --- /dev/null +++ b/crates/ark/src/reticulate.rs @@ -0,0 +1,117 @@ +use std::ops::Deref; +use std::sync::LazyLock; +use std::sync::Mutex; + +use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::event::CommManagerEvent; +use amalthea::socket::comm::CommInitiator; +use amalthea::socket::comm::CommSocket; +use crossbeam::channel::Sender; +use harp::RObject; +use libr::R_NilValue; +use libr::SEXP; +use serde_json::json; +use stdext::result::ResultOrLog; +use stdext::spawn; +use stdext::unwrap; +use uuid::Uuid; + +use crate::interface::RMain; + +static RETICULATE_COMM_ID: LazyLock>> = LazyLock::new(|| Mutex::new(None)); + +pub struct ReticulateService { + comm: CommSocket, + comm_manager_tx: Sender, +} + +impl ReticulateService { + fn start(comm_id: String, comm_manager_tx: Sender) -> anyhow::Result { + let comm = CommSocket::new( + CommInitiator::BackEnd, + comm_id.clone(), + String::from("positron.reticulate"), + ); + + let service = Self { + comm, + comm_manager_tx, + }; + + let event = CommManagerEvent::Opened(service.comm.clone(), serde_json::Value::Null); + service + .comm_manager_tx + .send(event) + .or_log_error("Reticulate: Could not open comm."); + + spawn!(format!("ark-reticulate-{}", comm_id), move || { + service + .handle_messages() + .or_log_error("Reticulate: Error handling messages"); + }); + + Ok(comm_id) + } + + fn handle_messages(&self) -> Result<(), anyhow::Error> { + loop { + let msg = unwrap!(self.comm.incoming_rx.recv(), Err(err) => { + log::error!("Reticulate: Error while receiving message from frontend: {err:?}"); + break; + }); + + if let CommMsg::Close = msg { + break; + } + } + + // before finalizing the thread we make sure to send a close message to the front end + self.comm + .outgoing_tx + .send(CommMsg::Close) + .or_log_error("Reticulate: Could not send close message to the front-end"); + + // Reset the global comm_id before closing + let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap(); + log::info!("Reticulate Thread closing {:?}", (*comm_id_guard).clone()); + *comm_id_guard = None; + + Ok(()) + } +} + +// Creates a client instance reticulate can use to communicate with the front-end. +// We should aim at having at most **1** client per R session. +// Further actions that reticulate can ask the front-end can be requested through +// the comm_id that is returned by this function. +#[harp::register] +pub unsafe extern "C" fn ps_reticulate_open(input: SEXP) -> Result { + let main = RMain::get(); + + let input: RObject = input.try_into()?; + let input_code: Option = input.try_into()?; + + let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap(); + + // If there's an id already registered, we just need to send the focus event + if let Some(id) = comm_id_guard.deref() { + // There's a comm_id registered, we just send the focus event + main.get_comm_manager_tx().send(CommManagerEvent::Message( + id.clone(), + CommMsg::Data(json!({ + "method": "focus", + "params": { + "input": input_code + } + })), + ))?; + return Ok(R_NilValue); + } + + let id = Uuid::new_v4().to_string(); + *comm_id_guard = Some(id.clone()); + + ReticulateService::start(id, main.get_comm_manager_tx().clone())?; + + Ok(R_NilValue) +}