Skip to content

Commit

Permalink
add ability to fetch SystemHealth, which is a full actor system hea…
Browse files Browse the repository at this point in the history
…lth check + the ability to register custom actor implementations to be part of the health check

If any of the system actors take longer than 1 second to respond to a `GetStatus`, the health check will report the health status as degraded.
 If an error is returned from any of the actors, the health check will report it as unhealthy.

Health check also provides useful information such as currently active cluster peers and version information.
  • Loading branch information
LeonHartley committed Dec 16, 2022
1 parent 86b453d commit f451999
Show file tree
Hide file tree
Showing 28 changed files with 562 additions and 153 deletions.
101 changes: 79 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions coerce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "coerce"
description = "Async actor runtime and distributed systems framework"
license = "Apache-2.0"
version = "0.8.2"
version = "0.8.3"
authors = ["Leon Hartley <[email protected]>"]
edition = "2021"
readme = "README.md"
Expand Down Expand Up @@ -64,7 +64,7 @@ api = ["remote", "dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.22.0", features = ["full"] }
tokio = { version = "1.23.0", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["full"] }
tokio-stream = { version = "0.1", optional = true }
tracing = { version = "0.1", features = ["valuable"] }
Expand All @@ -75,20 +75,20 @@ serde_json = "1.0"
futures = "0.3"
async-trait = { version = "0.1" }
hashring = { version = "0.3.0", optional = true }
bytes = { version = "1.2.1", optional = true }
bytes = { version = "1.3.0", optional = true }
byteorder = { version = "1.4", optional = true }
chrono = { version = "0.4", optional = true }
chrono = { version = "0.4", features = ["serde"], optional = true }
protobuf = { version = "3.2.0", optional = true }
anyhow = { version = "1.0", optional = true }
rand = "0.8.5"
parking_lot = { version = "0.12.1", optional = true }
metrics = { version = "0.20.1", optional = true }
valuable = { version = "0.1", features = ["derive"] }
metrics-exporter-prometheus = { version = "0.11", optional = true }
metrics-util = { version = "0.14", optional = true }
metrics-util = { version = "0.14.0", optional = true }

# API dependencies
axum = { version = "0.6.0", features = ["query"], optional = true }
axum = { version = "0.6.1", features = ["query"], optional = true }
utoipa = { version = "2.4.2", features = ["axum_extras"], optional = true }
utoipa-swagger-ui = { version = "3.0.1", features = ["axum"], optional = true }

Expand Down
71 changes: 42 additions & 29 deletions coerce/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::actor::{
};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::time::Instant;
use tokio::sync::oneshot::Sender;
use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};

Expand All @@ -22,6 +23,10 @@ pub enum ActorStatus {
Stopping,
Stopped,
}
//
// struct LastMessage {
// received_at: DateTime<Utc>,
// }

pub struct ActorContext {
context_id: u64,
Expand All @@ -38,29 +43,9 @@ pub struct ActorContext {
persistence: Option<ActorPersistence>,
}

static ACTOR_CONTEXT_FIELDS: &[NamedField<'static>] =
&[NamedField::new("actor_path"), NamedField::new("actor_type")];

impl Structable for ActorContext {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static("ActorContext", Fields::Named(ACTOR_CONTEXT_FIELDS))
}
}

impl Valuable for ActorContext {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}

fn visit(&self, v: &mut dyn Visit) {
v.visit_named_fields(&NamedValues::new(
ACTOR_CONTEXT_FIELDS,
&[
Value::String(self.full_path.as_ref()),
Value::String(self.boxed_ref.actor_type()),
],
));
}
pub struct LogContext {
pub actor_path: ActorPath,
pub actor_type: &'static str,
}

impl ActorContext {
Expand All @@ -83,6 +68,7 @@ impl ActorContext {
boxed_parent_ref: None,
on_actor_stopped: None,
tags: None,
// last_message_timestamp: None,
#[cfg(feature = "persistence")]
persistence: None,
}
Expand Down Expand Up @@ -259,6 +245,38 @@ impl ActorContext {
pub fn take_on_stopped_handlers(&mut self) -> Option<Vec<Sender<()>>> {
self.on_actor_stopped.take()
}

pub fn log(&self) -> LogContext {
LogContext {
actor_path: self.full_path.clone(),
actor_type: self.boxed_ref.actor_type(),
}
}
}

static LOG_CONTEXT_FIELDS: &[NamedField<'static>] =
&[NamedField::new("actor_path"), NamedField::new("actor_type")];

impl Structable for LogContext {
fn definition(&self) -> StructDef<'_> {
StructDef::new_static("Context", Fields::Named(LOG_CONTEXT_FIELDS))
}
}

impl Valuable for LogContext {
fn as_value(&self) -> Value<'_> {
Value::Structable(self)
}

fn visit(&self, v: &mut dyn Visit) {
v.visit_named_fields(&NamedValues::new(
LOG_CONTEXT_FIELDS,
&[
Value::String(self.actor_path.as_ref()),
Value::String(self.actor_type),
],
));
}
}

impl Drop for ActorContext {
Expand All @@ -277,12 +295,7 @@ impl Drop for ActorContext {
on_context_dropped(&boxed_ref, &parent_ref, &status, &system);
});
} else {
on_context_dropped(
&self.boxed_ref,
&self.boxed_parent_ref,
&self.status,
&self.system,
);
on_context_dropped(&self.boxed_ref, &parent_ref, &self.status, &self.system);
}
}
}
Expand Down
Loading

0 comments on commit f451999

Please sign in to comment.