Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement raft log #5

Merged
merged 8 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions proto/raft/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ message AppendEntriesArgs {
message AppendEntriesReply {
uint64 term = 1;
bool success = 2;
uint64 conflict_index = 3;
uint64 conflict_term = 4;
uint64 last_log_index = 3;
uint64 last_log_term = 4;
}

message InstallSnapshotArgs {
Expand Down
4 changes: 3 additions & 1 deletion src/bin/radis/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use radis::conf::Config;
use radis::raft::RaftService;
use tokio;
use tokio::sync::mpsc;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand All @@ -18,7 +19,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let cfg = Config::from_path(&args.conf)?;

RaftService::new(cfg).serve().await?;
let (commit_tx, _) = mpsc::channel(1);
RaftService::new(cfg, commit_tx).serve().await?;

Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod raft {

mod config;
mod context;
mod log;
mod service;
pub mod state;

Expand Down
61 changes: 56 additions & 5 deletions src/raft/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use super::config::REQUEST_TIMEOUT;
use super::log::LogManager;
use super::service::PeerClient;
use crate::conf::Config;
use crate::timer::{OneshotTimer, PeriodicTimer};
use log::debug;
use log::{debug, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{Mutex, RwLock};

pub type PeerID = String;
pub type Peer = usize;
Expand All @@ -16,18 +17,29 @@ pub struct Context {
id: String,
peers: Vec<Arc<Mutex<PeerClient>>>,

log: LogManager,
peer_next_index: Vec<Arc<Mutex<LogIndex>>>,
peer_sync_index: Vec<Arc<RwLock<LogIndex>>>,
commit_ch: mpsc::Sender<Arc<Vec<u8>>>,

timeout: Arc<OneshotTimer>,
tick: Arc<PeriodicTimer>,
}

impl Context {
pub fn new(cfg: Config, timeout_event: Sender<()>, tick_event: Sender<()>) -> Self {
pub fn new(
cfg: Config,
commit_ch: mpsc::Sender<Arc<Vec<u8>>>,
timeout_event: Sender<()>,
tick_event: Sender<()>,
) -> Self {
let timeout = Duration::from_millis(REQUEST_TIMEOUT);
let Config {
id,
listen_addr: _,
peer_addrs,
} = cfg;
let peers = peer_addrs.len();

Self {
id,
Expand All @@ -44,6 +56,12 @@ impl Context {
Arc::new(Mutex::new(PeerClient::new(addr, timeout)))
})
.collect(),

log: LogManager::new(),
peer_next_index: (0..peers).map(|_| Arc::new(Mutex::new(0))).collect(),
peer_sync_index: (0..peers).map(|_| Arc::new(RwLock::new(0))).collect(),
commit_ch,

timeout: Arc::new(OneshotTimer::new(timeout_event)),
tick: Arc::new(PeriodicTimer::new(tick_event)),
}
Expand Down Expand Up @@ -75,7 +93,7 @@ impl Context {
}

pub fn majority(&self) -> usize {
self.peers.len() / 2 + 1
self.peers.len() / 2
}

pub async fn reset_timeout(&self, timeout: Duration) {
Expand All @@ -93,4 +111,37 @@ impl Context {
pub async fn stop_tick(&self) {
self.tick.stop().await;
}

pub fn log(&self) -> &LogManager {
&self.log
}

pub fn log_mut(&mut self) -> &mut LogManager {
&mut self.log
}

pub async fn commit_log(&mut self, index: LogIndex) {
self.log.commit(index, &self.commit_ch).await;
}

pub fn peer_next_index(&self, peer: Peer) -> Arc<Mutex<LogIndex>> {
self.peer_next_index[peer].clone()
}

pub async fn update_peer_index(&mut self, peer: Peer, index: LogIndex) {
*self.peer_sync_index[peer].write().await = index;

let mut sync_indexes = vec![0; self.peers()];
for (i, index) in self.peer_sync_index.iter().enumerate() {
sync_indexes[i] = *index.read().await;
}
info!("peer_sync_index: {:?}", sync_indexes);
self.commit_log(majority_index(sync_indexes)).await;
}
}

fn majority_index(mut indexes: Vec<LogIndex>) -> LogIndex {
indexes.sort_unstable();
let majority_index = indexes.len() / 2 - 1;
indexes[majority_index]
}
130 changes: 130 additions & 0 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use log::info;
use tokio::sync::mpsc;

use super::context::LogIndex;
use super::state::Term;
use super::Log;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Debug)]
pub struct InnerLog {
term: Term,
data: Arc<Vec<u8>>,
}

impl InnerLog {
pub fn new(term: Term, data: Vec<u8>) -> Self {
Self {
term,
data: Arc::new(data),
}
}
}

pub struct LogManager {
commit_index: LogIndex,
snapshot_index: LogIndex,
logs: Vec<InnerLog>,
#[allow(dead_code)]
snapshot: Option<Vec<u8>>,
}

impl LogManager {
pub fn new() -> Self {
Self {
commit_index: 0,
snapshot_index: 0,
logs: vec![InnerLog::new(0, vec![])],
snapshot: None,
}
}

pub fn append(&mut self, term: Term, log: Vec<u8>) -> LogIndex {
self.logs.push(InnerLog::new(term, log));
self.snapshot_index + self.logs.len() as u64
}

pub fn since(&self, start: usize) -> Vec<Log> {
let offset = start + self.snapshot_index as usize;
self.logs[start - self.snapshot_index as usize..]
.iter()
.enumerate()
.map(|(i, log)| Log {
index: (i + offset) as u64,
term: log.term,
command: log.data.deref().clone(),
})
.collect()
}

pub fn latest(&self) -> Option<(LogIndex, Term)> {
self.logs
.last()
.map(|log| (self.snapshot_index + self.logs.len() as u64 - 1, log.term))
}

pub fn term(&self, index: LogIndex) -> Option<Term> {
if index < self.snapshot_index {
return None;
}
let index = (index - self.snapshot_index) as usize;
if index >= self.logs.len() {
return None;
}
Some(self.logs[index].term)
}

pub fn first_log_at_term(&self, term: Term) -> Option<LogIndex> {
self.logs
.iter()
.position(|log| log.term == term)
.map(|index| index as u64 + self.snapshot_index)
}

pub fn delete_since(&mut self, index: LogIndex) -> usize {
if index < self.snapshot_index {
return 0;
}
let index = (index - self.snapshot_index) as usize;
let deleted = self.logs.drain(index..).count();
info!(target: "raft::log",
deleted = deleted;
"delete logs[{}..]", index
);
deleted
}

pub fn commit_index(&self) -> LogIndex {
self.commit_index
}

pub fn snapshot_index(&self) -> LogIndex {
self.snapshot_index
}

pub async fn commit(&mut self, index: LogIndex, ch: &mpsc::Sender<Arc<Vec<u8>>>) {
if index <= self.commit_index {
return;
}

info!(target: "raft::log",
"commit logs[{}..{}]",
self.commit_index, index
);
// Since commit_index is the index that already committed,
// we need to start from commit_index + 1
let start = (self.commit_index - self.snapshot_index + 1) as usize;
let end = (index - self.snapshot_index + 1) as usize;
for log in if end >= self.logs.len() {
&self.logs[start..]
} else {
&self.logs[start..end]
}
.iter()
{
ch.send(log.data.clone()).await.unwrap();
}
self.commit_index = index;
}
}
44 changes: 29 additions & 15 deletions src/raft/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::Result;
use log::{info, trace};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{mpsc, Mutex, RwLock};
use tonic::transport::{Channel, Endpoint, Server};
use tonic::{Request, Response, Status};

Expand All @@ -20,19 +20,22 @@ pub struct RaftService {
listen_addr: String,
context: Arc<RwLock<Context>>,
state: Arc<Mutex<Arc<Box<dyn State>>>>,

close_ch: Arc<Mutex<Option<mpsc::Sender<()>>>>,
}

impl RaftService {
pub fn new(cfg: Config) -> Self {
pub fn new(cfg: Config, commit_ch: mpsc::Sender<Arc<Vec<u8>>>) -> Self {
let Config {
id, listen_addr, ..
} = cfg.clone();
let (context, state) = state::init(cfg);
let (context, state) = state::init(cfg, commit_ch);
RaftService {
id,
listen_addr,
context,
state,
close_ch: Arc::new(Mutex::new(None)),
}
}

Expand All @@ -44,27 +47,38 @@ impl RaftService {
self.state.clone()
}

pub async fn append_command(&self, cmd: Vec<u8>) -> Result<()> {
info!(target: "raft::service", id = self.id; "append command");
let state = self.state.lock().await;
let new_state = state.on_command(self.context.clone(), cmd).await?;
if state::transition(state, new_state, self.context.clone()).await {
Err(anyhow::anyhow!("not leader"))
} else {
Ok(())
}
}

pub async fn serve(&self) -> Result<()> {
let (close_tx, mut close_rx) = mpsc::channel::<()>(1);
*self.close_ch.lock().await = Some(close_tx);
let addr = self.listen_addr.parse()?;
info!(target: "raft::service", id = self.id; "raft gRPC server listening on {addr}");
Server::builder()
.add_service(RaftServer::new(self.clone()))
.serve(addr)
.serve_with_shutdown(addr, async move {
close_rx.recv().await;
})
.await?;
Ok(())
}

pub async fn serve_with_shutdown<F: std::future::Future<Output = ()>>(
&self,
f: F,
) -> Result<()> {
let addr = self.listen_addr.parse()?;
info!(target: "raft::service", id = self.id; "raft gRPC server listening on {addr}");
Server::builder()
.add_service(RaftServer::new(self.clone()))
.serve_with_shutdown(addr, f)
.await?;
Ok(())
pub async fn close(&self) {
if let Some(ch) = self.close_ch.lock().await.take() {
ch.send(()).await.unwrap();
}
let ctx = self.context.read().await;
ctx.cancel_timeout().await;
ctx.stop_tick().await;
}
}

Expand Down
Loading