diff --git a/Cargo.lock b/Cargo.lock index d4e993aa..237089ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,9 +162,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "cast" @@ -277,21 +277,16 @@ dependencies = [ [[package]] name = "coerce" version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6aea3ee6f9764c32319074b7fbb454d8b5193a2b1818875089bb019576cfe7" dependencies = [ - "anyhow", "async-trait", - "axum", "byteorder", "bytes", "chrono", - "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "criterion", "futures", "hashring", "lazy_static", - "metrics", - "metrics-exporter-prometheus", - "metrics-util", "parking_lot", "protobuf", "rand 0.8.5", @@ -301,26 +296,28 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", - "tracing-subscriber 0.3.16", - "utoipa", - "utoipa-swagger-ui", "uuid", "valuable", ] [[package]] name = "coerce" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc6aea3ee6f9764c32319074b7fbb454d8b5193a2b1818875089bb019576cfe7" +version = "0.8.3" dependencies = [ + "anyhow", "async-trait", + "axum", "byteorder", "bytes", "chrono", + "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "criterion", "futures", "hashring", "lazy_static", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "parking_lot", "protobuf", "rand 0.8.5", @@ -330,6 +327,9 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "tracing-subscriber 0.3.16", + "utoipa", + "utoipa-swagger-ui", "uuid", "valuable", ] @@ -339,7 +339,7 @@ name = "coerce-cluster-example" version = "0.1.0" dependencies = [ "async-trait", - "coerce 0.8.2", + "coerce 0.8.3", "coerce-macros 0.2.0", "opentelemetry", "opentelemetry-jaeger", @@ -355,7 +355,7 @@ dependencies = [ name = "coerce-k8s" version = "0.1.2" dependencies = [ - "coerce 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", + "coerce 0.8.2", "k8s-openapi", "kube", "tracing", @@ -396,7 +396,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", - "coerce 0.8.2", + "coerce 0.8.3", "redis", "tokio", ] @@ -408,7 +408,7 @@ dependencies = [ "async-trait", "chrono", "clap 4.0.26", - "coerce 0.8.2", + "coerce 0.8.3", "coerce-k8s", "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "coerce-redis", @@ -2320,9 +2320,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -2335,7 +2335,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] @@ -2910,6 +2910,27 @@ dependencies = [ "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.32.0" @@ -2922,6 +2943,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.32.0" @@ -2934,6 +2961,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.32.0" @@ -2946,6 +2979,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.32.0" @@ -2958,6 +2997,18 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.32.0" @@ -2970,6 +3021,12 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/coerce/Cargo.toml b/coerce/Cargo.toml index a9c71da6..f3b2bad3 100644 --- a/coerce/Cargo.toml +++ b/coerce/Cargo.toml @@ -2,7 +2,7 @@ name = "coerce" description = "Async actor runtime and distributed systems framework" license = "Apache-2.0" -version = "0.8.2" +version = "0.8.3" authors = ["Leon Hartley "] edition = "2021" readme = "README.md" @@ -64,7 +64,7 @@ api = ["remote", "dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.22.0", features = ["full"] } +tokio = { version = "1.23.0", features = ["full"] } tokio-util = { version = "0.7.4", features = ["full"] } tokio-stream = { version = "0.1", optional = true } tracing = { version = "0.1", features = ["valuable"] } @@ -75,9 +75,9 @@ serde_json = "1.0" futures = "0.3" async-trait = { version = "0.1" } hashring = { version = "0.3.0", optional = true } -bytes = { version = "1.2.1", optional = true } +bytes = { version = "1.3.0", optional = true } byteorder = { version = "1.4", optional = true } -chrono = { version = "0.4", optional = true } +chrono = { version = "0.4", features = ["serde"], optional = true } protobuf = { version = "3.2.0", optional = true } anyhow = { version = "1.0", optional = true } rand = "0.8.5" @@ -85,10 +85,10 @@ parking_lot = { version = "0.12.1", optional = true } metrics = { version = "0.20.1", optional = true } valuable = { version = "0.1", features = ["derive"] } metrics-exporter-prometheus = { version = "0.11", optional = true } -metrics-util = { version = "0.14", optional = true } +metrics-util = { version = "0.14.0", optional = true } # API dependencies -axum = { version = "0.6.0", features = ["query"], optional = true } +axum = { version = "0.6.1", features = ["query"], optional = true } utoipa = { version = "2.4.2", features = ["axum_extras"], optional = true } utoipa-swagger-ui = { version = "3.0.1", features = ["axum"], optional = true } diff --git a/coerce/src/actor/context.rs b/coerce/src/actor/context.rs index 842a88f0..da0e1f23 100644 --- a/coerce/src/actor/context.rs +++ b/coerce/src/actor/context.rs @@ -7,6 +7,7 @@ use crate::actor::{ }; use futures::{Stream, StreamExt}; use std::any::Any; +use std::time::Instant; use tokio::sync::oneshot::Sender; use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit}; @@ -22,6 +23,10 @@ pub enum ActorStatus { Stopping, Stopped, } +// +// struct LastMessage { +// received_at: DateTime, +// } pub struct ActorContext { context_id: u64, @@ -38,29 +43,9 @@ pub struct ActorContext { persistence: Option, } -static ACTOR_CONTEXT_FIELDS: &[NamedField<'static>] = - &[NamedField::new("actor_path"), NamedField::new("actor_type")]; - -impl Structable for ActorContext { - fn definition(&self) -> StructDef<'_> { - StructDef::new_static("ActorContext", Fields::Named(ACTOR_CONTEXT_FIELDS)) - } -} - -impl Valuable for ActorContext { - fn as_value(&self) -> Value<'_> { - Value::Structable(self) - } - - fn visit(&self, v: &mut dyn Visit) { - v.visit_named_fields(&NamedValues::new( - ACTOR_CONTEXT_FIELDS, - &[ - Value::String(self.full_path.as_ref()), - Value::String(self.boxed_ref.actor_type()), - ], - )); - } +pub struct LogContext { + pub actor_path: ActorPath, + pub actor_type: &'static str, } impl ActorContext { @@ -83,6 +68,7 @@ impl ActorContext { boxed_parent_ref: None, on_actor_stopped: None, tags: None, + // last_message_timestamp: None, #[cfg(feature = "persistence")] persistence: None, } @@ -259,6 +245,38 @@ impl ActorContext { pub fn take_on_stopped_handlers(&mut self) -> Option>> { self.on_actor_stopped.take() } + + pub fn log(&self) -> LogContext { + LogContext { + actor_path: self.full_path.clone(), + actor_type: self.boxed_ref.actor_type(), + } + } +} + +static LOG_CONTEXT_FIELDS: &[NamedField<'static>] = + &[NamedField::new("actor_path"), NamedField::new("actor_type")]; + +impl Structable for LogContext { + fn definition(&self) -> StructDef<'_> { + StructDef::new_static("Context", Fields::Named(LOG_CONTEXT_FIELDS)) + } +} + +impl Valuable for LogContext { + fn as_value(&self) -> Value<'_> { + Value::Structable(self) + } + + fn visit(&self, v: &mut dyn Visit) { + v.visit_named_fields(&NamedValues::new( + LOG_CONTEXT_FIELDS, + &[ + Value::String(self.actor_path.as_ref()), + Value::String(self.actor_type), + ], + )); + } } impl Drop for ActorContext { @@ -277,12 +295,7 @@ impl Drop for ActorContext { on_context_dropped(&boxed_ref, &parent_ref, &status, &system); }); } else { - on_context_dropped( - &self.boxed_ref, - &self.boxed_parent_ref, - &self.status, - &self.system, - ); + on_context_dropped(&self.boxed_ref, &parent_ref, &self.status, &self.system); } } } diff --git a/coerce/src/actor/describe/mod.rs b/coerce/src/actor/describe/mod.rs index 157f1283..9a92386f 100644 --- a/coerce/src/actor/describe/mod.rs +++ b/coerce/src/actor/describe/mod.rs @@ -6,7 +6,6 @@ use crate::actor::{ Actor, ActorId, ActorPath, ActorRefErr, ActorTags, BoxedActorRef, CoreActorRef, IntoActorPath, ToActorId, }; - use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot; @@ -68,7 +67,9 @@ pub struct ActorDescription { pub actor_type_name: String, pub actor_context_id: u64, pub tags: ActorTags, + // pub last_message_timestamp: Option, pub supervised: Option, + pub time_taken: Option, } impl Message for Describe { @@ -89,7 +90,9 @@ impl Handler for A { actor_type_name: A::type_name().to_string(), actor_context_id: ctx.ctx_id(), tags: ctx.tags(), + // last_message_timestamp: ctx.last_message_timestamp, supervised: None, + time_taken: None, }; let mut message = message; @@ -168,13 +171,18 @@ pub async fn describe_all( }; async move { + let start = Instant::now(); match actor.describe(describe) { Ok(_) => { // TODO: Apply a timeout to `rx.await` let description = tokio::time::timeout(timeout, rx).await; if let Ok(description) = description { match description { - Ok(description) => DescribeResult::Ok(description), + Ok(description) => DescribeResult::Ok({ + let mut description = description; + description.time_taken = Some(start.elapsed()); + description + }), Err(_) => { DescribeResult::from_err(ActorRefErr::ResultChannelClosed, &actor) } @@ -215,9 +223,13 @@ impl Message for DescribeAll { #[async_trait] impl Handler for ActorScheduler { - async fn handle(&mut self, message: DescribeAll, _ctx: &mut ActorContext) { + async fn handle(&mut self, message: DescribeAll, ctx: &mut ActorContext) { let start = Instant::now(); - let actors: Vec = self.actors.values().cloned().collect(); + let actors = { + let mut actors: Vec = self.actors.values().cloned().collect(); + actors.push(ctx.boxed_actor_ref()); + actors + }; trace!("describing actors (count={})", actors.len()); tokio::spawn(async move { diff --git a/coerce/src/actor/lifecycle.rs b/coerce/src/actor/lifecycle.rs index ede1f872..22a10d01 100644 --- a/coerce/src/actor/lifecycle.rs +++ b/coerce/src/actor/lifecycle.rs @@ -78,26 +78,27 @@ impl ActorLoop { let _ = on_start.send(()); } + let log = ctx.log(); while let Some(mut msg) = receiver.recv().await { { #[cfg(feature = "actor-tracing-info")] let span = tracing::info_span!( "actor.recv", - ctx = ctx.as_value(), + ctx = log.as_value(), message_type = msg.name(), ); #[cfg(feature = "actor-tracing-debug")] let span = tracing::debug_span!( "actor.recv", - ctx = ctx.as_value(), + ctx = log.as_value(), message_type = msg.name(), ); #[cfg(feature = "actor-tracing-trace")] let span = tracing::trace_span!( "actor.recv", - ctx = ctx.as_value(), + ctx = log.as_value(), message_type = msg.name(), ); diff --git a/coerce/src/actor/message/mod.rs b/coerce/src/actor/message/mod.rs index 06b6b752..f3476d57 100644 --- a/coerce/src/actor/message/mod.rs +++ b/coerce/src/actor/message/mod.rs @@ -114,6 +114,8 @@ where let msg = self.msg.take(); let result = actor.handle(msg.unwrap(), ctx).await; + + // ctx.last_message_timestamp = Some(start); let message_processing_took = start.elapsed(); ActorMetrics::incr_messages_processed( diff --git a/coerce/src/actor/scheduler/mod.rs b/coerce/src/actor/scheduler/mod.rs index 79032899..8c257f0a 100644 --- a/coerce/src/actor/scheduler/mod.rs +++ b/coerce/src/actor/scheduler/mod.rs @@ -9,7 +9,7 @@ use crate::actor::lifecycle::ActorLoop; use crate::actor::system::{ActorSystem, DEFAULT_ACTOR_PATH}; #[cfg(feature = "remote")] -use crate::remote::{actor::message::SetRemote, system::RemoteActorSystem}; +use crate::remote::{actor::message::SetRemote, heartbeat::Heartbeat, system::RemoteActorSystem}; use std::collections::HashMap; use std::marker::PhantomData; @@ -39,7 +39,7 @@ impl ActorScheduler { #[cfg(feature = "remote")] remote: None, }, - "ActorScheduler-0".into_actor_id(), + "actor-scheduler".into_actor_id(), ActorType::Anonymous, None, None, @@ -178,7 +178,7 @@ where #[cfg(feature = "remote")] #[async_trait] impl Handler for ActorScheduler { - async fn handle(&mut self, message: SetRemote, _ctx: &mut ActorContext) { + async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) { self.remote = Some(message.0); trace!("actor scheduler is now configured for remoting"); } diff --git a/coerce/src/remote/actor/mod.rs b/coerce/src/remote/actor/mod.rs index 5ff342df..0f1417ed 100644 --- a/coerce/src/remote/actor/mod.rs +++ b/coerce/src/remote/actor/mod.rs @@ -29,6 +29,7 @@ pub(crate) type BoxedMessageHandler = Box pub struct RemoteSystemConfig { node_tag: String, + node_version: String, actor_types: HashMap, handler_types: HashMap, message_handlers: HashMap, @@ -41,6 +42,7 @@ pub struct RemoteSystemConfig { impl RemoteSystemConfig { pub fn new( node_tag: String, + node_version: String, actor_types: HashMap, handler_types: HashMap, message_handlers: HashMap, @@ -51,6 +53,7 @@ impl RemoteSystemConfig { ) -> RemoteSystemConfig { RemoteSystemConfig { node_tag, + node_version, actor_types, handler_types, message_handlers, @@ -65,6 +68,10 @@ impl RemoteSystemConfig { &self.node_tag } + pub fn node_version(&self) -> &str { + &self.node_version + } + pub fn handler_name(&self) -> Option { let marker = RemoteActorMessageMarker::::new(); self.handler_types.get(&marker.id()).cloned() diff --git a/coerce/src/remote/api/cluster/mod.rs b/coerce/src/remote/api/cluster/mod.rs index d6163a60..1710d891 100644 --- a/coerce/src/remote/api/cluster/mod.rs +++ b/coerce/src/remote/api/cluster/mod.rs @@ -1,4 +1,5 @@ use crate::remote::api::Routes; +use crate::remote::cluster::node::RemoteNodeState; use crate::remote::system::RemoteActorSystem; use axum::response::IntoResponse; use axum::routing::get; @@ -78,20 +79,7 @@ async fn get_nodes(system: RemoteActorSystem) -> impl IntoResponse { .get_nodes() .await .into_iter() - .map(|node| ClusterNode { - id: node.id, - addr: node.addr, - tag: node.tag, - ping_latency: node.ping_latency.map(|p| format!("{:?}", p)), - last_heartbeat: node.last_heartbeat.map(|h| format!("{:?}", h)), - node_started_at: node.node_started_at.map(|p| format!("{:?}", p)), - status: node.status.into(), - attributes: node - .attributes - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), - }) + .map(|node| node.into()) .collect(); nodes.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); @@ -113,3 +101,22 @@ async fn get_nodes(system: RemoteActorSystem) -> impl IntoResponse { nodes, }) } + +impl From for ClusterNode { + fn from(node: RemoteNodeState) -> Self { + ClusterNode { + id: node.id, + addr: node.addr, + tag: node.tag, + ping_latency: node.ping_latency.map(|p| format!("{:?}", p)), + last_heartbeat: node.last_heartbeat.map(|h| format!("{:?}", h)), + node_started_at: node.node_started_at.map(|p| format!("{:?}", p)), + status: node.status.into(), + attributes: node + .attributes + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + } + } +} diff --git a/coerce/src/remote/api/openapi.rs b/coerce/src/remote/api/openapi.rs index 97e707ff..2d915a0e 100644 --- a/coerce/src/remote/api/openapi.rs +++ b/coerce/src/remote/api/openapi.rs @@ -5,6 +5,7 @@ use crate::remote::api::system::actors; #[derive(OpenApi)] #[openapi( paths( + system::health, system::get_stats, actors::get_all, cluster::get_nodes, @@ -14,6 +15,9 @@ use crate::remote::api::system::actors; ), components( schemas( + system::SystemHealth, + system::HealthStatus, + system::SystemStats, system::SystemStats, system::actors::GetAll, system::actors::Actors, diff --git a/coerce/src/remote/api/system/actors.rs b/coerce/src/remote/api/system/actors.rs index 1f14bcc7..5065172f 100644 --- a/coerce/src/remote/api/system/actors.rs +++ b/coerce/src/remote/api/system/actors.rs @@ -49,6 +49,7 @@ pub struct ActorDescription { pub actor_context_id: Option, pub tags: Option, pub supervised: Option, + pub time_taken: Option, } #[derive(Serialize, Deserialize, ToSchema, Debug)] @@ -66,6 +67,7 @@ impl Default for ActorDescription { actor_context_id: None, tags: None, supervised: None, + time_taken: None, } } } @@ -125,6 +127,7 @@ impl From for ActorDescription { actor_context_id: Some(value.actor_context_id), tags: Some(value.tags.into()), supervised: value.supervised.map(|s| s.into()), + time_taken: value.time_taken, }, describe::DescribeResult::Err { diff --git a/coerce/src/remote/api/system/mod.rs b/coerce/src/remote/api/system/mod.rs index 6d6814dd..291b7123 100644 --- a/coerce/src/remote/api/system/mod.rs +++ b/coerce/src/remote/api/system/mod.rs @@ -1,14 +1,25 @@ pub mod actors; use crate::remote::api::Routes; +use std::collections::HashMap; +use std::time::{Duration, Instant}; use crate::actor::scheduler::ActorCount; -use crate::remote::system::RemoteActorSystem; +use crate::remote::system::{NodeId, RemoteActorSystem}; use axum::response::IntoResponse; use axum::routing::get; use axum::{Json, Router}; +use chrono::{DateTime, Utc}; +use futures::future::join_all; +use tokio::sync::oneshot; +use crate::actor::context::ActorStatus; +use crate::actor::lifecycle::Status; +use crate::actor::{ActorPath, BoxedActorRef, CoreActorRef, IntoActorPath}; +use crate::remote::api::cluster::ClusterNode; use crate::remote::api::openapi::ApiDoc; +use crate::remote::heartbeat::{health, Heartbeat}; +use crate::remote::stream::pubsub::PubSub; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; @@ -26,6 +37,10 @@ impl Routes for SystemApi { fn routes(&self, router: Router) -> Router { router .merge(SwaggerUi::new("/swagger").url("/api-doc/openapi.json", ApiDoc::openapi())) + .route("/health", { + let system = self.system.clone(); + get(move || health(system)) + }) .route("/system/stats", { let system = self.system.clone(); get(move || get_stats(system)) @@ -37,20 +52,80 @@ impl Routes for SystemApi { } } +#[derive(Serialize, ToSchema, Eq, PartialEq, Clone, Copy)] +pub enum HealthStatus { + Healthy, + Degraded, + Unhealthy, +} + +#[derive(Serialize, ToSchema)] +pub struct SystemHealth { + status: HealthStatus, + node_id: u64, + node_tag: String, + node_version: String, + node_started_at: DateTime, + runtime_version: &'static str, + actor_response_times: HashMap>, + current_leader: Option, + nodes: Vec, +} + +#[utoipa::path( + get, + path = "/health", + responses( + (status = 200, description = "System Health", body = SystemHealth), + ) +)] +async fn health(system: RemoteActorSystem) -> Json { + Json(Heartbeat::get_system_health(&system).await.into()) +} + +impl From for SystemHealth { + fn from(value: health::SystemHealth) -> Self { + Self { + status: value.status.into(), + node_id: value.node_id, + node_tag: value.node_tag, + node_version: value.node_version, + node_started_at: value.node_started_at, + runtime_version: value.runtime_version, + actor_response_times: value.actor_response_times, + current_leader: value.current_leader, + nodes: value.nodes.into_iter().map(|n| n.into()).collect(), + } + } +} + +impl From for HealthStatus { + fn from(value: health::HealthStatus) -> Self { + match value { + health::HealthStatus::Healthy => Self::Healthy, + health::HealthStatus::Degraded => Self::Degraded, + health::HealthStatus::Unhealthy => Self::Unhealthy, + } + } +} + #[derive(Serialize, Deserialize, ToSchema)] pub struct SystemStats { inflight_remote_requests: usize, total_tracked_actors: usize, + remote_actor_ref_cache_len: usize, } #[utoipa::path( get, path = "/system/stats", responses( - (status = 200, description = "System statistics", body = SystemStats), + (status = 200, description = "System Statistics", body = SystemStats), ) )] async fn get_stats(system: RemoteActorSystem) -> impl IntoResponse { + use crate::remote::handler::actor_ref_cache_size; + Json(SystemStats { inflight_remote_requests: system.inflight_remote_request_count(), total_tracked_actors: system @@ -59,5 +134,6 @@ async fn get_stats(system: RemoteActorSystem) -> impl IntoResponse { .send(ActorCount) .await .unwrap(), + remote_actor_ref_cache_len: actor_ref_cache_size(), }) } diff --git a/coerce/src/remote/cluster/discovery/mod.rs b/coerce/src/remote/cluster/discovery/mod.rs index bd44aed2..b9b418d6 100644 --- a/coerce/src/remote/cluster/discovery/mod.rs +++ b/coerce/src/remote/cluster/discovery/mod.rs @@ -3,6 +3,7 @@ use crate::actor::message::{Handler, Message}; use crate::actor::Actor; use crate::remote::actor::message::SetRemote; use crate::remote::cluster::node::{NodeIdentity, NodeStatus, RemoteNode}; +use crate::remote::heartbeat::Heartbeat; use crate::remote::net::client::RemoteClientRef; use crate::remote::stream::pubsub::PubSub; use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic}; @@ -46,7 +47,7 @@ impl Message for Forget { #[async_trait] impl Handler for NodeDiscovery { - async fn handle(&mut self, message: SetRemote, _ctx: &mut ActorContext) { + async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) { self.remote_system = Some(message.0); } } diff --git a/coerce/src/remote/handler.rs b/coerce/src/remote/handler.rs index 835a44c2..c1846315 100644 --- a/coerce/src/remote/handler.rs +++ b/coerce/src/remote/handler.rs @@ -212,6 +212,10 @@ lazy_static! { static ref ACTOR_REF_CACHE: ActorRefCache = ActorRefCache::new(); } +pub fn actor_ref_cache_size() -> usize { + ACTOR_REF_CACHE.len() +} + async fn get_actor_ref( system: &ActorSystem, actor_id: &ActorId, diff --git a/coerce/src/remote/heartbeat/health.rs b/coerce/src/remote/heartbeat/health.rs new file mode 100644 index 00000000..ef55c6ea --- /dev/null +++ b/coerce/src/remote/heartbeat/health.rs @@ -0,0 +1,133 @@ +use crate::actor::context::{ActorContext, ActorStatus}; +use crate::actor::message::{Handler, Message}; +use crate::actor::{ActorId, ActorPath, BoxedActorRef, CoreActorRef, IntoActorPath}; +use crate::remote::cluster::node::RemoteNodeState; +use crate::remote::heartbeat::Heartbeat; +use crate::remote::system::NodeId; +use chrono::{DateTime, Utc}; +use futures::future::join_all; +use std::collections::HashMap; +use std::time::{Duration, Instant}; +use tokio::sync::oneshot; + +pub struct SystemHealth { + pub status: HealthStatus, + pub node_id: u64, + pub node_tag: String, + pub node_version: String, + pub node_started_at: DateTime, + pub current_leader: Option, + pub runtime_version: &'static str, + pub actor_response_times: HashMap>, + pub nodes: Vec, +} + +pub struct RegisterHealthCheck(pub BoxedActorRef); + +impl Message for RegisterHealthCheck { + type Result = (); +} + +pub struct RemoveHealthCheck(pub ActorId); + +impl Message for RemoveHealthCheck { + type Result = (); +} + +pub struct GetHealth(pub oneshot::Sender); + +pub enum HealthStatus { + Healthy, + Degraded, + Unhealthy, +} + +impl Message for GetHealth { + type Result = (); +} + +const SLOW_ACTOR_DURATION: Duration = Duration::from_secs(1); + +#[async_trait] +impl Handler for Heartbeat { + async fn handle(&mut self, m: GetHealth, _ctx: &mut ActorContext) { + let sender = m.0; + let system = self.system.as_ref().unwrap().clone(); + let actors = self.health_check_actors.clone(); + + tokio::spawn(async move { + let checks: Vec<(ActorPath, Option)> = + join_all(actors.into_iter().map(|actor| async move { + let start = Instant::now(); + let status = actor.status().await; + let time_taken = start.elapsed(); + let actor_path = + format!("{}/{}", actor.actor_path(), actor.actor_id()).into_actor_path(); + if let Ok(ActorStatus::Started) = status { + (actor_path, Some(time_taken)) + } else { + (actor_path, None) + } + })) + .await; + + let mut errors_exist = false; + let mut slow_actors_exist = false; + for check in &checks { + if check.1.is_none() { + errors_exist = true; + break; + } else if let Some(check) = &check.1 { + if check > &SLOW_ACTOR_DURATION { + /*todo: this should be configurable*/ + slow_actors_exist = true; + } + } + } + + let _ = sender.send(SystemHealth { + node_id: system.node_id(), + node_tag: system.node_tag().to_string(), + node_version: system.node_version().to_string(), + runtime_version: env!("CARGO_PKG_VERSION"), + node_started_at: *system.started_at(), + status: if errors_exist { + HealthStatus::Unhealthy + } else if slow_actors_exist { + HealthStatus::Degraded + } else { + HealthStatus::Healthy + }, + actor_response_times: checks.into_iter().map(|n| (n.0, n.1)).collect(), + nodes: system.get_nodes().await, + current_leader: system.current_leader(), + }); + }); + } +} + +#[async_trait] +impl Handler for Heartbeat { + async fn handle(&mut self, message: RegisterHealthCheck, _ctx: &mut ActorContext) { + let actor = message.0; + + debug!( + "actor({}/{}) registered for health check", + actor.actor_path(), + actor.actor_id() + ); + + self.health_check_actors.push(actor); + } +} +#[async_trait] +impl Handler for Heartbeat { + async fn handle(&mut self, message: RemoveHealthCheck, _ctx: &mut ActorContext) { + let actor_id = message.0; + + debug!("actor({}) removed from health check", actor_id); + + self.health_check_actors + .retain(|a| a.actor_id() != &actor_id); + } +} diff --git a/coerce/src/remote/heartbeat/mod.rs b/coerce/src/remote/heartbeat/mod.rs index 8e3ba306..c73aba8f 100644 --- a/coerce/src/remote/heartbeat/mod.rs +++ b/coerce/src/remote/heartbeat/mod.rs @@ -1,8 +1,11 @@ +pub mod health; + use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::scheduler::timer::{Timer, TimerTick}; use crate::actor::system::ActorSystem; -use crate::actor::{Actor, IntoActor, LocalActorRef}; +use crate::actor::{Actor, BoxedActorRef, IntoActor, LocalActorRef}; +use crate::actor::{ActorId, CoreActorRef}; use crate::remote::actor::message::{NodeTerminated, SetRemote}; use crate::remote::cluster::node::{NodeStatus, RemoteNodeState}; use crate::remote::net::proto::network::PongEvent; @@ -17,7 +20,11 @@ use std::collections::{HashMap, VecDeque}; use std::ops::Add; +use crate::remote::heartbeat::health::{ + GetHealth, RegisterHealthCheck, RemoveHealthCheck, SystemHealth, +}; use std::time::{Duration, Instant}; +use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; pub struct Heartbeat { @@ -26,6 +33,7 @@ pub struct Heartbeat { last_heartbeat: Option>, node_pings: HashMap, on_next_leader_changed: VecDeque>, + health_check_actors: Vec, } pub struct HeartbeatConfig { @@ -43,11 +51,29 @@ impl Heartbeat { last_heartbeat: None, node_pings: HashMap::new(), on_next_leader_changed: VecDeque::new(), + health_check_actors: Vec::new(), } .into_actor(Some("heartbeat"), sys) .await .expect("heartbeat actor") } + + /// Registers an actor to be part of the health check. + pub fn register>(actor: T, system: &RemoteActorSystem) { + let _ = system.heartbeat().notify(RegisterHealthCheck(actor.into())); + } + + pub fn remove(actor_id: &ActorId, system: &RemoteActorSystem) { + let _ = system + .heartbeat() + .notify(RemoveHealthCheck(actor_id.clone())); + } + + pub async fn get_system_health(system: &RemoteActorSystem) -> SystemHealth { + let (tx, rx) = oneshot::channel(); + let _ = system.heartbeat().notify(GetHealth(tx)); + rx.await.unwrap() + } } #[async_trait] @@ -67,7 +93,22 @@ impl Handler for Heartbeat { HeartbeatTick, )); + // Default system actors that will form the initial health check. + let mut actors: Vec = vec![ + system.actor_system().scheduler().clone().into(), + system.heartbeat().clone().into(), + system.registry().clone().into(), + system.client_registry().clone().into(), + system.node_discovery().clone().into(), + ]; + + if let Some(stream_mediator) = system.stream_mediator() { + actors.push(stream_mediator.clone().into()); + } + self.system = Some(system); + self.health_check_actors = actors; + let _ = self.actor_ref(ctx).notify(HeartbeatTick); } } diff --git a/coerce/src/remote/net/server/mod.rs b/coerce/src/remote/net/server/mod.rs index a13bc81c..ba6e6f58 100644 --- a/coerce/src/remote/net/server/mod.rs +++ b/coerce/src/remote/net/server/mod.rs @@ -1,5 +1,5 @@ -use crate::actor::scheduler::ActorType::Anonymous; -use crate::actor::LocalActorRef; +use crate::actor::scheduler::ActorType::{Anonymous, Tracked}; +use crate::actor::{IntoActor, LocalActorRef}; use crate::remote::net::server::session::store::{NewSession, RemoteSessionStore}; use crate::remote::net::server::session::RemoteSession; use crate::remote::system::RemoteActorSystem; @@ -88,13 +88,8 @@ impl RemoteServer { let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?; - let session_store = system - .actor_system() - .new_actor( - format!("RemoteSessionStore-{}", system.node_tag()), - RemoteSessionStore::new(), - Anonymous, - ) + let session_store = RemoteSessionStore::new() + .into_actor(Some("remote-session-store"), &system.actor_system()) .await .unwrap(); diff --git a/coerce/src/remote/net/server/session/store.rs b/coerce/src/remote/net/server/session/store.rs index 8a968a52..368a67cc 100644 --- a/coerce/src/remote/net/server/session/store.rs +++ b/coerce/src/remote/net/server/session/store.rs @@ -50,7 +50,7 @@ impl Handler for RemoteSessionStore { let session_actor = ctx .spawn( - format!("RemoteSession-{}", session_id.to_string()).into_actor_id(), + format!("session-{}", session_id.to_string()).into_actor_id(), session, ) .await diff --git a/coerce/src/remote/stream/mediator/mod.rs b/coerce/src/remote/stream/mediator/mod.rs index 09e1dd81..5a8a916b 100644 --- a/coerce/src/remote/stream/mediator/mod.rs +++ b/coerce/src/remote/stream/mediator/mod.rs @@ -2,6 +2,7 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::actor::{Actor, LocalActorRef}; use crate::remote::actor::message::SetRemote; +use crate::remote::heartbeat::Heartbeat; use crate::remote::net::message::SessionEvent; use crate::remote::net::proto::network::StreamPublishEvent; use crate::remote::net::StreamData; @@ -134,6 +135,8 @@ impl StreamMediator { #[async_trait] impl Handler for StreamMediator { async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) { + Heartbeat::register(ctx.boxed_actor_ref(), &message.0); + self.remote = Some(message.0); self.system_subscription = Some(self.subscribe(SystemTopic, self.actor_ref(ctx)).unwrap()) } diff --git a/coerce/src/remote/system/builder.rs b/coerce/src/remote/system/builder.rs index 3bd51da3..13bf1fcc 100644 --- a/coerce/src/remote/system/builder.rs +++ b/coerce/src/remote/system/builder.rs @@ -28,6 +28,7 @@ use uuid::Uuid; pub struct RemoteActorSystemBuilder { node_id: Option, node_tag: Option, + node_version: Option, inner: Option, config_builders: Vec, mediator: Option, @@ -43,6 +44,7 @@ impl RemoteActorSystemBuilder { RemoteActorSystemBuilder { node_id: None, node_tag: None, + node_version: None, inner: None, config_builders: vec![ #[cfg(feature = "sharding")] @@ -67,6 +69,13 @@ impl RemoteActorSystemBuilder { self } + pub fn with_version(mut self, version: impl ToString) -> Self { + info!("setting version={}", version.to_string()); + self.node_version = Some(version.to_string()); + + self + } + pub fn configure(mut self, f: F) -> Self where F: 'static + (FnOnce(&mut RemoteSystemConfigBuilder) -> &mut RemoteSystemConfigBuilder), @@ -138,6 +147,7 @@ impl RemoteActorSystemBuilder { .unwrap_or_else(|| inner.system_name().to_string()); let config = config_builder.build( Some(system_tag.clone()), + self.node_version, self.server_auth_token, self.node_attributes, ); @@ -278,6 +288,7 @@ impl RemoteSystemConfigBuilder { pub fn build( self, tag: Option, + version: Option, server_auth_token: Option, attributes: HashMap, ) -> Arc { @@ -293,19 +304,23 @@ impl RemoteSystemConfigBuilder { } let node_tag = tag.map_or_else(|| format!("cluster-node-{}", Uuid::new_v4()), |t| t); + let node_version = version.unwrap_or_else(|| "0.0.0".to_string()); + let attributes = attributes + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect::() + .into(); + Arc::new(RemoteSystemConfig::new( node_tag, + node_version, actor_types, handler_types, self.handlers, self.actors, self.heartbeat.unwrap_or_default(), server_auth_token, - attributes - .into_iter() - .map(|(k, v)| (k.into(), v.into())) - .collect::() - .into(), + attributes, )) } } diff --git a/coerce/src/remote/system/mod.rs b/coerce/src/remote/system/mod.rs index 5532274c..039be3c3 100644 --- a/coerce/src/remote/system/mod.rs +++ b/coerce/src/remote/system/mod.rs @@ -69,12 +69,6 @@ impl RemoteSystemCore { } } -impl Drop for RemoteSystemCore { - fn drop(&mut self) { - info!("dropped remotesystem(id={})", self.node_id); - } -} - impl RemoteActorSystem { pub fn builder() -> RemoteActorSystemBuilder { RemoteActorSystemBuilder::new() @@ -96,6 +90,10 @@ impl RemoteActorSystem { self.inner.config.node_tag() } + pub fn node_version(&self) -> &str { + self.inner.config.node_version() + } + pub fn node_id(&self) -> NodeId { self.inner.node_id } diff --git a/coerce/src/sharding/coordinator/mod.rs b/coerce/src/sharding/coordinator/mod.rs index cd0c8fc3..0176bc1a 100644 --- a/coerce/src/sharding/coordinator/mod.rs +++ b/coerce/src/sharding/coordinator/mod.rs @@ -10,6 +10,7 @@ use crate::remote::system::NodeId; use crate::actor::message::Handler; use crate::remote::cluster::node::NodeStatus::{Healthy, Joining}; +use crate::remote::heartbeat::Heartbeat; use crate::remote::stream::pubsub::{PubSub, Receive, Subscription}; use crate::remote::stream::system::SystemTopic; use crate::remote::RemoteActorRef; @@ -79,6 +80,8 @@ impl PersistentActor for ShardCoordinator { async fn pre_recovery(&mut self, ctx: &mut ActorContext) { let remote = ctx.system().remote(); + Heartbeat::register(ctx.boxed_actor_ref(), remote); + let node_id = remote.node_id(); let node_tag = remote.node_tag().to_string(); @@ -127,6 +130,10 @@ impl PersistentActor for ShardCoordinator { } } } + + async fn stopped(&mut self, ctx: &mut ActorContext) { + Heartbeat::remove(ctx.id(), ctx.system().remote()); + } } impl ShardCoordinator { diff --git a/coerce/src/sharding/host/mod.rs b/coerce/src/sharding/host/mod.rs index 98c49544..19324a5d 100644 --- a/coerce/src/sharding/host/mod.rs +++ b/coerce/src/sharding/host/mod.rs @@ -12,6 +12,7 @@ use crate::sharding::shard::Shard; use protobuf::Message as ProtoMessage; use crate::remote::actor::BoxedActorHandler; +use crate::remote::heartbeat::Heartbeat; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -54,7 +55,12 @@ impl Message for LeaderAllocated { type Result = (); } -impl Actor for ShardHost {} +#[async_trait] +impl Actor for ShardHost { + async fn started(&mut self, ctx: &mut ActorContext) { + Heartbeat::register(ctx.boxed_actor_ref(), ctx.system().remote()); + } +} impl ShardHost { pub fn new( diff --git a/coerce/tests/test_actor_supervision.rs b/coerce/tests/test_actor_supervision.rs index 3edfacbc..638df592 100644 --- a/coerce/tests/test_actor_supervision.rs +++ b/coerce/tests/test_actor_supervision.rs @@ -149,5 +149,12 @@ pub async fn test_actor_child_spawn_and_stop() { let x = rx.await; info!("{:#?}", x); + let actor_count = x + .unwrap() + .supervised + .map_or(0, |s| s.actors.iter().filter(|n| n.is_ok()).count()); + + assert_eq!(0, actor_count); + system.shutdown().await; } diff --git a/coerce/tests/test_remote_system_health.rs b/coerce/tests/test_remote_system_health.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/coerce/tests/test_remote_system_health.rs @@ -0,0 +1 @@ + diff --git a/examples/coerce-sharded-chat-example/scripts/run-server-3.ps1 b/examples/coerce-sharded-chat-example/scripts/run-server-3.ps1 index 8ac3a308..1cf242f6 100755 --- a/examples/coerce-sharded-chat-example/scripts/run-server-3.ps1 +++ b/examples/coerce-sharded-chat-example/scripts/run-server-3.ps1 @@ -1 +1 @@ -cargo run --release --bin sharded-chat-server -- --node_id 3 --remote_listen_addr localhost:33101 --websocket_listen_addr localhost:33102 --cluster_api_listen_addr 127.0.0.1:33103 --remote_seed_addr localhost:32101 --log_level INFO +cargo run --release --bin sharded-chat-server -- --node_id 3 --remote_listen_addr 0.0.0.0:33101 --websocket_listen_addr localhost:33102 --cluster_api_listen_addr 127.0.0.1:33103 --remote_seed_addr 127.0.0.1:32101 --log_level INFO diff --git a/examples/coerce-sharded-chat-example/src/app.rs b/examples/coerce-sharded-chat-example/src/app.rs index e7ae9194..773a72d6 100644 --- a/examples/coerce-sharded-chat-example/src/app.rs +++ b/examples/coerce-sharded-chat-example/src/app.rs @@ -8,20 +8,18 @@ use coerce::remote::api::cluster::ClusterApi; use coerce::remote::api::sharding::ShardingApi; use coerce::remote::api::system::SystemApi; use coerce::remote::api::RemoteHttpApi; +use coerce::remote::net::server::RemoteServer; use coerce::remote::system::builder::RemoteActorSystemBuilder; use coerce::remote::system::{NodeId, RemoteActorSystem}; use coerce::sharding::Sharding; - -use coerce::remote::net::server::RemoteServer; use coerce_k8s::config::KubernetesDiscoveryConfig; use coerce_k8s::KubernetesDiscovery; -use std::net::SocketAddr; - use coerce::actor::LocalActorRef; use coerce::remote::api::builder::HttpApiBuilder; use coerce::remote::api::metrics::MetricsApi; use coerce_redis::journal::{RedisStorageConfig, RedisStorageProvider}; +use std::net::SocketAddr; use std::str::FromStr; use tokio::task::JoinHandle; @@ -108,7 +106,10 @@ impl ShardedChat { } async fn create_actor_system(config: &ShardedChatConfig) -> (RemoteActorSystem, RemoteServer) { - let actor_system = ActorSystem::new(); + let (seed_addr, node_id, node_tag, external_addr) = get_node_info(&config).await; + + let actor_system = ActorSystem::builder().system_name(&node_tag).build(); + let system = actor_system.to_persistent(match &config.persistence { ShardedChatPersistence::Redis { host: Some(host) } => Persistence::from( RedisStorageProvider::connect( @@ -125,45 +126,6 @@ async fn create_actor_system(config: &ShardedChatConfig) -> (RemoteActorSystem, _ => Persistence::from(InMemoryStorageProvider::new()), }); - let mut seed_addr = config.remote_seed_addr.clone(); - let is_running_in_k8s = std::env::var("KUBERNETES_SERVICE_HOST").is_ok(); - let (node_id, node_tag, external_addr) = if is_running_in_k8s { - let pod_name = std::env::var("POD_NAME") - .expect("POD_NAME environment variable not set, TODO: fallback to hostname?"); - let cluster_ip = std::env::var("CLUSTER_IP") - .expect("CLUSTER_IP environment variable not set, TODO: fallback to hostname?"); - - let discovered_targets = - KubernetesDiscovery::discover(KubernetesDiscoveryConfig::default()).await; - - if let Some(peers) = discovered_targets { - if let Some(first_peer) = peers.into_iter().next() { - seed_addr = Some(first_peer); - } - } - - let pod_ordinal = pod_name.split('-').last(); - if let Some(Ok(pod_ordinal)) = pod_ordinal.map(|i| i.parse()) { - let listen_addr_base = SocketAddr::from_str(&config.remote_listen_addr).unwrap(); - let port = listen_addr_base.port(); - - let external_addr = format!("{}:{}", &cluster_ip, port); - (pod_ordinal, pod_name, Some(external_addr)) - } else { - ( - config.node_id, - format!("chat-server-{}", config.node_id), - None, - ) - } - } else { - ( - config.node_id, - format!("chat-server-{}", config.node_id), - None, - ) - }; - info!( "starting cluster node (node_id={}, node_tag={}, listen_addr={}, external_addr={:?})", node_id, &node_tag, &config.remote_listen_addr, &external_addr @@ -172,6 +134,7 @@ async fn create_actor_system(config: &ShardedChatConfig) -> (RemoteActorSystem, let remote_system = RemoteActorSystemBuilder::new() .with_id(node_id) .with_tag(node_tag) + .with_version(env!("CARGO_PKG_VERSION")) .with_actor_system(system) .with_handlers(|handlers| { handlers @@ -209,3 +172,48 @@ async fn create_actor_system(config: &ShardedChatConfig) -> (RemoteActorSystem, (remote_system, server) } + +async fn get_node_info( + config: &&ShardedChatConfig, +) -> (Option, NodeId, String, Option) { + let mut seed_addr = config.remote_seed_addr.clone(); + let is_running_in_k8s = std::env::var("KUBERNETES_SERVICE_HOST").is_ok(); + let (node_id, node_tag, external_addr) = if is_running_in_k8s { + let pod_name = std::env::var("POD_NAME") + .expect("POD_NAME environment variable not set, TODO: fallback to hostname?"); + let cluster_ip = std::env::var("CLUSTER_IP") + .expect("CLUSTER_IP environment variable not set, TODO: fallback to hostname?"); + + let discovered_targets = + KubernetesDiscovery::discover(KubernetesDiscoveryConfig::default()).await; + + if let Some(peers) = discovered_targets { + if let Some(first_peer) = peers.into_iter().next() { + seed_addr = Some(first_peer); + } + } + + let pod_ordinal = pod_name.split('-').last(); + if let Some(Ok(pod_ordinal)) = pod_ordinal.map(|i| i.parse()) { + let listen_addr_base = SocketAddr::from_str(&config.remote_listen_addr).unwrap(); + let port = listen_addr_base.port(); + + let external_addr = format!("{}:{}", &cluster_ip, port); + (pod_ordinal, pod_name, Some(external_addr)) + } else { + ( + config.node_id, + format!("chat-server-{}", config.node_id), + None, + ) + } + } else { + ( + config.node_id, + format!("chat-server-{}", config.node_id), + None, + ) + }; + + (seed_addr, node_id, node_tag, external_addr) +} diff --git a/providers/persistence/coerce-redis/src/journal/mod.rs b/providers/persistence/coerce-redis/src/journal/mod.rs index 89ead600..fe8145f5 100644 --- a/providers/persistence/coerce-redis/src/journal/mod.rs +++ b/providers/persistence/coerce-redis/src/journal/mod.rs @@ -7,6 +7,7 @@ use coerce::persistent::journal::storage::{JournalEntry, JournalStorage, Journal use redis::aio::ConnectionLike; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use tokio::sync::oneshot; @@ -87,10 +88,17 @@ async fn create_provider( where C: Clone, { + const REDIS_JOURNAL_COUNTER: AtomicU32 = AtomicU32::new(1); let config = Arc::new(config); let redis_journal = RedisJournal(redis) - .into_anon_actor(Option::::None, system) + .into_actor( + Some(format!( + "redis-journal-{}", + REDIS_JOURNAL_COUNTER.fetch_add(1, Ordering::Relaxed) + )), + system, + ) .await .expect("start journal actor");