diff --git a/coerce/src/actor/watch/mod.rs b/coerce/src/actor/watch/mod.rs index 10da6e0..6764efd 100644 --- a/coerce/src/actor/watch/mod.rs +++ b/coerce/src/actor/watch/mod.rs @@ -10,21 +10,21 @@ pub mod watchers; #[async_trait] pub trait ActorWatch { - fn watch(&self, actor: LocalActorRef, ctx: &ActorContext); + fn watch(&self, actor: &LocalActorRef, ctx: &ActorContext); - fn unwatch(&self, actor: LocalActorRef, ctx: &ActorContext); + fn unwatch(&self, actor: &LocalActorRef, ctx: &ActorContext); } impl ActorWatch for A where A: Handler, { - fn watch(&self, actor: LocalActorRef, ctx: &ActorContext) { + fn watch(&self, actor: &LocalActorRef, ctx: &ActorContext) { let terminated_receiver = Receiver::::from(self.actor_ref(ctx)); let _ = actor.notify(Watch::from(terminated_receiver)); } - fn unwatch(&self, actor: LocalActorRef, ctx: &ActorContext) { + fn unwatch(&self, actor: &LocalActorRef, ctx: &ActorContext) { let _ = actor.notify(Unwatch::from(ctx.id().clone())); } } @@ -76,6 +76,12 @@ pub struct ActorTerminated { actor_ref: BoxedActorRef, } +impl ActorTerminated { + pub fn actor_ref(&self) -> &BoxedActorRef { + &self.actor_ref + } +} + impl From for ActorTerminated { fn from(value: BoxedActorRef) -> Self { Self { actor_ref: value } diff --git a/coerce/tests/test_actor_watching.rs b/coerce/tests/test_actor_watching.rs new file mode 100644 index 0000000..c60f4e0 --- /dev/null +++ b/coerce/tests/test_actor_watching.rs @@ -0,0 +1,55 @@ +use crate::util::TestActor; +use async_trait::async_trait; +use coerce::actor::context::ActorContext; +use coerce::actor::message::Handler; +use coerce::actor::system::ActorSystem; +use coerce::actor::watch::{ActorTerminated, ActorWatch}; +use coerce::actor::{Actor, ActorId, CoreActorRef, IntoActor, LocalActorRef}; +use tokio::sync::oneshot; +use tokio::sync::oneshot::Sender; + +pub mod util; + + + +pub struct Watchdog { + target: LocalActorRef, + on_actor_terminated: Option>, +} + +#[async_trait] +impl Actor for Watchdog { + async fn started(&mut self, ctx: &mut ActorContext) { + self.watch(&self.target, ctx); + } +} + +#[async_trait] +impl Handler for Watchdog { + async fn handle(&mut self, message: ActorTerminated, _ctx: &mut ActorContext) { + if let Some(on_actor_terminated) = self.on_actor_terminated.take() { + let _ = on_actor_terminated.send(message); + } + } +} + +#[tokio::test] +pub async fn test_actor_watch_notifications() { + let system = ActorSystem::new(); + let actor = TestActor::new().into_actor(Option::::None, &system).await.unwrap(); + let (tx, rx) = oneshot::channel(); + let _watchdog = Watchdog { + target: actor.clone(), + on_actor_terminated: Some(tx), + } + .into_actor(Option::::None, &system) + .await + .unwrap(); + + let _ = actor.notify_stop(); + let actor_terminated = rx.await.unwrap(); + let terminated_actor = actor_terminated.actor_ref(); + + assert_eq!(terminated_actor.actor_id(), actor.actor_id()); + assert_eq!(terminated_actor.is_valid(), false); +}