diff --git a/coerce/src/actor/mod.rs b/coerce/src/actor/mod.rs index b8fca51..bcd45f4 100644 --- a/coerce/src/actor/mod.rs +++ b/coerce/src/actor/mod.rs @@ -142,6 +142,7 @@ use crate::actor::message::{Handler, Message}; use crate::actor::scheduler::ActorType::{Anonymous, Tracked}; use crate::actor::system::ActorSystem; +pub use refs::*; use std::fmt::Debug; use std::hash::Hasher; use std::marker::PhantomData; @@ -149,7 +150,6 @@ use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; use uuid::Uuid; -pub use refs::*; #[cfg(feature = "remote")] use crate::actor::message::Envelope; diff --git a/modules/replication/coerce-replication/src/simple/consensus.rs b/modules/replication/coerce-replication/src/simple/consensus.rs index de9949f..28e96da 100644 --- a/modules/replication/coerce-replication/src/simple/consensus.rs +++ b/modules/replication/coerce-replication/src/simple/consensus.rs @@ -1 +1 @@ -pub struct Replicate {} +pub struct Replicate {} diff --git a/modules/replication/coerce-replication/src/simple/error.rs b/modules/replication/coerce-replication/src/simple/error.rs index 2a11285..708c93f 100644 --- a/modules/replication/coerce-replication/src/simple/error.rs +++ b/modules/replication/coerce-replication/src/simple/error.rs @@ -1,6 +1,7 @@ use crate::storage::StorageErr; use coerce::actor::message::{MessageUnwrapErr, MessageWrapErr}; use coerce::actor::ActorRefErr; +use coerce::remote::system::NodeId; #[derive(Debug)] pub enum Error { @@ -9,6 +10,7 @@ pub enum Error { ActorRef(ActorRefErr), Deserialisation(MessageUnwrapErr), Serialisation(MessageWrapErr), + LeaderChanged { new_leader_id: NodeId }, } impl From for Error { diff --git a/modules/replication/coerce-replication/src/simple/mod.rs b/modules/replication/coerce-replication/src/simple/mod.rs index 2d78f06..f7a4c03 100644 --- a/modules/replication/coerce-replication/src/simple/mod.rs +++ b/modules/replication/coerce-replication/src/simple/mod.rs @@ -15,7 +15,7 @@ pub mod write; use crate::simple::heartbeat::HeartbeatTick; use crate::simple::read::Read; -use crate::simple::write::Write; +use crate::simple::write::{Mutation, UncommittedMutation, Write}; use crate::storage::{Key, Storage, Value}; use coerce::actor::context::ActorContext; use coerce::actor::message::{Handler, Message}; @@ -25,9 +25,11 @@ use coerce::remote::cluster::group::{Node, NodeGroup, NodeGroupEvent, Subscribe} use coerce::remote::cluster::node::NodeSelector; use coerce::remote::system::{NodeId, RemoteActorSystem}; -use std::collections::{HashMap, VecDeque}; +use crate::simple::error::Error; +use std::collections::{HashMap, HashSet, VecDeque}; use std::mem; use std::time::Duration; +use tokio::sync::oneshot; pub enum Request { Read(Read), @@ -35,6 +37,8 @@ pub enum Request { } enum State { + None, + Joining { request_buffer: VecDeque>, }, @@ -45,16 +49,37 @@ enum State { Available { cluster: Cluster, - heartbeat_timer: Option, + pending_mutations: HashMap>, + }, + + Leader { + cluster: Cluster, + uncommitted_mutations: HashMap>, + heartbeat_timer: Timer, }, } +impl Default for State { + fn default() -> Self { + Self::None + } +} + struct Cluster { current_leader: NodeId, leader_actor: ActorRef>, nodes: HashMap>>, } +impl Cluster { + pub fn update_leader(&mut self, leader_id: NodeId) { + let leader_actor = self.nodes.get(&leader_id).unwrap().actor.clone(); + + self.current_leader = leader_id; + self.leader_actor = leader_actor; + } +} + pub struct Replicator { storage: S, group: LocalActorRef>, @@ -134,14 +159,8 @@ impl Handler>> for Replicator { NodeGroupEvent::MemberUp { leader_id, nodes } => { debug!(leader_id = leader_id, node_count = nodes.len(), "member up"); - let (mut leader_actor, heartbeat_timer) = if leader_id == self.system.node_id() { - let timer = start_heartbeat_timer::(ctx); - let actor_ref = ActorRef::from(self.actor_ref(ctx)); - - (Some(actor_ref), Some(timer)) - } else { - (None, None) - }; + let is_leader = leader_id == self.system.node_id(); + let (mut leader_actor) = is_leader.then(|| ActorRef::from(self.actor_ref(ctx))); let mut node_map = HashMap::new(); for node in nodes { @@ -152,19 +171,26 @@ impl Handler>> for Replicator { node_map.insert(node.node_id, node); } - let old_state = mem::replace( - &mut self.state, + let cluster = Cluster { + current_leader: leader_id, + leader_actor: leader_actor.unwrap(), + nodes: node_map, + }; + + let new_state = if is_leader { + State::Leader { + cluster, + uncommitted_mutations: Default::default(), + heartbeat_timer: start_heartbeat_timer::(ctx), + } + } else { State::Available { - cluster: Cluster { - current_leader: leader_id, - leader_actor: leader_actor.unwrap(), - nodes: node_map, - }, - heartbeat_timer, - }, - ); - - match old_state { + cluster, + pending_mutations: Default::default(), + } + }; + + match mem::replace(&mut self.state, new_state) { State::Joining { request_buffer } => { debug!( pending_requests = request_buffer.len(), @@ -189,7 +215,7 @@ impl Handler>> for Replicator { debug!(node_id = node.node_id, "node added"); match &mut self.state { - State::Available { cluster, .. } => { + State::Available { cluster, .. } | State::Leader { cluster, .. } => { cluster.nodes.insert(node.node_id, node); } _ => {} @@ -200,7 +226,7 @@ impl Handler>> for Replicator { debug!(node_id = node_id, "node removed"); match &mut self.state { - State::Available { cluster, .. } => { + State::Available { cluster, .. } | State::Leader { cluster, .. } => { cluster.nodes.remove(&node_id); } _ => {} @@ -210,22 +236,44 @@ impl Handler>> for Replicator { NodeGroupEvent::LeaderChanged(leader_id) => { info!(leader_id = leader_id, "leader changed"); - match &mut self.state { - State::Available { - cluster, - heartbeat_timer, + match mem::take(&mut self.state) { + State::Leader { + mut cluster, + uncommitted_mutations, + .. } => { - let leader_actor = cluster.nodes.get(&leader_id).unwrap().actor.clone(); - cluster.current_leader = leader_id; - cluster.leader_actor = leader_actor; + cluster.update_leader(leader_id); + + for (_, mutation) in uncommitted_mutations { + mutation.on_completion.notify_err( + Error::LeaderChanged { + new_leader_id: leader_id, + }, + &self.system, + ); + } + + self.state = State::Available { + cluster, + pending_mutations: Default::default(), + } + } + + State::Available { mut cluster, .. } => { + cluster.update_leader(leader_id); let is_leader = leader_id == self.system.node_id(); if is_leader { - if heartbeat_timer.is_none() { - *heartbeat_timer = Some(start_heartbeat_timer::(ctx)); - } + self.state = State::Leader { + cluster, + uncommitted_mutations: Default::default(), + heartbeat_timer: start_heartbeat_timer(ctx), + }; } else { - *heartbeat_timer = None; + self.state = State::Available { + cluster, + pending_mutations: Default::default(), + } } } diff --git a/modules/replication/coerce-replication/src/simple/read.rs b/modules/replication/coerce-replication/src/simple/read.rs index 04de4ec..3494883 100644 --- a/modules/replication/coerce-replication/src/simple/read.rs +++ b/modules/replication/coerce-replication/src/simple/read.rs @@ -37,34 +37,28 @@ impl Handler> for Replicator { ); } - State::Recovering { cluster } => { - debug!("node is currently recovering, forwarding to leader actor"); + State::Available { cluster, .. } | State::Recovering { cluster, .. } => { + let on_completion = message.on_completion.take().unwrap(); + + debug!("forwarding request to leader node"); tokio::spawn(remote_read( cluster.leader_actor.clone(), message.key, - message.on_completion.take().unwrap(), + on_completion, self.system.clone(), )); } - State::Available { cluster, .. } => { + State::Leader { .. } => { let on_completion = message.on_completion.take().unwrap(); + let data = self.storage.read(message.key).await; - if cluster.current_leader == self.system.node_id() { - let data = self.storage.read(message.key).await; - - debug!("local read, node is leader, emitting result"); - let _ = on_completion.send(data.map_err(|e| e.into())); - } else { - tokio::spawn(remote_read( - cluster.leader_actor.clone(), - message.key, - on_completion, - self.system.clone(), - )); - } + debug!("local read, node is leader, emitting result"); + let _ = on_completion.send(data.map_err(|e| e.into())); } + + _ => {} } } } diff --git a/modules/replication/coerce-replication/src/simple/write.rs b/modules/replication/coerce-replication/src/simple/write.rs index 78fe9c4..243f333 100644 --- a/modules/replication/coerce-replication/src/simple/write.rs +++ b/modules/replication/coerce-replication/src/simple/write.rs @@ -1,8 +1,11 @@ -use crate::protocol::simple::Error; -use crate::simple::Replicator; +use crate::simple::error::Error; +use crate::simple::{Replicator, Request, State}; use crate::storage::{Key, Storage, Value}; use coerce::actor::context::ActorContext; use coerce::actor::message::{Handler, Message}; +use coerce::actor::ActorRef; +use coerce::remote::system::{NodeId, RemoteActorSystem}; +use std::collections::HashSet; use tokio::sync::oneshot; pub struct Write { @@ -11,11 +14,122 @@ pub struct Write { pub on_completion: Option>>, } +pub struct UncommittedMutation { + pub mutation: Mutation, + pub votes: HashSet, + pub on_completion: MutationCompletion, +} + +pub struct Mutation { + pub log_index: u64, + pub key: K, + pub value: V, +} + +pub enum MutationCompletion { + Local(Option>>), + Remote { + request_id: u64, + source_node_id: NodeId, + }, +} + +impl MutationCompletion { + pub fn notify_err(self, error: Error, system: &RemoteActorSystem) { + self.notify(Err(error), system) + } + + pub fn notify(self, result: Result<(), Error>, system: &RemoteActorSystem) { + match self { + MutationCompletion::Local(mut channel) => { + let channel = channel.take().unwrap(); + let _ = channel.send(result); + } + + MutationCompletion::Remote { + request_id, + source_node_id, + } => { + let result = match result { + Ok(_) => RemoteWriteResult::Ok(()), + Err(e) => RemoteWriteResult::Err(e), + }; + + let bytes = result.as_bytes().unwrap(); + let sys = system.clone(); + + tokio::spawn(async move { + sys.notify_raw_rpc_result(request_id, bytes, source_node_id) + .await + }); + } + } + } +} + impl Message for Write { type Result = (); } #[async_trait] impl Handler> for Replicator { - async fn handle(&mut self, _message: Write, _ctx: &mut ActorContext) {} + async fn handle(&mut self, mut write: Write, _ctx: &mut ActorContext) { + match &mut self.state { + State::Joining { request_buffer } => { + request_buffer.push_back(Request::Write(write)); + + debug!( + pending_requests = request_buffer.len(), + "replicator still joining cluster, buffered write request" + ); + } + + State::Recovering { cluster } | State::Available { cluster, .. } => { + tokio::spawn(remote_write( + cluster.leader_actor.clone(), + write.key, + write.value, + write.on_completion.take().unwrap(), + self.system.clone(), + )); + } + + State::Leader { .. } => { + // create a log entry + // notify all nodes in cluster + // upon ack, commit log entry + // tell all nodes the latest commit index + } + _ => {} + } + } +} + +pub struct RemoteWrite { + pub request_id: u64, + pub source_node_id: u64, + pub key: K, + pub value: V, +} + +impl Message for RemoteWrite { + type Result = (); +} + +pub enum RemoteWriteResult { + Ok(()), + Err(Error), +} + +impl Message for RemoteWriteResult { + type Result = (); +} + +async fn remote_write( + leader_ref: ActorRef>, + key: S::Key, + value: S::Value, + on_completion: oneshot::Sender>, + system: RemoteActorSystem, +) { } diff --git a/modules/replication/coerce-replication/src/storage/mod.rs b/modules/replication/coerce-replication/src/storage/mod.rs index c71901f..c49accb 100644 --- a/modules/replication/coerce-replication/src/storage/mod.rs +++ b/modules/replication/coerce-replication/src/storage/mod.rs @@ -17,6 +17,8 @@ pub trait Storage: 'static + Sync + Send { type Snapshot: Snapshot; + fn set_last_commit_index(&mut self, commit_index: u64) -> Result<(), StorageErr>; + fn last_commit_index(&self) -> Option; async fn read(&mut self, key: Self::Key) -> Result;