Skip to content

Reticulate support #506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 27, 2024
Merged
1 change: 1 addition & 0 deletions crates/ark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod modules_utils;
pub mod plots;
pub mod r_task;
pub mod request;
pub mod reticulate;
pub mod shell;
pub mod signals;
pub mod srcref;
Expand Down
11 changes: 11 additions & 0 deletions crates/ark/src/modules/positron/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
#' @export
.ps.rpc.is_installed <- .ps.is_installed

#' @export
.ps.rpc.install_packages <- function(packages) {
for (pkg in packages) {
if (.ps.rpc.isPackageAttached(pkg)) {
stop("Should not install a package if it's already attached.")
}
}
utils::install.packages(packages)
TRUE
}

#' @export
.ps.rpc.isPackageAttached <- function(pkg) {
if (!is_string(pkg)) {
Expand Down
92 changes: 92 additions & 0 deletions crates/ark/src/modules/positron/reticulate.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#' @export
.ps.reticulate_open <- function(input="") {
.ps.Call("ps_reticulate_open", input)
}

#' Called by the front-end right before starting the reticulate session.
#'
#' At this point it should be fine to load Python if it's not loaded, and
#' check if it can be started and if necessary packages are installed.
#' @export
.ps.rpc.reticulate_check_prerequisites <- function() {

# This should return a list with the following fields:
# python: NULL or string
# venv: NULL or string
# ipykernel: NULL or string
# error: NULL or string

config <- tryCatch({
reticulate::py_discover_config()
}, error = function(err) {
err
})

if (inherits(config, "error")) {
# py_discover_config() can fail if the user forced a Python session
# via RETICULATE_PYTHON, but this version doesn't exist.
return(list(error = conditionMessage(config)))
}

if (is.null(config) || is.null(config$python)) {
# The front-end will offer to install Python.
return(list(python = NULL, error = NULL))
}

python <- config$python
venv <- config$virtualenv

# Check that python can be loaded, if it can't we will throw
# an error, which is unrecoverable.
config <- tryCatch({
reticulate::py_config()
}, error = function(err) {
err
})

if (inherits(config, "error")) {
return(list(python = python, venv = venv, error = conditionMessage(config)))
}

# Now check ipykernel
ipykernel <- tryCatch({
reticulate::py_module_available("ipykernel")
}, error = function(err) {
err
})

if (inherits(ipykernel, "error")) {
return(list(python = python, venv = venv, error = conditionMessage(ipykernel)))
}

list(
python = config$python,
venv = venv,
ipykernel = ipykernel,
error = NULL
)
}

#' @export
.ps.rpc.reticulate_start_kernel <- function(kernelPath, connectionFile, logFile, logLevel) {
# Starts an IPykernel in a separate thread with information provided by
# the caller.
# It it's essentially executing the kernel startup script:
# https://github.com/posit-dev/positron/blob/main/extensions/positron-python/python_files/positron/positron_language_server.py
# and passing the communication files that Positron Jupyter's Adapter sets up.
tryCatch({
reticulate:::py_run_file_on_thread(
file = kernelPath,
args = c(
"-f", connectionFile,
"--logfile", logFile,
"--loglevel", logLevel,
"--session-mode", "console"
)
)
# Empty string means that no error happened.
""
}, error = function(err) {
conditionMessage(err)
})
}
117 changes: 117 additions & 0 deletions crates/ark/src/reticulate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::ops::Deref;
use std::sync::LazyLock;
use std::sync::Mutex;

use amalthea::comm::comm_channel::CommMsg;
use amalthea::comm::event::CommManagerEvent;
use amalthea::socket::comm::CommInitiator;
use amalthea::socket::comm::CommSocket;
use crossbeam::channel::Sender;
use harp::RObject;
use libr::R_NilValue;
use libr::SEXP;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::spawn;
use stdext::unwrap;
use uuid::Uuid;

use crate::interface::RMain;

static RETICULATE_COMM_ID: LazyLock<Mutex<Option<String>>> = LazyLock::new(|| Mutex::new(None));

pub struct ReticulateService {
comm: CommSocket,
comm_manager_tx: Sender<CommManagerEvent>,
}

impl ReticulateService {
fn start(comm_id: String, comm_manager_tx: Sender<CommManagerEvent>) -> anyhow::Result<String> {
let comm = CommSocket::new(
CommInitiator::BackEnd,
comm_id.clone(),
String::from("positron.reticulate"),
);

let service = Self {
comm,
comm_manager_tx,
};

let event = CommManagerEvent::Opened(service.comm.clone(), serde_json::Value::Null);
service
.comm_manager_tx
.send(event)
.or_log_error("Reticulate: Could not open comm.");

spawn!(format!("ark-reticulate-{}", comm_id), move || {
service
.handle_messages()
.or_log_error("Reticulate: Error handling messages");
});

Ok(comm_id)
}

fn handle_messages(&self) -> Result<(), anyhow::Error> {
loop {
let msg = unwrap!(self.comm.incoming_rx.recv(), Err(err) => {
log::error!("Reticulate: Error while receiving message from frontend: {err:?}");
break;
});

if let CommMsg::Close = msg {
break;
}
}

// before finalizing the thread we make sure to send a close message to the front end
self.comm
.outgoing_tx
.send(CommMsg::Close)
.or_log_error("Reticulate: Could not send close message to the front-end");

// Reset the global comm_id before closing
let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap();
log::info!("Reticulate Thread closing {:?}", (*comm_id_guard).clone());
*comm_id_guard = None;

Ok(())
}
}

// Creates a client instance reticulate can use to communicate with the front-end.
// We should aim at having at most **1** client per R session.
// Further actions that reticulate can ask the front-end can be requested through
// the comm_id that is returned by this function.
#[harp::register]
pub unsafe extern "C" fn ps_reticulate_open(input: SEXP) -> Result<SEXP, anyhow::Error> {
let main = RMain::get();

let input: RObject = input.try_into()?;
let input_code: Option<String> = input.try_into()?;

let mut comm_id_guard = RETICULATE_COMM_ID.lock().unwrap();

// If there's an id already registered, we just need to send the focus event
if let Some(id) = comm_id_guard.deref() {
// There's a comm_id registered, we just send the focus event
main.get_comm_manager_tx().send(CommManagerEvent::Message(
id.clone(),
CommMsg::Data(json!({
"method": "focus",
"params": {
"input": input_code
}
})),
))?;
return Ok(R_NilValue);
}

let id = Uuid::new_v4().to_string();
*comm_id_guard = Some(id.clone());

ReticulateService::start(id, main.get_comm_manager_tx().clone())?;

Ok(R_NilValue)
}
Loading