diff --git a/coerce/src/actor/context.rs b/coerce/src/actor/context.rs index da0e1f23..a7b07b4d 100644 --- a/coerce/src/actor/context.rs +++ b/coerce/src/actor/context.rs @@ -7,11 +7,11 @@ use crate::actor::{ }; use futures::{Stream, StreamExt}; use std::any::Any; -use std::time::Instant; + use tokio::sync::oneshot::Sender; use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit}; -use crate::actor::supervised::Supervised; +use crate::actor::supervised::{ChildRef, Supervised}; #[cfg(feature = "persistence")] use crate::persistent::context::ActorPersistence; @@ -194,6 +194,20 @@ impl ActorContext { self.supervised.as_ref() } + pub fn add_child_ref(&mut self, actor_ref: BoxedActorRef) -> Option { + let supervised = { + if let Some(supervised) = self.supervised.as_mut() { + supervised + } else { + self.supervised = + Some(Supervised::new(self.id().clone(), self.full_path().clone())); + self.supervised.as_mut().unwrap() + } + }; + + supervised.add_child_ref(actor_ref) + } + pub async fn spawn( &mut self, id: ActorId, diff --git a/coerce/src/actor/describe/mod.rs b/coerce/src/actor/describe/mod.rs index 9a92386f..d79e9f0b 100644 --- a/coerce/src/actor/describe/mod.rs +++ b/coerce/src/actor/describe/mod.rs @@ -1,10 +1,9 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::scheduler::ActorScheduler; -use crate::actor::supervised::{ChildRef, Supervised}; +use crate::actor::supervised::ChildRef; use crate::actor::{ - Actor, ActorId, ActorPath, ActorRefErr, ActorTags, BoxedActorRef, CoreActorRef, IntoActorPath, - ToActorId, + Actor, ActorId, ActorPath, ActorRefErr, ActorTags, BoxedActorRef, CoreActorRef, }; use std::sync::Arc; use std::time::Duration; diff --git a/coerce/src/actor/lifecycle.rs b/coerce/src/actor/lifecycle.rs index 22a10d01..e528db67 100644 --- a/coerce/src/actor/lifecycle.rs +++ b/coerce/src/actor/lifecycle.rs @@ -4,8 +4,8 @@ use crate::actor::message::{Handler, Message, MessageHandler}; use crate::actor::metrics::ActorMetrics; use crate::actor::scheduler::{ActorType, DeregisterActor}; use crate::actor::system::ActorSystem; -use crate::actor::{Actor, ActorId, ActorTags, BoxedActorRef, LocalActorRef}; -use std::sync::Arc; +use crate::actor::{Actor, ActorId, BoxedActorRef, LocalActorRef}; + use tokio::sync::mpsc::UnboundedReceiver; use tracing::Instrument; use valuable::Valuable; diff --git a/coerce/src/actor/mod.rs b/coerce/src/actor/mod.rs index 1a6ea5ad..2c533a3c 100644 --- a/coerce/src/actor/mod.rs +++ b/coerce/src/actor/mod.rs @@ -17,7 +17,7 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; -use tracing::Instrument; + use uuid::Uuid; #[cfg(feature = "remote")] @@ -811,7 +811,7 @@ pub trait ToActorId { } pub trait IntoActorPath { - fn into_actor_path(&self) -> ActorPath; + fn into_actor_path(self) -> ActorPath; } impl IntoActorId for T { @@ -827,7 +827,7 @@ impl ToActorId for T { } impl IntoActorPath for T { - fn into_actor_path(&self) -> ActorPath { + fn into_actor_path(self) -> ActorPath { self.to_string().into() } } diff --git a/coerce/src/actor/scheduler/mod.rs b/coerce/src/actor/scheduler/mod.rs index 8c257f0a..776c96cc 100644 --- a/coerce/src/actor/scheduler/mod.rs +++ b/coerce/src/actor/scheduler/mod.rs @@ -1,15 +1,14 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::{ - Actor, ActorId, ActorPath, BoxedActorRef, CoreActorRef, IntoActorId, IntoActorPath, - LocalActorRef, + Actor, ActorId, ActorPath, BoxedActorRef, CoreActorRef, IntoActorId, LocalActorRef, }; use crate::actor::lifecycle::ActorLoop; -use crate::actor::system::{ActorSystem, DEFAULT_ACTOR_PATH}; +use crate::actor::system::ActorSystem; #[cfg(feature = "remote")] -use crate::remote::{actor::message::SetRemote, heartbeat::Heartbeat, system::RemoteActorSystem}; +use crate::remote::{actor::message::SetRemote, system::RemoteActorSystem}; use std::collections::HashMap; use std::marker::PhantomData; @@ -178,7 +177,7 @@ where #[cfg(feature = "remote")] #[async_trait] impl Handler for ActorScheduler { - async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) { + async fn handle(&mut self, message: SetRemote, _ctx: &mut ActorContext) { self.remote = Some(message.0); trace!("actor scheduler is now configured for remoting"); } @@ -273,27 +272,6 @@ pub fn start_actor( where A: 'static + Send + Sync, { - let _actor_id_clone = id.clone(); - // let actor_id = actor_id_clone.as_str(); - let _actor_type_name = A::type_name(); - - // let node_id = if let Some(system) = &system { - // if system.is_remote() { - // system.remote().node_id() - // } else { - // 0 - // } - // } else { - // 0 - // }; - // - // tracing::trace_span!( - // "ActorScheduler::start_actor", - // actor_id = actor_id, - // actor_type_name = actor_type_name, - // node_id = node_id, - // ); - let (tx, rx) = mpsc::unbounded_channel(); let system_id = system.as_ref().map(|s| *s.system_id()); let actor_ref = LocalActorRef::new(id, tx, system_id, path); diff --git a/coerce/src/actor/supervised.rs b/coerce/src/actor/supervised.rs index f8ff5fc6..c64427e8 100644 --- a/coerce/src/actor/supervised.rs +++ b/coerce/src/actor/supervised.rs @@ -5,8 +5,7 @@ use crate::actor::message::{Handler, Message}; use crate::actor::scheduler::{start_actor, ActorType}; use crate::actor::system::ActorSystem; use crate::actor::{ - Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, CoreActorRef, IntoActorPath, - LocalActorRef, ToActorId, + Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, CoreActorRef, LocalActorRef, ToActorId, }; #[derive(Debug)] @@ -107,6 +106,11 @@ impl Supervised { .insert(boxed_ref.actor_id().clone(), ChildRef::attached(boxed_ref)); } + pub fn add_child_ref(&mut self, boxed_ref: BoxedActorRef) -> Option { + self.children + .insert(boxed_ref.actor_id().clone(), ChildRef::spawned(boxed_ref)) + } + pub async fn stop_all(&mut self) { let n = self.children.len(); let stop_results = futures::future::join_all( diff --git a/coerce/src/actor/system.rs b/coerce/src/actor/system.rs index 5f38e8be..a32c820d 100644 --- a/coerce/src/actor/system.rs +++ b/coerce/src/actor/system.rs @@ -1,10 +1,10 @@ use crate::actor::scheduler::{start_actor, ActorScheduler, ActorType, GetActor, RegisterActor}; use crate::actor::{ - new_actor_id, Actor, ActorId, ActorPath, ActorRefErr, IntoActorId, LocalActorRef, + new_actor_id, Actor, ActorId, ActorPath, ActorRefErr, BoxedActorRef, IntoActorId, LocalActorRef, }; -use rand::RngCore; + use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::Arc; use uuid::Uuid; @@ -71,7 +71,7 @@ impl ActorSystemBuilder { let system_name: Arc = self.system_name.map_or_else( || { std::env::var("COERCE_ACTOR_SYSTEM").map_or_else( - |e| format!("{}", system_id).into(), + |_e| format!("{}", system_id).into(), |v| v.to_string().into(), ) }, @@ -154,19 +154,6 @@ impl ActorSystem { actor: A, actor_type: ActorType, ) -> Result, ActorRefErr> { - let _actor_type_name = A::type_name(); - // let span = tracing::trace_span!( - // "ActorSystem::new_actor", - // actor_type = match actor_type { - // ActorType::Anonymous => "Anonymous", - // _ => "Tracked", - // }, - // actor_type_name = actor_type_name, - // actor_id = id.as_str(), - // ); - // - // let _enter = span.enter(); - let id = id.into_actor_id(); let (tx, rx) = tokio::sync::oneshot::channel(); let actor_ref = start_actor( @@ -203,6 +190,37 @@ impl ActorSystem { } } + pub async fn new_supervised_actor( + &self, + id: I, + actor: A, + parent_ref: BoxedActorRef, + ) -> Result, ActorRefErr> { + let id = id.into_actor_id(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let actor_ref = start_actor( + actor, + id.clone(), + ActorType::Anonymous, + Some(tx), + Some(self.clone()), + Some(parent_ref), + self.core.system_name.clone(), + ); + + match rx.await { + Ok(_) => Ok(actor_ref), + Err(_e) => { + error!( + "actor not started, actor_id={}, type={}", + &id, + A::type_name() + ); + Err(ActorRefErr::ActorStartFailed) + } + } + } + pub fn is_terminated(&self) -> bool { self.core.is_terminated.load(Relaxed) } diff --git a/coerce/src/persistent/actor.rs b/coerce/src/persistent/actor.rs index c39b925c..c2f4b773 100644 --- a/coerce/src/persistent/actor.rs +++ b/coerce/src/persistent/actor.rs @@ -1,21 +1,15 @@ use crate::actor::context::{ActorContext, ActorStatus}; use crate::actor::message::Message; use crate::actor::system::ActorSystem; -use crate::actor::{Actor, BoxedActorRef}; +use crate::actor::{Actor, ActorId, BoxedActorRef}; use crate::persistent::failure::{should_retry, PersistFailurePolicy, RecoveryFailurePolicy}; use crate::persistent::journal::snapshot::Snapshot; use crate::persistent::journal::types::JournalTypes; -use crate::persistent::journal::{PersistErr, RecoveredPayload, RecoveryErr}; +use crate::persistent::journal::{PersistErr, RecoveryErr}; use crate::persistent::recovery::{ActorRecovery, RecoveryResult}; -use crate::persistent::RecoveredJournal; -use std::error::Error; -use std::fmt; -use std::fmt::{Debug, Display, Formatter}; -use std::ops::Add; use std::sync::Arc; -use std::time::Duration; #[async_trait] pub trait Recover { @@ -120,6 +114,8 @@ pub trait PersistentActor: 'static + Sized + Send + Sync { async fn on_recovery_err(&mut self, _err: RecoveryErr, _ctx: &mut ActorContext) {} async fn on_recovery_failed(&mut self, _ctx: &mut ActorContext) {} + + async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) {} } async fn check( @@ -242,7 +238,10 @@ where async fn stopped(&mut self, ctx: &mut ActorContext) { trace!("persistent actor stopped"); - self.stopped(ctx).await } + + async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) { + self.on_child_stopped(id, ctx).await + } } diff --git a/coerce/src/persistent/failure.rs b/coerce/src/persistent/failure.rs index d5542a44..5bd9a89f 100644 --- a/coerce/src/persistent/failure.rs +++ b/coerce/src/persistent/failure.rs @@ -1,5 +1,5 @@ use crate::actor::context::ActorContext; -use std::error::Error; + use std::fmt; use std::fmt::{Display, Formatter}; use std::time::Duration; diff --git a/coerce/src/persistent/journal/mod.rs b/coerce/src/persistent/journal/mod.rs index 31eb143d..1a1eaefe 100644 --- a/coerce/src/persistent/journal/mod.rs +++ b/coerce/src/persistent/journal/mod.rs @@ -4,7 +4,7 @@ pub mod storage; pub mod types; use crate::actor::context::ActorContext; -use crate::actor::message::{Handler, Message, MessageUnwrapErr, MessageWrapErr}; +use crate::actor::message::{Message, MessageUnwrapErr, MessageWrapErr}; use crate::persistent::journal::snapshot::Snapshot; use crate::persistent::journal::storage::{JournalEntry, JournalStorageRef}; use crate::persistent::journal::types::{init_journal_types, JournalTypes}; diff --git a/coerce/src/persistent/recovery.rs b/coerce/src/persistent/recovery.rs index 75a6a816..3b11e05a 100644 --- a/coerce/src/persistent/recovery.rs +++ b/coerce/src/persistent/recovery.rs @@ -1,10 +1,7 @@ use crate::actor::context::ActorContext; -use crate::persistent::failure::{should_retry, RecoveryFailurePolicy, Retry}; +use crate::persistent::failure::{should_retry, RecoveryFailurePolicy}; use crate::persistent::journal::{RecoveredPayload, RecoveryErr}; use crate::persistent::PersistentActor; -use std::error::Error; -use std::fmt; -use std::fmt::{Display, Formatter}; #[async_trait] pub trait ActorRecovery: PersistentActor { diff --git a/coerce/src/remote/actor/mod.rs b/coerce/src/remote/actor/mod.rs index 0f1417ed..d79355b3 100644 --- a/coerce/src/remote/actor/mod.rs +++ b/coerce/src/remote/actor/mod.rs @@ -1,14 +1,12 @@ use crate::actor::message::Message; -use crate::actor::system::ActorSystem; -use crate::actor::{scheduler::ActorType, Actor, ActorId, ActorRefErr, LocalActorRef}; -use crate::remote::cluster::node::{NodeAttributes, NodeAttributesRef, RemoteNodeStore}; + +use crate::actor::{Actor, ActorRefErr}; +use crate::remote::cluster::node::NodeAttributesRef; use crate::remote::handler::{ ActorHandler, ActorMessageHandler, RemoteActorMarker, RemoteActorMessageMarker, }; use crate::remote::heartbeat::HeartbeatConfig; -use crate::remote::net::client::RemoteClient; -use crate::remote::stream::pubsub::Subscription; -use crate::remote::system::{NodeId, RemoteActorSystem}; + use std::any::TypeId; use std::collections::HashMap; use uuid::Uuid; diff --git a/coerce/src/remote/api/builder.rs b/coerce/src/remote/api/builder.rs index 27da073c..268af076 100644 --- a/coerce/src/remote/api/builder.rs +++ b/coerce/src/remote/api/builder.rs @@ -1,7 +1,7 @@ use crate::actor::system::ActorSystem; use crate::actor::{IntoActor, LocalActorRef}; use crate::remote::api::{RemoteHttpApi, Routes}; -use axum::Router; + use std::net::SocketAddr; pub struct HttpApiBuilder { diff --git a/coerce/src/remote/api/cluster/mod.rs b/coerce/src/remote/api/cluster/mod.rs index 1710d891..54c5049d 100644 --- a/coerce/src/remote/api/cluster/mod.rs +++ b/coerce/src/remote/api/cluster/mod.rs @@ -5,6 +5,7 @@ use axum::response::IntoResponse; use axum::routing::get; use axum::{Json, Router}; use std::collections::HashMap; +use std::time::Duration; pub struct ClusterApi { system: RemoteActorSystem, @@ -25,24 +26,24 @@ impl Routes for ClusterApi { } } -#[derive(Serialize, Deserialize, ToSchema)] +#[derive(Serialize, Deserialize, Debug, ToSchema)] pub struct ClusterNode { pub id: u64, pub addr: String, pub tag: String, - pub ping_latency: Option, + pub ping_latency: Option, pub last_heartbeat: Option, pub node_started_at: Option, pub status: NodeStatus, pub attributes: HashMap, } -#[derive(Serialize, Deserialize, ToSchema)] +#[derive(Serialize, Deserialize, Debug, ToSchema)] pub struct ClusterNodes { - node_id: u64, - leader_node: Option, - leader_node_tag: Option, - nodes: Vec, + pub node_id: u64, + pub leader_node: Option, + pub leader_node_tag: Option, + pub nodes: Vec, } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -108,7 +109,7 @@ impl From for ClusterNode { id: node.id, addr: node.addr, tag: node.tag, - ping_latency: node.ping_latency.map(|p| format!("{:?}", p)), + ping_latency: node.ping_latency, last_heartbeat: node.last_heartbeat.map(|h| format!("{:?}", h)), node_started_at: node.node_started_at.map(|p| format!("{:?}", p)), status: node.status.into(), diff --git a/coerce/src/remote/api/metrics/mod.rs b/coerce/src/remote/api/metrics/mod.rs index fab5e76d..b018ff57 100644 --- a/coerce/src/remote/api/metrics/mod.rs +++ b/coerce/src/remote/api/metrics/mod.rs @@ -1,16 +1,12 @@ -use crate::actor::{Actor, LocalActorRef}; -use crate::remote::api::sharding::ShardingApi; use crate::remote::api::Routes; use crate::remote::system::RemoteActorSystem; -use axum::extract::Path; + use axum::response::IntoResponse; use axum::routing::get; -use axum::{Json, Router}; -use metrics_exporter_prometheus::{ - BuildError, PrometheusBuilder, PrometheusHandle, PrometheusRecorder, -}; +use axum::Router; +use metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle}; use metrics_util::MetricKindMask; -use std::net::SocketAddr; + use std::time::Duration; pub struct MetricsApi { diff --git a/coerce/src/remote/api/mod.rs b/coerce/src/remote/api/mod.rs index 7d718539..9362945f 100644 --- a/coerce/src/remote/api/mod.rs +++ b/coerce/src/remote/api/mod.rs @@ -10,15 +10,13 @@ pub mod openapi; pub mod sharding; use crate::actor::context::ActorContext; -use crate::actor::system::ActorSystem; -use crate::actor::{Actor, IntoActor, LocalActorRef}; -use crate::remote::system::RemoteActorSystem; -use axum::routing::get; -use axum::{Json, Router}; + +use crate::actor::Actor; + +use axum::Router; use std::net::SocketAddr; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; -use tokio::task::JoinHandle; pub struct RemoteHttpApi { listen_addr: SocketAddr, diff --git a/coerce/src/remote/api/sharding/cluster.rs b/coerce/src/remote/api/sharding/cluster.rs index a8b376e0..7ad0cb4f 100644 --- a/coerce/src/remote/api/sharding/cluster.rs +++ b/coerce/src/remote/api/sharding/cluster.rs @@ -1,8 +1,8 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; -use crate::actor::{ActorRef, ActorRefErr, LocalActorRef}; +use crate::actor::{ActorRef, LocalActorRef}; use crate::remote::api::sharding::ShardingApi; -use crate::remote::system::{NodeId, RemoteActorSystem}; +use crate::remote::system::RemoteActorSystem; use crate::remote::RemoteActorRef; use crate::sharding::coordinator::stats::{GetShardingStats, NodeStats}; use crate::sharding::host::stats::{GetStats, HostStats, RemoteShard}; diff --git a/coerce/src/remote/api/sharding/node.rs b/coerce/src/remote/api/sharding/node.rs index cad44216..7aeec343 100644 --- a/coerce/src/remote/api/sharding/node.rs +++ b/coerce/src/remote/api/sharding/node.rs @@ -2,13 +2,12 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::LocalActorRef; use crate::remote::api::sharding::ShardingApi; -use crate::remote::system::NodeId; -use crate::sharding::coordinator::ShardId; + use crate::sharding::host::stats::GetStats as GetHostStats; use axum::extract::Path; use axum::response::IntoResponse; use axum::Json; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; #[derive(Serialize, Deserialize)] pub struct GetStats(pub String); diff --git a/coerce/src/remote/api/sharding/routes.rs b/coerce/src/remote/api/sharding/routes.rs index f983a638..0ce677da 100644 --- a/coerce/src/remote/api/sharding/routes.rs +++ b/coerce/src/remote/api/sharding/routes.rs @@ -1,10 +1,9 @@ use crate::actor::LocalActorRef; use crate::remote::api::sharding::cluster::{get_shard_host_stats, get_sharding_stats}; -use crate::remote::api::sharding::node::{get_node_stats, GetAllStats, GetStats}; +use crate::remote::api::sharding::node::{get_node_stats, GetAllStats}; use crate::remote::api::sharding::{get_sharding_types, ShardingApi}; use crate::remote::api::Routes; -use axum::extract::Path; -use axum::response::IntoResponse; + use axum::routing::get; use axum::{Json, Router}; diff --git a/coerce/src/remote/api/system/actors.rs b/coerce/src/remote/api/system/actors.rs index 5065172f..67d37764 100644 --- a/coerce/src/remote/api/system/actors.rs +++ b/coerce/src/remote/api/system/actors.rs @@ -23,7 +23,7 @@ fn child_describe_attached_default() -> bool { #[derive(Serialize, Deserialize, ToSchema)] pub struct Actors { - actors: Vec, + pub actors: Vec, } #[derive(Serialize, Deserialize, ToSchema, Debug)] diff --git a/coerce/src/remote/api/system/mod.rs b/coerce/src/remote/api/system/mod.rs index 291b7123..b05b950a 100644 --- a/coerce/src/remote/api/system/mod.rs +++ b/coerce/src/remote/api/system/mod.rs @@ -2,7 +2,7 @@ pub mod actors; use crate::remote::api::Routes; use std::collections::HashMap; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::actor::scheduler::ActorCount; use crate::remote::system::{NodeId, RemoteActorSystem}; @@ -10,16 +10,12 @@ use axum::response::IntoResponse; use axum::routing::get; use axum::{Json, Router}; use chrono::{DateTime, Utc}; -use futures::future::join_all; -use tokio::sync::oneshot; -use crate::actor::context::ActorStatus; -use crate::actor::lifecycle::Status; -use crate::actor::{ActorPath, BoxedActorRef, CoreActorRef, IntoActorPath}; +use crate::actor::{ActorPath, CoreActorRef}; use crate::remote::api::cluster::ClusterNode; use crate::remote::api::openapi::ApiDoc; use crate::remote::heartbeat::{health, Heartbeat}; -use crate::remote::stream::pubsub::PubSub; + use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; @@ -52,24 +48,24 @@ impl Routes for SystemApi { } } -#[derive(Serialize, ToSchema, Eq, PartialEq, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, ToSchema, Eq, PartialEq, Clone, Copy)] pub enum HealthStatus { Healthy, Degraded, Unhealthy, } -#[derive(Serialize, ToSchema)] +#[derive(Serialize, Deserialize, Debug, ToSchema)] pub struct SystemHealth { - status: HealthStatus, - node_id: u64, - node_tag: String, - node_version: String, - node_started_at: DateTime, - runtime_version: &'static str, - actor_response_times: HashMap>, - current_leader: Option, - nodes: Vec, + pub status: HealthStatus, + pub node_id: u64, + pub node_tag: String, + pub node_version: String, + pub node_started_at: DateTime, + pub runtime_version: String, + pub actor_response_times: HashMap>, + pub current_leader: Option, + pub nodes: Vec, } #[utoipa::path( @@ -91,7 +87,7 @@ impl From for SystemHealth { node_tag: value.node_tag, node_version: value.node_version, node_started_at: value.node_started_at, - runtime_version: value.runtime_version, + runtime_version: value.runtime_version.to_string(), actor_response_times: value.actor_response_times, current_leader: value.current_leader, nodes: value.nodes.into_iter().map(|n| n.into()).collect(), diff --git a/coerce/src/remote/cluster/builder/worker.rs b/coerce/src/remote/cluster/builder/worker.rs index cc833efa..ff83b630 100644 --- a/coerce/src/remote/cluster/builder/worker.rs +++ b/coerce/src/remote/cluster/builder/worker.rs @@ -1,5 +1,5 @@ use crate::remote::cluster::discovery::{Discover, Seed}; -use crate::remote::cluster::node::{NodeAttributes, NodeAttributesRef, RemoteNode}; +use crate::remote::cluster::node::RemoteNode; use crate::remote::net::server::{RemoteServer, RemoteServerConfig}; use crate::remote::system::RemoteActorSystem; use std::env; @@ -69,8 +69,6 @@ impl ClusterWorkerBuilder { &cluster_node_addr ); - // TODO: Allow all of this to be overridden via environment variables - let listen_addr = self.server_listen_addr.clone(); let override_incoming_node_addr = env::var("COERCE_OVERRIDE_INCOMING_NODE_ADDR") .map_or(false, |s| s == "1" || s.to_lowercase() == "true"); diff --git a/coerce/src/remote/cluster/discovery/mod.rs b/coerce/src/remote/cluster/discovery/mod.rs index b9b418d6..6642fef1 100644 --- a/coerce/src/remote/cluster/discovery/mod.rs +++ b/coerce/src/remote/cluster/discovery/mod.rs @@ -3,7 +3,7 @@ use crate::actor::message::{Handler, Message}; use crate::actor::Actor; use crate::remote::actor::message::SetRemote; use crate::remote::cluster::node::{NodeIdentity, NodeStatus, RemoteNode}; -use crate::remote::heartbeat::Heartbeat; + use crate::remote::net::client::RemoteClientRef; use crate::remote::stream::pubsub::PubSub; use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic}; @@ -47,7 +47,7 @@ impl Message for Forget { #[async_trait] impl Handler for NodeDiscovery { - async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) { + async fn handle(&mut self, message: SetRemote, _ctx: &mut ActorContext) { self.remote_system = Some(message.0); } } diff --git a/coerce/src/remote/handler.rs b/coerce/src/remote/handler.rs index c1846315..12b67ce7 100644 --- a/coerce/src/remote/handler.rs +++ b/coerce/src/remote/handler.rs @@ -23,7 +23,7 @@ pub trait ActorHandler: 'static + Any + Sync + Send { &self, actor_id: Option, raw_recipe: &Vec, - supervisor_ctx: Option<&mut ActorContext>, + parent_ref: Option, system: Option<&ActorSystem>, ) -> Result; @@ -137,7 +137,7 @@ where &self, actor_id: Option, recipe: &Vec, - supervisor_ctx: Option<&mut ActorContext>, + parent_ref: Option, system: Option<&ActorSystem>, ) -> Result { let system = system.clone(); @@ -146,8 +146,11 @@ where let recipe = F::Recipe::read_from_bytes(recipe); if let Some(recipe) = recipe { if let Ok(state) = self.factory.create(recipe).await { - let actor_ref = if let Some(supervisor_ctx) = supervisor_ctx { - supervisor_ctx.spawn(actor_id, state).await + let actor_ref = if let Some(parent_ref) = parent_ref { + system + .expect("ActorSystem ref") + .new_supervised_actor(actor_id, state, parent_ref) + .await } else { system .expect( diff --git a/coerce/src/remote/heartbeat/health.rs b/coerce/src/remote/heartbeat/health.rs index ef55c6ea..af76b424 100644 --- a/coerce/src/remote/heartbeat/health.rs +++ b/coerce/src/remote/heartbeat/health.rs @@ -36,6 +36,7 @@ impl Message for RemoveHealthCheck { pub struct GetHealth(pub oneshot::Sender); +#[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum HealthStatus { Healthy, Degraded, diff --git a/coerce/src/remote/heartbeat/mod.rs b/coerce/src/remote/heartbeat/mod.rs index c73aba8f..9a34d843 100644 --- a/coerce/src/remote/heartbeat/mod.rs +++ b/coerce/src/remote/heartbeat/mod.rs @@ -63,6 +63,7 @@ impl Heartbeat { let _ = system.heartbeat().notify(RegisterHealthCheck(actor.into())); } + // Removes an actor that was registered to be part of the health check. pub fn remove(actor_id: &ActorId, system: &RemoteActorSystem) { let _ = system .heartbeat() diff --git a/coerce/src/remote/net/client/connect.rs b/coerce/src/remote/net/client/connect.rs index 964fb983..3d88e9b4 100644 --- a/coerce/src/remote/net/client/connect.rs +++ b/coerce/src/remote/net/client/connect.rs @@ -13,7 +13,7 @@ use crate::remote::net::client::{ RemoteClient, }; use crate::remote::net::codec::NetworkCodec; -use crate::remote::net::message::{datetime_to_timestamp, SessionEvent}; +use crate::remote::net::message::SessionEvent; use crate::remote::net::proto::network::{self as proto, IdentifyEvent}; use crate::remote::net::{receive_loop, StreamData}; @@ -50,7 +50,7 @@ impl RemoteClient { let (identity_tx, identity_rx) = oneshot::channel(); let remote = ctx.system().remote_owned(); - let identify = SessionEvent::Identify(IdentifyEvent { + let _identify = SessionEvent::Identify(IdentifyEvent { ..Default::default() }); diff --git a/coerce/src/remote/net/client/receive.rs b/coerce/src/remote/net/client/receive.rs index 363d4d14..5dade115 100644 --- a/coerce/src/remote/net/client/receive.rs +++ b/coerce/src/remote/net/client/receive.rs @@ -3,12 +3,12 @@ use std::io::Error; use crate::actor::LocalActorRef; use crate::remote::actor::{RemoteResponse, SystemCapabilities}; -use crate::remote::cluster::node::{NodeAttributes, NodeAttributesRef, NodeIdentity, RemoteNode}; +use crate::remote::cluster::node::{NodeIdentity, RemoteNode}; use crate::remote::net::client::connect::Disconnected; use crate::remote::net::client::RemoteClient; use crate::remote::net::message::{timestamp_to_datetime, ClientEvent}; use crate::remote::net::proto::network::PongEvent; -use crate::remote::net::{proto, StreamReceiver}; +use crate::remote::net::StreamReceiver; use crate::remote::system::{NodeId, RemoteActorSystem}; use chrono::{DateTime, Utc}; use protobuf::Message as ProtoMessage; diff --git a/coerce/src/remote/net/server/mod.rs b/coerce/src/remote/net/server/mod.rs index ba6e6f58..3e8bf1cf 100644 --- a/coerce/src/remote/net/server/mod.rs +++ b/coerce/src/remote/net/server/mod.rs @@ -1,4 +1,3 @@ -use crate::actor::scheduler::ActorType::{Anonymous, Tracked}; use crate::actor::{IntoActor, LocalActorRef}; use crate::remote::net::server::session::store::{NewSession, RemoteSessionStore}; use crate::remote::net::server::session::RemoteSession; diff --git a/coerce/src/remote/net/server/session/mod.rs b/coerce/src/remote/net/server/session/mod.rs index 6fe09ff8..c5fce074 100644 --- a/coerce/src/remote/net/server/session/mod.rs +++ b/coerce/src/remote/net/server/session/mod.rs @@ -394,7 +394,7 @@ async fn session_handshake( ); let nodes = ctx.get_nodes().await; - let mut response = ClientHandshake { + let response = ClientHandshake { node_id: ctx.node_id(), node_tag: ctx.node_tag().to_string(), node_started_at: Some(datetime_to_timestamp(ctx.started_at())).into(), @@ -546,7 +546,7 @@ async fn session_create_actor( actor_id.as_ref().map_or_else(|| "N/A", |s| s) ); match ctx - .handle_create_actor(actor_id, msg.actor_type, msg.recipe, None) + .handle_create_actor(actor_id, msg.actor_type, msg.recipe) .await { Ok(buf) => send_result(msg_id.parse().unwrap(), buf.to_vec(), session_id, session).await, diff --git a/coerce/src/remote/stream/mediator/mod.rs b/coerce/src/remote/stream/mediator/mod.rs index 5a8a916b..912016e4 100644 --- a/coerce/src/remote/stream/mediator/mod.rs +++ b/coerce/src/remote/stream/mediator/mod.rs @@ -117,8 +117,10 @@ impl StreamMediator { T::topic_name().to_string(), &topic_key ); + let receiver = topic.receiver(topic_key); let subscription = Subscription::new(receiver, receiver_ref); + Ok(subscription) } else { error!( @@ -127,6 +129,7 @@ impl StreamMediator { T::topic_name().to_string(), &topic_key ); + Err(SubscribeErr::Err) } } @@ -152,11 +155,7 @@ impl Handler> for StreamMediator { self.nodes.insert(new_node.id); } - info!( - "[node={}] node added (node_id={})", - self.remote().node_id(), - new_node.id - ); + info!("node added (node_id={})", new_node.id); } ClusterEvent::NodeRemoved(removed_node) => { @@ -167,11 +166,8 @@ impl Handler> for StreamMediator { // it will receive any messages it may have missed. let _ = self.nodes.remove(&removed_node.id); - info!( - "[node={}] node removed (node_id={})", - self.remote().node_id(), - removed_node.id - ); + + info!("node removed (node_id={})", removed_node.id); } _ => {} }, diff --git a/coerce/src/remote/stream/system.rs b/coerce/src/remote/stream/system.rs index 229e592a..3c779526 100644 --- a/coerce/src/remote/stream/system.rs +++ b/coerce/src/remote/stream/system.rs @@ -7,8 +7,6 @@ use std::sync::Arc; use crate::remote::cluster::node::RemoteNode; -use crate::remote::net::message::{datetime_to_timestamp, timestamp_to_datetime}; -use crate::remote::net::proto::network as proto; use crate::remote::system::NodeId; use protobuf::{Enum, Error, Message}; diff --git a/coerce/src/remote/system/actor.rs b/coerce/src/remote/system/actor.rs index 3860f96e..5bf8219f 100644 --- a/coerce/src/remote/system/actor.rs +++ b/coerce/src/remote/system/actor.rs @@ -135,8 +135,9 @@ impl RemoteActorSystem { let mut actor_addr = None; if node == self_id { let local_create = self - .handle_create_actor(Some(id.clone()), actor_type, recipe, None) + .handle_create_actor(Some(id.clone()), actor_type, recipe) .await; + if local_create.is_ok() { actor_addr = Some(ActorAddress::parse_from_bytes(&local_create.unwrap()).unwrap()); } else { @@ -183,7 +184,6 @@ impl RemoteActorSystem { actor_id: Option, actor_type: String, raw_recipe: Vec, - supervisor_ctx: Option<&mut ActorContext>, ) -> Result, RemoteActorErr> { let (tx, rx) = oneshot::channel(); @@ -204,7 +204,7 @@ impl RemoteActorSystem { .create( Some(actor_id.clone()), &raw_recipe, - supervisor_ctx, + None, Some(self.actor_system()), ) .await; diff --git a/coerce/src/remote/system/builder.rs b/coerce/src/remote/system/builder.rs index 13bf1fcc..0c491029 100644 --- a/coerce/src/remote/system/builder.rs +++ b/coerce/src/remote/system/builder.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::actor::scheduler::ActorType; use crate::remote::cluster::discovery::NodeDiscovery; -use crate::remote::cluster::node::{NodeAttributes, NodeAttributesRef}; +use crate::remote::cluster::node::NodeAttributes; use chrono::Utc; use uuid::Uuid; diff --git a/coerce/src/sharding/coordinator/allocation.rs b/coerce/src/sharding/coordinator/allocation.rs index 5233b0a7..7cbbfae2 100644 --- a/coerce/src/sharding/coordinator/allocation.rs +++ b/coerce/src/sharding/coordinator/allocation.rs @@ -9,6 +9,7 @@ use crate::sharding::proto::sharding as proto; use futures::future::join_all; use protobuf::Message as ProtoMessage; use std::collections::hash_map::{DefaultHasher, Entry, VacantEntry}; +use std::fmt::{Display, Formatter}; use crate::sharding::proto::sharding::allocate_shard_result; use std::hash::{Hash, Hasher}; @@ -146,7 +147,10 @@ pub async fn broadcast_allocation( for host in hosts.into_iter() { if host.node_id() == Some(node_id) { if let Err(_e) = host.send(ShardAllocated(shard_id, node_id)).await { - warn!("") + warn!( + "failed to notify node of shard allocation (shard_id={}, node_id={})", + shard_id, node_id + ); } } else { futures.push(async move { @@ -195,6 +199,19 @@ pub async fn broadcast_reallocation(shard_id: ShardId, hosts: Vec) -> std::fmt::Result { + match &self { + AllocateShardErr::Unknown => { + write!(f, "AllocateShardErr::Unknown - Unknown error occurred") + } + AllocateShardErr::Persistence => { + write!(f, "AllocateShardErr::Persistence - failed to persist event") + } + } + } +} + impl Message for AllocateShard { type Result = AllocateShardResult; diff --git a/coerce/src/sharding/coordinator/balancing.rs b/coerce/src/sharding/coordinator/balancing.rs index b366f84b..b3a473f8 100644 --- a/coerce/src/sharding/coordinator/balancing.rs +++ b/coerce/src/sharding/coordinator/balancing.rs @@ -46,7 +46,10 @@ impl Handler for ShardCoordinator { i += 1; } - debug!("rebalancing {} shards from node={} - total={} - fair_shard_count_per_node={}", i, node_id, shard_host.shards.len(), fair_shard_count_per_node); + debug!( + "rebalancing {} shards from node={} - total={} - fair_shard_count_per_node={}", + i, node_id, shard_host.shards.len(), fair_shard_count_per_node + ); } } diff --git a/coerce/src/sharding/coordinator/discovery.rs b/coerce/src/sharding/coordinator/discovery.rs index 7aa26814..7d6e0d2f 100644 --- a/coerce/src/sharding/coordinator/discovery.rs +++ b/coerce/src/sharding/coordinator/discovery.rs @@ -46,12 +46,7 @@ impl Handler for ShardCoordinator { node_id: new_node.id, node_tag: new_node.tag.clone(), shards: Default::default(), - actor: RemoteActorRef::::new( - format!("shard-host-{}-{}", &self.shard_entity, new_node.id).into_actor_id(), - new_node.id, - remote, - ) - .into(), + actor: ShardHost::remote_ref(&self.shard_entity, new_node.id, &remote), status: ShardHostStatus::Ready/*TODO: shard hosts may not be immediately ready*/, }); diff --git a/coerce/src/sharding/coordinator/mod.rs b/coerce/src/sharding/coordinator/mod.rs index 0176bc1a..336b1333 100644 --- a/coerce/src/sharding/coordinator/mod.rs +++ b/coerce/src/sharding/coordinator/mod.rs @@ -115,12 +115,7 @@ impl PersistentActor for ShardCoordinator { node_id: host.id, node_tag: String::default(), shards: HashSet::new(), - actor: RemoteActorRef::::new( - format!("shard-host-{}-{}", &self.shard_entity, host.id).into_actor_id(), - host.id, - remote.clone(), - ) - .into(), + actor: ShardHost::remote_ref(&self.shard_entity, host.id, remote), status: if host.status == Healthy || host.status == Joining { ShardHostStatus::Ready } else { diff --git a/coerce/src/sharding/coordinator/stream.rs b/coerce/src/sharding/coordinator/stream.rs index de676143..955da993 100644 --- a/coerce/src/sharding/coordinator/stream.rs +++ b/coerce/src/sharding/coordinator/stream.rs @@ -1 +1,33 @@ -pub struct ShardingTopic; +use crate::remote::net::StreamData; +use crate::remote::stream::pubsub::Topic; +use std::sync::Arc; + +pub struct ShardingTopic { + sharded_entity: Arc, +} + +pub enum ShardingEvent { + Rebalance, +} + +impl Topic for ShardingTopic { + type Message = ShardingEvent; + + fn topic_name() -> &'static str { + "sharding" + } + + fn key(&self) -> String { + format!("sharding-events-{}", &self.sharded_entity) + } +} + +impl StreamData for ShardingEvent { + fn read_from_bytes(data: Vec) -> Option { + todo!() + } + + fn write_to_bytes(&self) -> Option> { + todo!() + } +} diff --git a/coerce/src/sharding/host/mod.rs b/coerce/src/sharding/host/mod.rs index 19324a5d..778378d7 100644 --- a/coerce/src/sharding/host/mod.rs +++ b/coerce/src/sharding/host/mod.rs @@ -1,13 +1,12 @@ use crate::actor::context::ActorContext; use crate::actor::message::{EnvelopeType, Handler, Message, MessageUnwrapErr, MessageWrapErr}; use crate::actor::{Actor, ActorId, ActorRef, IntoActor, IntoActorId, LocalActorRef, ToActorId}; +use crate::remote::system::{NodeId, RemoteActorSystem}; +use crate::remote::RemoteActorRef; use crate::sharding::coordinator::allocation::DefaultAllocator; use crate::sharding::coordinator::{ShardCoordinator, ShardId}; use crate::sharding::host::request::{handle_request, EntityRequest}; use crate::sharding::proto::sharding as proto; - -use crate::remote::system::NodeId; -use crate::remote::RemoteActorRef; use crate::sharding::shard::Shard; use protobuf::Message as ProtoMessage; @@ -45,6 +44,25 @@ pub struct ShardHost { allocator: Box, } +impl ShardHost { + pub fn actor_id(entity_type: &str, node_id: NodeId) -> ActorId { + format!("shard-host-{}-{}", &entity_type, node_id).into_actor_id() + } + + pub fn remote_ref( + entity_type: &str, + node_id: NodeId, + remote: &RemoteActorSystem, + ) -> ActorRef { + RemoteActorRef::::new( + Self::actor_id(entity_type, node_id), + node_id, + remote.clone(), + ) + .into() + } +} + pub trait ShardAllocator: 'static + Send + Sync { fn allocate(&mut self, actor_id: &ActorId) -> ShardId; } @@ -126,19 +144,6 @@ pub struct StopShard { pub request_id: Uuid, } -pub struct StartEntity { - pub actor_id: ActorId, - pub recipe: Arc>, -} - -pub struct RemoveEntity { - pub actor_id: ActorId, -} - -pub struct PassivateEntity { - pub actor_id: ActorId, -} - pub struct GetShards; pub struct HostedShards { @@ -303,6 +308,7 @@ impl Handler for ShardHost { origin_node_id: message.origin_node_id, request_id: message.request_id, }; + match shard_entry { Entry::Occupied(mut shard_entry) => { // TODO: is this correct? if the status is starting, should we not update that rather than re-inserting `Stopping`? @@ -474,95 +480,6 @@ impl Message for StopShard { } } -impl Message for StartEntity { - type Result = (); - - fn as_bytes(&self) -> Result, MessageWrapErr> { - proto::StartEntity { - actor_id: self.actor_id.to_string(), - recipe: self.recipe.as_ref().clone(), - ..Default::default() - } - .write_to_bytes() - .map_err(|_| MessageWrapErr::SerializationErr) - } - - fn from_bytes(b: Vec) -> Result { - proto::StartEntity::parse_from_bytes(&b) - .map(|r| Self { - actor_id: r.actor_id.to_actor_id(), - recipe: Arc::new(r.recipe), - }) - .map_err(|_e| MessageUnwrapErr::DeserializationErr) - } - - fn read_remote_result(_: Vec) -> Result { - Ok(()) - } - - fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { - Ok(vec![]) - } -} - -impl Message for RemoveEntity { - type Result = (); - - fn as_bytes(&self) -> Result, MessageWrapErr> { - proto::RemoveEntity { - actor_id: self.actor_id.to_string(), - ..Default::default() - } - .write_to_bytes() - .map_err(|_| MessageWrapErr::SerializationErr) - } - - fn from_bytes(b: Vec) -> Result { - proto::RemoveEntity::parse_from_bytes(&b) - .map(|r| Self { - actor_id: r.actor_id.into_actor_id(), - }) - .map_err(|_e| MessageUnwrapErr::DeserializationErr) - } - - fn read_remote_result(_: Vec) -> Result { - Ok(()) - } - - fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { - Ok(vec![]) - } -} - -impl Message for PassivateEntity { - type Result = (); - - fn as_bytes(&self) -> Result, MessageWrapErr> { - proto::PassivateEntity { - actor_id: self.actor_id.to_string(), - ..Default::default() - } - .write_to_bytes() - .map_err(|_| MessageWrapErr::SerializationErr) - } - - fn from_bytes(b: Vec) -> Result { - proto::PassivateEntity::parse_from_bytes(&b) - .map(|r| Self { - actor_id: r.actor_id.into_actor_id(), - }) - .map_err(|_e| MessageUnwrapErr::DeserializationErr) - } - - fn read_remote_result(_: Vec) -> Result { - Ok(()) - } - - fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { - Ok(vec![]) - } -} - impl Message for ShardStopped { type Result = (); diff --git a/coerce/src/sharding/host/request.rs b/coerce/src/sharding/host/request.rs index e4378aec..05acf1e3 100644 --- a/coerce/src/sharding/host/request.rs +++ b/coerce/src/sharding/host/request.rs @@ -91,6 +91,7 @@ impl Handler for ShardHost { shard_id, buffered_requests.len()); let host_ref = self.actor_ref(ctx); + tokio::spawn(async move { let allocation = leader .send(AllocateShard { @@ -98,8 +99,21 @@ impl Handler for ShardHost { rebalancing: false, }) .await; - if let Ok(AllocateShardResult::Allocated(shard_id, node_id)) = allocation { - let _ = host_ref.notify(ShardAllocated(shard_id, node_id)); + match allocation { + Ok(result) => match result { + AllocateShardResult::Allocated(shard_id, node_id) + | AllocateShardResult::AlreadyAllocated(shard_id, node_id) => { + let _ = host_ref.notify(ShardAllocated(shard_id, node_id)); + } + AllocateShardResult::NotAllocated => { + // No hosts available? + error!("shard(#{}) not allocated, no hosts available?", shard_id); + } + AllocateShardResult::Err(e) => { + error!("shard(#{}) failed to allocate: {}", shard_id, e); + } + }, + Err(e) => {} } }); } else { diff --git a/coerce/src/sharding/mod.rs b/coerce/src/sharding/mod.rs index 66a77dd6..154756fe 100644 --- a/coerce/src/sharding/mod.rs +++ b/coerce/src/sharding/mod.rs @@ -51,12 +51,6 @@ impl Sharding { system: RemoteActorSystem, allocator: Option>, ) -> Self { - let coordinator_spawner_actor_id = - Some(format!("shard-coordinator-spawner-{}", &shard_entity,).into_actor_id()); - - let host_actor_id = - Some(format!("shard-host-{}-{}", &shard_entity, system.node_id()).into_actor_id()); - let actor_handler = match system .config() .actor_handler(A::Actor::type_name()) { @@ -65,13 +59,19 @@ impl Sharding { }; let host = ShardHost::new(shard_entity.clone(), actor_handler, allocator) - .into_actor(host_actor_id, system.actor_system()) + .into_actor( + Some(ShardHost::actor_id(&shard_entity, system.node_id())), + system.actor_system(), + ) .await .expect("create ShardHost actor"); let coordinator_spawner = CoordinatorSpawner::new(system.node_id(), shard_entity.clone(), host.clone()) - .into_actor(coordinator_spawner_actor_id, system.actor_system()) + .into_actor( + Some(format!("shard-coordinator-spawner-{}", &shard_entity).into_actor_id()), + system.actor_system(), + ) .await .expect("create ShardCoordinator spawner"); diff --git a/coerce/src/sharding/shard/message.rs b/coerce/src/sharding/shard/message.rs new file mode 100644 index 00000000..02078a7a --- /dev/null +++ b/coerce/src/sharding/shard/message.rs @@ -0,0 +1,118 @@ +use crate::actor::message::{EnvelopeType, Message, MessageUnwrapErr, MessageWrapErr}; +use crate::actor::{ActorId, ActorRefErr, BoxedActorRef, IntoActorId}; +use crate::sharding::proto::sharding as proto; +use crate::sharding::shard::RecipeRef; +use protobuf::Message as ProtoMessage; +use std::sync::Arc; + +pub struct StartEntity { + pub actor_id: ActorId, + pub recipe: RecipeRef, +} + +pub struct RemoveEntity { + pub actor_id: ActorId, +} + +pub struct PassivateEntity { + pub actor_id: ActorId, +} + +pub struct EntityStartResult { + pub actor_id: ActorId, + pub result: Result, + pub is_shard_recovery: bool, +} + +impl Message for EntityStartResult { + type Result = (); +} + +impl Message for StartEntity { + type Result = (); + + fn as_bytes(&self) -> Result, MessageWrapErr> { + proto::StartEntity { + actor_id: self.actor_id.to_string(), + recipe: self.recipe.as_ref().clone(), + ..Default::default() + } + .write_to_bytes() + .map_err(|_| MessageWrapErr::SerializationErr) + } + + fn from_bytes(b: Vec) -> Result { + proto::StartEntity::parse_from_bytes(&b) + .map(|r| Self { + actor_id: r.actor_id.into_actor_id(), + recipe: Arc::new(r.recipe), + }) + .map_err(|_e| MessageUnwrapErr::DeserializationErr) + } + + fn read_remote_result(_: Vec) -> Result { + Ok(()) + } + + fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { + Ok(vec![]) + } +} + +impl Message for RemoveEntity { + type Result = (); + + fn as_bytes(&self) -> Result, MessageWrapErr> { + proto::RemoveEntity { + actor_id: self.actor_id.to_string(), + ..Default::default() + } + .write_to_bytes() + .map_err(|_| MessageWrapErr::SerializationErr) + } + + fn from_bytes(b: Vec) -> Result { + proto::RemoveEntity::parse_from_bytes(&b) + .map(|r| Self { + actor_id: r.actor_id.into_actor_id(), + }) + .map_err(|_e| MessageUnwrapErr::DeserializationErr) + } + + fn read_remote_result(_: Vec) -> Result { + Ok(()) + } + + fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { + Ok(vec![]) + } +} + +impl Message for PassivateEntity { + type Result = (); + + fn as_bytes(&self) -> Result, MessageWrapErr> { + proto::PassivateEntity { + actor_id: self.actor_id.to_string(), + ..Default::default() + } + .write_to_bytes() + .map_err(|_| MessageWrapErr::SerializationErr) + } + + fn from_bytes(b: Vec) -> Result { + proto::PassivateEntity::parse_from_bytes(&b) + .map(|r| Self { + actor_id: r.actor_id.into_actor_id(), + }) + .map_err(|_e| MessageUnwrapErr::DeserializationErr) + } + + fn read_remote_result(_: Vec) -> Result { + Ok(()) + } + + fn write_remote_result(_res: Self::Result) -> Result, MessageWrapErr> { + Ok(vec![]) + } +} diff --git a/coerce/src/sharding/shard/mod.rs b/coerce/src/sharding/shard/mod.rs index 7e0132d8..e5ef84fc 100644 --- a/coerce/src/sharding/shard/mod.rs +++ b/coerce/src/sharding/shard/mod.rs @@ -1,28 +1,37 @@ use crate::actor::context::ActorContext; -use crate::actor::message::{Envelope, Handler, MessageUnwrapErr, MessageWrapErr}; -use crate::actor::{ActorId, ActorRefErr, BoxedActorRef, CoreActorRef, IntoActorId}; +use crate::actor::message::{Envelope, Handler, Message, MessageUnwrapErr, MessageWrapErr}; +use crate::actor::{Actor, ActorId, ActorRefErr, BoxedActorRef, CoreActorRef, IntoActorId}; use crate::persistent::journal::snapshot::Snapshot; use crate::persistent::journal::types::JournalTypes; use crate::persistent::journal::PersistErr; use crate::persistent::{PersistentActor, Recover, RecoverSnapshot}; -use crate::remote::actor::BoxedActorHandler; +use crate::remote::actor::{BoxedActorHandler, BoxedMessageHandler}; +use crate::remote::system::NodeId; use crate::sharding::coordinator::ShardId; use crate::sharding::host::request::{EntityRequest, RemoteEntityRequest}; -use crate::sharding::host::{PassivateEntity, RemoveEntity, StartEntity}; use crate::sharding::proto::sharding as proto; - -use crate::remote::system::NodeId; +use crate::sharding::shard::message::{ + EntityStartResult, PassivateEntity, RemoveEntity, StartEntity, +}; +use crate::sharding::shard::recovery::ShardStateSnapshot; use chrono::{DateTime, Utc}; use futures::SinkExt; use protobuf::Message as ProtoMessage; +use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::mem; use std::sync::Arc; - use tokio::sync::oneshot; +use tokio::sync::oneshot::Sender; +pub mod message; pub(crate) mod passivation; +pub(crate) mod recovery; pub(crate) mod stats; +pub type RecipeRef = Arc>; + pub struct Shard { shard_id: ShardId, handler: BoxedActorHandler, @@ -51,17 +60,51 @@ impl Drop for Shard { } } -#[derive(Clone, Debug)] +struct BufferedReq { + handler: BoxedMessageHandler, + message: Vec, + result_channel: Option, ActorRefErr>>>, +} + enum EntityState { Idle, - Starting, + Starting { request_buffer: Vec }, Active(BoxedActorRef), Passivated, } +impl Clone for EntityState { + fn clone(&self) -> Self { + Self::Idle + } +} + +impl Debug for EntityState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match &self { + EntityState::Idle => write!(f, "EntityState::Idle"), + EntityState::Starting { request_buffer } => write!( + f, + "EntityState::Starting(request_buffer: {} messages)", + request_buffer.len() + ), + EntityState::Active(actor_ref) => { + write!(f, "EntityState::Active(actor: {:?})", actor_ref) + } + EntityState::Passivated => write!(f, "EntityState::Passivated"), + } + } +} + impl EntityState { + pub fn starting(request: Option) -> EntityState { + EntityState::Starting { + request_buffer: request.map(|r| vec![r]).unwrap_or_else(|| vec![]), + } + } + pub fn is_starting(&self) -> bool { - matches!(&self, EntityState::Starting) + matches!(&self, EntityState::Starting { .. }) } pub fn is_active(&self) -> bool { @@ -82,17 +125,11 @@ impl EntityState { #[derive(Clone)] struct Entity { actor_id: ActorId, - recipe: Arc>, - status: EntityState, + recipe: RecipeRef, + state: EntityState, last_request: DateTime, } -struct ShardStateSnapshot { - shard_id: ShardId, - node_id: NodeId, - entities: Vec, -} - struct GetEntities {} #[async_trait] @@ -126,102 +163,210 @@ impl PersistentActor for Shard { // TODO: Start entity passivation worker } } + + async fn on_child_stopped(&mut self, id: &ActorId, _ctx: &mut ActorContext) { + info!("child stopped, id={}", id); + } } impl Shard { - async fn start_entity( + fn get_or_create_entity( &mut self, - actor_id: ActorId, - recipe: Arc>, - ctx: &mut ActorContext, - is_recovering: bool, - ) -> Result { - let start_entity = StartEntity { - actor_id: actor_id.clone(), - recipe: recipe.clone(), - }; + actor_id: &ActorId, + recipe: Option, + ) -> Option<&mut Entity> { + let entry = self.entities.entry(actor_id.clone()); + match entry { + Entry::Occupied(entry) => Some(entry.into_mut()), + Entry::Vacant(vacant_entry) => { + if let Some(recipe) = recipe { + let entity = Entity { + actor_id: actor_id.clone(), + recipe: recipe.clone(), + state: EntityState::Idle, + last_request: Utc::now(), + }; - if !is_recovering && self.persistent_entities { - let _persist_res = self.persist(&start_entity, ctx).await; + Some(vacant_entry.insert(entity)) + } else { + None + } + } } + } - let entity = self - .handler - .create( - Some(actor_id.clone()), - &start_entity.recipe, - Some(ctx), - None, - ) - .await; + async fn recover_entities(&mut self, ctx: &mut ActorContext) { + let entities = self.get_active_entities(); + for entity in entities { + let _ = self + .start_entity(entity.actor_id.clone(), entity.recipe, ctx, true) + .await; + } + } + + fn get_active_entities(&self) -> Vec { + self.entities + .values() + .filter(|e| e.state.is_starting()) + .cloned() + .collect() + } +} - match &entity { +#[async_trait] +impl Handler for Shard { + async fn handle(&mut self, message: EntityStartResult, ctx: &mut ActorContext) { + let EntityStartResult { + actor_id, + result, + is_shard_recovery, + } = message; + + match result { Ok(actor_ref) => { - self.entities.insert( - actor_id.clone(), - Entity { - actor_id: actor_id.clone(), - recipe, - status: EntityState::Active(actor_ref.clone()), - last_request: Utc::now(), - }, - ); + if let Some(previous) = ctx.add_child_ref(actor_ref.clone()) { + warn!("spawned entity (id={}) was not removed correctly, ChildRef still exists, {:?}", &actor_id, &previous); + } + + if let Some(entity) = self.entities.get_mut(&actor_id) { + let previous = mem::replace(&mut entity.state, EntityState::Active(actor_ref)); + match previous { + EntityState::Starting { request_buffer } => { + debug!( + "entity({}) request buffer: {} message(s)", + &entity.actor_id, + request_buffer.len() + ); + + for msg in request_buffer { + entity.request(msg.handler, msg.message, msg.result_channel); + } + } + state => { + warn!("spawned entity (id={}) state(previous={:?}) was unexpected (expected: `Starting`)", &actor_id, &state); + // TODO: anything to cleanup? + } + } + } debug!( - "spawned entity as a child of Shard#{}, actor_id={}, total_hosted_entities={}", + "spawned entity as a child of Shard#{}, actor_id={}, total_hosted_entities={}, is_shard_recovery={}", &self.shard_id, &actor_id, - &ctx.supervised_count() + &ctx.supervised_count(), + is_shard_recovery, ); } Err(e) => { + // TODO: we should respond to all buffered requests with a failure + error!( - "failed to spawn entity as a child of Shard#{}, actor_id={}, error={}", - &self.shard_id, &actor_id, e + "failed to spawn entity as a child of Shard#{}, actor_id={}, error={}, is_shard_recovery={}", + &self.shard_id, &actor_id, e, is_shard_recovery ) } } - - entity } +} - async fn recover_entities(&mut self, ctx: &mut ActorContext) { - let entities = self.get_active_entities(); - for entity in entities { - if let Err(e) = self - .start_entity(entity.actor_id.clone(), entity.recipe, ctx, true) - .await - { - warn!( - "unable to start recovered entity (actor_id={}, shard_id={}, error={})", - &entity.actor_id, &self.shard_id, e - ) - } +impl Shard { + async fn start_entity( + &mut self, + actor_id: ActorId, + recipe: RecipeRef, + ctx: &mut ActorContext, + is_shard_recovery: bool, + ) { + let start_entity = StartEntity { + actor_id: actor_id.clone(), + recipe: recipe.clone(), + }; + + if !is_shard_recovery && self.persistent_entities { + let _persist_res = self.persist(&start_entity, ctx).await; } - } - fn get_active_entities(&self) -> Vec { - self.entities - .values() - .filter(|e| e.status.is_starting()) - .cloned() - .collect() + let handler = self.handler.new_boxed(); + let system = ctx.system().clone(); + let self_ref = self.actor_ref(ctx); + + tokio::spawn(async move { + let result = handler + .create( + Some(actor_id.clone()), + &start_entity.recipe, + Some(self_ref.clone().into()), + Some(&system), + ) + .await; + + let _ = self_ref.notify(EntityStartResult { + actor_id, + result, + is_shard_recovery, + }); + }); } } #[async_trait] impl Handler for Shard { async fn handle(&mut self, message: StartEntity, ctx: &mut ActorContext) { - /* TODO: `start_entity` should be done asynchronously, any messages sent while the actor is - starting should be buffered and emitted once the Shard receives confirmation that the actor was created - */ + self.entities.insert( + message.actor_id.clone(), + Entity { + actor_id: message.actor_id.clone(), + recipe: message.recipe.clone(), + state: EntityState::starting(None)/*TODO: if the previous actor was passivated, we should reflect that in the status so the actor is not restarted during recovery*/, + last_request: Utc::now(), + }, + ); - let _res = self - .start_entity(message.actor_id, message.recipe, ctx, false) + self.start_entity(message.actor_id, message.recipe, ctx, false) .await; } } +impl Entity { + pub fn request( + &mut self, + handler: BoxedMessageHandler, + message: Vec, + result_channel: Option, ActorRefErr>>>, + ) { + match &mut self.state { + EntityState::Passivated | EntityState::Idle => { + error!("request attempt for an actor that has not been marked as `Starting`"); + result_channel.map(|c| c.send(Err(ActorRefErr::ActorUnavailable))); + } + + EntityState::Starting { request_buffer } => { + request_buffer.push(BufferedReq { + handler, + message, + result_channel, + }); + + debug!( + "entity(id={}) starting, request buffered (total_buffered={})", + &self.actor_id, + request_buffer.len() + ); + } + + EntityState::Active(actor_ref) => { + let message = message; + let actor_ref = actor_ref.clone(); + tokio::spawn(async move { + handler + .handle_direct(&actor_ref, &message, result_channel) + .await; + }); + } + } + } +} + #[async_trait] impl Handler for Shard { async fn handle(&mut self, message: EntityRequest, ctx: &mut ActorContext) { @@ -230,16 +375,18 @@ impl Handler for Shard { let actor_id = message.actor_id; let result_channel = message.result_channel; + let message_bytes = message.message; debug!( - "entity request, node={}, actor_id={}, shard_id={}", + "entity request, node={}, actor_id={}, shard_id={}, msg_len={}, type={}", system.node_id(), &actor_id, - self.shard_id + self.shard_id, + message_bytes.len(), + &message.message_type, ); if handler.is_none() { - // TODO: send unsupported msg err let message_type = message.message_type; let actor_type = self.handler.actor_type_name().to_string(); @@ -250,64 +397,48 @@ impl Handler for Shard { result_channel.map(|m| { let actor_id = actor_id; - m.send(Err(ActorRefErr::NotSupported { + let _ = m.send(Err(ActorRefErr::NotSupported { actor_id, message_type, actor_type, - })) + })); }); return; } let handler = handler.unwrap(); - let actor = self.get_or_create(actor_id, message.recipe, ctx).await; - match actor { - Ok(actor_ref) => { - let message = message.message; - tokio::spawn(async move { - handler - .handle_direct(&actor_ref, &message, result_channel) - .await; - }); - } - Err(e) => { - result_channel.map(|c| c.send(Err(e))); + let entity = self.entities.entry(actor_id.clone()); + let recipe = message.recipe; + let message = message_bytes; + match entity { + Entry::Occupied(entity) => { + let entity = entity.into_mut(); + entity.request(handler, message, result_channel); } - } - } -} -impl Shard { - async fn get_or_create( - &mut self, - actor_id: ActorId, - recipe: Option>>, - ctx: &mut ActorContext, - ) -> Result { - let entity = self.entities.get_mut(&actor_id); - - let mut recipe = recipe; - if let Some(entity) = entity { - if let EntityState::Active(boxed_actor_ref) = &entity.status { - if boxed_actor_ref.is_valid() { - return Ok(boxed_actor_ref.clone()); + Entry::Vacant(entry) => { + if let Some(recipe) = recipe { + let _ = entry.insert(Entity { + actor_id: actor_id.clone(), + recipe: recipe.clone(), + state: EntityState::starting(Some(BufferedReq { + handler, + message, + result_channel, + })), + last_request: Utc::now(), + }); + + self.start_entity(actor_id.clone(), recipe, ctx, false) + .await; + } else { + result_channel.map(|m| { + let actor_id = actor_id; + let _ = m.send(Err(ActorRefErr::NotFound(actor_id))); + }); } } - - // we've seen this actor before, let's create it based on the original recipe. - recipe = Some(entity.recipe.clone()); - } - - match recipe { - /* TODO: `start_entity` should be done asynchronously, any messages sent while the actor is - starting should be buffered and emitted once the Shard receives confirmation that the actor was created - */ - Some(recipe) => match self.start_entity(actor_id, recipe, ctx, false).await { - Ok(actor) => Ok(actor), - Err(e) => Err(e), - }, - None => Err(ActorRefErr::NotFound(actor_id)), } } } @@ -360,125 +491,3 @@ impl Handler for Shard { }); } } - -impl Shard { - async fn save_snapshot(&self, ctx: &mut ActorContext) -> Result<(), PersistErr> { - let node_id = ctx.system().remote().node_id(); - let shard_id = self.shard_id; - let snapshot = ShardStateSnapshot { - node_id, - shard_id, - entities: self.entities.values().cloned().collect(), - }; - - info!("saving snapshot"); - self.snapshot(snapshot, ctx).await - } -} - -#[async_trait] -impl Recover for Shard { - async fn recover(&mut self, message: StartEntity, _ctx: &mut ActorContext) { - self.entities.insert( - message.actor_id.clone(), - Entity { - actor_id: message.actor_id, - recipe: message.recipe, - status: EntityState::Starting/*TODO: if the previous actor was passivated, we should reflect that in the status so the actor is not restarted during recovery*/, - last_request: Utc::now(), - }, - ); - } -} - -#[async_trait] -impl Recover for Shard { - async fn recover(&mut self, message: PassivateEntity, _ctx: &mut ActorContext) { - if let Some(entity) = self.entities.get_mut(&message.actor_id) { - entity.status = EntityState::Passivated; - } - } -} -#[async_trait] -impl Recover for Shard { - async fn recover(&mut self, message: RemoveEntity, _ctx: &mut ActorContext) { - self.entities.remove(&message.actor_id); - } -} - -#[async_trait] -impl RecoverSnapshot for Shard { - async fn recover(&mut self, snapshot: ShardStateSnapshot, _ctx: &mut ActorContext) { - trace!( - "shard#{} recovered {} entities", - &snapshot.shard_id, - snapshot.entities.len() - ); - - self.entities = snapshot - .entities - .into_iter() - .map(|e| (e.actor_id.clone(), e)) - .collect(); - } -} - -impl Snapshot for ShardStateSnapshot { - fn into_remote_envelope(self) -> Result, MessageWrapErr> { - let proto = proto::ShardStateSnapshot { - shard_id: self.shard_id, - node_id: self.node_id, - entities: self - .entities - .into_iter() - .map(|e| proto::shard_state_snapshot::Entity { - actor_id: e.actor_id.to_string(), - recipe: e.recipe.to_vec(), - state: match e.status { - EntityState::Active(_) | EntityState::Idle | EntityState::Starting => { - proto::EntityState::IDLE - } - EntityState::Passivated => proto::EntityState::PASSIVATED, - } - .into(), - ..Default::default() - }) - .collect(), - ..Default::default() - }; - - proto.write_to_bytes().map_or_else( - |_e| Err(MessageWrapErr::SerializationErr), - |s| Ok(Envelope::Remote(s)), - ) - } - - fn from_remote_envelope(b: Vec) -> Result { - let proto = proto::ShardStateSnapshot::parse_from_bytes(&b); - - proto.map_or_else( - |_e| Err(MessageUnwrapErr::DeserializationErr), - |s| { - Ok(Self { - entities: s - .entities - .into_iter() - .map(|e| Entity { - actor_id: e.actor_id.into_actor_id(), - recipe: Arc::new(e.recipe), - status: match e.state.unwrap() { - proto::EntityState::IDLE | proto::EntityState::ACTIVE => { - EntityState::Idle - } - proto::EntityState::PASSIVATED => EntityState::Passivated, - }, - last_request: Utc::now(), - }) - .collect(), - node_id: s.node_id, - shard_id: s.shard_id, - }) - }, - ) - } -} diff --git a/coerce/src/sharding/shard/passivation/mod.rs b/coerce/src/sharding/shard/passivation/mod.rs index b707887b..d129c586 100644 --- a/coerce/src/sharding/shard/passivation/mod.rs +++ b/coerce/src/sharding/shard/passivation/mod.rs @@ -1,7 +1,6 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::scheduler::timer::{Timer, TimerTick}; - use crate::actor::{Actor, LocalActorRef}; use crate::sharding::shard::Shard; @@ -57,5 +56,8 @@ impl Actor for PassivationWorker { } #[async_trait] impl Handler for PassivationWorker { - async fn handle(&mut self, _message: PassivationTimerTick, _ctx: &mut ActorContext) {} + async fn handle(&mut self, _message: PassivationTimerTick, _ctx: &mut ActorContext) { + // Still not sure if we should let entities themselves handle their own passivation + // or if we have some sort of shard-level passivation timer here.. + } } diff --git a/coerce/src/sharding/shard/recovery.rs b/coerce/src/sharding/shard/recovery.rs new file mode 100644 index 00000000..335a28ee --- /dev/null +++ b/coerce/src/sharding/shard/recovery.rs @@ -0,0 +1,155 @@ +use crate::actor::context::ActorContext; +use crate::actor::message::{Envelope, MessageUnwrapErr, MessageWrapErr}; +use crate::actor::IntoActorId; +use crate::persistent::journal::snapshot::Snapshot; +use crate::persistent::journal::PersistErr; +use crate::persistent::{PersistentActor, Recover, RecoverSnapshot}; +use crate::remote::system::NodeId; +use crate::sharding::coordinator::ShardId; +use crate::sharding::proto::sharding as proto; +use crate::sharding::shard::message::{PassivateEntity, RemoveEntity, StartEntity}; +use crate::sharding::shard::{Entity, EntityState, Shard}; +use chrono::Utc; +use protobuf::Message; +use std::fmt::{Display, Formatter}; +use std::sync::Arc; + +pub struct ShardStateSnapshot { + shard_id: ShardId, + node_id: NodeId, + entities: Vec, +} + +impl Display for ShardStateSnapshot { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ShardStateSnapshot(shard_id={}, node_id={}, entity_count={})", + self.shard_id, + self.node_id, + self.entities.len() + ) + } +} + +impl Shard { + async fn save_snapshot(&self, ctx: &mut ActorContext) -> Result<(), PersistErr> { + let node_id = ctx.system().remote().node_id(); + let shard_id = self.shard_id; + let snapshot = ShardStateSnapshot { + node_id, + shard_id, + entities: self.entities.values().cloned().collect(), + }; + + info!("saving snapshot - {}", &snapshot); + self.snapshot(snapshot, ctx).await + } +} + +#[async_trait] +impl Recover for Shard { + async fn recover(&mut self, message: StartEntity, _ctx: &mut ActorContext) { + self.entities.insert( + message.actor_id.clone(), + Entity { + actor_id: message.actor_id, + recipe: message.recipe, + state: EntityState::starting(None)/*TODO: if the previous actor was passivated, we should reflect that in the status so the actor is not restarted during recovery*/, + last_request: Utc::now(), + }, + ); + } +} + +#[async_trait] +impl Recover for Shard { + async fn recover(&mut self, message: PassivateEntity, _ctx: &mut ActorContext) { + if let Some(entity) = self.entities.get_mut(&message.actor_id) { + entity.state = EntityState::Passivated; + } + } +} +#[async_trait] +impl Recover for Shard { + async fn recover(&mut self, message: RemoveEntity, _ctx: &mut ActorContext) { + self.entities.remove(&message.actor_id); + } +} + +#[async_trait] +impl RecoverSnapshot for Shard { + async fn recover(&mut self, snapshot: ShardStateSnapshot, _ctx: &mut ActorContext) { + trace!( + "shard#{} recovered {} entities", + &snapshot.shard_id, + snapshot.entities.len() + ); + + self.entities = snapshot + .entities + .into_iter() + .map(|e| (e.actor_id.clone(), e)) + .collect(); + } +} + +impl Snapshot for ShardStateSnapshot { + fn into_remote_envelope(self) -> Result, MessageWrapErr> { + let proto = proto::ShardStateSnapshot { + shard_id: self.shard_id, + node_id: self.node_id, + entities: self + .entities + .into_iter() + .map(|e| proto::shard_state_snapshot::Entity { + actor_id: e.actor_id.to_string(), + recipe: e.recipe.to_vec(), + state: match e.state { + EntityState::Active(_) + | EntityState::Idle + | EntityState::Starting { .. } => proto::EntityState::IDLE, + EntityState::Passivated => proto::EntityState::PASSIVATED, + } + .into(), + ..Default::default() + }) + .collect(), + ..Default::default() + }; + + proto.write_to_bytes().map_or_else( + |_e| Err(MessageWrapErr::SerializationErr), + |s| Ok(Envelope::Remote(s)), + ) + } + + fn from_remote_envelope(b: Vec) -> Result { + let proto = proto::ShardStateSnapshot::parse_from_bytes(&b); + + proto.map_or_else( + |_e| Err(MessageUnwrapErr::DeserializationErr), + |s| { + Ok(Self { + entities: s + .entities + .into_iter() + .map(|e| Entity { + actor_id: e.actor_id.into_actor_id(), + recipe: Arc::new(e.recipe), + state: match e.state.unwrap() { + proto::EntityState::IDLE | proto::EntityState::ACTIVE => { + EntityState::Idle + } + proto::EntityState::PASSIVATED => EntityState::Passivated, + }, + last_request: /*TODO: use persisted date*/Utc::now(), + }) + .collect(), + node_id: s.node_id, + shard_id: s.shard_id, + }) + }, + ) + } +} diff --git a/coerce/tests/test_remote_cluster_heartbeat.rs b/coerce/tests/test_remote_cluster_heartbeat.rs index 5047ec9f..1af908ae 100644 --- a/coerce/tests/test_remote_cluster_heartbeat.rs +++ b/coerce/tests/test_remote_cluster_heartbeat.rs @@ -62,7 +62,7 @@ pub async fn test_remote_cluster_heartbeat() { // ensure both nodes have run a heartbeat atleast once. // TODO: We need the ability to hook into an on-cluster-joined event/future - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(750)).await; let nodes_a = remote.get_nodes().await; let nodes_b = remote_2.get_nodes().await; @@ -72,7 +72,7 @@ pub async fn test_remote_cluster_heartbeat() { let node_b_2 = nodes_b.iter().find(|n| n.id == 2).cloned().unwrap(); let leader_1_id = remote.current_leader(); - let leader_2_id = remote.current_leader(); + let leader_2_id = remote_2.current_leader(); assert_eq!(leader_1_id, Some(remote.node_id())); assert_eq!(leader_2_id, Some(remote.node_id())); diff --git a/coerce/tests/test_remote_sharding.rs b/coerce/tests/test_remote_sharding.rs index bb21eea5..3e829e6b 100644 --- a/coerce/tests/test_remote_sharding.rs +++ b/coerce/tests/test_remote_sharding.rs @@ -129,8 +129,8 @@ pub async fn test_shard_coordinator_shard_allocation() { .await; let initial_allocation = allocation.expect("shard allocation"); - - shard_coordinator.stop().await; + let res = shard_coordinator.stop().await; + assert!(res.is_ok()); let host_stats = shard_host .send(GetStats) diff --git a/coerce/tests/test_remote_system_health.rs b/coerce/tests/test_remote_system_health.rs index 8b137891..af6e1d94 100644 --- a/coerce/tests/test_remote_system_health.rs +++ b/coerce/tests/test_remote_system_health.rs @@ -1 +1,70 @@ +use async_trait::async_trait; +use coerce::actor::context::ActorContext; +use coerce::actor::message::{Handler, Message}; +use coerce::actor::system::ActorSystem; +use coerce::actor::{Actor, IntoActor}; +use coerce::remote::heartbeat::health::HealthStatus; +use coerce::remote::heartbeat::Heartbeat; +use coerce::remote::system::RemoteActorSystem; +use protobuf::VERSION; +use std::time::Duration; +#[tokio::test] +pub async fn test_heartbeat_actor_monitoring() { + const VERSION: &str = "1.0.0"; + let actor_system = ActorSystem::builder().system_name("heartbeat-test").build(); + + let remote = RemoteActorSystem::builder() + .with_tag("remote-1") + .with_version(VERSION) + .with_id(1) + .with_actor_system(actor_system) + .configure(|c| c) + .build() + .await; + + let health_check = Heartbeat::get_system_health(&remote).await; + assert_eq!(health_check.status, HealthStatus::Healthy); + assert_eq!(&health_check.node_version, VERSION); + + let actor = SlowActor + .into_actor(Some("slow-actor"), remote.actor_system()) + .await + .unwrap(); + let _ = actor.notify(Delay(Duration::from_secs(2))); + + let health_check = Heartbeat::get_system_health(&remote).await; + assert_eq!(health_check.status, HealthStatus::Degraded); + + let _ = actor.notify_stop(); + let health_check = Heartbeat::get_system_health(&remote).await; + assert_eq!(health_check.status, HealthStatus::Unhealthy); + + // De-register the actor from the health check + Heartbeat::remove(actor.actor_id(), &remote); + + let health_check = Heartbeat::get_system_health(&remote).await; + assert_eq!(health_check.status, HealthStatus::Healthy); +} + +pub struct SlowActor; + +#[async_trait] +impl Actor for SlowActor { + async fn started(&mut self, ctx: &mut ActorContext) { + Heartbeat::register(self.actor_ref(ctx), ctx.system().remote()); + } +} + +struct Delay(Duration); + +impl Message for Delay { + type Result = (); +} + +#[async_trait] +impl Handler for SlowActor { + async fn handle(&mut self, message: Delay, _ctx: &mut ActorContext) { + tokio::time::sleep(message.0).await; + } +}