Skip to content

Commit

Permalink
Make Actor::started(), stopped() fallible
Browse files Browse the repository at this point in the history
  • Loading branch information
strohel committed Aug 13, 2024
1 parent 0e10b57 commit 4eee983
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 76 deletions.
20 changes: 7 additions & 13 deletions benches/pub_sub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use criterion::{criterion_group, criterion_main, Criterion};
use std::{
hint::black_box,
Expand Down Expand Up @@ -36,11 +36,7 @@ impl Actor for PublisherActor {
"PublisherActor"
}

fn handle(
&mut self,
context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
PublisherMessage::SubscriberStarted => {
self.subscriber_count = self.subscriber_count.checked_sub(1).unwrap();
Expand Down Expand Up @@ -83,18 +79,16 @@ impl Actor for SubscriberActor {
"SubscriberActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
for publisher_addr in self.publisher_addrs.iter() {
publisher_addr.send(PublisherMessage::SubscriberStarted).unwrap();
publisher_addr.send(PublisherMessage::SubscriberStarted)?;
}

Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
// This black_box has a nice side effect that it silences the 'field is never read' warning.
black_box(message.0);
Ok(())
Expand Down
21 changes: 9 additions & 12 deletions examples/actor_wrapping.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use log::debug;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -34,12 +34,12 @@ impl<A: Actor> Actor for LoggingAdapter<A> {
A::priority(message)
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
debug!("LoggingAdapter: started()");
self.inner.started(context)
}

fn stopped(&mut self, context: &mut Self::Context) {
fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
debug!("LoggingAdapter: stopped()");
self.inner.stopped(context)
}
Expand All @@ -61,7 +61,7 @@ impl Actor for TestActor {
type Error = Error;
type Message = String;

fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<(), Error> {
fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<()> {
println!("Got a message: {}. Shuting down.", message);
context.system_handle.shutdown().map_err(Error::from)
}
Expand All @@ -70,20 +70,17 @@ impl Actor for TestActor {
"TestActor"
}

fn started(&mut self, context: &mut Self::Context) {
context.set_timeout(Some(Duration::from_millis(100)))
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_timeout(Some(Duration::from_millis(100)));
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(format!("deadline was {:?}", deadline)).map_err(Error::from)
}
}

fn main() -> Result<(), Error> {
fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();

let mut system = System::new("Actor Wrapping Example");
Expand Down
29 changes: 10 additions & 19 deletions examples/pub_sub_example.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use std::time::{Duration, Instant};
use tonari_actor::{Actor, Context, Event, System};
Expand Down Expand Up @@ -40,9 +40,10 @@ impl Actor for PublisherActor {
"PublisherActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_deadline(Some(self.started_at + Duration::from_millis(1500)));
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
Expand Down Expand Up @@ -71,11 +72,7 @@ impl Actor for PublisherActor {
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(PublisherMessage::Periodic)?;
context.set_deadline(Some(deadline + Duration::from_secs(1)));
Ok(())
Expand Down Expand Up @@ -104,15 +101,12 @@ impl Actor for SubscriberActor1 {
"SubscriberActor1"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
SubscriberMessage::Text(text) => {
println!("SubscriberActor1 got a text message: {:?}", text);
Expand All @@ -132,15 +126,12 @@ impl Actor for SubscriberActor2 {
"SubscriberActor1"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
SubscriberMessage::Text(text) => {
println!("SubscriberActor2 got a text message: {:?}", text);
Expand Down
17 changes: 5 additions & 12 deletions examples/simple_timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use std::time::{Duration, Instant};
use tonari_actor::{Actor, Context, System};
Expand Down Expand Up @@ -27,24 +27,17 @@ impl Actor for TimerExampleActor {
"TimerExampleActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_deadline(Some(self.started_at + Duration::from_millis(1500)));
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
println!("Got a message: {:?} at {:?}", message, self.started_at.elapsed());
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(TimerMessage::Periodic)?;
context.set_deadline(Some(deadline + Duration::from_secs(1)));
Ok(())
Expand Down
45 changes: 29 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,10 @@ impl System {
.spawn(move || {
let mut actor = factory();

actor.started(&mut context);
if let Err(error) = actor.started(&mut context) {
Self::report_error_shutdown(&system_handle, A::name(), "started", error);
return;
}
debug!("[{}] started actor: {}", system_handle.name, A::name());

Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
Expand Down Expand Up @@ -535,9 +538,13 @@ impl System {

self.handle.registry.lock().push(RegistryEntry::CurrentThread(addr.control_tx.clone()));

actor.started(&mut context);
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
match actor.started(&mut context) {
Ok(()) => {
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
},
Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started", error),
}

// Wait for the system to shutdown before we exit, otherwise the process
// would exit before the system is completely shutdown
Expand Down Expand Up @@ -602,7 +609,10 @@ impl System {
// Process the event. Returning ends actor loop, the normal operation is to fall through.
match received {
Received::Control(Control::Stop) => {
actor.stopped(context);
if let Err(error) = actor.stopped(context) {
// FWIW this should always hit the "while shutting down" variant.
Self::report_error_shutdown(system_handle, A::name(), "stopped", error);
}
debug!("[{}] stopped actor: {}", system_handle.name, A::name());
return;
},
Expand Down Expand Up @@ -885,10 +895,14 @@ pub trait Actor {
}

/// An optional callback when the Actor has been started.
fn started(&mut self, _context: &mut Self::Context) {}
fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}

/// An optional callback when the Actor has been stopped.
fn stopped(&mut self, _context: &mut Self::Context) {}
fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}

/// An optional callback when a deadline has passed.
///
Expand Down Expand Up @@ -1129,16 +1143,17 @@ mod tests {

fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
println!("message: {}", message);

Ok(())
}

fn started(&mut self, _: &mut Self::Context) {
fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("started");
Ok(())
}

fn stopped(&mut self, _: &mut Self::Context) {
fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("stopped");
Ok(())
}
}

Expand Down Expand Up @@ -1193,8 +1208,8 @@ mod tests {
}

/// We just need this test to compile, not run.
fn started(&mut self, ctx: &mut Self::Context) {
ctx.system_handle.shutdown().unwrap();
fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
ctx.system_handle.shutdown().map_err(|e| e.to_string())
}
}

Expand Down Expand Up @@ -1377,10 +1392,8 @@ mod tests {
type Error = String;
type Message = ();

fn started(&mut self, context: &mut Self::Context) {
context
.subscribe_and_receive_latest::<Self::Message>()
.expect("can receive last cached value");
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
}

fn handle(
Expand Down
8 changes: 4 additions & 4 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor
}
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.started(&mut TimedContext::from_context(context))
}

fn stopped(&mut self, context: &mut Self::Context) {
fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.stopped(&mut TimedContext::from_context(context))
}

Expand Down Expand Up @@ -314,15 +314,15 @@ mod tests {
Ok(())
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context
.myself
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap()
.map_err(|e| e.to_string())
}
}

Expand Down

0 comments on commit 4eee983

Please sign in to comment.