Skip to content

Commit

Permalink
Rewrite the way nodes run using a 'event-driven'(I'm not sure) pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 13, 2020
1 parent 4a6d3df commit 6686ef5
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 237 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nb"
version = "0.2.0"
version = "0.3.0"
authors = ["xxchan <[email protected]>", "Keith Null <[email protected]>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ naïve blockchain in Rust

# Roadmap

- [ ] Broadcast update to peers
- [x] Broadcast update to peers
- [ ] Reduce data redundancy among nodes
205 changes: 2 additions & 203 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

const MSG_COLOR: &str = "yellow";
const ERR_COLOR: &str = "red";
const PROMPT_COLOR: &str = "blue";

fn main() {
let matches = App::new("nb")
.version(env!("CARGO_PKG_VERSION"))
Expand All @@ -40,204 +36,7 @@ fn main() {
info!("nb {}", env!("CARGO_PKG_VERSION"));
info!("Listening on {}", addr);

run_node(addr.to_owned());
}

fn run_node(addr: String) {
let node = Node::new(addr.clone());
if let Err(e) = node {
error!("Fail to create node: {:?}", e);
std::process::exit(1);
}
let node = Arc::new(Mutex::new(node.unwrap()));

let listener_node = node.clone();
thread::spawn(move || {
let node = listener_node;
let addr = addr.clone();
loop {
let _result = handle_incoming_connections(node.clone(), addr.clone());
}
});
let broadcast_node = node.clone();
thread::spawn(move || handle_broadcast(broadcast_node));
loop {
let mut input = String::new();
// a prompt for input
print!("{}", "> ".color(PROMPT_COLOR).bold());
stdout().flush().expect("flush error");

stdin().read_line(&mut input).expect("cannot read input");

let input = input.trim();
let args: Vec<&str> = input.split_whitespace().collect();
let command = match args.get(0) {
Some(value) => *value,
None => {
continue;
}
};

const NEW_TRANS: &str = "new_trans";
const SEE_BLOCKCHAIN: &str = "list_blocks";
const ADD_PEER: &str = "add_peer";
const LIST_PEERS: &str = "list_peers";
const RESOLVE_CONFLICTS: &str = "resolve";
const EXIT: &str = "exit";
const HELP: &str = "help";
const MINE: &str = "mine";

{
let mut node = node.lock().unwrap();
match command {
NEW_TRANS => {
if args.len() < 4 {
eprintln!("{}", "not enough arguments!".color(ERR_COLOR));
continue;
}
let sender = *args.get(1).unwrap();
let receiver = *args.get(2).unwrap();
let amount: i64;
match (*args.get(3).unwrap()).parse() {
Ok(num) => amount = num,
Err(_) => {
eprintln!("{}", "illegal amount!".color(ERR_COLOR));
continue;
}
};
node.create_and_add_new_transaction(sender, receiver, amount);
}
MINE => {
node.mine();
debug!("{}", "Mined!!!".color(MSG_COLOR))
}
SEE_BLOCKCHAIN => {
node.display();
}
ADD_PEER => {
if args.len() < 2 {
eprintln!("{}", "not enough arguments!".color(ERR_COLOR));
continue;
}
let peer = *args.get(1).unwrap();
if false == node.greet_and_add_peer(peer) {
eprintln!("{}", "fail to add peer".color(ERR_COLOR));
}
}
LIST_PEERS => {
node.display_peers();
}
RESOLVE_CONFLICTS => {
if node.resolve_conflicts() {
println!("node updated");
} else {
println!("node stays unchanged")
}
}
HELP => {
list_commands();
}
EXIT => {
break;
}
_ => {
eprintln!(
"{}",
"Command not found. Type 'help' to list commands.".color(ERR_COLOR)
);
}
}
}
}
}

fn list_commands() {
println!(
"{}",
concat!("blockchain node commands:\n",
" mine - mines a new block\n",
" new_trans [sender] [receiver] [amount] - adds a new transaction into the local blockchain\n",
" list_blocks - list the local chain blocks\n",
" add_peer [addr:port] - add one node as a peer\n",
" list_peers - list the node's peers\n",
" resolve - apply the consensus algorithm to resolve conflicts\n",
" exit - quit the program")
.color(MSG_COLOR)
);
}

fn handle_incoming_connections(node: Arc<Mutex<Node>>, addr: String) -> Result<()> {
let listener = TcpListener::bind(&addr).expect("Fail to bind listener");
for stream in listener.incoming() {
debug!("new incoming connection");
match stream {
Ok(mut stream) => {
// There should be only one request, but we have to deserialize from a stream in this way
for request in Deserializer::from_reader(stream.try_clone()?).into_iter::<Request>()
{
let request = request
.map_err(|e| failure::err_msg(format!("Deserializing error {}", e)))?;
debug!("request received {:?}", request);
// try to add a new peer from every request
let mut node = node.lock().unwrap();
let peer_info = request.get_sender_peer_info();
// this seems not very useful, no new peer will be added except `Hello` request received
if node.add_peer(&peer_info) {
info!("Add one new peer: {:?}", peer_info);
}
let my_info = node.get_basic_info();
let response = match request {
Request::Hello(peer_info) => {
info!("Get Hello from {:?}, simply ack it", peer_info);
Response::Ack(my_info)
}
Request::NewTransaction(peer_info, transaction) => {
info!(
"Get NewTransaction from {:?}, add the transaction and ack it",
peer_info
);
node.handle_incoming_transaction(transaction);
Response::Ack(my_info)
}
Request::NewBlock(peer_info, new_block) => {
info!(
"Get NewBlock from {:?}, validate it and possibly add it to our chain",
peer_info
);
node.handle_incoming_block(new_block);
Response::Ack(my_info)
}
Request::HowAreYou(peer_info) => {
info!(
"Get HowAreYou from {:?}, will respond with all my blocks",
peer_info
);
Response::MyBlocks(node.get_basic_info(), node.get_blocks())
}
Request::NewPeer(peer_info, new_peer) => {
info!(
"Get NewPeer from {:?}, new peer is {:?}",
peer_info, new_peer
);
node.handle_incoming_peer(new_peer);
Response::Ack(my_info)
}
};
serde_json::to_writer(&mut stream, &response)?;
stream.flush()?;
debug!("response sent {:?}", response);
break;
}
}
Err(e) => error!("Connection failed: {}", e),
}
}
Ok(())
}

fn handle_broadcast(node: Arc<Mutex<Node>>) {
loop {
node.lock().unwrap().try_fetch_one_broadcast();
thread::sleep(Duration::from_secs(3));
if let Err(e) = Node::run(addr.to_owned()) {
eprintln!("Error when running node: {}", e);
}
}
Loading

1 comment on commit 6686ef5

@izackwu
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

Please sign in to comment.