Skip to content

Commit

Permalink
Error reporting QoL (#88)
Browse files Browse the repository at this point in the history
* Error reporting QoL

- require Display instead of Debug for Actor::Error associated type
- print handle() (and friends) errors using `{:#}` rather than `{:?}`
  - this hides the backtrace print of anyhow errors
- differentiate handle() (and friends) errors while shutting down vs. running and log them with lower severity
- make it more clear and less spammy when trying to shut down a system while it is already shutting down

* Tweak error, log message punctuation, casing, prefixes

Coming from live testing the these changes.
  • Loading branch information
strohel authored Aug 13, 2024
1 parent 4ea3712 commit edb7fdd
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 51 deletions.
2 changes: 1 addition & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ struct ChainLink {

impl Actor for ChainLink {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = u64;

fn name() -> &'static str {
Expand Down
117 changes: 69 additions & 48 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
//! struct TestActor {}
//! impl Actor for TestActor {
//! type Context = Context<Self::Message>;
//! type Error = ();
//! type Error = String;
//! type Message = usize;
//!
//! fn name() -> &'static str {
//! "TestActor"
//! }
//!
//! fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), ()> {
//! fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<(), String> {
//! println!("message: {}", message);
//!
//! Ok(())
Expand Down Expand Up @@ -92,13 +92,13 @@ impl fmt::Display for ActorError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ActorError::SystemStopped { actor_name } => {
write!(f, "The system is not running. The actor {} can not be started.", actor_name)
write!(f, "the system is not running; the actor {} can not be started", actor_name)
},
ActorError::SpawnFailed { actor_name } => {
write!(f, "Failed to spawn a thread for the actor {}.", actor_name)
write!(f, "failed to spawn a thread for the actor {}", actor_name)
},
ActorError::ActorPanic => {
write!(f, "A panic inside an actor thread. See above for more verbose logs.")
write!(f, "panic inside an actor thread; see above for more verbose logs")
},
}
}
Expand All @@ -125,7 +125,7 @@ impl fmt::Display for SendError {
SendErrorReason::Full => {
write!(
f,
"The capacity of {}'s {:?}-priority channel is full.",
"the capacity of {}'s {:?}-priority channel is full",
recipient_name, priority
)
},
Expand All @@ -145,7 +145,7 @@ impl fmt::Display for PublishError {
let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
write!(
f,
"Failed to deliver an event to {} subscribers: {}.",
"failed to deliver an event to {} subscribers: {}",
self.0.len(),
error_strings.join(", ")
)
Expand All @@ -165,7 +165,7 @@ pub struct DisconnectedError {

impl fmt::Display for DisconnectedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "The recipient of the message ({}) no longer exists.", self.recipient_name)
write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
}
}

Expand Down Expand Up @@ -608,35 +608,55 @@ impl System {
},
Received::Message(msg) => {
trace!("[{}] message received by {}", system_handle.name, A::name());
if let Err(err) = actor.handle(context, msg) {
error!(
"[{}] {} handle error: {:?}, shutting down.",
system_handle.name,
A::name(),
err
);
let _ = system_handle.shutdown();

if let Err(error) = actor.handle(context, msg) {
Self::report_error_shutdown(system_handle, A::name(), "handle", error);
return;
}
},
Received::Timeout => {
let deadline = context.receive_deadline.take().expect("implied by timeout");
if let Err(err) = actor.deadline_passed(context, deadline) {
error!(
"[{}] {} deadline_passed error: {:?}, shutting down.",
system_handle.name,
if let Err(error) = actor.deadline_passed(context, deadline) {
Self::report_error_shutdown(
system_handle,
A::name(),
err
"deadline_passed",
error,
);
let _ = system_handle.shutdown();

return;
}
},
}
}
}

fn report_error_shutdown(
system_handle: &SystemHandle,
actor_name: &str,
method_name: &str,
error: impl std::fmt::Display,
) {
let is_running = match *system_handle.system_state.read() {
SystemState::Running => true,
SystemState::ShuttingDown | SystemState::Stopped => false,
};

let system_name = &system_handle.name;

// Note that the system may have transitioned from running to stopping (but not the other
// way around) in the mean time. Slightly imprecise log and an extra no-op call is fine.
if is_running {
error!(
"[{system_name}] {actor_name} {method_name}() error: {error:#}. Shutting down the \
actor system."
);
let _ = system_handle.shutdown();
} else {
warn!(
"[{system_name}] {actor_name} {method_name}() error while shutting down: \
{error:#}. Ignoring."
);
}
}
}

impl Drop for System {
Expand All @@ -658,7 +678,6 @@ impl SystemHandle {
pub fn shutdown(&self) -> Result<(), ActorError> {
let current_thread = thread::current();
let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
info!("Thread [{}] shutting down the actor system", current_thread_name);

// Use an inner scope to prevent holding the lock for the duration of shutdown
{
Expand All @@ -667,24 +686,22 @@ impl SystemHandle {
match *system_state_lock {
SystemState::ShuttingDown | SystemState::Stopped => {
debug!(
"Thread [{}] called system.shutdown() but the system is already shutting \
down or stopped",
current_thread_name
"[{}] thread {} called system.shutdown() but the system is already \
shutting down or stopped.",
self.name, current_thread_name,
);
return Ok(());
},
SystemState::Running => {
debug!(
"Thread [{}] setting the system_state value to ShuttingDown",
current_thread_name
info!(
"[{}] thread {} shutting down the actor system.",
self.name, current_thread_name,
);
*system_state_lock = SystemState::ShuttingDown;
},
}
}

info!("[{}] system shutting down.", self.name);

if let Some(callback) = self.callbacks.preshutdown.as_ref() {
info!("[{}] calling pre-shutdown callback.", self.name);
if let Err(err) = callback() {
Expand Down Expand Up @@ -845,7 +862,7 @@ pub trait Actor {
// 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715
type Message: Send + 'static;
/// The type to return on error in the handle method.
type Error: std::fmt::Debug;
type Error: std::fmt::Display;
/// What kind of context this actor accepts. Usually [`Context<Self::Message>`].
type Context;

Expand Down Expand Up @@ -888,13 +905,13 @@ pub trait Actor {
/// # struct TickingActor;
/// impl Actor for TickingActor {
/// # type Context = Context<Self::Message>;
/// # type Error = ();
/// # type Error = String;
/// # type Message = ();
/// # fn name() -> &'static str { "TickingActor" }
/// # fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), ()> { Ok(()) }
/// # fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> { Ok(()) }
/// // ...
///
/// fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), ()> {
/// fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<(), String> {
/// // do_periodic_housekeeping();
///
/// // A: Schedule one second from now (even if delayed); drifting tick.
Expand Down Expand Up @@ -1106,14 +1123,14 @@ mod tests {
struct TestActor;
impl Actor for TestActor {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = usize;

fn name() -> &'static str {
"TestActor"
}

fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), ()> {
fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
println!("message: {}", message);

Ok(())
Expand Down Expand Up @@ -1167,14 +1184,14 @@ mod tests {
}
impl Actor for LocalActor {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = ();

fn name() -> &'static str {
"LocalActor"
}

fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), ()> {
fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
Ok(())
}

Expand Down Expand Up @@ -1204,22 +1221,26 @@ mod tests {

impl Actor for TimeoutActor {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = Option<Instant>;

fn name() -> &'static str {
"TimeoutActor"
}

fn handle(&mut self, ctx: &mut Self::Context, msg: Self::Message) -> Result<(), ()> {
fn handle(
&mut self,
ctx: &mut Self::Context,
msg: Self::Message,
) -> Result<(), String> {
self.handle_count.fetch_add(1, Ordering::SeqCst);
if msg.is_some() {
ctx.receive_deadline = msg;
}
Ok(())
}

fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), ()> {
fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
self.timeout_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Expand Down Expand Up @@ -1272,7 +1293,7 @@ mod tests {
let error = low_capacity_actor.send(123).unwrap_err();
assert_eq!(
error.to_string(),
"The capacity of TestActor's Normal-priority channel is full."
"the capacity of TestActor's Normal-priority channel is full"
);
assert_eq!(
format!("{:?}", error),
Expand All @@ -1282,7 +1303,7 @@ mod tests {
system.shutdown().unwrap();

let error = stopped_actor.send(456usize).unwrap_err();
assert_eq!(error.to_string(), "The recipient of the message (TestActor) no longer exists.");
assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
assert_eq!(
format!("{:?}", error),
r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
Expand All @@ -1299,7 +1320,7 @@ mod tests {

impl Actor for PriorityActor {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = usize;

fn handle(
Expand Down Expand Up @@ -1356,7 +1377,7 @@ mod tests {
struct Subscriber;
impl Actor for Subscriber {
type Context = Context<Self::Message>;
type Error = ();
type Error = String;
type Message = ();

fn started(&mut self, context: &mut Self::Context) {
Expand Down
4 changes: 2 additions & 2 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ mod tests {

impl Actor for TimedTestActor {
type Context = TimedContext<Self::Message>;
type Error = ();
type Error = String;
type Message = usize;

fn name() -> &'static str {
"TimedTestActor"
}

fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> {
fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), String> {
{
let mut guard = self.received.lock().unwrap();
guard.push(message);
Expand Down

0 comments on commit edb7fdd

Please sign in to comment.