Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging issues in integration tests #558

Merged
merged 14 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/amalthea/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ anyhow = "1.0.80"
serde_with = "3.0.0"
serde_repr = "0.1.17"
tracing = "0.1.40"
assert_matches = "1.5.0"

[dev-dependencies]
env_logger = "0.10.0"
56 changes: 35 additions & 21 deletions crates/amalthea/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
*
*/

use assert_matches::assert_matches;
use serde_json::Value;
use stdext::assert_match;

use crate::connection_file::ConnectionFile;
use crate::session::Session;
Expand Down Expand Up @@ -155,17 +155,41 @@ impl DummyFrontend {
message.send(&self.stdin_socket).unwrap();
}

pub fn recv(&self, socket: &Socket) -> Message {
// It's important to wait with a timeout because the kernel thread might
// have panicked, preventing it from sending the expected message. The
// tests would then hang indefinitely.
//
// Note that the panic hook will still have run to record the panic, so
// we'll get expected panic information in the test output.
if socket.poll_incoming(1000).unwrap() {
return Message::read_from_socket(socket).unwrap();
}

panic!("Timeout while expecting message on socket {}", socket.name);
}

/// Receives a Jupyter message from the Shell socket
pub fn recv_shell(&self) -> Message {
Message::read_from_socket(&self.shell_socket).unwrap()
self.recv(&self.shell_socket)
}

/// Receives a Jupyter message from the IOPub socket
pub fn recv_iopub(&self) -> Message {
self.recv(&self.iopub_socket)
}

/// Receives a Jupyter message from the Stdin socket
pub fn recv_stdin(&self) -> Message {
self.recv(&self.stdin_socket)
}

/// Receive from Shell and assert `ExecuteReply` message.
/// Returns `execution_count`.
pub fn recv_shell_execute_reply(&self) -> u32 {
let msg = self.recv_shell();

assert_match!(msg, Message::ExecuteReply(data) => {
assert_matches!(msg, Message::ExecuteReply(data) => {
assert_eq!(data.content.status, Status::Ok);
data.content.execution_count
})
Expand All @@ -176,22 +200,17 @@ impl DummyFrontend {
pub fn recv_shell_execute_reply_exception(&self) -> u32 {
let msg = self.recv_shell();

assert_match!(msg, Message::ExecuteReplyException(data) => {
assert_matches!(msg, Message::ExecuteReplyException(data) => {
assert_eq!(data.content.status, Status::Error);
data.content.execution_count
})
}

/// Receives a Jupyter message from the IOPub socket
pub fn recv_iopub(&self) -> Message {
Message::read_from_socket(&self.iopub_socket).unwrap()
}

/// Receive from IOPub and assert Busy message
pub fn recv_iopub_busy(&self) -> () {
let msg = self.recv_iopub();

assert_match!(msg, Message::Status(data) => {
assert_matches!(msg, Message::Status(data) => {
assert_eq!(data.content.execution_state, ExecutionState::Busy);
});
}
Expand All @@ -200,7 +219,7 @@ impl DummyFrontend {
pub fn recv_iopub_idle(&self) -> () {
let msg = self.recv_iopub();

assert_match!(msg, Message::Status(data) => {
assert_matches!(msg, Message::Status(data) => {
assert_eq!(data.content.execution_state, ExecutionState::Idle);
});
}
Expand All @@ -209,7 +228,7 @@ impl DummyFrontend {
pub fn recv_iopub_execute_input(&self) -> ExecuteInput {
let msg = self.recv_iopub();

assert_match!(msg, Message::ExecuteInput(data) => {
assert_matches!(msg, Message::ExecuteInput(data) => {
data.content
})
}
Expand All @@ -219,9 +238,9 @@ impl DummyFrontend {
pub fn recv_iopub_execute_result(&self) -> String {
let msg = self.recv_iopub();

assert_match!(msg, Message::ExecuteResult(data) => {
assert_match!(data.content.data, Value::Object(map) => {
assert_match!(map["text/plain"], Value::String(ref string) => {
assert_matches!(msg, Message::ExecuteResult(data) => {
assert_matches!(data.content.data, Value::Object(map) => {
assert_matches!(map["text/plain"], Value::String(ref string) => {
string.clone()
})
})
Expand All @@ -233,16 +252,11 @@ impl DummyFrontend {
pub fn recv_iopub_execute_error(&self) -> String {
let msg = self.recv_iopub();

assert_match!(msg, Message::ExecuteError(data) => {
assert_matches!(msg, Message::ExecuteError(data) => {
data.content.exception.evalue
})
}

/// Receives a Jupyter message from the Stdin socket
pub fn recv_stdin(&self) -> Message {
Message::read_from_socket(&self.stdin_socket).unwrap()
}

/// Receives a (raw) message from the heartbeat socket
pub fn recv_heartbeat(&self) -> zmq::Message {
let mut msg = zmq::Message::new();
Expand Down
19 changes: 11 additions & 8 deletions crates/amalthea/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crossbeam::channel::unbounded;
use crossbeam::channel::Receiver;
use crossbeam::channel::Select;
use crossbeam::channel::Sender;
use log::error;
use stdext::spawn;
use stdext::unwrap;

Expand All @@ -38,6 +37,10 @@ use crate::wire::input_reply::InputReply;
use crate::wire::jupyter_message::Message;
use crate::wire::jupyter_message::OutboundMessage;

macro_rules! report_error {
($($arg:tt)+) => (if cfg!(debug_assertions) { log::error!($($arg)+) } else { panic!($($arg)+) })
}

/// A Kernel represents a unique Jupyter kernel session and is the host for all
/// execution and messaging threads.
pub struct Kernel {
Expand Down Expand Up @@ -371,7 +374,7 @@ impl Kernel {
}
// Consume notification
let _ = unwrap!(outbound_notif_socket.socket.recv_bytes(0), Err(err) => {
log::error!("Could not consume outbound notification socket: {}", err);
report_error!("Could not consume outbound notification socket: {}", err);
return false;
});

Expand Down Expand Up @@ -424,7 +427,7 @@ impl Kernel {
let n = unwrap!(
zmq::poll(&mut poll_items, -1),
Err(err) => {
error!("While polling 0MQ items: {}", err);
report_error!("While polling 0MQ items: {}", err);
0
}
);
Expand All @@ -433,20 +436,20 @@ impl Kernel {
if has_outbound() {
unwrap!(
forward_outbound(),
Err(err) => error!("While forwarding outbound message: {}", err)
Err(err) => report_error!("While forwarding outbound message: {}", err)
);
continue;
}

if has_inbound() {
unwrap!(
forward_inbound(),
Err(err) => error!("While forwarding inbound message: {}", err)
Err(err) => report_error!("While forwarding inbound message: {}", err)
);
continue;
}

log::error!("Could not find readable message");
report_error!("Could not find readable message");
}
}
}
Expand All @@ -463,7 +466,7 @@ impl Kernel {
unwrap!(
notif_socket.send(zmq::Message::new()),
Err(err) => {
error!("Couldn't notify 0MQ thread: {}", err);
report_error!("Couldn't notify 0MQ thread: {}", err);
continue;
}
);
Expand All @@ -476,7 +479,7 @@ impl Kernel {
notif_socket.recv(&mut msg)
},
Err(err) => {
error!("Couldn't received acknowledgement from 0MQ thread: {}", err);
report_error!("Couldn't received acknowledgement from 0MQ thread: {}", err);
continue;
}
);
Expand Down
6 changes: 5 additions & 1 deletion crates/amalthea/src/socket/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ impl Socket {
}
}

pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result<bool> {
Ok(self.socket.poll(zmq::PollEvents::POLLIN, timeout_ms)? != 0)
}

pub fn has_incoming_data(&self) -> zmq::Result<bool> {
Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0)
self.poll_incoming(0)
}

/// Subscribes a SUB socket to all the published messages from a PUB socket.
Expand Down
26 changes: 16 additions & 10 deletions crates/ark/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,25 @@ impl DummyArkFrontend {
let frontend = DummyFrontend::new();
let connection_file = frontend.get_connection_file();

// Start the kernel in this thread so that panics are propagated
crate::start::start_kernel(
connection_file,
vec![
String::from("--interactive"),
String::from("--vanilla"),
lionel- marked this conversation as resolved.
Show resolved Hide resolved
String::from("--no-save"),
String::from("--no-restore"),
],
None,
SessionMode::Console,
false,
);

// Start the REPL in a background thread, does not return and is never joined
stdext::spawn!("dummy_kernel", || {
crate::start::start_kernel(
connection_file,
vec![String::from("--no-save"), String::from("--no-restore")],
None,
SessionMode::Console,
false,
);
RMain::start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mildly concerned about the fact that R_INIT sets "set" in setup() at the very end.

But we typically think of R_INIT as "R is ready to receive input", but that really isn't true because we haven't started the REPL yet (not until we call start() here).

I feel like its probably ok? Maybe R_INIT should be R_STARTING or something, I'm not sure. And the remaining helper would be is_r_starting(). If you look at the 1 place is_r_initialized() is currently being used, it is to capture output that is part of the startup banner. Switching the name to is_r_starting() seems like it would still make sense there.

Copy link
Contributor Author

@lionel- lionel- Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about that too, but I couldn't move it to the start() method.

But I think that's ok because R is ready to receive input or be accessed with well protected (top-level exec) API calls even if the REPL has not been started yet.

});
lionel- marked this conversation as resolved.
Show resolved Hide resolved

// Wait for startup to complete
RMain::wait_r_initialized();
Comment on lines -55 to -56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove wait_r_initialized() now?


frontend.complete_initialization();
frontend
}
Expand Down
Loading
Loading