Skip to content

Commit

Permalink
Merge pull request #3 from ASjet/feat/logging
Browse files Browse the repository at this point in the history
refactor: add structured logging
  • Loading branch information
ASjet authored Jul 8, 2024
2 parents 87314e3 + 84ac963 commit ce1fcc8
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 77 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ Cargo.lock
# Added by cargo

/target
/logs
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
"derive",
]}
futures = "0.3.30"
log = { version = "0.4.22", features = ["kv"] }
prost = "0.12.6"
rand = "0.8.5"
serde = {version = "1.0.203", features = [
"derive",
]}
structured-logger = "1.0.3"
tokio = {version = "1.38.0", features = [
"rt-multi-thread",
"macros",
Expand Down
26 changes: 26 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
set -e

NODES=${1:-3}
NODE_PREFIX=${2:-node}
LOG_DIR=${3:-./logs}

mkdir -p $LOG_DIR

echo "Running $NODES nodes with prefix $NODE_PREFIX"

cargo run --bin mkconf -- -p $NODES
cargo build
for node in $(seq 1 $NODES); do
node_name="${NODE_PREFIX}$(($node - 1))"
./target/debug/radis -c ${node_name}.toml > ${LOG_DIR}/${node_name}.log &
done

function cleanup {
pkill -9 radis
rm -f ${NODE_PREFIX}*.toml
exit 0
}

trap 'cleanup 2>/dev/null' INT
wait < <(jobs -p)
12 changes: 4 additions & 8 deletions src/bin/radis/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use clap::Parser;
use radis::conf::Config;
use radis::raft::{RaftServer, RaftService};
use radis::raft::RaftService;
use tokio;
use tonic::transport::Server;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand All @@ -14,17 +13,14 @@ struct Args {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
radis::init_logger("info");

let args = Args::parse();
let cfg = Config::from_path(&args.conf)?;
let addr = cfg.listen_addr.parse()?;
println!("radis node <{}> listening on {}", cfg.id, addr);

let srv = RaftService::new(cfg);

Server::builder()
.add_service(RaftServer::new(srv))
.serve(addr)
.await?;
srv.serve(addr).await?;

Ok(())
}
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod conf;
pub mod timer;

pub mod raft {
tonic::include_proto!("raft");
Expand All @@ -13,4 +14,22 @@ pub mod raft {
pub use service::RaftService;
}

pub mod timer;
#[cfg(feature = "async_log")]
pub fn init_logger(level: &str) {
use structured_logger::{async_json::new_writer, Builder};
use tokio::io;

Builder::with_level(level)
.with_target_writer("*", new_writer(io::stdout()))
.init()
}

#[cfg(not(feature = "async_log"))]
pub fn init_logger(level: &str) {
use std::io;
use structured_logger::{json::new_writer, Builder};

Builder::with_level(level)
.with_target_writer("*", new_writer(io::stdout()))
.init()
}
14 changes: 13 additions & 1 deletion src/raft/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::config::REQUEST_TIMEOUT;
use super::service::PeerClient;
use crate::conf::Config;
use crate::timer::{OneshotTimer, PeriodicTimer};
use log::debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -32,7 +33,16 @@ impl Context {
id,
peers: peer_addrs
.iter()
.map(|addr| Arc::new(Mutex::new(PeerClient::new(addr, timeout))))
.enumerate()
.map(|(i, addr)| {
debug!(target: "raft::context",
peer_index = i,
peer_addr = addr,
timeout:serde = timeout;
"init peer client"
);
Arc::new(Mutex::new(PeerClient::new(addr, timeout)))
})
.collect(),
timeout: Arc::new(OneshotTimer::new(timeout_event)),
tick: Arc::new(PeriodicTimer::new(tick_event)),
Expand All @@ -43,9 +53,11 @@ impl Context {
let timeout = self.timeout.clone();
let tick = self.tick.clone();
tokio::spawn(async move {
debug!(target: "raft::context", timer = "timeout"; "start timer");
timeout.start().await;
});
tokio::spawn(async move {
debug!(target: "raft::context", timer = "tick"; "start timer");
tick.start().await;
});
}
Expand Down
66 changes: 59 additions & 7 deletions src/raft/service.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
use super::context::Context;
use super::state::{self, FollowerState, State};
use super::{
AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs, InstallSnapshotReply,
AppendEntriesArgs, AppendEntriesReply, InstallSnapshotArgs, InstallSnapshotReply, RaftServer,
RequestVoteArgs, RequestVoteReply,
};
use super::{Raft, RaftClient};
use crate::conf::Config;
use log::{info, trace};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex, RwLock};
use tonic::transport::{Channel, Endpoint};
use tonic::transport::{self, Channel, Endpoint, Server};
use tonic::{Request, Response, Status};

#[derive(Clone)]
pub struct RaftService {
id: String,
context: Arc<RwLock<Context>>,
state: Arc<Mutex<Arc<Box<dyn State>>>>,
}

impl RaftService {
pub fn new(cfg: Config) -> Self {
let id = cfg.id.clone();
let (timeout_tx, timeout_rx) = mpsc::channel(1);
let (tick_tx, tick_rx) = mpsc::channel(1);
let context = Arc::new(RwLock::new(Context::new(cfg, timeout_tx, tick_tx)));
let state = Arc::new(Mutex::new(FollowerState::new(0, None)));

let init_state = FollowerState::new(0, None);
info!(target: "raft::state",
state:serde = (&init_state as &Box<dyn State>);
"init raft state"
);
let state = Arc::new(Mutex::new(init_state));
state::handle_timer(state.clone(), context.clone(), timeout_rx, tick_rx);
RaftService { context, state }
RaftService { id, context, state }
}

pub fn context(&self) -> Arc<RwLock<Context>> {
Expand All @@ -34,6 +45,14 @@ impl RaftService {
pub fn state(&self) -> Arc<Mutex<Arc<Box<dyn State>>>> {
self.state.clone()
}

pub async fn serve(&self, addr: SocketAddr) -> Result<(), transport::Error> {
info!(target: "raft::service", id = self.id; "raft gRPC server listening on {addr}");
Server::builder()
.add_service(RaftServer::new(self.clone()))
.serve(addr)
.await
}
}

#[tonic::async_trait]
Expand All @@ -42,9 +61,18 @@ impl Raft for RaftService {
&self,
request: Request<RequestVoteArgs>,
) -> Result<Response<RequestVoteReply>, Status> {
let request = request.into_inner();
trace!(
target: "raft::rpc",
term = request.term,
candidate_id = request.candidate_id,
last_log_index = request.last_log_index,
last_log_term = request.last_log_term;
"received RequestVote request",
);
let state = self.state.lock().await;
let (resp, new_state) = state
.handle_request_vote(self.context.clone(), request.into_inner())
.handle_request_vote(self.context.clone(), request)
.await;
state::transition(state, new_state, self.context.clone()).await;
Ok(Response::new(resp))
Expand All @@ -54,9 +82,19 @@ impl Raft for RaftService {
&self,
request: Request<AppendEntriesArgs>,
) -> Result<Response<AppendEntriesReply>, Status> {
let request = request.into_inner();
trace!(
target: "raft::rpc",
term = request.term,
leader_id = request.leader_id,
prev_log_index = request.prev_log_index,
prev_log_term = request.prev_log_term,
entries = request.entries.len();
"received AppendEntries request",
);
let state = self.state.lock().await;
let (resp, new_state) = state
.handle_append_entries(self.context.clone(), request.into_inner())
.handle_append_entries(self.context.clone(), request)
.await;
state::transition(state, new_state, self.context.clone()).await;
Ok(Response::new(resp))
Expand All @@ -66,9 +104,19 @@ impl Raft for RaftService {
&self,
request: Request<InstallSnapshotArgs>,
) -> Result<Response<InstallSnapshotReply>, Status> {
let request = request.into_inner();
trace!(
target: "raft::rpc",
term = request.term,
leader_id = request.leader_id,
last_included_index = request.last_included_index,
last_included_term = request.last_included_term,
snapshot_size = request.snapshot.len();
"received InstallSnapshot request",
);
let state = self.state.lock().await;
let (resp, new_state) = state
.handle_install_snapshot(self.context.clone(), request.into_inner())
.handle_install_snapshot(self.context.clone(), request)
.await;
state::transition(state, new_state, self.context.clone()).await;
Ok(Response::new(resp))
Expand Down Expand Up @@ -101,6 +149,10 @@ impl PeerClient {
.map_err(|e| Status::unavailable(e.to_string()))?;
// Once connect() returns successfully, the connection
// reliability is handled by the underlying gRPC library.
info!(target: "raft::rpc",
peer_addr = self.endpoint.uri().to_string();
"connected to peer"
);
self.cli = Some(RaftClient::new(channel));
}

Expand Down
Loading

0 comments on commit ce1fcc8

Please sign in to comment.