diff --git a/Cargo.lock b/Cargo.lock index 5d8241d8..846882e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,13 +54,13 @@ checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" [[package]] name = "async-trait" -version = "0.1.61" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -234,7 +234,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -284,7 +284,7 @@ dependencies = [ [[package]] name = "coerce" -version = "0.8.5" +version = "0.8.6" dependencies = [ "anyhow", "async-trait", @@ -324,7 +324,7 @@ name = "coerce-cluster-example" version = "0.1.0" dependencies = [ "async-trait", - "coerce 0.8.5", + "coerce 0.8.6", "coerce-macros 0.2.0", "opentelemetry", "opentelemetry-jaeger", @@ -352,7 +352,7 @@ version = "0.2.0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -363,7 +363,7 @@ checksum = "3c89d9c1baca2b55d5e73638dc63da0c6093f6ec66f6324cad332225319e3beb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -381,7 +381,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", - "coerce 0.8.5", + "coerce 0.8.6", "redis", "tokio", ] @@ -393,7 +393,7 @@ dependencies = [ "async-trait", "chrono", "clap", - "coerce 0.8.5", + "coerce 0.8.6", "coerce-k8s", "coerce-macros 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "coerce-redis", @@ -524,7 +524,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.107", ] [[package]] @@ -541,7 +541,7 @@ checksum = "39e61fda7e62115119469c7b3591fd913ecca96fb766cfd3f2e2502ab7bc87a5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -732,7 +732,7 @@ checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1282,7 +1282,7 @@ checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1527,7 +1527,7 @@ checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1538,7 +1538,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1574,7 +1574,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "version_check", ] @@ -1591,9 +1591,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "e472a104799c74b514a57226160104aa483546de37e839ec50e3c2e41dd87534" dependencies = [ "unicode-ident", ] @@ -1667,9 +1667,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -1766,9 +1766,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1aada340fba5deba625c84d109d0a83cc3565452d38083417992a702c2428d" +checksum = "aa8455fa3621f6b41c514946de66ea0531f57ca017b2e6c7cc368035ea5b46df" dependencies = [ "async-trait", "bytes", @@ -1877,7 +1877,7 @@ dependencies = [ "quote", "rust-embed-utils", "shellexpand", - "syn", + "syn 1.0.107", "walkdir", ] @@ -2051,7 +2051,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2220,6 +2220,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e3787bb71465627110e7d87ed4faaa36c1f61042ee67badb9e2ef173accc40" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sync_wrapper" version = "0.1.1" @@ -2266,7 +2277,7 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2363,7 +2374,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2489,7 +2500,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2667,9 +2678,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "utoipa" -version = "3.0.1" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3920fa753064b1be7842bea26175ffa0dfc4a8f30bcb52b8ff03fddf8889914c" +checksum = "6f6ec551be59b8d61831b9bde585b85fced81eb3771afa60ccd920d1c1cc8b6d" dependencies = [ "indexmap", "serde", @@ -2679,14 +2690,14 @@ dependencies = [ [[package]] name = "utoipa-gen" -version = "3.0.1" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "720298fac6efca20df9e457e67a1eab41a20d1c3101380b5c4dca1ca60ae0062" +checksum = "81d51d7de9f57fc943abe9a3ceeea62835341186b3c12a05617c155df1824e2b" dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 2.0.11", ] [[package]] @@ -2732,7 +2743,7 @@ checksum = "9d44690c645190cfce32f91a1582281654b2338c6073fa250b0949fd25c55b32" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2801,7 +2812,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -2823,7 +2834,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/README.md b/README.md index f4f7b3f1..b1ce8998 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,22 @@ scalable, fault-tolerant modern actor-driven application. | [coerce-macros](https://crates.io/crates/coerce-macros) | Useful macros allowing for quick implementations of snapshots, JSON-serialisable remote messages and more. | ![crates.io](https://img.shields.io/crates/v/coerce-macros.svg) | | [coerce-k8s](https://crates.io/crates/coerce-k8s) | Kubernetes discovery provider, automatically discover cluster peers hosted in Kubernetes, based on a configurable pod-selection label | ![crates.io](https://img.shields.io/crates/v/coerce-k8s.svg) | +# Using Coerce in your own project +First step to using Coerce in your project is to add the coerce crate dependency, this can be done by adding the following +to your Cargo.toml: + +```toml +[dependencies] +coerce = { version = "0.8", features = ["full"] } +``` + +Coerce currently relies on an unstable feature from the `tracing` crate, `valuable`, used for enriching logs with +information on the actor context, which can be enabled by adding the following to your `.cargo/config.toml` file: +``` +[build] +rustflags = ["--cfg", "tracing_unstable"] +``` + ## Features ### Actors @@ -47,8 +63,10 @@ scalable, fault-tolerant modern actor-driven application. - Easily accessible metrics and information useful for diagnosis -# How to build + +# Building and testing the Coerce libraries Building Coerce is easy. All you need is the latest Rust stable or nightly installed, along with Cargo. + ```shell # Clone the repository git clone https://github.com/leonhartley/coerce-rs && cd coerce-rs @@ -57,7 +75,7 @@ git clone https://github.com/leonhartley/coerce-rs && cd coerce-rs cargo build ## Alternatively, if you'd like to build the library, dependencies and run the tests -cargo test +cargo test --all-features ``` # How to run the examples diff --git a/coerce/Cargo.toml b/coerce/Cargo.toml index 83eb2e61..a41234b9 100644 --- a/coerce/Cargo.toml +++ b/coerce/Cargo.toml @@ -2,7 +2,7 @@ name = "coerce" description = "Async actor runtime and distributed systems framework" license = "Apache-2.0" -version = "0.8.5" +version = "0.8.6" authors = ["Leon Hartley "] edition = "2021" readme = "README.md" @@ -80,7 +80,7 @@ async-trait = { version = "0.1" } hashring = { version = "0.3.0", optional = true } bytes = { version = "1.4.0", optional = true } byteorder = { version = "1.4.3", optional = true } -chrono = { version = "0.4.23", features = ["serde"], optional = true } +chrono = { version = "0.4", features = ["serde"], optional = true } protobuf = { version = "3.2.0", optional = true } anyhow = { version = "1.0.68", optional = true } rand = "0.8.5" @@ -95,8 +95,8 @@ sha2 = { version = "0.10.6", optional = true } # API dependencies axum = { version = "0.6.4", features = ["query"], optional = true } -utoipa = { version = "3.0.1", features = ["axum_extras"], optional = true } -utoipa-swagger-ui = { version = "3.0.2", features = ["axum"], optional = true } +utoipa = { version = "3", features = ["axum_extras", "chrono"], optional = true } +utoipa-swagger-ui = { version = "3", features = ["axum"], optional = true } [dev-dependencies] coerce-macros = { version = "0.2.0" } diff --git a/coerce/benches/actor_creation.rs b/coerce/benches/actor_creation.rs index 6da3cc66..e71d7dc5 100644 --- a/coerce/benches/actor_creation.rs +++ b/coerce/benches/actor_creation.rs @@ -1,16 +1,13 @@ -use bencher::{Bencher, benchmark_group, benchmark_main}; -use tokio::runtime::Runtime; -use coerce::actor::{Actor, IntoActorId, LocalActorRef}; +use bencher::{benchmark_group, benchmark_main, Bencher}; use coerce::actor::scheduler::ActorType::Anonymous; use coerce::actor::system::ActorSystem; +use coerce::actor::{Actor, IntoActorId, LocalActorRef}; +use tokio::runtime::Runtime; fn rt() -> Runtime { - tokio::runtime::Builder::new_multi_thread() - .build() - .unwrap() + tokio::runtime::Builder::new_multi_thread().build().unwrap() } - fn create_1000_actors(bench: &mut Bencher) { let runtime = rt(); @@ -35,6 +32,5 @@ async fn actor() -> LocalActorRef { .expect("unable to create actor") } - benchmark_group!(actor_creation, create_1000_actors); -benchmark_main!(actor_creation); \ No newline at end of file +benchmark_main!(actor_creation); diff --git a/coerce/benches/actor_messaging.rs b/coerce/benches/actor_messaging.rs index 609e1d94..05ae0386 100644 --- a/coerce/benches/actor_messaging.rs +++ b/coerce/benches/actor_messaging.rs @@ -5,12 +5,12 @@ extern crate bencher; use bencher::Bencher; -use tokio::runtime::Runtime; use coerce::actor::context::ActorContext; use coerce::actor::message::{Handler, Message}; use coerce::actor::scheduler::ActorType::Anonymous; use coerce::actor::system::ActorSystem; use coerce::actor::{Actor, IntoActorId, LocalActorRef, ToActorId}; +use tokio::runtime::Runtime; struct BenchmarkActor; @@ -45,24 +45,18 @@ fn actor_send_1000_benchmark(bench: &mut Bencher) { let runtime = rt(); let actor = runtime.block_on(async { actor().await }); - bench.iter(|| { - runtime.block_on(actor_1000_send_and_wait(&actor)) - }); + bench.iter(|| runtime.block_on(actor_1000_send_and_wait(&actor))); } fn actor_notify_1000_benchmark(bench: &mut Bencher) { let runtime = rt(); let actor = runtime.block_on(async { actor().await }); - bench.iter(|| { - runtime.block_on(actor_999_notify_1_send_and_wait(&actor)) - }) + bench.iter(|| runtime.block_on(actor_999_notify_1_send_and_wait(&actor))) } fn rt() -> Runtime { - tokio::runtime::Builder::new_multi_thread() - .build() - .unwrap() + tokio::runtime::Builder::new_multi_thread().build().unwrap() } async fn actor() -> LocalActorRef { @@ -73,5 +67,9 @@ async fn actor() -> LocalActorRef { .expect("unable to create actor") } -benchmark_group!(actor_messaging, actor_send_1000_benchmark, actor_notify_1000_benchmark); +benchmark_group!( + actor_messaging, + actor_send_1000_benchmark, + actor_notify_1000_benchmark +); benchmark_main!(actor_messaging); diff --git a/coerce/src/actor/context.rs b/coerce/src/actor/context.rs index 9d43eaaf..cb19a232 100644 --- a/coerce/src/actor/context.rs +++ b/coerce/src/actor/context.rs @@ -25,10 +25,6 @@ pub enum ActorStatus { Stopping, Stopped, } -// -// struct LastMessage { -// received_at: DateTime, -// } pub struct ActorContext { context_id: u64, @@ -210,6 +206,11 @@ impl ActorContext { supervised.add_child_ref(actor_ref) } + /// Spawns the supervised actor and waits for the actor to be started before returning + /// the LocalActorRef. + /// + /// Note: this waits for the actor to be spawned and the `Actor::started` hook to complete, if + /// the actor is persistent, this includes all time spent recovering. pub async fn spawn( &mut self, id: ActorId, @@ -229,6 +230,31 @@ impl ActorContext { supervised.spawn(id, actor, system, parent_ref).await } + /// Spawns the supervised actor but doesn't wait for the actor to be completely started before + /// completing, and returning the LocalActorRef. + /// + /// This can be helpful when you don't want the supervisor to be blocked until the child + /// has fully recovered. + pub fn spawn_deferred( + &mut self, + id: ActorId, + actor: A, + ) -> Result, ActorRefErr> { + let supervised = { + if self.supervised.is_none() { + self.supervised = + Some(Supervised::new(self.id().clone(), self.full_path().clone())); + } + + self.supervised.as_mut().unwrap() + }; + + let system = self.system.as_ref().unwrap().clone(); + let parent_ref = self.boxed_ref.clone(); + + supervised.spawn_deferred(id, actor, system, parent_ref) + } + pub fn supervised_count(&self) -> usize { self.supervised.as_ref().map_or(0, |s| s.count()) } diff --git a/coerce/src/actor/lifecycle.rs b/coerce/src/actor/lifecycle.rs index d1760e89..032cc4fd 100644 --- a/coerce/src/actor/lifecycle.rs +++ b/coerce/src/actor/lifecycle.rs @@ -60,11 +60,7 @@ impl ActorLoop { .new_context(system.clone(), Starting, actor_ref.clone().into()) .with_parent(parent_ref); - let system_id = actor_ref - .system_id() - .map_or("system-creation".to_string(), |s| s.to_string()); - - trace!("[{}] starting on system: {}", ctx.full_path(), system_id); + trace!("[{}] starting", ctx.full_path()); actor.started(&mut ctx).await; ActorMetrics::incr_actor_created(A::type_name()); diff --git a/coerce/src/actor/mod.rs b/coerce/src/actor/mod.rs index 0d839c29..0516d148 100644 --- a/coerce/src/actor/mod.rs +++ b/coerce/src/actor/mod.rs @@ -149,6 +149,7 @@ use crate::actor::supervised::Terminated; use crate::actor::system::ActorSystem; use std::any::Any; use std::fmt::{Debug, Display, Formatter}; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::ops::Deref; use std::sync::Arc; @@ -603,16 +604,29 @@ pub struct LocalActorRef { pub struct LocalActorRefInner { pub id: ActorId, path: ActorPath, - system_id: Option, sender: UnboundedSender>, } +impl Hash for LocalActorRef { + fn hash(&self, state: &mut H) { + self.id.hash(state); + self.path.hash(state); + } +} + +impl PartialEq for LocalActorRef { + fn eq(&self, other: &Self) -> bool { + self.path == other.path && self.id == other.id + } +} + +impl Eq for LocalActorRef {} + impl Debug for LocalActorRef { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct(&format!("LocalActorRef<{}>", A::type_name())) .field("path", &self.inner.path) .field("actor_id", &self.inner.id) - .field("system_id", &self.inner.system_id) .finish() } } @@ -706,12 +720,7 @@ impl LocalActorRef { path: ActorPath, ) -> Self { Self { - inner: Arc::new(LocalActorRefInner { - id, - path, - system_id, - sender, - }), + inner: Arc::new(LocalActorRefInner { id, path, sender }), } } @@ -866,11 +875,6 @@ impl LocalActorRef { pub fn notify_stop(&self) -> Result<(), ActorRefErr> { self.notify(Stop(None)) } - - /// Returns a reference to the ID of the `ActorSystem` was created in - pub fn system_id(&self) -> Option<&Uuid> { - self.inner.system_id.as_ref() - } } #[async_trait] diff --git a/coerce/src/actor/supervised.rs b/coerce/src/actor/supervised.rs index 83f836ef..9117fc15 100644 --- a/coerce/src/actor/supervised.rs +++ b/coerce/src/actor/supervised.rs @@ -91,6 +91,33 @@ impl Supervised { } } + pub fn spawn_deferred( + &mut self, + id: ActorId, + actor: A, + system: ActorSystem, + parent_ref: BoxedActorRef, + ) -> Result, ActorRefErr> { + if let Some(_) = self.children.get(&id) { + return Err(ActorRefErr::AlreadyExists(id)); + } + + let actor_ref = start_actor( + actor, + id.clone(), + ActorType::Anonymous, + None, + Some(system), + Some(parent_ref), + self.path.clone(), + ); + + self.children + .insert(id.clone(), ChildRef::spawned(actor_ref.clone().into())); + + Ok(actor_ref) + } + pub fn count(&self) -> usize { self.children.len() } diff --git a/coerce/src/persistent/actor.rs b/coerce/src/persistent/actor.rs index b14b96f5..aa46d5eb 100644 --- a/coerce/src/persistent/actor.rs +++ b/coerce/src/persistent/actor.rs @@ -7,8 +7,11 @@ use crate::persistent::failure::{should_retry, PersistFailurePolicy, RecoveryFai use crate::persistent::journal::snapshot::Snapshot; use crate::persistent::journal::types::JournalTypes; use crate::persistent::journal::{PersistErr, RecoveryErr}; -use crate::persistent::recovery::{ActorRecovery, RecoveryResult}; +use crate::persistent::recovery::{ActorRecovery, Recovery}; +use crate::persistent::batch::EventBatch; +use crate::persistent::storage::JournalEntry; +use crate::persistent::ReadMessages; use std::sync::Arc; #[async_trait] @@ -55,6 +58,28 @@ pub trait PersistentActor: 'static + Sized + Send + Sync { } } + async fn persist_batch( + &self, + batch: EventBatch, + ctx: &mut ActorContext, + ) -> Result<(), PersistErr> { + if batch.entries().is_empty() { + return Ok(()); + } + let mut attempts = 1; + loop { + let result = ctx + .persistence_mut() + .journal_mut::() + .persist_batch(&batch) + .await; + + if let Some(res) = check(result, &mut attempts, self, ctx).await { + return res; + } + } + } + async fn snapshot( &self, snapshot: S, @@ -81,18 +106,39 @@ pub trait PersistentActor: 'static + Sized + Send + Sync { } } } + Err(e) => return Err(PersistErr::Serialisation(e)), } } - async fn recover_journal( - &mut self, - persistence_key: String, - ctx: &mut ActorContext, - ) -> RecoveryResult { + async fn recover(&mut self, persistence_key: String, ctx: &mut ActorContext) -> Recovery { ActorRecovery::recover_journal(self, Some(persistence_key), ctx).await } + fn last_sequence_id(&self, ctx: &mut ActorContext) -> i64 { + ctx.persistence_mut() + .journal_mut::() + .last_sequence_id() + } + + async fn read_messages<'a>( + &self, + args: ReadMessages<'a>, + ctx: &mut ActorContext, + ) -> anyhow::Result>> { + ctx.persistence_mut() + .journal_mut::() + .read_messages(args) + .await + } + + async fn clear_old_messages(&mut self, ctx: &mut ActorContext) -> bool { + ctx.persistence_mut() + .journal_mut::() + .clear_old_messages() + .await + } + fn recovery_failure_policy(&self) -> RecoveryFailurePolicy { RecoveryFailurePolicy::default() } @@ -101,6 +147,10 @@ pub trait PersistentActor: 'static + Sized + Send + Sync { PersistFailurePolicy::default() } + fn event_batch(&self, ctx: &ActorContext) -> EventBatch { + EventBatch::create(ctx) + } + async fn on_recovery_err(&mut self, _err: RecoveryErr, _ctx: &mut ActorContext) {} async fn on_recovery_failed(&mut self, _ctx: &mut ActorContext) {} @@ -179,14 +229,32 @@ where async fn started(&mut self, ctx: &mut ActorContext) { trace!("persistent actor starting, loading journal"); + self.pre_recovery(ctx).await; let persistence_key = self.persistence_key(ctx); let (snapshot, messages) = { - let journal = self.recover_journal(persistence_key.clone(), ctx).await; + let journal = self.recover(persistence_key.clone(), ctx).await; match journal { - RecoveryResult::Recovered(journal) => (journal.snapshot, journal.messages), - RecoveryResult::Failed => { + Recovery::Recovered(journal) => { + trace!( + "persistent actor ({}) recovered {} snapshot(s) and {} message(s)", + &persistence_key, + if journal.snapshot.is_some() { 1 } else { 0 }, + journal.messages.as_ref().map_or(0, |m| m.len()), + ); + + (journal.snapshot, journal.messages) + } + Recovery::Disabled => { + trace!( + "persistent actor ({}) disabled message recovery", + &persistence_key, + ); + + (None, None) + } + Recovery::Failed => { trace!("recovery failed, ctx_status={:?}", ctx.get_status()); self.on_recovery_failed(ctx).await; return; @@ -194,13 +262,6 @@ where } }; - trace!( - "persistent actor ({}) recovered {} snapshot(s) and {} message(s)", - &persistence_key, - if snapshot.is_some() { 1 } else { 0 }, - messages.as_ref().map_or(0, |m| m.len()), - ); - if let Some(snapshot) = snapshot { if let Err(e) = snapshot.recover(self, ctx).await { error!("Error while attempting to recover from a snapshot, error={error}, actor_id={actor_id}, persistence_key={persistence_key}", diff --git a/coerce/src/persistent/batch.rs b/coerce/src/persistent/batch.rs new file mode 100644 index 00000000..44fe0bff --- /dev/null +++ b/coerce/src/persistent/batch.rs @@ -0,0 +1,49 @@ +use crate::actor::context::ActorContext; +use crate::actor::message::Message; +use crate::actor::Actor; +use crate::persistent::storage::JournalEntry; +use crate::persistent::types::JournalTypes; +use crate::persistent::{PersistentActor, Recover}; +use std::sync::Arc; + +#[derive(Clone)] +pub struct EventBatch { + journal_types: Arc>, + entries: Vec, +} + +#[derive(Clone)] +pub struct BatchedEntry { + pub payload_type: Arc, + pub bytes: Arc>, +} + +impl EventBatch { + pub fn create(ctx: &ActorContext) -> Self { + let journal = ctx.persistence().journal::(); + let journal_types = journal.get_types(); + + Self { + journal_types, + entries: vec![], + } + } + + pub fn message(&mut self, message: M) -> &mut Self + where + A: Recover, + { + let payload_type = self.journal_types.message_type_mapping::().unwrap(); + let entry = BatchedEntry { + payload_type, + bytes: message.as_bytes().unwrap().into(), + }; + + self.entries.push(entry); + self + } + + pub fn entries(&self) -> &Vec { + &self.entries + } +} diff --git a/coerce/src/persistent/journal/mod.rs b/coerce/src/persistent/journal/mod.rs index 1a1eaefe..9269d58a 100644 --- a/coerce/src/persistent/journal/mod.rs +++ b/coerce/src/persistent/journal/mod.rs @@ -12,9 +12,11 @@ use crate::persistent::{PersistentActor, Recover, RecoverSnapshot}; use crate::actor::metrics::ActorMetrics; use crate::actor::Actor; +use crate::persistent::batch::EventBatch; use std::error::Error; use std::fmt::{Display, Formatter}; use std::marker::PhantomData; +use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -23,6 +25,7 @@ pub mod proto; pub struct Journal { persistence_id: String, last_sequence_id: i64, + last_snapshot_sequence_id: Option, storage: JournalStorageRef, types: Arc>, } @@ -30,14 +33,24 @@ pub struct Journal { impl Journal { pub fn new(persistence_id: String, storage: JournalStorageRef) -> Self { let last_sequence_id = 0; + let last_snapshot_sequence_id = None; let types = init_journal_types::(); Self { persistence_id, last_sequence_id, + last_snapshot_sequence_id, storage, types, } } + + pub fn get_types(&self) -> Arc> { + self.types.clone() + } + + pub fn last_sequence_id(&self) -> i64 { + self.last_sequence_id + } } type RecoveryHandlerRef = Arc>; @@ -162,7 +175,82 @@ impl Error for PersistErr {} type BytesRef = Arc>; +#[derive(Clone, Debug)] +pub struct ReadMessages<'a> { + pub persistence_id: Option<&'a str>, + pub read: Read, +} + +impl<'a> ReadMessages<'a> { + pub fn message(persistence_id: Option<&'a str>, sequence_id: i64) -> Self { + Self { + persistence_id, + read: Read::Message { sequence_id }, + } + } + + pub fn range(persistence_id: Option<&'a str>, range: Range) -> Self { + Self { + persistence_id, + read: Read::Range(range), + } + } +} + +#[derive(Clone, Debug)] +pub enum Read { + Message { sequence_id: i64 }, + Range(Range), +} + impl Journal { + pub async fn persist_batch(&mut self, batch: &EventBatch) -> Result<(), PersistErr> { + let mut sequence_id = self.last_sequence_id; + let batch = batch + .entries() + .iter() + .map(|e| { + sequence_id += 1; + JournalEntry { + sequence: sequence_id, + payload_type: e.payload_type.clone(), + bytes: e.bytes.clone(), + } + }) + .collect(); + + let res = self + .storage + .write_message_batch(&self.persistence_id, batch) + .await + .map_err(|e| PersistErr::Storage(e)); + + if res.is_ok() { + self.last_sequence_id = sequence_id; + } + + res + } + + pub async fn read_messages( + &mut self, + args: ReadMessages<'_>, + ) -> anyhow::Result>> { + let persistence_id = args.persistence_id.unwrap_or(self.persistence_id.as_ref()); + match args.read { + Read::Message { sequence_id } => Ok(self + .storage + .read_message(persistence_id, sequence_id) + .await? + .map(|m| vec![m])), + + Read::Range(range) => Ok(self + .storage + .read_messages(persistence_id, range.start, range.end) + .await?), + } + } + pub async fn persist_message(&mut self, bytes: BytesRef) -> Result<(), PersistErr> where A: Recover, @@ -178,13 +266,11 @@ impl Journal { .message_type_mapping::() .expect("message type not configured"); - let sequence = self.last_sequence_id + 1; - self.storage .write_message( &self.persistence_id, JournalEntry { - sequence, + sequence: self.last_sequence_id, payload_type: payload_type.clone(), bytes, }, @@ -197,7 +283,7 @@ impl Journal { M::type_name() ); - self.last_sequence_id = sequence; + self.last_sequence_id += 1; Ok(()) } @@ -229,6 +315,7 @@ impl Journal { .await?; self.last_sequence_id = sequence; + self.last_snapshot_sequence_id = Some(sequence); Ok(()) } @@ -241,13 +328,14 @@ impl Journal { let handler = self .types .recoverable_snapshots() - .get(&raw_snapshot.payload_type); + .get(raw_snapshot.payload_type.as_ref()); let sequence = raw_snapshot.sequence; let bytes = Arc::try_unwrap(raw_snapshot.bytes).map_or_else(|e| e.as_ref().clone(), |s| s); self.last_sequence_id = sequence; + self.last_snapshot_sequence_id = Some(sequence); debug!( "snapshot recovered (persistence_id={}), last sequence={}, type={}", @@ -280,7 +368,11 @@ impl Journal { for entry in messages { self.last_sequence_id = entry.sequence; - if let Some(handler) = self.types.recoverable_messages().get(&entry.payload_type) { + if let Some(handler) = self + .types + .recoverable_messages() + .get(entry.payload_type.as_ref()) + { trace!( "message recovered (persistence_id={}), sequence={}, starting_sequence={} type={}", &self.persistence_id, @@ -318,10 +410,20 @@ impl Journal { } } - pub async fn delete_messages(&mut self) -> bool { - let delete_result = self.storage.delete_all(&self.persistence_id).await; + pub async fn clear(&mut self) -> bool { + self.storage.delete_all(&self.persistence_id).await.is_ok() + } - delete_result.is_ok() + pub async fn clear_old_messages(&mut self) -> bool { + if let Some(snapshot_sequence_id) = self.last_snapshot_sequence_id { + self.storage + .delete_messages_to(&self.persistence_id, snapshot_sequence_id) + .await + .is_ok() + } else { + // no messages to delete + false + } } } @@ -347,7 +449,7 @@ where let start = Instant::now(); - actor.recover(message, ctx).await; + Recover::recover(actor, message, ctx).await; let message_processing_took = start.elapsed(); ActorMetrics::incr_messages_processed( @@ -373,18 +475,19 @@ where bytes: Vec, ctx: &mut ActorContext, ) -> Result<(), RecoveryErr> { - actor - .recover( - S::from_remote_envelope(bytes).map_err(|error| { - RecoveryErr::SnapshotDeserialisation { - error, - snapshot_type: S::type_name(), - actor_type: A::type_name(), - } - })?, - ctx, - ) - .await; + RecoverSnapshot::recover( + actor, + S::from_remote_envelope(bytes).map_err(|error| { + RecoveryErr::SnapshotDeserialisation { + error, + snapshot_type: S::type_name(), + actor_type: A::type_name(), + } + })?, + ctx, + ) + .await; + Ok(()) } } diff --git a/coerce/src/persistent/journal/provider.rs b/coerce/src/persistent/journal/provider.rs index ded53460..8d2de5f8 100644 --- a/coerce/src/persistent/journal/provider.rs +++ b/coerce/src/persistent/journal/provider.rs @@ -11,7 +11,9 @@ pub mod inmemory { use crate::persistent::journal::provider::StorageProvider; use crate::persistent::journal::storage::{JournalEntry, JournalStorage, JournalStorageRef}; use parking_lot::RwLock; + use std::collections::hash_map::Entry; use std::collections::HashMap; + use std::mem; use std::sync::Arc; #[derive(Debug)] @@ -34,6 +36,13 @@ pub mod inmemory { messages: vec![entry], } } + + pub fn from_messages(messages: Vec) -> ActorJournal { + ActorJournal { + snapshots: vec![], + messages, + } + } } #[derive(Default)] @@ -96,12 +105,30 @@ pub mod inmemory { Ok(()) } + async fn write_message_batch( + &self, + persistence_id: &str, + entries: Vec, + ) -> anyhow::Result<()> { + let mut store = self.store.write(); + if let Some(journal) = store.get_mut(persistence_id) { + let mut entries = entries; + journal.messages.append(&mut entries); + } else { + store.insert( + persistence_id.to_string(), + ActorJournal::from_messages(entries), + ); + } + + Ok(()) + } + async fn read_latest_snapshot( &self, persistence_id: &str, ) -> anyhow::Result> { let store = self.store.read(); - Ok(store .get(persistence_id) .and_then(|j| j.snapshots.last().cloned())) @@ -143,6 +170,128 @@ pub mod inmemory { })) } + async fn read_message( + &self, + persistence_id: &str, + sequence_id: i64, + ) -> anyhow::Result> { + let mut store = self.store.read(); + let journal = store.get(persistence_id); + match journal { + None => Ok(None), + Some(journal) => Ok(journal + .messages + .iter() + .find(|n| n.sequence == sequence_id) + .cloned()), + } + } + + async fn read_messages( + &self, + persistence_id: &str, + from_sequence: i64, + to_sequence: i64, + ) -> anyhow::Result>> { + let mut store = self.store.read(); + let journal = store.get(persistence_id); + match journal { + None => Ok(None), + Some(journal) => { + if journal.messages.is_empty() { + Ok(None) + } else { + let first_seq = journal.messages.first().map(|m| m.sequence).unwrap(); + let final_seq = journal.messages.last().map(|m| m.sequence).unwrap(); + + if to_sequence >= final_seq { + if first_seq >= from_sequence { + Ok(Some(journal.messages.clone())) + } else { + let starting_message = journal + .messages + .iter() + .enumerate() + .find(|(_index, j)| j.sequence > from_sequence) + .map(|(index, _j)| index); + + if let Some(starting_index) = starting_message { + Ok(Some( + journal.messages[starting_index..] + .iter() + .cloned() + .collect(), + )) + } else { + Ok(Some(vec![])) + } + } + } else if first_seq >= from_sequence { + let end_message = journal + .messages + .iter() + .enumerate() + .find(|(_index, j)| j.sequence > from_sequence) + .map(|(index, _j)| index); + + if let Some(end_index) = end_message { + Ok(Some( + journal.messages[..end_index].iter().cloned().collect(), + )) + } else { + Ok(Some(vec![])) + } + } else { + panic!("todo: this") + } + } + } + } + } + + async fn delete_messages_to( + &self, + persistence_id: &str, + to_sequence: i64, + ) -> anyhow::Result<()> { + let mut store = self.store.write(); + let mut journal = store.entry(persistence_id.to_string()); + if let Entry::Occupied(mut journal) = journal { + let journal = journal.get_mut(); + + fn get_messages_to( + to_sequence: i64, + journal: &mut ActorJournal, + ) -> Vec { + let starting_message = journal + .messages + .iter() + .enumerate() + .find(|(_index, j)| j.sequence >= to_sequence) + .map(|(index, _j)| index); + + starting_message.map_or_else(|| vec![], |m| journal.messages.split_off(m)) + } + + let messages = if let Some(newest_msg) = journal.messages.last() { + if newest_msg.sequence < to_sequence { + vec![] + } else { + get_messages_to(to_sequence, journal) + } + } else { + get_messages_to(to_sequence, journal) + }; + + *journal = ActorJournal { + snapshots: mem::take(&mut journal.snapshots), + messages, + }; + } + + Ok(()) + } + async fn delete_all(&self, persistence_id: &str) -> anyhow::Result<()> { let mut store = self.store.write(); store.remove(persistence_id); diff --git a/coerce/src/persistent/journal/storage.rs b/coerce/src/persistent/journal/storage.rs index 34935039..98fdcec6 100644 --- a/coerce/src/persistent/journal/storage.rs +++ b/coerce/src/persistent/journal/storage.rs @@ -6,24 +6,50 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct JournalEntry { pub sequence: i64, - pub payload_type: String, + pub payload_type: Arc, pub bytes: Arc>, } #[async_trait] pub trait JournalStorage: Send + Sync { + // TODO: add the ability to limit the maximum size of snapshots, + // loading these back could cause an unexpected OOM async fn write_snapshot(&self, persistence_id: &str, entry: JournalEntry) -> Result<()>; async fn write_message(&self, persistence_id: &str, entry: JournalEntry) -> Result<()>; + async fn write_message_batch( + &self, + persistence_id: &str, + entries: Vec, + ) -> Result<()>; + async fn read_latest_snapshot(&self, persistence_id: &str) -> Result>; + // TODO: add the ability to stream the messages, rather than load all up front, + // if the actor has a very large journal, this could cause an unexpected OOM. + // payload size limits should also be applied. async fn read_latest_messages( &self, persistence_id: &str, from_sequence: i64, ) -> Result>>; + async fn read_message( + &self, + persistence_id: &str, + sequence_id: i64, + ) -> Result>; + + async fn read_messages( + &self, + persistence_id: &str, + from_sequence: i64, + to_sequence: i64, + ) -> Result>>; + + async fn delete_messages_to(&self, persistence_id: &str, to_sequence: i64) -> Result<()>; + async fn delete_all(&self, persistence_id: &str) -> Result<()>; } @@ -35,7 +61,7 @@ impl JournalEntry { if let Ok(journal_entry) = journal_entry { Some(JournalEntry { sequence: journal_entry.sequence, - payload_type: journal_entry.payload_type, + payload_type: journal_entry.payload_type.into(), bytes: Arc::new(journal_entry.bytes), }) } else { @@ -47,7 +73,7 @@ impl JournalEntry { let journal_entry = self; let proto = ProtoJournalEntry { sequence: journal_entry.sequence, - payload_type: journal_entry.payload_type.clone(), + payload_type: journal_entry.payload_type.to_string(), bytes: journal_entry.bytes.as_ref().clone(), ..Default::default() }; diff --git a/coerce/src/persistent/journal/types.rs b/coerce/src/persistent/journal/types.rs index 5af043e9..1bd45f98 100644 --- a/coerce/src/persistent/journal/types.rs +++ b/coerce/src/persistent/journal/types.rs @@ -15,8 +15,8 @@ lazy_static! { } pub struct JournalTypes { - message_type_map: HashMap, - snapshot_type_map: HashMap, + message_type_map: HashMap>, + snapshot_type_map: HashMap>, recoverable_messages: HashMap>, recoverable_snapshots: HashMap>, } @@ -47,7 +47,7 @@ impl JournalTypes { ); self.message_type_map - .insert(TypeId::of::(), identifier.to_string()); + .insert(TypeId::of::(), identifier.into()); self } @@ -62,16 +62,16 @@ impl JournalTypes { ); self.snapshot_type_map - .insert(TypeId::of::(), identifier.to_string()); + .insert(TypeId::of::(), identifier.into()); self } - pub fn snapshot_type_mapping(&self) -> Option { + pub fn snapshot_type_mapping(&self) -> Option> { self.snapshot_type_map.get(&TypeId::of::()).cloned() } - pub fn message_type_mapping(&self) -> Option { + pub fn message_type_mapping(&self) -> Option> { self.message_type_map.get(&TypeId::of::()).cloned() } diff --git a/coerce/src/persistent/mod.rs b/coerce/src/persistent/mod.rs index 65e10c77..dba57c7d 100644 --- a/coerce/src/persistent/mod.rs +++ b/coerce/src/persistent/mod.rs @@ -1,6 +1,7 @@ //! Coerce Actor Persistence pub mod actor; +pub mod batch; pub mod context; pub mod failure; pub mod inspect; @@ -9,6 +10,7 @@ pub mod recovery; pub use actor::*; pub use failure::*; +pub use journal::*; pub use recovery::*; use std::any::TypeId; @@ -23,6 +25,12 @@ pub struct Persistence { actor_type_specific_providers: HashMap, } +impl From for Persistence { + fn from(value: S) -> Self { + Persistence::from(value) + } +} + impl Persistence { pub fn from(provider: S) -> Persistence { let default_provider = Arc::new(provider); diff --git a/coerce/src/persistent/recovery.rs b/coerce/src/persistent/recovery.rs index 3b11e05a..e8fb9261 100644 --- a/coerce/src/persistent/recovery.rs +++ b/coerce/src/persistent/recovery.rs @@ -9,11 +9,12 @@ pub trait ActorRecovery: PersistentActor { &mut self, persistence_key: Option, ctx: &mut ActorContext, - ) -> RecoveryResult; + ) -> Recovery; } -pub enum RecoveryResult { +pub enum Recovery { Recovered(RecoveredJournal), + Disabled, Failed, } @@ -23,7 +24,7 @@ impl ActorRecovery for A { &mut self, persistence_key: Option, ctx: &mut ActorContext, - ) -> RecoveryResult { + ) -> Recovery { let mut journal = None; let mut attempts = 1; @@ -53,12 +54,12 @@ impl ActorRecovery for A { match policy { RecoveryFailurePolicy::StopActor => { ctx.stop(None); - return RecoveryResult::Failed; + return Recovery::Failed; } RecoveryFailurePolicy::Retry(retry_policy) => { if !should_retry(ctx, &attempts, retry_policy).await { - return RecoveryResult::Failed; + return Recovery::Failed; } } @@ -71,7 +72,7 @@ impl ActorRecovery for A { } let journal = journal.expect("no journal loaded"); - RecoveryResult::Recovered(journal) + Recovery::Recovered(journal) } } diff --git a/coerce/src/remote/net/client/connect.rs b/coerce/src/remote/net/client/connect.rs index f446bb0f..d6fe93ae 100644 --- a/coerce/src/remote/net/client/connect.rs +++ b/coerce/src/remote/net/client/connect.rs @@ -12,17 +12,17 @@ use crate::remote::net::client::{ BeginHandshake, ClientState, ConnectionState, HandshakeAckCallback, HandshakeStatus, RemoteClient, }; -use crate::remote::net::codec::NetworkCodec; use crate::remote::net::message::SessionEvent; use crate::remote::net::proto::network::{self as proto, IdentifyEvent}; use crate::remote::net::{receive_loop, StreamData}; +use bytes::Bytes; use protobuf::EnumOrUnknown; use std::time::Duration; use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; -use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use valuable::Valuable; pub struct Connect; @@ -49,8 +49,9 @@ impl RemoteClient { let stream = stream.unwrap(); let (read, writer) = tokio::io::split(stream); - let reader = FramedRead::new(read, NetworkCodec); - let mut write = FramedWrite::new(writer, NetworkCodec); + let codec = LengthDelimitedCodec::new(); + let reader = FramedRead::new(read, codec.clone()); + let mut write = FramedWrite::new(writer, codec.clone()); let (identity_tx, identity_rx) = oneshot::channel(); @@ -67,7 +68,7 @@ impl RemoteClient { ..Default::default() }); - match write_bytes(&identify.write_to_bytes().unwrap(), &mut write).await { + match write_bytes(Bytes::from(identify.write_to_bytes().unwrap()), &mut write).await { Ok(_) => {} Err(e) => { error!( @@ -205,22 +206,23 @@ impl Handler for RemoteClient { ); write_bytes( - SessionEvent::Handshake(proto::SessionHandshake { - node_id, - node_tag, - token: vec![], - client_type: EnumOrUnknown::new(self.client_type.into()), - trace_id: message.request_id.to_string(), - nodes: message - .seed_nodes - .into_iter() - .map(|node| node.into()) - .collect(), - ..proto::SessionHandshake::default() - }) - .write_to_bytes() - .unwrap() - .as_ref(), + Bytes::from( + SessionEvent::Handshake(proto::SessionHandshake { + node_id, + node_tag, + token: vec![], + client_type: EnumOrUnknown::new(self.client_type.into()), + trace_id: message.request_id.to_string(), + nodes: message + .seed_nodes + .into_iter() + .map(|node| node.into()) + .collect(), + ..proto::SessionHandshake::default() + }) + .write_to_bytes() + .unwrap(), + ), &mut connection.write, ) .await diff --git a/coerce/src/remote/net/client/mod.rs b/coerce/src/remote/net/client/mod.rs index e3947f0d..926b4ddf 100644 --- a/coerce/src/remote/net/client/mod.rs +++ b/coerce/src/remote/net/client/mod.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::SinkExt; use std::collections::VecDeque; @@ -8,7 +9,7 @@ use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::task::JoinHandle; -use tokio_util::codec::FramedWrite; +use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; use uuid::Uuid; use crate::actor::context::ActorContext; @@ -20,7 +21,6 @@ use crate::remote::cluster::node::{NodeIdentity, RemoteNode}; use crate::remote::net::client::connect::Connect; use crate::remote::net::client::receive::HandshakeAcknowledge; use crate::remote::net::client::send::write_bytes; -use crate::remote::net::codec::NetworkCodec; use crate::remote::net::message::SessionEvent; use crate::remote::net::proto::network as proto; use crate::remote::net::proto::network::PingEvent; @@ -221,7 +221,7 @@ impl Actor for RemoteClient { }); let _ = write_bytes( - &ping_event.write_to_bytes().unwrap(), + Bytes::from(ping_event.write_to_bytes().unwrap()), &mut connection.write, ) .await; @@ -256,7 +256,7 @@ pub enum ClientState { pub struct ConnectionState { identity: NodeIdentity, handshake: HandshakeStatus, - write: FramedWrite, NetworkCodec>, + write: FramedWrite, LengthDelimitedCodec>, receive_task: JoinHandle<()>, } diff --git a/coerce/src/remote/net/client/send.rs b/coerce/src/remote/net/client/send.rs index 2a8d2388..92770582 100644 --- a/coerce/src/remote/net/client/send.rs +++ b/coerce/src/remote/net/client/send.rs @@ -2,12 +2,12 @@ use crate::actor::context::ActorContext; use crate::actor::message::{Handler, Message}; use crate::remote::net::client::connect::Disconnected; use crate::remote::net::client::{ClientState, ConnectionState, RemoteClient, RemoteClientErr}; -use crate::remote::net::codec::NetworkCodec; use crate::remote::net::StreamData; +use bytes::{Bytes, BytesMut}; use futures::SinkExt; use tokio::io::WriteHalf; use tokio::net::TcpStream; -use tokio_util::codec::FramedWrite; +use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; pub struct Write(pub M); @@ -28,9 +28,9 @@ impl Handler> for RemoteClient { impl ConnectionState { pub async fn write(&mut self, bytes: Vec) -> Result<(), Option>> { - if let Err(e) = write_bytes(&bytes, &mut self.write).await { + if let Err(e) = write_bytes(Bytes::from(bytes), &mut self.write).await { match e { - RemoteClientErr::StreamErr(_e) => Err(Some(bytes)), + RemoteClientErr::StreamErr(_e) => Err(Some(vec![])), _ => Err(None), } } else { @@ -54,10 +54,11 @@ impl RemoteClient { while let Some(buffered_message) = self.write_buffer.pop_front() { let len = buffered_message.len(); - if let Ok(()) = write_bytes(&buffered_message, &mut connection_state.write).await { + let bytes = Bytes::from(buffered_message); + if let Ok(()) = write_bytes(bytes.clone(), &mut connection_state.write).await { self.write_buffer_bytes_total -= len; } else { - self.write_buffer.push_front(buffered_message); + self.write_buffer.push_front(bytes.to_vec()); // write failed, no point trying again - break and reconnect/retry later break; @@ -94,7 +95,8 @@ impl RemoteClient { } ClientState::Connected(state) => { - if let Err(e) = write_bytes(&bytes, &mut state.write).await { + let bytes = Bytes::from(bytes); + if let Err(e) = write_bytes(bytes.clone(), &mut state.write).await { match e { RemoteClientErr::StreamErr(_e) => { warn!("node {} (addr={}) is unreachable but marked as connected, buffering message (total_buffered={})", @@ -102,7 +104,7 @@ impl RemoteClient { &self.addr, self.write_buffer.len()); - buffer_message = Some(bytes); + buffer_message = Some(bytes.to_vec()); true } @@ -131,8 +133,8 @@ impl RemoteClient { } pub(crate) async fn write_bytes( - bytes: &Vec, - writer: &mut FramedWrite, NetworkCodec>, + bytes: Bytes, + writer: &mut FramedWrite, LengthDelimitedCodec>, ) -> Result<(), RemoteClientErr> { match writer.send(bytes).await { Ok(()) => Ok(()), diff --git a/coerce/src/remote/net/codec.rs b/coerce/src/remote/net/codec.rs deleted file mode 100644 index cbbb3006..00000000 --- a/coerce/src/remote/net/codec.rs +++ /dev/null @@ -1,51 +0,0 @@ -use bytes::{Buf, BufMut, BytesMut}; - -use crate::remote::net::metrics::NetworkMetrics; -use byteorder::{ByteOrder, LittleEndian}; -use std::io::Error; -use tokio_util::codec::{Decoder, Encoder}; - -pub struct NetworkCodec; - -// TODO: change the codec to use the `bytes` structs - -impl Encoder<&Vec> for NetworkCodec { - type Error = Error; - - fn encode(&mut self, item: &Vec, dst: &mut BytesMut) -> Result<(), Error> { - trace!("encoding msg"); - - let len = 4 + item.len(); - dst.reserve(len); - dst.put_i32_le(item.len() as i32); - dst.put_slice(item); - - NetworkMetrics::incr_bytes_sent(dst.len() as u64); - - Ok(()) - } -} - -impl Decoder for NetworkCodec { - type Item = Vec; - type Error = Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result>, Error> { - if src.is_empty() || src.remaining() <= 4 { - return Ok(None); - } - - trace!("decoding message"); - - NetworkMetrics::incr_bytes_received(src.len() as u64); - - let len = LittleEndian::read_i32(src.as_ref()) as usize; - if (src.remaining() - 4) < len { - return Ok(None); - } - - src.advance(4); - let buf = src.split_to(len); - Ok(Some(buf.to_vec())) - } -} diff --git a/coerce/src/remote/net/mod.rs b/coerce/src/remote/net/mod.rs index 803dae01..cf3459ea 100644 --- a/coerce/src/remote/net/mod.rs +++ b/coerce/src/remote/net/mod.rs @@ -3,15 +3,14 @@ use crate::remote::system::RemoteActorSystem; use std::future::Future; use std::io::Error; +use bytes::BytesMut; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::remote::net::codec::NetworkCodec; use futures::StreamExt; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; pub mod client; -pub mod codec; pub mod message; pub mod metrics; pub mod proto; @@ -42,13 +41,13 @@ pub trait StreamReceiver { } pub struct StreamReceiverFuture { - stream: FramedRead, + stream: FramedRead, stop_rx: tokio::sync::oneshot::Receiver, } impl StreamReceiverFuture { pub fn new( - stream: FramedRead, + stream: FramedRead, stop_rx: tokio::sync::oneshot::Receiver, ) -> StreamReceiverFuture { StreamReceiverFuture { stream, stop_rx } @@ -59,14 +58,14 @@ impl tokio_stream::Stream for StreamReceiverFuture where S: Unpin, { - type Item = Option>; + type Item = Option; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { if let Poll::Ready(Ok(true)) = Pin::new(&mut self.stop_rx).poll(cx) { return Poll::Ready(None); } - let result: Option, Error>> = + let result: Option> = futures::ready!(Pin::new(&mut self.stream).poll_next(cx)); Poll::Ready(match result { @@ -82,7 +81,7 @@ where pub async fn receive_loop( mut system: RemoteActorSystem, - read: FramedRead, + read: FramedRead, mut receiver: R, ) where R: Send, @@ -90,7 +89,7 @@ pub async fn receive_loop( let mut reader = read; while let Some(res) = reader.next().await { match res { - Ok(res) => match R::Message::read_from_bytes(res) { + Ok(res) => match R::Message::read_from_bytes(res.to_vec()) { Some(msg) => { receiver.on_receive(msg, &system).await; if receiver.should_close() { diff --git a/coerce/src/remote/net/server/session/mod.rs b/coerce/src/remote/net/server/session/mod.rs index 9a32fdf6..a724a617 100644 --- a/coerce/src/remote/net/server/session/mod.rs +++ b/coerce/src/remote/net/server/session/mod.rs @@ -5,7 +5,6 @@ use crate::remote::actor::message::NodeTerminated; use crate::remote::actor::RemoteResponse; use crate::remote::cluster::discovery::{Discover, Seed}; use crate::remote::cluster::node::{NodeAttributes, RemoteNode}; -use crate::remote::net::codec::NetworkCodec; use crate::remote::net::message::{ datetime_to_timestamp, timestamp_to_datetime, ClientEvent, SessionEvent, }; @@ -24,6 +23,7 @@ use futures::{SinkExt, StreamExt}; use protobuf::well_known_types::wrappers::UInt64Value; use protobuf::{Message as ProtoMessage, MessageField}; +use bytes::Bytes; use std::io::Error; use std::net::SocketAddr; use std::str::FromStr; @@ -31,7 +31,7 @@ use std::sync::Arc; use tokio::io::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; use tokio::sync::oneshot; -use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tokio_util::sync::CancellationToken; use uuid::Uuid; use valuable::Valuable; @@ -41,8 +41,8 @@ pub mod store; pub struct RemoteSession { id: Uuid, addr: SocketAddr, - write: FramedWrite, NetworkCodec>, - read: Option, NetworkCodec>>, + write: FramedWrite, LengthDelimitedCodec>, + read: Option, LengthDelimitedCodec>>, read_cancellation_token: Option, remote_server_config: RemoteServerConfigRef, } @@ -55,8 +55,8 @@ impl RemoteSession { remote_server_config: RemoteServerConfigRef, ) -> RemoteSession { let (read, write) = tokio::io::split(stream); - let read = Some(FramedRead::new(read, NetworkCodec)); - let write = FramedWrite::new(write, NetworkCodec); + let read = Some(FramedRead::new(read, LengthDelimitedCodec::new())); + let write = FramedWrite::new(write, LengthDelimitedCodec::new()); RemoteSession { id, addr, @@ -148,11 +148,11 @@ async fn validate_session_token( ctx: &mut ActorContext, log: LogContext, system: &RemoteActorSystem, - read: &mut FramedRead, NetworkCodec>, + read: &mut FramedRead, LengthDelimitedCodec>, ) -> bool { let bytes = read.next().await; if let Some(Ok(bytes)) = bytes { - match SessionEvent::read_from_bytes(bytes) { + match SessionEvent::read_from_bytes(bytes.to_vec()) { Some(SessionEvent::Identify(identify)) => { let token = identify.token; let token_valid = system @@ -208,7 +208,7 @@ impl RemoteSession { match message.write_to_bytes() { Some(msg) => { trace!("message encoded"); - if self.write.send(&msg).await.is_ok() { + if self.write.send(Bytes::from(msg)).await.is_ok() { trace!("message sent"); } else { error!("failed to send message"); diff --git a/coerce/src/remote/system/builder.rs b/coerce/src/remote/system/builder.rs index 5d32079c..c2070ad9 100644 --- a/coerce/src/remote/system/builder.rs +++ b/coerce/src/remote/system/builder.rs @@ -8,9 +8,7 @@ use crate::remote::actor::{ }; use crate::remote::handler::{RemoteActorHandler, RemoteActorMessageHandler}; use crate::remote::heartbeat::{Heartbeat, HeartbeatConfig}; - use crate::remote::stream::mediator::StreamMediator; - use crate::remote::system::{AtomicNodeId, NodeId, RemoteActorSystem, RemoteSystemCore}; use rand::RngCore; @@ -24,7 +22,7 @@ use crate::remote::cluster::discovery::NodeDiscovery; use crate::remote::cluster::node::NodeAttributes; use crate::remote::config::{RemoteSystemConfig, RemoteSystemSecurity}; -use crate::remote::net::security::jwt::Jwt; + use crate::remote::net::security::ClientAuth; use chrono::Utc; use uuid::Uuid; @@ -120,6 +118,8 @@ impl RemoteActorSystemBuilder { secret: S, token_ttl: Option, ) -> Self { + use crate::remote::net::security::jwt::Jwt; + self.client_auth = Some(ClientAuth::Jwt(Jwt::from_secret(secret, token_ttl))); self } diff --git a/coerce/tests/test_persistent_failures.rs b/coerce/tests/test_persistent_failures.rs index 525f0dbb..7864daf4 100644 --- a/coerce/tests/test_persistent_failures.rs +++ b/coerce/tests/test_persistent_failures.rs @@ -235,6 +235,14 @@ impl JournalStorage for MockPersistence { } } + async fn write_message_batch( + &self, + persistent_id: &str, + entries: Vec, + ) -> anyhow::Result<()> { + todo!() + } + async fn read_latest_snapshot( &self, _persistence_id: &str, @@ -260,6 +268,31 @@ impl JournalStorage for MockPersistence { } } + async fn read_message( + &self, + persistence_id: &str, + sequence_id: i64, + ) -> anyhow::Result> { + todo!() + } + + async fn read_messages( + &self, + persistence_id: &str, + from_sequence: i64, + to_sequence: i64, + ) -> anyhow::Result>> { + todo!() + } + + async fn delete_messages_to( + &self, + persistence_id: &str, + to_sequence: i64, + ) -> anyhow::Result<()> { + todo!() + } + async fn delete_all(&self, _persistence_id: &str) -> anyhow::Result<()> { Ok(()) } diff --git a/coerce/tests/test_persistent_recovery.rs b/coerce/tests/test_persistent_recovery.rs index ad970574..3bc82221 100644 --- a/coerce/tests/test_persistent_recovery.rs +++ b/coerce/tests/test_persistent_recovery.rs @@ -1,11 +1,13 @@ use coerce::actor::context::ActorContext; +use std::sync::Arc; use coerce::actor::message::Handler; use coerce::actor::system::ActorSystem; use coerce::actor::IntoActor; use coerce::persistent::journal::provider::inmemory::InMemoryStorageProvider; - +use coerce::persistent::journal::provider::StorageProvider; +use coerce::persistent::journal::storage::JournalEntry; use coerce::persistent::journal::types::JournalTypes; use coerce::persistent::{Persistence, PersistentActor, Recover, RecoverSnapshot}; use coerce_macros::{JsonMessage, JsonSnapshot}; @@ -73,6 +75,59 @@ impl Recover for TestActor { } } +#[tokio::test] +pub async fn test_in_memory_delete_messages() { + util::create_trace_logger(); + let storage = InMemoryStorageProvider::new(); + let journal = storage.journal_storage().unwrap(); + journal + .write_message( + "1", + JournalEntry { + sequence: 1, + payload_type: "hello".into(), + bytes: Arc::new(vec![]), + }, + ) + .await + .unwrap(); + + journal + .write_message( + "1", + JournalEntry { + sequence: 2, + payload_type: "hello".into(), + bytes: Arc::new(vec![]), + }, + ) + .await + .unwrap(); + + journal + .write_message( + "1", + JournalEntry { + sequence: 3, + payload_type: "hello".into(), + bytes: Arc::new(vec![]), + }, + ) + .await + .unwrap(); + + let messages = journal.read_latest_messages("1", 0).await.unwrap().unwrap(); + assert_eq!(messages.len(), 3); + + journal.delete_messages_to("1", 3).await.unwrap(); + let messages = journal.read_latest_messages("1", 0).await.unwrap().unwrap(); + assert_eq!(messages.len(), 1); + + journal.delete_messages_to("1", 4).await.unwrap(); + let messages = journal.read_latest_messages("1", 0).await.unwrap().unwrap(); + assert_eq!(messages.len(), 0); +} + #[tokio::test] pub async fn test_persistent_actor_message_recovery() { util::create_trace_logger(); diff --git a/providers/persistence/coerce-redis/Cargo.toml b/providers/persistence/coerce-redis/Cargo.toml index 20e94955..3277cbcb 100644 --- a/providers/persistence/coerce-redis/Cargo.toml +++ b/providers/persistence/coerce-redis/Cargo.toml @@ -17,8 +17,8 @@ cluster = [ [dependencies] coerce = { path = "../../../coerce", features = ["persistence"] } -async-trait = { version = "0.1" } -redis = { version = "0.22.1", features = ["tokio-comp"] } -tokio = { version = "1.21.1", features = ["full"] } +async-trait = { version = "0.1.64" } +redis = { version = "0.22.3", features = ["tokio-comp"] } +tokio = { version = "1.25.0", features = ["full"] } anyhow = "1" bytes = { version = "1.2.1" } \ No newline at end of file diff --git a/providers/persistence/coerce-redis/src/journal/actor.rs b/providers/persistence/coerce-redis/src/journal/actor.rs index d3815883..08351b8d 100644 --- a/providers/persistence/coerce-redis/src/journal/actor.rs +++ b/providers/persistence/coerce-redis/src/journal/actor.rs @@ -18,6 +18,16 @@ impl Message for Write { type Result = (); } +pub(crate) struct WriteBatch { + pub key: String, + pub entries: Vec, + pub result_channel: Sender>, +} + +impl Message for WriteBatch { + type Result = (); +} + pub(crate) struct ReadSnapshot(pub String, pub Sender>>); impl Message for ReadSnapshot { @@ -26,7 +36,8 @@ impl Message for ReadSnapshot { pub(crate) struct ReadMessages { pub key: String, - pub from_sequence: i64, + pub start_sequence: Option, + pub end_sequence: Option, pub result_channel: Sender>>>, } @@ -34,12 +45,33 @@ impl Message for ReadMessages { type Result = (); } +pub(crate) struct ReadMessage { + pub key: String, + pub sequence_id: i64, + pub result_channel: Sender>>, +} + +impl Message for ReadMessage { + type Result = (); +} + pub(crate) struct Delete(pub Vec); impl Message for Delete { type Result = anyhow::Result<()>; } +pub(crate) struct DeleteRange { + pub key: String, + pub start_sequence: i64, + pub end_sequence: i64, + pub result_channel: Sender>, +} + +impl Message for DeleteRange { + type Result = (); +} + impl Actor for RedisJournal where C: Clone {} #[async_trait] @@ -67,6 +99,59 @@ where } } +#[async_trait] +impl Handler for RedisJournal +where + C: Clone, +{ + async fn handle(&mut self, message: WriteBatch, _ctx: &mut ActorContext) { + let connection = self.0.clone(); + let _ = tokio::spawn(async move { + let mut connection = connection; + + let mut cmd = redis::cmd("ZADD"); + + cmd.arg(message.key); + + for entry in message.entries { + cmd.arg(entry.sequence) + .arg(entry.write_to_bytes().expect("serialized journal")); + } + + if let Err(e) = cmd.query_async::(&mut connection).await { + let _ = message.result_channel.send(Err(anyhow::Error::new(e))); + } else { + let _ = message.result_channel.send(Ok(())); + } + }); + } +} + +#[async_trait] +impl Handler for RedisJournal +where + C: Clone, +{ + async fn handle(&mut self, message: DeleteRange, _ctx: &mut ActorContext) { + let connection = self.0.clone(); + let _ = tokio::spawn(async move { + let mut connection = connection; + if let Err(e) = redis::cmd("ZREMRANGEBYSCORE") + .arg(message.key) + .arg(message.start_sequence) + .arg(message.end_sequence) + .query_async::(&mut connection) + .await + { + let err = anyhow::Error::new(e); + let _ = message.result_channel.send(Err(err)); + } else { + let _ = message.result_channel.send(Ok(())); + } + }); + } +} + #[async_trait] impl Handler for RedisJournal where @@ -102,6 +187,39 @@ where } } +#[async_trait] +impl Handler for RedisJournal +where + C: Clone, +{ + async fn handle(&mut self, message: ReadMessage, _ctx: &mut ActorContext) { + let connection = self.0.clone(); + let _ = tokio::spawn(async move { + let mut connection = connection; + + let data = redis::cmd("ZRANGE") + .arg(message.key) + .arg(message.sequence_id) + .arg(message.sequence_id) + .arg("BYSCORE") + .query_async::>>>(&mut connection) + .await; + + match data { + Ok(data) => { + let _ = message.result_channel.send(Ok( + data.and_then(|b| b.into_iter().next().and_then(read_journal_entry)) + )); + } + Err(err) => { + let err = anyhow::Error::new(err); + let _ = message.result_channel.send(Err(err)); + } + } + }); + } +} + #[async_trait] impl Handler for RedisJournal where @@ -112,10 +230,15 @@ where let _ = tokio::spawn(async move { let mut connection = connection; + let from_sequence = message.start_sequence.unwrap_or(0); + let end_sequence = message + .end_sequence + .map_or("+inf".to_string(), |s| format!("{}", s)); + let data = redis::cmd("ZRANGE") .arg(message.key) - .arg(message.from_sequence) - .arg("+inf") + .arg(from_sequence) + .arg(end_sequence) .arg("BYSCORE") .query_async::>>>(&mut connection) .await; diff --git a/providers/persistence/coerce-redis/src/journal/mod.rs b/providers/persistence/coerce-redis/src/journal/mod.rs index fe8145f5..3bd8cc7d 100644 --- a/providers/persistence/coerce-redis/src/journal/mod.rs +++ b/providers/persistence/coerce-redis/src/journal/mod.rs @@ -1,4 +1,6 @@ -use crate::journal::actor::{Delete, ReadMessages, ReadSnapshot, RedisJournal, Write}; +use crate::journal::actor::{ + Delete, DeleteRange, ReadMessage, ReadMessages, ReadSnapshot, RedisJournal, Write, WriteBatch, +}; use coerce::actor::system::ActorSystem; use coerce::actor::{IntoActor, LocalActorRef}; @@ -14,6 +16,7 @@ use tokio::sync::oneshot; pub(crate) mod actor; +#[derive(Clone)] pub struct RedisStorageProvider { redis: JournalStorageRef, } @@ -156,6 +159,23 @@ where rx.await? } + async fn write_message_batch( + &self, + persistence_id: &str, + entries: Vec, + ) -> anyhow::Result<()> { + let (result_channel, rx) = oneshot::channel(); + let key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); + + self.redis_journal.notify(WriteBatch { + key, + entries, + result_channel, + })?; + + rx.await? + } + async fn read_latest_snapshot( &self, persistence_id: &str, @@ -178,15 +198,69 @@ where let key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); self.redis_journal.notify(ReadMessages { key, - from_sequence, + start_sequence: Some(from_sequence), + end_sequence: None, result_channel, })?; + + rx.await? + } + + async fn read_message( + &self, + persistence_id: &str, + sequence_id: i64, + ) -> anyhow::Result> { + let (result_channel, rx) = oneshot::channel(); + let key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); + + self.redis_journal.notify(ReadMessage { + key, + sequence_id, + result_channel, + })?; + + rx.await? + } + + async fn read_messages( + &self, + persistence_id: &str, + from_sequence: i64, + to_sequence: i64, + ) -> anyhow::Result>> { + let (result_channel, rx) = oneshot::channel(); + let key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); + self.redis_journal.notify(ReadMessages { + key, + start_sequence: Some(from_sequence), + end_sequence: Some(to_sequence), + result_channel, + })?; + + rx.await? + } + + async fn delete_messages_to( + &self, + persistence_id: &str, + to_sequence: i64, + ) -> anyhow::Result<()> { + let (result_channel, rx) = oneshot::channel(); + let key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); + self.redis_journal.notify(DeleteRange { + key, + start_sequence: 0, + end_sequence: to_sequence, + result_channel, + })?; + rx.await? } async fn delete_all(&self, persistence_id: &str) -> anyhow::Result<()> { - let journal_key = get_redis_key(persistence_id, "journal", self.config.as_ref()); - let snapshot_key = get_redis_key(persistence_id, "snapshot", self.config.as_ref()); + let journal_key = (self.key_provider_fn)(persistence_id, "journal", self.config.as_ref()); + let snapshot_key = (self.key_provider_fn)(persistence_id, "snapshot", self.config.as_ref()); self.redis_journal .send(Delete(vec![journal_key, snapshot_key])) diff --git a/providers/persistence/coerce-redis/tests/test_redis_journal_storage.rs b/providers/persistence/coerce-redis/tests/test_redis_journal_storage.rs index a509f90e..4b05a506 100644 --- a/providers/persistence/coerce-redis/tests/test_redis_journal_storage.rs +++ b/providers/persistence/coerce-redis/tests/test_redis_journal_storage.rs @@ -1,111 +1,125 @@ use coerce::actor::system::ActorSystem; use coerce::persistent::journal::provider::StorageProvider; use coerce::persistent::journal::storage::JournalEntry; +use coerce::persistent::storage::JournalStorageRef; +use coerce::persistent::Persistence; use coerce_redis::journal::{RedisStorageConfig, RedisStorageProvider}; +const TEST_REDIS_HOST: &str = "redis://127.0.0.1:6379/"; + +struct RedisTestCtx { + system: ActorSystem, + storage: JournalStorageRef, +} + #[tokio::test] pub async fn test_redis_journal_read_write_snapshot() { let persistence_id = "hi"; - let system = ActorSystem::new(); - let provider = RedisStorageProvider::connect( - RedisStorageConfig { - nodes: vec!["redis://127.0.0.1:6379/".to_string()], - key_prefix: "test_redis_journal_read_write_snapshot:".to_string(), - cluster: false, - use_key_hashtags: false, - }, - &system, - ) - .await; + let ctx = new_test_context("test_redis_journal_read_write_snapshot:").await; + let redis = ctx.storage; + let entries = generate_entries(3); - let redis = provider.journal_storage().expect("redis journal storage"); + for entry in entries { + redis + .write_snapshot(persistence_id, entry) + .await + .expect("write snapshot"); + } - redis - .write_snapshot( - persistence_id, - JournalEntry { - sequence: 1, - payload_type: "test".to_string(), - bytes: vec![1, 3, 3, 7].into(), - }, - ) - .await - .expect("write snapshot"); + let latest_snapshot = redis.read_latest_snapshot(persistence_id).await; + + redis.delete_all(persistence_id).await.expect("delete all"); + + let latest_snapshot = latest_snapshot.expect("load latest snapshot").unwrap(); + assert_eq!(latest_snapshot.sequence, 3); +} +#[tokio::test] +pub async fn test_redis_journal_read_write_messages() { + let persistence_id = "hi"; + let ctx = new_test_context("test_redis_journal_read_write_messages:").await; + let redis = ctx.storage; + + let entries = generate_entries(2); + for entry in entries { + redis + .write_message(persistence_id, entry) + .await + .expect("write message"); + } + + let latest_messages = redis.read_latest_messages(persistence_id, 0).await; + + redis.delete_all(persistence_id).await.expect("delete all"); + + let latest_messages = latest_messages.unwrap().unwrap(); + assert_eq!(latest_messages.len(), 2); + assert_eq!(latest_messages[0].sequence, 1); + assert_eq!(latest_messages[1].sequence, 2); +} + +#[tokio::test] +pub async fn test_redis_journal_read_write_message_batch() { + let persistence_id = "hi"; + let ctx = new_test_context("test_redis_journal_read_write_message_batch:").await; + let redis = ctx.storage; + + let entries = generate_entries(5); redis - .write_snapshot( - persistence_id, - JournalEntry { - sequence: 2, - payload_type: "test".to_string(), - bytes: vec![1, 3, 3, 7].into(), - }, - ) + .write_message_batch(persistence_id, entries) .await - .expect("write snapshot"); + .expect("write message batch"); + + let latest_messages = redis.read_latest_messages(persistence_id, 0).await; + redis.delete_all(persistence_id).await.expect("delete all"); + + let latest_messages = latest_messages.unwrap().unwrap(); + assert_eq!(latest_messages.len(), 5); + assert_eq!(latest_messages[0].sequence, 1); + assert_eq!(latest_messages[1].sequence, 2); + assert_eq!(latest_messages[2].sequence, 3); + assert_eq!(latest_messages[3].sequence, 4); + assert_eq!(latest_messages[4].sequence, 5); +} + +#[tokio::test] +pub async fn test_redis_journal_read_single_message() { + let persistence_id = "hi"; + let ctx = new_test_context("test_redis_journal_read_single_message:").await; + let redis = ctx.storage; + + let entries = generate_entries(5); redis - .write_snapshot( - persistence_id, - JournalEntry { - sequence: 3, - payload_type: "test".to_string(), - bytes: vec![1, 3, 3, 7].into(), - }, - ) + .write_message_batch(persistence_id, entries) .await - .expect("write snapshot"); + .expect("write message batch"); - let latest_snapshot = redis.read_latest_snapshot(persistence_id).await; + let third_message = redis.read_message(persistence_id, 3).await; redis.delete_all(persistence_id).await.expect("delete all"); - let latest_snapshot = latest_snapshot.expect("load latest snapshot").unwrap(); - - assert_eq!(latest_snapshot.sequence, 3); + let third_message = third_message.unwrap().unwrap(); + assert_eq!(third_message.sequence, 3); } #[tokio::test] -pub async fn test_redis_journal_read_write_messages() { +pub async fn test_redis_journal_delete_messages_range() { let persistence_id = "hi"; - let system = ActorSystem::new(); - let provider = RedisStorageProvider::connect( - RedisStorageConfig { - nodes: vec!["redis://127.0.0.1:6379/".to_string()], - key_prefix: "test_redis_journal_read_write_messages:".to_string(), - cluster: false, - use_key_hashtags: false, - }, - &system, - ) - .await; - - let redis = provider.journal_storage().expect("redis journal storage"); + let ctx = new_test_context("test_redis_journal_delete_messages_range:").await; + let redis = ctx.storage; + let entries = generate_entries(5); redis - .write_message( - persistence_id, - JournalEntry { - sequence: 1, - payload_type: "test".to_string(), - bytes: vec![1, 3, 3, 7].into(), - }, - ) + .write_message_batch(persistence_id, entries) .await - .expect("write message 1"); + .expect("write message batch"); redis - .write_message( - persistence_id, - JournalEntry { - sequence: 2, - payload_type: "test".to_string(), - bytes: vec![1, 3, 3, 7].into(), - }, - ) + .delete_messages_to(persistence_id, 3) .await - .expect("write message 2"); + .expect("delete message batch"); let latest_messages = redis.read_latest_messages(persistence_id, 0).await; @@ -113,9 +127,33 @@ pub async fn test_redis_journal_read_write_messages() { let latest_messages = latest_messages.unwrap().unwrap(); assert_eq!(latest_messages.len(), 2); - assert_eq!(latest_messages[0].sequence, 1); - assert_eq!(latest_messages[1].sequence, 2); } -#[tokio::test] -pub async fn test_redis_journal_actor_integration() {} +async fn new_test_context(key_prefix: &str) -> RedisTestCtx { + let system = ActorSystem::new(); + let provider = RedisStorageProvider::connect( + RedisStorageConfig { + nodes: vec![TEST_REDIS_HOST.to_string()], + key_prefix: key_prefix.to_string(), + cluster: false, + use_key_hashtags: false, + }, + &system, + ) + .await; + + let storage = provider.journal_storage().expect("journal storage"); + system.to_persistent(provider.into()); + RedisTestCtx { system, storage } +} + +fn generate_entries(n: i32) -> Vec { + (1..n + 1) + .into_iter() + .map(|n| JournalEntry { + sequence: n as i64, + payload_type: "test".into(), + bytes: vec![1, 3, 3, 7].into(), + }) + .collect() +}