From 89d58b4eb90ef17ec349dfe01508372cea4a85de Mon Sep 17 00:00:00 2001 From: Leon Hartley Date: Thu, 8 Feb 2024 20:59:49 +0000 Subject: [PATCH] move shard coordinator to use the new cluster singleton system & remove `CoordinatorSpawner` actor --- Cargo.lock | 48 ++---- coerce/Cargo.toml | 3 +- coerce/src/actor/blocking.rs | 6 +- coerce/src/actor/context.rs | 28 +++- coerce/src/actor/lifecycle.rs | 20 ++- .../remote/cluster/singleton/manager/mod.rs | 8 +- coerce/src/remote/cluster/singleton/mod.rs | 9 ++ .../src/remote/cluster/singleton/proxy/mod.rs | 22 ++- .../remote/cluster/singleton/proxy/send.rs | 2 + coerce/src/sharding/coordinator/discovery.rs | 47 +++--- coerce/src/sharding/coordinator/factory.rs | 26 ++++ coerce/src/sharding/coordinator/mod.rs | 10 +- coerce/src/sharding/coordinator/spawner.rs | 144 ------------------ coerce/src/sharding/host/mod.rs | 69 ++++----- coerce/src/sharding/host/request.rs | 11 +- coerce/src/sharding/host/stats.rs | 2 - coerce/src/sharding/mod.rs | 40 ++--- .../tests/test_remote_sharding_rebalancing.rs | 2 +- .../coerce-sharded-chat-example/Cargo.toml | 4 +- providers/persistence/coerce-redis/Cargo.toml | 2 +- 20 files changed, 182 insertions(+), 321 deletions(-) create mode 100644 coerce/src/sharding/coordinator/factory.rs delete mode 100644 coerce/src/sharding/coordinator/spawner.rs diff --git a/Cargo.lock b/Cargo.lock index 24a4b312..e39703f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,52 +403,41 @@ checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" [[package]] name = "coerce" version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3311405ed7a9b541faddd4e2de71cfa908e5558658d504f7954d2aee684b96a6" dependencies = [ - "anyhow", "async-trait", - "axum", - "bencher", "byteorder", "bytes", "chrono", - "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures", "hashring", - "hmac", - "jwt", "lazy_static", - "metrics 0.21.1", - "metrics-exporter-prometheus 0.12.1", - "metrics-util 0.15.1", "parking_lot", "protobuf", "rand 0.8.5", "serde", "serde_json", - "sha2", "tokio", "tokio-stream", "tokio-util", "tracing", - "tracing-subscriber 0.3.17", - "utoipa", - "utoipa-swagger-ui", "uuid", "valuable", ] [[package]] name = "coerce" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3311405ed7a9b541faddd4e2de71cfa908e5558658d504f7954d2aee684b96a6" +version = "0.8.12" dependencies = [ "anyhow", "async-trait", "axum", + "bencher", "byteorder", "bytes", "chrono", + "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures", "hashring", "hmac", @@ -467,6 +456,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "tracing-subscriber 0.3.17", "utoipa", "utoipa-swagger-ui", "uuid", @@ -478,7 +468,7 @@ name = "coerce-cluster-example" version = "0.1.0" dependencies = [ "async-trait", - "coerce 0.8.11", + "coerce 0.8.12", "coerce-macros 0.2.0", "opentelemetry", "opentelemetry-jaeger", @@ -496,7 +486,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "794b1185e927d99c1516ea3f5918a14e95e633c2c3659d426135c00c3c0e7c15" dependencies = [ - "coerce 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "coerce 0.8.11", "k8s-openapi", "kube", "tracing", @@ -506,7 +496,7 @@ dependencies = [ name = "coerce-k8s" version = "0.1.8" dependencies = [ - "coerce 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "coerce 0.8.11", "k8s-openapi", "kube", "tracing", @@ -540,20 +530,6 @@ dependencies = [ "protobuf-codegen", ] -[[package]] -name = "coerce-redis" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e9c6e01a101da79c184a316334cd15f6618520706325c70d5bd1553801a8c48" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "coerce 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", - "redis", - "tokio", -] - [[package]] name = "coerce-redis" version = "0.4.4" @@ -561,7 +537,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", - "coerce 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "coerce 0.8.12", "redis", "tokio", ] @@ -573,10 +549,10 @@ dependencies = [ "async-trait", "chrono", "clap", - "coerce 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "coerce 0.8.12", "coerce-k8s 0.1.7", "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "coerce-redis 0.4.3", + "coerce-redis", "env_logger", "futures", "futures-io", diff --git a/coerce/Cargo.toml b/coerce/Cargo.toml index 6eea2cdd..ee57a598 100644 --- a/coerce/Cargo.toml +++ b/coerce/Cargo.toml @@ -2,7 +2,7 @@ name = "coerce" description = "Async actor runtime and distributed systems framework" license = "Apache-2.0" -version = "0.8.11" +version = "0.8.12" authors = ["Leon Hartley "] edition = "2021" readme = "README.md" @@ -20,6 +20,7 @@ full = [ "actor-tracing", "actor-tracing-info", "client-auth-jwt", + "singleton", ] remote = [ diff --git a/coerce/src/actor/blocking.rs b/coerce/src/actor/blocking.rs index a222023b..476a75c3 100644 --- a/coerce/src/actor/blocking.rs +++ b/coerce/src/actor/blocking.rs @@ -38,9 +38,9 @@ impl ActorSystem { Ok(_) => Ok(actor_ref), Err(_e) => { error!( - "actor not started, actor_id={}, type={}", - &id, - A::type_name() + actor_id = id.as_ref(), + actor_type = A::type_name(), + "actor not started", ); Err(ActorRefErr::ActorStartFailed) } diff --git a/coerce/src/actor/context.rs b/coerce/src/actor/context.rs index f8b4aade..cf2b3f55 100644 --- a/coerce/src/actor/context.rs +++ b/coerce/src/actor/context.rs @@ -385,27 +385,41 @@ fn on_context_dropped( ActorStatus::Started => { if let Some(true) = system_terminated { debug!( - "actor (id={}, type={}) has stopped due to system shutdown", - actor_id, actor_type + actor_id = actor_id.as_ref(), + actor_type = actor_type, + "actor stopped due to system shutdown", ); } else { - debug!("actor (id={}) has stopped unexpectedly", actor.0.actor_id()); + debug!( + actor_id = actor.0.actor_id().as_ref(), + actor_type = actor_type, + "actor stopped unexpectedly" + ); } } ActorStatus::Stopping => { if let Some(true) = system_terminated { - trace!("actor (id={}) has stopped due to system shutdown", actor_id,); + trace!( + actor_id = actor_id.as_ref(), + actor_type = actor_type, + "actor stopped due to system shutdown" + ); } else { debug!( - "actor (id={}) was stopping but did not complete the stop procedure", - actor_id, + actor_id = actor_id.as_ref(), + actor_type = actor_type, + "actor was stopping but did not complete the stop procedure", ); } } ActorStatus::Stopped => { - debug!("actor (id={}) stopped, context dropped", actor_id); + debug!( + actor_id = actor_id.as_ref(), + actor_type = actor_type, + "actor stopped, context dropped" + ); } } diff --git a/coerce/src/actor/lifecycle.rs b/coerce/src/actor/lifecycle.rs index e8acd132..6c1874cb 100644 --- a/coerce/src/actor/lifecycle.rs +++ b/coerce/src/actor/lifecycle.rs @@ -61,7 +61,7 @@ impl ActorLoop { .new_context(system.clone(), Starting, actor_ref.clone().into()) .with_parent(parent_ref); - trace!("[{}] starting", ctx.full_path()); + trace!(actor = ctx.full_path().as_ref(), "actor starting"); actor.started(&mut ctx).await; ActorMetrics::incr_actor_created(A::type_name()); @@ -72,7 +72,7 @@ impl ActorLoop { ctx.set_status(Started); - trace!("[{}] ready", ctx.full_path()); + trace!(actor = ctx.full_path().as_ref(), "actor ready"); if let Some(on_start) = on_start.take() { let _ = on_start.send(()); @@ -102,7 +102,11 @@ impl ActorLoop { message_type = msg.name(), ); - trace!("[{}] received {}", ctx.full_path(), msg.name(),); + trace!( + actor = ctx.full_path().as_ref(), + msg_type = msg.name(), + "actor message received" + ); let handle_fut = msg.handle(&mut actor, &mut ctx); @@ -111,7 +115,11 @@ impl ActorLoop { handle_fut.await; - trace!("[{}] processed {}", ctx.full_path(), msg.name()); + trace!( + actor = ctx.full_path().as_ref(), + msg_type = msg.name(), + "actor message processed" + ); } if ctx.get_status() == &Stopping { @@ -119,7 +127,7 @@ impl ActorLoop { } } - trace!("[{}] stopping", ctx.full_path()); + trace!(actor = ctx.full_path().as_ref(), "actor stopping"); ctx.set_status(Stopping); @@ -141,7 +149,7 @@ async fn actor_stopped( if actor_type.is_tracked() { if let Some(system) = system.take() { if !system.is_terminated() { - trace!("de-registering actor {}", &actor_id); + trace!(actor_id = actor_id.as_ref(), "de-registering actor"); system .scheduler() diff --git a/coerce/src/remote/cluster/singleton/manager/mod.rs b/coerce/src/remote/cluster/singleton/manager/mod.rs index 6aecf35e..d18814a9 100644 --- a/coerce/src/remote/cluster/singleton/manager/mod.rs +++ b/coerce/src/remote/cluster/singleton/manager/mod.rs @@ -6,9 +6,7 @@ use crate::actor::context::ActorContext; use crate::actor::message::{ FromBytes, Handler, Message, MessageUnwrapErr, MessageWrapErr, ToBytes, }; -use crate::actor::{ - Actor, ActorFactory, ActorId, ActorRef, ActorRefErr, IntoActor, LocalActorRef, ToActorId, -}; +use crate::actor::{Actor, ActorFactory, ActorId, ActorRef, IntoActor, LocalActorRef, ToActorId}; use crate::remote::cluster::node::NodeSelector; use crate::remote::cluster::singleton::factory::SingletonFactory; use crate::remote::cluster::singleton::manager::lease::LeaseAck; @@ -19,7 +17,7 @@ use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, use crate::remote::system::{NodeId, RemoteActorSystem}; use crate::remote::RemoteActorRef; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::mem; use std::time::Duration; @@ -316,7 +314,7 @@ impl Handler> for Manager { }, ctx, ) - .await; + .await; } _ => {} } diff --git a/coerce/src/remote/cluster/singleton/mod.rs b/coerce/src/remote/cluster/singleton/mod.rs index 4562f250..de0a7988 100644 --- a/coerce/src/remote/cluster/singleton/mod.rs +++ b/coerce/src/remote/cluster/singleton/mod.rs @@ -124,3 +124,12 @@ pub fn singleton( actor_type )) } + +impl> Clone for Singleton { + fn clone(&self) -> Self { + Self { + manager: self.manager.clone(), + proxy: self.proxy.clone(), + } + } +} diff --git a/coerce/src/remote/cluster/singleton/proxy/mod.rs b/coerce/src/remote/cluster/singleton/proxy/mod.rs index e822fe30..dc3e312c 100644 --- a/coerce/src/remote/cluster/singleton/proxy/mod.rs +++ b/coerce/src/remote/cluster/singleton/proxy/mod.rs @@ -66,19 +66,25 @@ impl Handler> for Proxy { match &mut self.state { ProxyState::Buffered { request_queue } => { - debug!( - buffered_msgs = request_queue.len(), - actor_ref = format!("{}", &actor_ref), - "emitting buffered messages", - ); - - while let Some(mut buffered) = request_queue.pop_front() { - buffered.send(actor_ref.clone()); + if request_queue.len() > 0 { + debug!( + buffered_msgs = request_queue.len(), + actor_ref = format!("{}", &actor_ref), + "emitting buffered messages", + ); + + while let Some(mut buffered) = request_queue.pop_front() { + buffered.send(actor_ref.clone()); + } } } _ => {} } + debug!( + singleton_actor = format!("{}", actor_ref), + "singleton proxy active - singleton started" + ); self.state = ProxyState::Active { actor_ref }; } } diff --git a/coerce/src/remote/cluster/singleton/proxy/send.rs b/coerce/src/remote/cluster/singleton/proxy/send.rs index b25fa312..a08a0570 100644 --- a/coerce/src/remote/cluster/singleton/proxy/send.rs +++ b/coerce/src/remote/cluster/singleton/proxy/send.rs @@ -27,6 +27,8 @@ impl Deliver { let message = self.message.take().unwrap(); let result_channel = self.result_channel.take().unwrap(); tokio::spawn(async move { + debug!(msg_type = M::type_name(), "delivering message to singleton"); + let res = actor.send(message).await; result_channel.send(res) }); diff --git a/coerce/src/sharding/coordinator/discovery.rs b/coerce/src/sharding/coordinator/discovery.rs index 04dfc8b2..f83bd56e 100644 --- a/coerce/src/sharding/coordinator/discovery.rs +++ b/coerce/src/sharding/coordinator/discovery.rs @@ -5,25 +5,33 @@ use crate::sharding::coordinator::balancing::Rebalance; use crate::sharding::coordinator::{ShardCoordinator, ShardHostState, ShardHostStatus}; use crate::sharding::host::ShardHost; +use crate::remote::stream::pubsub::Receive; +use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic}; +use crate::remote::system::NodeId; use std::collections::hash_map::Entry; use std::sync::Arc; -pub struct NodeDiscovered(pub Arc); - -pub struct NodeForgotten(pub Arc); - -impl Message for NodeDiscovered { - type Result = (); -} +#[async_trait] +impl Handler> for ShardCoordinator { + async fn handle(&mut self, message: Receive, ctx: &mut ActorContext) { + match message.0.as_ref() { + SystemEvent::Cluster(event) => match event { + ClusterEvent::NodeAdded(node) => { + self.on_node_discovered(node.as_ref(), ctx); + } -impl Message for NodeForgotten { - type Result = (); + ClusterEvent::NodeRemoved(node) => { + self.on_node_removed(node.id, ctx).await; + } + _ => {} + }, + _ => {} + } + } } -#[async_trait] -impl Handler for ShardCoordinator { - async fn handle(&mut self, message: NodeDiscovered, ctx: &mut ActorContext) { - let new_node = message.0; +impl ShardCoordinator { + pub fn on_node_discovered(&mut self, new_node: &RemoteNode, ctx: &ActorContext) { match self.hosts.entry(new_node.id) { Entry::Occupied(mut node) => { let node = node.get_mut(); @@ -40,6 +48,7 @@ impl Handler for ShardCoordinator { "new shard host (node_id={}, tag={}, addr={})", new_node.id, &new_node.tag, &new_node.addr ); + vacant_entry.insert(ShardHostState { node_id: new_node.id, node_tag: new_node.tag.clone(), @@ -52,16 +61,10 @@ impl Handler for ShardCoordinator { } } } -} -#[async_trait] -impl Handler for ShardCoordinator { - async fn handle(&mut self, message: NodeForgotten, ctx: &mut ActorContext) { - match self.hosts.entry(message.0.id) { - Entry::Occupied(_) => { - self.handle(Rebalance::NodeUnavailable(message.0.id), ctx) - .await - } + pub async fn on_node_removed(&mut self, node_id: NodeId, ctx: &mut ActorContext) { + match self.hosts.entry(node_id) { + Entry::Occupied(_) => self.handle(Rebalance::NodeUnavailable(node_id), ctx).await, Entry::Vacant(_) => {} } } diff --git a/coerce/src/sharding/coordinator/factory.rs b/coerce/src/sharding/coordinator/factory.rs new file mode 100644 index 00000000..1612bfde --- /dev/null +++ b/coerce/src/sharding/coordinator/factory.rs @@ -0,0 +1,26 @@ +use crate::actor::LocalActorRef; +use crate::remote::cluster::singleton::factory::SingletonFactory; +use crate::sharding::coordinator::ShardCoordinator; +use crate::sharding::host::ShardHost; + +pub struct CoordinatorFactory { + shard_entity: String, + local_shard_host: LocalActorRef, +} + +impl CoordinatorFactory { + pub fn new(shard_entity: String, local_shard_host: LocalActorRef) -> Self { + CoordinatorFactory { + shard_entity, + local_shard_host, + } + } +} + +impl SingletonFactory for CoordinatorFactory { + type Actor = ShardCoordinator; + + fn create(&self) -> Self::Actor { + ShardCoordinator::new(self.shard_entity.clone(), self.local_shard_host.clone()) + } +} diff --git a/coerce/src/sharding/coordinator/mod.rs b/coerce/src/sharding/coordinator/mod.rs index 39ce8fdf..54bfff05 100644 --- a/coerce/src/sharding/coordinator/mod.rs +++ b/coerce/src/sharding/coordinator/mod.rs @@ -11,9 +11,8 @@ use crate::remote::system::NodeId; use crate::actor::message::Handler; use crate::remote::cluster::node::NodeStatus::{Healthy, Joining}; use crate::remote::heartbeat::Heartbeat; -use crate::remote::stream::pubsub::{PubSub, Receive, Subscription}; +use crate::remote::stream::pubsub::{PubSub, Subscription}; use crate::remote::stream::system::SystemTopic; - use crate::sharding::coordinator::balancing::Rebalance; use std::collections::{HashMap, HashSet}; use std::time::Duration; @@ -21,7 +20,7 @@ use std::time::Duration; pub mod allocation; pub mod balancing; pub mod discovery; -pub mod spawner; +pub mod factory; pub mod stats; pub mod stream; @@ -63,11 +62,6 @@ pub struct ShardCoordinator { type ScheduledRebalance = ScheduledNotify; -#[async_trait] -impl Handler> for ShardCoordinator { - async fn handle(&mut self, _message: Receive, _ctx: &mut ActorContext) {} -} - #[async_trait] impl PersistentActor for ShardCoordinator { fn persistence_key(&self, _ctx: &ActorContext) -> String { diff --git a/coerce/src/sharding/coordinator/spawner.rs b/coerce/src/sharding/coordinator/spawner.rs deleted file mode 100644 index ba0c99f8..00000000 --- a/coerce/src/sharding/coordinator/spawner.rs +++ /dev/null @@ -1,144 +0,0 @@ -use crate::actor::context::ActorContext; -use crate::actor::message::Handler; -use crate::actor::{Actor, IntoActor, LocalActorRef}; -use crate::remote::stream::pubsub::{PubSub, Receive, Subscription}; -use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic}; -use crate::remote::system::NodeId; -use crate::sharding::coordinator::discovery::{NodeDiscovered, NodeForgotten}; -use crate::sharding::coordinator::ShardCoordinator; -use crate::sharding::host::{LeaderAllocated, ShardHost}; - -pub struct CoordinatorSpawner { - node_id: NodeId, - shard_entity: String, - current_leader: Option, - local_shard_host: LocalActorRef, - system_event_subscription: Option, - coordinator: Option>, -} - -impl CoordinatorSpawner { - pub fn new( - node_id: NodeId, - shard_entity: String, - local_shard_host: LocalActorRef, - ) -> CoordinatorSpawner { - Self { - node_id, - shard_entity, - local_shard_host, - current_leader: None, - system_event_subscription: None, - coordinator: None, - } - } - - pub async fn start_coordinator(&mut self, ctx: &mut ActorContext) { - let coordinator = - ShardCoordinator::new(self.shard_entity.clone(), self.local_shard_host.clone()) - .into_actor( - Some(format!("shard-coordinator-{}", &self.shard_entity)), - &ctx.system(), - ) - .await; - match coordinator { - Ok(coordinator) => { - self.coordinator = Some(coordinator); - - debug!( - node_id = self.node_id, - "started shard coordinator for entity type={}", &self.shard_entity - ); - } - Err(e) => { - error!( - "[node={}] failed to spawn shard coordinator, e={}", - self.node_id, e - ); - } - } - } - - pub async fn stop_coordinator(&mut self) -> bool { - if let Some(coordinator) = self.coordinator.take() { - let result = coordinator.stop().await; - match result { - Ok(_) => true, - Err(_) => true, - } - } else { - false - } - } -} - -#[async_trait] -impl Actor for CoordinatorSpawner { - async fn started(&mut self, ctx: &mut ActorContext) { - let remote = ctx.system().remote(); - self.current_leader = remote.current_leader(); - if let Some(current_leader) = self.current_leader { - if current_leader == remote.node_id() { - self.start_coordinator(ctx).await; - } - } else { - debug!("[node={}] no leader allocated", self.node_id); - } - - self.system_event_subscription = Some( - PubSub::subscribe::(SystemTopic, ctx) - .await - .unwrap(), - ); - } -} - -#[async_trait] -impl Handler> for CoordinatorSpawner { - async fn handle(&mut self, message: Receive, ctx: &mut ActorContext) { - debug!( - "[node={}] received system event - {:?}", - self.node_id, &message.0 - ); - - match message.0.as_ref() { - SystemEvent::Cluster(event) => match event { - ClusterEvent::NodeAdded(node) => { - if let Some(coordinator) = &self.coordinator { - let _ = coordinator.notify(NodeDiscovered(node.clone())); - } - } - - ClusterEvent::NodeRemoved(node) => { - if let Some(coordinator) = &self.coordinator { - let _ = coordinator.notify(NodeForgotten(node.clone())); - } - } - - ClusterEvent::LeaderChanged(leader_node_id) => { - let leader_node_id = *leader_node_id; - debug!( - "[node={}] Leader changed, leader_node_id={}", - self.node_id, leader_node_id, - ); - - if leader_node_id == self.node_id && self.coordinator.is_none() { - self.start_coordinator(ctx).await; - } else if self.stop_coordinator().await { - trace!("[node={}] stopped coordinator", self.node_id); - } - - if let Err(e) = self.local_shard_host.notify(LeaderAllocated) { - error!( - "[node={}] failed to notify `LeaderAllocated` to local shard host (entity={}, err={})", - self.node_id, &self.shard_entity, e - ); - } - } - ClusterEvent::MemberUp(member_up) => { - // TODO: coordinator should not start until memberup has been signalled. - } - }, - } - } -} diff --git a/coerce/src/sharding/host/mod.rs b/coerce/src/sharding/host/mod.rs index d5ee398f..e107af03 100644 --- a/coerce/src/sharding/host/mod.rs +++ b/coerce/src/sharding/host/mod.rs @@ -16,6 +16,9 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use crate::actor::scheduler::ActorType; +use crate::remote::cluster::singleton::factory::SingletonFactory; +use crate::remote::cluster::singleton::Singleton; +use crate::sharding::coordinator::factory::CoordinatorFactory; use uuid::Uuid; pub mod request; @@ -41,9 +44,9 @@ pub struct ShardHost { actor_handler: BoxedActorHandler, hosted_shards: HashMap, remote_shards: HashMap>, - requests_pending_leader_allocation: VecDeque, requests_pending_shard_allocation: HashMap>, allocator: Box, + coordinator: Option>, } impl ShardHost { @@ -65,16 +68,16 @@ impl ShardHost { } } -pub trait ShardAllocator: 'static + Send + Sync { - fn allocate(&mut self, actor_id: &ActorId) -> ShardId; -} - -pub struct LeaderAllocated; +pub struct Init(pub Singleton); -impl Message for LeaderAllocated { +impl Message for Init { type Result = (); } +pub trait ShardAllocator: 'static + Send + Sync { + fn allocate(&mut self, actor_id: &ActorId) -> ShardId; +} + #[async_trait] impl Actor for ShardHost { async fn started(&mut self, ctx: &mut ActorContext) { @@ -94,35 +97,33 @@ impl ShardHost { hosted_shards: Default::default(), remote_shards: Default::default(), requests_pending_shard_allocation: Default::default(), - requests_pending_leader_allocation: Default::default(), allocator: allocator.map_or_else( || Box::new(DefaultAllocator::default()) as Box, |s| s, ), + coordinator: None, } } - pub async fn get_coordinator(&self, ctx: &ActorContext) -> ActorRef { - let actor_id = format!("shard-coordinator-{}", &self.shard_entity).into_actor_id(); - let remote = ctx.system().remote(); - let leader = remote.current_leader(); - if leader == Some(remote.node_id()) { - ctx.system() - .get_tracked_actor::(actor_id) - .await - .expect("get local coordinator") - .into() - } else { - RemoteActorRef::::new(actor_id, leader.unwrap(), remote.clone()) - .into() - } + pub fn get_coordinator(&self) -> Singleton { + self.coordinator + .as_ref() + .expect("coordinator singleton reference") + .clone() + } +} + +#[async_trait] +impl Handler for ShardHost { + async fn handle(&mut self, message: Init, ctx: &mut ActorContext) { + self.coordinator = Some(message.0); } } pub struct GetCoordinator; impl Message for GetCoordinator { - type Result = ActorRef; + type Result = Singleton; } pub struct ShardAllocated(pub ShardId, pub NodeId); @@ -167,25 +168,9 @@ impl Handler for ShardHost { async fn handle( &mut self, _message: GetCoordinator, - ctx: &mut ActorContext, - ) -> ActorRef { - self.get_coordinator(&ctx).await - } -} - -#[async_trait] -impl Handler for ShardHost { - async fn handle(&mut self, _message: LeaderAllocated, ctx: &mut ActorContext) { - if self.requests_pending_leader_allocation.len() > 0 { - debug!( - "processing {} buffered requests_pending_leader_allocation", - self.requests_pending_leader_allocation.len(), - ); - - while let Some(request) = self.requests_pending_leader_allocation.pop_front() { - self.handle(request, ctx).await; - } - } + _: &mut ActorContext, + ) -> Singleton { + self.coordinator.as_ref().unwrap().clone() } } diff --git a/coerce/src/sharding/host/request.rs b/coerce/src/sharding/host/request.rs index 4c8ca333..4466d32d 100644 --- a/coerce/src/sharding/host/request.rs +++ b/coerce/src/sharding/host/request.rs @@ -80,8 +80,8 @@ impl Handler for ShardHost { message, ctx.system().remote_owned(), )); - } else if ctx.system().remote().current_leader().is_some() { - let leader = self.get_coordinator(&ctx).await; + } else { + let leader = self.get_coordinator(); let buffered_requests = self.requests_pending_shard_allocation.entry(shard_id); let buffered_requests = buffered_requests.or_insert_with(|| vec![]); @@ -116,13 +116,6 @@ impl Handler for ShardHost { Err(_e) => {} } }); - } else { - self.requests_pending_leader_allocation.push_back(message); - - debug!( - "no leader allocated, buffering message (requests_pending_leader_allocation={})", - self.requests_pending_leader_allocation.len() - ); } } } diff --git a/coerce/src/sharding/host/stats.rs b/coerce/src/sharding/host/stats.rs index 2ec01313..56d7ab6b 100644 --- a/coerce/src/sharding/host/stats.rs +++ b/coerce/src/sharding/host/stats.rs @@ -20,7 +20,6 @@ pub struct RemoteShard { #[derive(Serialize, Deserialize)] pub struct HostStats { - pub requests_pending_leader_allocation_count: usize, pub requests_pending_shard_allocation_count: usize, pub hosted_shard_count: usize, pub remote_shard_count: usize, @@ -47,7 +46,6 @@ impl Handler for ShardHost { .collect(); let stats = HostStats { - requests_pending_leader_allocation_count: self.requests_pending_leader_allocation.len(), requests_pending_shard_allocation_count: self.requests_pending_shard_allocation.len(), hosted_shard_count: self.hosted_shards.len(), remote_shard_count: self.remote_shards.len(), diff --git a/coerce/src/sharding/mod.rs b/coerce/src/sharding/mod.rs index 9012e5bd..f9f63553 100644 --- a/coerce/src/sharding/mod.rs +++ b/coerce/src/sharding/mod.rs @@ -2,21 +2,21 @@ use crate::actor::message::{Handler, Message}; use crate::actor::{ - Actor, ActorFactory, ActorId, ActorRecipe, ActorRef, ActorRefErr, IntoActor, IntoActorId, - LocalActorRef, + Actor, ActorFactory, ActorId, ActorRecipe, ActorRefErr, IntoActor, IntoActorId, LocalActorRef, }; use std::error::Error; use std::fmt::{Display, Formatter}; -use crate::remote::system::builder::{RemoteActorSystemBuilder, RemoteSystemConfigBuilder}; +use crate::remote::cluster::singleton::{singleton, Singleton, SingletonBuilder}; +use crate::remote::system::builder::RemoteSystemConfigBuilder; use crate::remote::system::RemoteActorSystem; use crate::sharding::coordinator::allocation::AllocateShard; -use crate::sharding::coordinator::spawner::CoordinatorSpawner; +use crate::sharding::coordinator::factory::CoordinatorFactory; use crate::sharding::coordinator::stats::GetShardingStats; use crate::sharding::coordinator::ShardCoordinator; use crate::sharding::host::request::{EntityRequest, RemoteEntityRequest}; use crate::sharding::host::{ - ShardAllocated, ShardAllocator, ShardHost, ShardReallocating, StopShard, + Init, ShardAllocated, ShardAllocator, ShardHost, ShardReallocating, StopShard, }; use crate::sharding::shard::stats::GetShardStats; use crate::sharding::shard::Shard; @@ -38,7 +38,7 @@ pub struct Sharding { struct ShardingCore { host: LocalActorRef, - coordinator_spawner: LocalActorRef, + coordinator: Singleton, shard_entity: String, system: RemoteActorSystem, } @@ -88,20 +88,18 @@ impl Sharding { .await .map_err(|e| e.into_host_err(&shard_entity))?; - let coordinator_spawner = - CoordinatorSpawner::new(system.node_id(), shard_entity.clone(), host.clone()) - .into_actor( - Some(format!("shard-coordinator-spawner-{}", &shard_entity).into_actor_id()), - system.actor_system(), - ) - .await - .map_err(|e| e.into_coordinator_err(&shard_entity))?; + let coordinator = SingletonBuilder::new(system.clone()) + .factory(CoordinatorFactory::new(shard_entity.clone(), host.clone())) + .build() + .await; + + let _ = host.send(Init(coordinator.clone())).await; Ok(Self { core: Arc::new(ShardingCore { host, system, - coordinator_spawner, + coordinator, shard_entity, }), _a: PhantomData, @@ -152,10 +150,6 @@ impl Sharding { pub fn shard_entity(&self) -> &String { &self.core.shard_entity } - - pub fn coordinator_spawner(&self) -> &LocalActorRef { - &self.core.coordinator_spawner - } } impl ShardingCore { @@ -200,10 +194,8 @@ impl Sharded { if let Ok(result) = result { let result = result.map(|res| M::read_remote_result(res).map_err(ActorRefErr::Deserialisation)); - match result { - Ok(res) => res, - Err(e) => Err(e), - } + + result.unwrap_or_else(|e| Err(e)) } else { Err(ActorRefErr::ResultChannelClosed) } @@ -211,7 +203,7 @@ impl Sharded { } pub fn sharding(builder: &mut RemoteSystemConfigBuilder) -> &mut RemoteSystemConfigBuilder { - builder + singleton::(builder) .with_handler::("ShardCoordinator.AllocateShard") .with_handler::("ShardCoordinator.GetShardingStats") .with_handler::("ShardHost.ShardAllocated") diff --git a/coerce/tests/test_remote_sharding_rebalancing.rs b/coerce/tests/test_remote_sharding_rebalancing.rs index e574dabd..fb6160d9 100644 --- a/coerce/tests/test_remote_sharding_rebalancing.rs +++ b/coerce/tests/test_remote_sharding_rebalancing.rs @@ -136,7 +136,7 @@ async fn create_system( #[tokio::test] pub async fn test_shard_rebalancing_upon_node_termination() { - util::create_logger(Some(Level::WARN)); + util::create_logger(Some(Level::DEBUG)); let persistence = Persistence::from(InMemoryStorageProvider::new()); let (remote_a, server_a) = create_system(persistence.clone(), "127.0.0.1:31101", 1, None).await; diff --git a/examples/coerce-sharded-chat-example/Cargo.toml b/examples/coerce-sharded-chat-example/Cargo.toml index 745b79e6..02eb00f6 100644 --- a/examples/coerce-sharded-chat-example/Cargo.toml +++ b/examples/coerce-sharded-chat-example/Cargo.toml @@ -6,9 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -coerce = { version = "0.8.11", features = ["full"] } +coerce = { path = "../../coerce", features = ["full"] } coerce-macros = { version = "0.2.0" } -coerce-redis = { version = "0.4.3" } +coerce-redis = { path = "../../providers/persistence/coerce-redis" } coerce-k8s = { version = "0.1.7" } tokio = { version = "1.28.1", features = ["full"] } diff --git a/providers/persistence/coerce-redis/Cargo.toml b/providers/persistence/coerce-redis/Cargo.toml index 222f854c..49d17feb 100644 --- a/providers/persistence/coerce-redis/Cargo.toml +++ b/providers/persistence/coerce-redis/Cargo.toml @@ -16,7 +16,7 @@ cluster = [ ] [dependencies] -coerce = { version = "0.8.11", features = ["persistence"] } +coerce = { path = "../../../coerce", features = ["persistence"] } async-trait = { version = "0.1.64" } redis = { version = "0.23.0", features = ["tokio-comp"] } tokio = { version = "1.28.1", features = ["full"] }