Skip to content

Commit

Permalink
Merge pull request #188 from posit-dev/feature/rpc-handling
Browse files Browse the repository at this point in the history
Move low level handling of RPC messages to `CommSocket`
  • Loading branch information
lionel- authored Dec 19, 2023
2 parents 434e96c + 4036929 commit edcf3fe
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 102 deletions.
64 changes: 64 additions & 0 deletions crates/amalthea/src/socket/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;

use crate::comm::base_comm::json_rpc_error;
use crate::comm::base_comm::JsonRpcErrorCode;
use crate::comm::comm_channel::CommMsg;

/**
Expand Down Expand Up @@ -83,6 +88,7 @@ impl CommSocket {
pub fn new(initiator: CommInitiator, comm_id: String, comm_name: String) -> Self {
let (outgoing_tx, outgoing_rx) = crossbeam::channel::unbounded();
let (incoming_tx, incoming_rx) = crossbeam::channel::unbounded();

Self {
comm_id,
comm_name,
Expand All @@ -93,4 +99,62 @@ impl CommSocket {
incoming_rx,
}
}

/**
* Handle `CommMsg::Rpc`.
*
* - `message`: A message received by the comm.
* - `request_handler`: The comm's handler for requests.
*
* Returns `false` if `message` is not an RPC. Otherwise returns `true`.
* Requests that could not be handled cause an RPC error response.
*/
pub fn handle_request<Reqs, Reps>(
&self,
message: CommMsg,
request_handler: impl FnOnce(Reqs) -> anyhow::Result<Reps>,
) -> bool
where
Reqs: DeserializeOwned,
Reps: Serialize,
{
let (id, data) = match message {
CommMsg::Rpc(id, data) => (id, data),
_ => return false,
};

let json = match serde_json::from_value::<Reqs>(data.clone()) {
Ok(m) => match request_handler(m) {
Ok(reply) => match serde_json::to_value(reply) {
Ok(value) => value,
Err(err) => self.json_rpc_internal_error(err, data),
},
Err(err) => self.json_rpc_internal_error(err, data),
},
Err(err) => json_rpc_error(
JsonRpcErrorCode::InvalidRequest,
format!(
"Invalid {} request: {err:} (request: {data:})",
self.comm_name
),
),
};
let response = CommMsg::Rpc(id, json);

self.outgoing_tx.send(response).unwrap();
true
}

fn json_rpc_internal_error<T>(&self, err: T, data: Value) -> Value
where
T: std::fmt::Display,
{
json_rpc_error(
JsonRpcErrorCode::InternalError,
format!(
"Failed to process {} request: {err} (request: {data:})",
self.comm_name
),
)
}
}
57 changes: 12 additions & 45 deletions crates/ark/src/help/r_help.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
//
//

use amalthea::comm::base_comm::json_rpc_error;
use amalthea::comm::base_comm::JsonRpcErrorCode;
use amalthea::comm::comm_channel::CommMsg;
use amalthea::comm::help_comm::HelpEvent;
use amalthea::comm::help_comm::HelpRpcReply;
Expand Down Expand Up @@ -95,6 +93,7 @@ impl RHelp {
help_request_rx,
help_reply_tx,
};

help.execution_thread();
});

Expand Down Expand Up @@ -163,59 +162,27 @@ impl RHelp {
// thread exit.
return false;
}
if let CommMsg::Rpc(id, data) = message {
let message = match serde_json::from_value::<HelpRpcRequest>(data.clone()) {
Ok(m) => m,
Err(err) => {
self.comm
.outgoing_tx
.send(CommMsg::Rpc(
id,
json_rpc_error(
JsonRpcErrorCode::InvalidRequest,
format!("Invalid help request: {err:} (request: {data:})"),
),
))
.unwrap();
return true;
},
};
if let Err(err) = self.handle_rpc(id.clone(), message) {
self.comm
.outgoing_tx
.send(CommMsg::Rpc(
id,
json_rpc_error(
JsonRpcErrorCode::InternalError,
format!("Failed to process help request: {err:} (request: {data:})"),
),
))
.unwrap();
return true;
}

if self
.comm
.handle_request(message, |req| self.handle_rpc(req))
{
return true;
}

true
}

fn handle_rpc(&self, id: String, message: HelpRpcRequest) -> Result<()> {
fn handle_rpc(&self, message: HelpRpcRequest) -> anyhow::Result<HelpRpcReply> {
// Match on the type of data received.
match message {
HelpRpcRequest::ShowHelpTopic(topic) => {
// Look up the help topic and attempt to show it; this returns a
// boolean indicating whether the topic was found.
let found = match self.show_help_topic(topic.topic.clone()) {
Ok(found) => found,
Err(err) => {
return Err(err);
},
};

// Create and send a reply to the front end.
let reply = HelpRpcReply::ShowHelpTopicReply(found);
let json = serde_json::to_value(reply)?;
self.comm.outgoing_tx.send(CommMsg::Rpc(id, json))?;
Ok(())
match self.show_help_topic(topic.topic.clone()) {
Ok(found) => Ok(HelpRpcReply::ShowHelpTopicReply(found)),
Err(err) => Err(err),
}
},
}
}
Expand Down
81 changes: 24 additions & 57 deletions crates/ark/src/plots/graphics_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use std::fs::File;
use std::io::BufReader;
use std::io::Read;

use amalthea::comm::base_comm::json_rpc_error;
use amalthea::comm::base_comm::JsonRpcErrorCode;
use amalthea::comm::comm_channel::CommMsg;
use amalthea::comm::event::CommManagerEvent;
use amalthea::comm::plot_comm::PlotEvent;
Expand Down Expand Up @@ -176,61 +174,30 @@ impl DeviceContext {
});

// Get the RPC request.
if let CommMsg::Rpc(rpc_id, value) = message {
let input = match serde_json::from_value::<PlotRpcRequest>(value.clone()) {
Ok(req) => req,
Err(err) => {
socket
.outgoing_tx
.send(CommMsg::Rpc(
rpc_id,
json_rpc_error(
JsonRpcErrorCode::InvalidRequest,
format!("Invalid request sent to plot ${plot_id}: {err:} (request: {value:})"),
),
))
.unwrap();
return;
},
};

match input {
PlotRpcRequest::Render(plot_meta) => {
let data = match self.render_plot(
plot_id,
plot_meta.width,
plot_meta.height,
plot_meta.pixel_ratio,
) {
Ok(data) => data,
Err(err) => {
socket
.outgoing_tx
.send(CommMsg::Rpc(
rpc_id,
json_rpc_error(
JsonRpcErrorCode::InternalError,
format!("Plot ${plot_id} failed to render: {err:} (request: {value:})"),
),
))
.unwrap();
return;
},
};

let response = PlotRpcReply::RenderReply(PlotResult {
data: data.to_string(),
mime_type: "image/png".to_string(),
});

let json = serde_json::to_value(response).unwrap();

socket
.outgoing_tx
.send(CommMsg::Rpc(rpc_id.to_string(), json))
.or_log_error("Failed to send plot due to");
},
}
if socket.handle_request(message, |req| self.handle_rpc(req, plot_id)) {
return;
}
}

fn handle_rpc(
&mut self,
message: PlotRpcRequest,
plot_id: &String,
) -> anyhow::Result<PlotRpcReply> {
match message {
PlotRpcRequest::Render(plot_meta) => {
let data = self.render_plot(
&plot_id,
plot_meta.width,
plot_meta.height,
plot_meta.pixel_ratio,
)?;

Ok(PlotRpcReply::RenderReply(PlotResult {
data: data.to_string(),
mime_type: "image/png".to_string(),
}))
},
}
}

Expand Down

0 comments on commit edcf3fe

Please sign in to comment.