Skip to content

Commit

Permalink
add initial node groups support: sub-clusters, each group can have it…
Browse files Browse the repository at this point in the history
…s own leader
  • Loading branch information
LeonHartley committed Mar 12, 2024
1 parent 9b0c04d commit 823b6ed
Show file tree
Hide file tree
Showing 11 changed files with 443 additions and 158 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion coerce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ remote = [
"dep:tokio-stream",
"dep:parking_lot",
"dep:bytes",
"dep:byteorder"
"dep:byteorder",
"dep:itertools",
]

persistence = [
Expand Down Expand Up @@ -101,6 +102,7 @@ metrics-util = { version = "0.15.0", optional = true }
jwt = { version = "0.16.0", optional = true }
hmac = { version = "0.12.1", optional = true }
sha2 = { version = "0.10.6", optional = true }
itertools = { version = "0.12.1", optional = true }

# API dependencies
axum = { version = "0.6.18", features = ["query"], optional = true }
Expand Down
16 changes: 12 additions & 4 deletions coerce/src/actor/supervised.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,24 @@ impl Supervised {
self.children.remove(&actor_id);
}
Err(e) => match e {
ActorRefErr::InvalidRef => {},
ActorRefErr::InvalidRef => {}
e => {
warn!(actor_id = actor_id.as_ref(), error = format!("{}", e), "failed to stop child");
warn!(
actor_id = actor_id.as_ref(),
error = format!("{}", e),
"failed to stop child"
);
}
},
}
}

let n = self.children.len();
trace!(actor_id = self.actor_id.as_ref(), total_children = n, "all child actors stopped");
trace!(
actor_id = self.actor_id.as_ref(),
total_children = n,
"all child actors stopped"
);
}

pub async fn on_child_stopped(&mut self, id: &ActorId) {
Expand Down
257 changes: 257 additions & 0 deletions coerce/src/remote/cluster/group/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::system::ActorSystem;
use crate::actor::{
Actor, ActorId, ActorRef, ActorRefErr, IntoActor, IntoActorId, LocalActorRef, Receiver,
ToActorId,
};
use crate::remote::cluster::node::{NodeSelector, RemoteNodeRef};
use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, SystemTopic};
use crate::remote::system::{NodeId, RemoteActorSystem};
use crate::remote::RemoteActorRef;
use crate::singleton::factory::SingletonFactory;
use crate::singleton::manager::lease::{LeaseAck, RequestLease};
use crate::singleton::manager::{Manager, SingletonStarted, State};
use chrono::{DateTime, Utc};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem;

pub enum NodeGroupEvent<A: Actor> {
MemberUp {
leader_id: NodeId,
nodes: Vec<Node<A>>,
},
NodeAdded(Node<A>),
NodeRemoved(NodeId),
LeaderChanged(NodeId),
}

impl<A: Actor> Message for NodeGroupEvent<A> {
type Result = ();
}

impl<A: Actor> Clone for NodeGroupEvent<A> {
fn clone(&self) -> Self {
match &self {
NodeGroupEvent::MemberUp { leader_id, nodes } => Self::MemberUp {
leader_id: *leader_id,
nodes: nodes.clone(),
},
NodeGroupEvent::NodeAdded(node) => Self::NodeAdded(node.clone()),
NodeGroupEvent::NodeRemoved(node_id) => Self::NodeRemoved(*node_id),
NodeGroupEvent::LeaderChanged(leader_id) => Self::LeaderChanged(*leader_id),
}
}
}

pub struct NodeGroup<A: Actor> {
node_id: NodeId,
group_name: String,
nodes: HashMap<NodeId, Node<A>>,
selector: NodeSelector,
subscription: Option<Subscription>,
receivers: Vec<Receiver<NodeGroupEvent<A>>>,
actor_id_provider: Box<dyn ActorIdProvider>,
current_group_leader: Option<NodeId>,
min_node_count: Option<usize>,
}

pub struct Node<A: Actor> {
pub node_id: NodeId,
pub actor: ActorRef<A>,
pub node: RemoteNodeRef,
}

impl<A: Actor> Clone for Node<A> {
fn clone(&self) -> Self {
Self {
node_id: self.node_id,
actor: self.actor.clone(),
node: self.node.clone(),
}
}
}

#[async_trait]
impl<A: Actor> Actor for NodeGroup<A> {
async fn started(&mut self, ctx: &mut ActorContext) {
self.subscription = Some(
PubSub::subscribe::<Self, _>(SystemTopic, &ctx)
.await
.expect("system subscription"),
);
}
}

impl<A: Actor> NodeGroup<A> {
pub async fn new(
group_name: impl ToString,
actor_id_provider: impl ActorIdProvider,
selector: NodeSelector,
receiver: Receiver<NodeGroupEvent<A>>,
system: &RemoteActorSystem,
) -> Result<LocalActorRef<Self>, ActorRefErr> {
let group_name = group_name.to_string();
Self {
group_name: group_name.clone(),
node_id: system.node_id(),
selector,
nodes: Default::default(),
subscription: None,
receivers: vec![receiver],
actor_id_provider: Box::new(actor_id_provider),
current_group_leader: None,
min_node_count: None,
}
.into_actor(Some(group_name.into_actor_id()), system.actor_system())
.await
}
}

impl<A: Actor> NodeGroup<A> {
/// Finds the oldest node in the group, if there are multiple nodes with the same start time,
/// the node with the lowest ID is returned.
pub fn leader_id(&self) -> Option<NodeId> {
let mut nodes: Vec<&Node<_>> = self.nodes.values().collect();
nodes.sort_by(|a, b| {
match Ord::cmp(
&a.node.node_started_at.unwrap_or(DateTime::<Utc>::MIN_UTC),
&b.node.node_started_at.unwrap_or(DateTime::<Utc>::MIN_UTC),
) {
Ordering::Equal => Ord::cmp(&a.node_id, &b.node_id),
ordering => ordering,
}
});

nodes.iter().map(|n| n.node_id).next()
}

pub fn add(&mut self, node: Node<A>) {
self.nodes.insert(node.node_id, node);
}

fn broadcast(&self, event: NodeGroupEvent<A>) {
let mut events = itertools::repeat_n(event, self.receivers.len());
for receiver in &self.receivers {
let _ = receiver.notify(events.next().unwrap());
}
}
}

#[async_trait]
impl<A: Actor> Handler<Receive<SystemTopic>> for NodeGroup<A> {
async fn handle(&mut self, message: Receive<SystemTopic>, ctx: &mut ActorContext) {
let sys = ctx.system().remote();
match message.0.as_ref() {
SystemEvent::Cluster(e) => match e {
ClusterEvent::MemberUp(ClusterMemberUp { leader_id, nodes }) => {
debug!(
cluster_leader = leader_id,
nodes_len = nodes.len(),
"nodegroup received `MemberUp`"
);

for node in nodes {
if !self.selector.includes(node.as_ref()) {
continue;
}

let actor_id = self
.actor_id_provider
.get_actor_id(self.group_name.as_ref(), node.id);

let actor_ref = RemoteActorRef::new(actor_id, node.id, sys.clone()).into();

let node = Node::new(node.id, actor_ref, node.clone());

self.nodes.insert(node.node_id, node);
}

let group_leader = self.leader_id();

if let Some(group_leader) = group_leader {
self.current_group_leader = Some(group_leader);

self.broadcast(NodeGroupEvent::MemberUp {
leader_id: group_leader,
nodes: self
.nodes
.values()
.filter(|n| n.node_id != self.node_id)
.cloned()
.collect(),
})
}
}

ClusterEvent::NodeAdded(node) => {
debug!(node_id = node.id, "node added");
let mut event = None;

if self.selector.includes(node.as_ref()) {
let mut entry = self.nodes.entry(node.id);
if let Entry::Vacant(mut entry) = entry {
let actor_id = self
.actor_id_provider
.get_actor_id(self.group_name.as_ref(), node.id);

let remote_ref: ActorRef<A> =
RemoteActorRef::new(actor_id, node.id, sys.clone()).into();

let node = Node::new(node.id, remote_ref, node.clone());

if node.node_id != self.node_id {
event = Some(NodeGroupEvent::NodeAdded(node.clone()));
}

entry.insert(node);
}
}

if let Some(event) = event {
self.broadcast(event);
}
}

ClusterEvent::NodeRemoved(node) => {
self.nodes.remove(&node.id);
debug!(node_id = node.id, "node removed");

self.broadcast(NodeGroupEvent::NodeRemoved(node.id));
}

_ => {}
},
};

if let Some(leader_id) = self.leader_id() {
if Some(leader_id) != self.current_group_leader {
self.broadcast(NodeGroupEvent::LeaderChanged(leader_id));
self.current_group_leader = Some(leader_id);
}
}
}
}

impl<A: Actor> Node<A> {
pub fn new(node_id: NodeId, actor: ActorRef<A>, node: RemoteNodeRef) -> Self {
Self {
node_id,
actor,
node,
}
}
}

pub trait ActorIdProvider: 'static + Sync + Send {
fn get_actor_id(&self, node_group: &str, node_id: NodeId) -> ActorId;
}

impl<F: Fn(&str, NodeId) -> String + 'static + Sync + Send> ActorIdProvider for F {
fn get_actor_id(&self, node_group: &str, node_id: NodeId) -> ActorId {
self(node_group, node_id).into_actor_id()
}
}
1 change: 1 addition & 0 deletions coerce/src/remote/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod builder;
pub mod client;
pub mod discovery;
pub mod group;
pub mod node;
3 changes: 2 additions & 1 deletion coerce/src/remote/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ pub struct RemoteNode {
pub attributes: NodeAttributesRef,
}

#[derive(Clone)]
pub enum NodeSelector {
All,
Attribute(NodeAttribute),
Ids(HashSet<NodeId>)
Ids(HashSet<NodeId>),
}

impl NodeSelector {
Expand Down
1 change: 1 addition & 0 deletions coerce/src/remote/heartbeat/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod health;
mod topic;

use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
Expand Down
16 changes: 16 additions & 0 deletions coerce/src/remote/heartbeat/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// use crate::remote::cluster::node::RemoteNodeState;
// use crate::remote::stream::pubsub::Topic;
//
// pub struct HeartbeatTopic;
//
// impl Topic for HeartbeatTopic {
// type Message = HeartbeatState;
//
// fn topic_name() -> &'static str {
// "heartbeat"
// }
// }
//
// pub struct HeartbeatState {
// nodes: Vec<RemoteNodeState>,
// }
4 changes: 3 additions & 1 deletion coerce/src/remote/stream/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use crate::remote::net::proto::network::{
LeaderChangedEvent, MemberUpEvent, NewNodeEvent, NodeRemovedEvent, SystemEvent as SysEvent,
};
use crate::remote::net::StreamData;
use crate::remote::stream::pubsub::Topic;
use crate::remote::stream::pubsub::{PubSub, Subscription, Topic};
use std::sync::Arc;

use crate::actor::context::ActorContext;
use crate::remote::system::NodeId;
use protobuf::{Enum, Error, Message};

use crate::remote::cluster::node::RemoteNodeRef;
use crate::remote::stream::mediator::SubscribeErr;

pub struct SystemTopic;

Expand Down
Loading

0 comments on commit 823b6ed

Please sign in to comment.