Skip to content

Commit

Permalink
move shard coordinator to use the new cluster singleton system & remo…
Browse files Browse the repository at this point in the history
…ve `CoordinatorSpawner` actor
  • Loading branch information
LeonHartley committed Feb 8, 2024
1 parent 2ffbaa3 commit 89d58b4
Show file tree
Hide file tree
Showing 20 changed files with 182 additions and 321 deletions.
48 changes: 12 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion coerce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "coerce"
description = "Async actor runtime and distributed systems framework"
license = "Apache-2.0"
version = "0.8.11"
version = "0.8.12"
authors = ["Leon Hartley <[email protected]>"]
edition = "2021"
readme = "README.md"
Expand All @@ -20,6 +20,7 @@ full = [
"actor-tracing",
"actor-tracing-info",
"client-auth-jwt",
"singleton",
]

remote = [
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/actor/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ impl ActorSystem {
Ok(_) => Ok(actor_ref),
Err(_e) => {
error!(
"actor not started, actor_id={}, type={}",
&id,
A::type_name()
actor_id = id.as_ref(),
actor_type = A::type_name(),
"actor not started",
);
Err(ActorRefErr::ActorStartFailed)
}
Expand Down
28 changes: 21 additions & 7 deletions coerce/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,27 +385,41 @@ fn on_context_dropped(
ActorStatus::Started => {
if let Some(true) = system_terminated {
debug!(
"actor (id={}, type={}) has stopped due to system shutdown",
actor_id, actor_type
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped due to system shutdown",
);
} else {
debug!("actor (id={}) has stopped unexpectedly", actor.0.actor_id());
debug!(
actor_id = actor.0.actor_id().as_ref(),
actor_type = actor_type,
"actor stopped unexpectedly"
);
}
}

ActorStatus::Stopping => {
if let Some(true) = system_terminated {
trace!("actor (id={}) has stopped due to system shutdown", actor_id,);
trace!(
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped due to system shutdown"
);
} else {
debug!(
"actor (id={}) was stopping but did not complete the stop procedure",
actor_id,
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor was stopping but did not complete the stop procedure",
);
}
}

ActorStatus::Stopped => {
debug!("actor (id={}) stopped, context dropped", actor_id);
debug!(
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped, context dropped"
);
}
}

Expand Down
20 changes: 14 additions & 6 deletions coerce/src/actor/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ActorLoop {
.new_context(system.clone(), Starting, actor_ref.clone().into())
.with_parent(parent_ref);

trace!("[{}] starting", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor starting");

actor.started(&mut ctx).await;
ActorMetrics::incr_actor_created(A::type_name());
Expand All @@ -72,7 +72,7 @@ impl ActorLoop {

ctx.set_status(Started);

trace!("[{}] ready", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor ready");

if let Some(on_start) = on_start.take() {
let _ = on_start.send(());
Expand Down Expand Up @@ -102,7 +102,11 @@ impl ActorLoop {
message_type = msg.name(),
);

trace!("[{}] received {}", ctx.full_path(), msg.name(),);
trace!(
actor = ctx.full_path().as_ref(),
msg_type = msg.name(),
"actor message received"
);

let handle_fut = msg.handle(&mut actor, &mut ctx);

Expand All @@ -111,15 +115,19 @@ impl ActorLoop {

handle_fut.await;

trace!("[{}] processed {}", ctx.full_path(), msg.name());
trace!(
actor = ctx.full_path().as_ref(),
msg_type = msg.name(),
"actor message processed"
);
}

if ctx.get_status() == &Stopping {
break;
}
}

trace!("[{}] stopping", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor stopping");

ctx.set_status(Stopping);

Expand All @@ -141,7 +149,7 @@ async fn actor_stopped<A: Actor>(
if actor_type.is_tracked() {
if let Some(system) = system.take() {
if !system.is_terminated() {
trace!("de-registering actor {}", &actor_id);
trace!(actor_id = actor_id.as_ref(), "de-registering actor");

system
.scheduler()
Expand Down
8 changes: 3 additions & 5 deletions coerce/src/remote/cluster/singleton/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use crate::actor::context::ActorContext;
use crate::actor::message::{
FromBytes, Handler, Message, MessageUnwrapErr, MessageWrapErr, ToBytes,
};
use crate::actor::{
Actor, ActorFactory, ActorId, ActorRef, ActorRefErr, IntoActor, LocalActorRef, ToActorId,
};
use crate::actor::{Actor, ActorFactory, ActorId, ActorRef, IntoActor, LocalActorRef, ToActorId};
use crate::remote::cluster::node::NodeSelector;
use crate::remote::cluster::singleton::factory::SingletonFactory;
use crate::remote::cluster::singleton::manager::lease::LeaseAck;
Expand All @@ -19,7 +17,7 @@ use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent,
use crate::remote::system::{NodeId, RemoteActorSystem};
use crate::remote::RemoteActorRef;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::mem;
use std::time::Duration;
Expand Down Expand Up @@ -316,7 +314,7 @@ impl<F: SingletonFactory> Handler<Receive<SystemTopic>> for Manager<F> {
},
ctx,
)
.await;
.await;
}
_ => {}
}
Expand Down
9 changes: 9 additions & 0 deletions coerce/src/remote/cluster/singleton/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,12 @@ pub fn singleton<F: SingletonFactory>(
actor_type
))
}

impl<A: Actor, F: SingletonFactory<Actor = A>> Clone for Singleton<A, F> {
fn clone(&self) -> Self {
Self {
manager: self.manager.clone(),
proxy: self.proxy.clone(),
}
}
}
22 changes: 14 additions & 8 deletions coerce/src/remote/cluster/singleton/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,25 @@ impl<A: Actor> Handler<SingletonStarted<A>> for Proxy<A> {

match &mut self.state {
ProxyState::Buffered { request_queue } => {
debug!(
buffered_msgs = request_queue.len(),
actor_ref = format!("{}", &actor_ref),
"emitting buffered messages",
);

while let Some(mut buffered) = request_queue.pop_front() {
buffered.send(actor_ref.clone());
if request_queue.len() > 0 {
debug!(
buffered_msgs = request_queue.len(),
actor_ref = format!("{}", &actor_ref),
"emitting buffered messages",
);

while let Some(mut buffered) = request_queue.pop_front() {
buffered.send(actor_ref.clone());
}
}
}
_ => {}
}

debug!(
singleton_actor = format!("{}", actor_ref),
"singleton proxy active - singleton started"
);
self.state = ProxyState::Active { actor_ref };
}
}
Expand Down
2 changes: 2 additions & 0 deletions coerce/src/remote/cluster/singleton/proxy/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ impl<M: Message> Deliver<M> {
let message = self.message.take().unwrap();
let result_channel = self.result_channel.take().unwrap();
tokio::spawn(async move {
debug!(msg_type = M::type_name(), "delivering message to singleton");

let res = actor.send(message).await;
result_channel.send(res)
});
Expand Down
Loading

0 comments on commit 89d58b4

Please sign in to comment.