diff --git a/Cargo.toml b/Cargo.toml index fb8d1cf..a516138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nb" -version = "0.2.0" +version = "0.3.0" authors = ["xxchan ", "Keith Null "] edition = "2018" diff --git a/README.md b/README.md index 77c3168..a4e360a 100644 --- a/README.md +++ b/README.md @@ -4,5 +4,5 @@ naïve blockchain in Rust # Roadmap -- [ ] Broadcast update to peers +- [x] Broadcast update to peers - [ ] Reduce data redundancy among nodes diff --git a/src/main.rs b/src/main.rs index c11cb40..a17a769 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,10 +13,6 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -const MSG_COLOR: &str = "yellow"; -const ERR_COLOR: &str = "red"; -const PROMPT_COLOR: &str = "blue"; - fn main() { let matches = App::new("nb") .version(env!("CARGO_PKG_VERSION")) @@ -40,204 +36,7 @@ fn main() { info!("nb {}", env!("CARGO_PKG_VERSION")); info!("Listening on {}", addr); - run_node(addr.to_owned()); -} - -fn run_node(addr: String) { - let node = Node::new(addr.clone()); - if let Err(e) = node { - error!("Fail to create node: {:?}", e); - std::process::exit(1); - } - let node = Arc::new(Mutex::new(node.unwrap())); - - let listener_node = node.clone(); - thread::spawn(move || { - let node = listener_node; - let addr = addr.clone(); - loop { - let _result = handle_incoming_connections(node.clone(), addr.clone()); - } - }); - let broadcast_node = node.clone(); - thread::spawn(move || handle_broadcast(broadcast_node)); - loop { - let mut input = String::new(); - // a prompt for input - print!("{}", "> ".color(PROMPT_COLOR).bold()); - stdout().flush().expect("flush error"); - - stdin().read_line(&mut input).expect("cannot read input"); - - let input = input.trim(); - let args: Vec<&str> = input.split_whitespace().collect(); - let command = match args.get(0) { - Some(value) => *value, - None => { - continue; - } - }; - - const NEW_TRANS: &str = "new_trans"; - const SEE_BLOCKCHAIN: &str = "list_blocks"; - const ADD_PEER: &str = "add_peer"; - const LIST_PEERS: &str = "list_peers"; - const RESOLVE_CONFLICTS: &str = "resolve"; - const EXIT: &str = "exit"; - const HELP: &str = "help"; - const MINE: &str = "mine"; - - { - let mut node = node.lock().unwrap(); - match command { - NEW_TRANS => { - if args.len() < 4 { - eprintln!("{}", "not enough arguments!".color(ERR_COLOR)); - continue; - } - let sender = *args.get(1).unwrap(); - let receiver = *args.get(2).unwrap(); - let amount: i64; - match (*args.get(3).unwrap()).parse() { - Ok(num) => amount = num, - Err(_) => { - eprintln!("{}", "illegal amount!".color(ERR_COLOR)); - continue; - } - }; - node.create_and_add_new_transaction(sender, receiver, amount); - } - MINE => { - node.mine(); - debug!("{}", "Mined!!!".color(MSG_COLOR)) - } - SEE_BLOCKCHAIN => { - node.display(); - } - ADD_PEER => { - if args.len() < 2 { - eprintln!("{}", "not enough arguments!".color(ERR_COLOR)); - continue; - } - let peer = *args.get(1).unwrap(); - if false == node.greet_and_add_peer(peer) { - eprintln!("{}", "fail to add peer".color(ERR_COLOR)); - } - } - LIST_PEERS => { - node.display_peers(); - } - RESOLVE_CONFLICTS => { - if node.resolve_conflicts() { - println!("node updated"); - } else { - println!("node stays unchanged") - } - } - HELP => { - list_commands(); - } - EXIT => { - break; - } - _ => { - eprintln!( - "{}", - "Command not found. Type 'help' to list commands.".color(ERR_COLOR) - ); - } - } - } - } -} - -fn list_commands() { - println!( - "{}", - concat!("blockchain node commands:\n", - " mine - mines a new block\n", - " new_trans [sender] [receiver] [amount] - adds a new transaction into the local blockchain\n", - " list_blocks - list the local chain blocks\n", - " add_peer [addr:port] - add one node as a peer\n", - " list_peers - list the node's peers\n", - " resolve - apply the consensus algorithm to resolve conflicts\n", - " exit - quit the program") - .color(MSG_COLOR) - ); -} - -fn handle_incoming_connections(node: Arc>, addr: String) -> Result<()> { - let listener = TcpListener::bind(&addr).expect("Fail to bind listener"); - for stream in listener.incoming() { - debug!("new incoming connection"); - match stream { - Ok(mut stream) => { - // There should be only one request, but we have to deserialize from a stream in this way - for request in Deserializer::from_reader(stream.try_clone()?).into_iter::() - { - let request = request - .map_err(|e| failure::err_msg(format!("Deserializing error {}", e)))?; - debug!("request received {:?}", request); - // try to add a new peer from every request - let mut node = node.lock().unwrap(); - let peer_info = request.get_sender_peer_info(); - // this seems not very useful, no new peer will be added except `Hello` request received - if node.add_peer(&peer_info) { - info!("Add one new peer: {:?}", peer_info); - } - let my_info = node.get_basic_info(); - let response = match request { - Request::Hello(peer_info) => { - info!("Get Hello from {:?}, simply ack it", peer_info); - Response::Ack(my_info) - } - Request::NewTransaction(peer_info, transaction) => { - info!( - "Get NewTransaction from {:?}, add the transaction and ack it", - peer_info - ); - node.handle_incoming_transaction(transaction); - Response::Ack(my_info) - } - Request::NewBlock(peer_info, new_block) => { - info!( - "Get NewBlock from {:?}, validate it and possibly add it to our chain", - peer_info - ); - node.handle_incoming_block(new_block); - Response::Ack(my_info) - } - Request::HowAreYou(peer_info) => { - info!( - "Get HowAreYou from {:?}, will respond with all my blocks", - peer_info - ); - Response::MyBlocks(node.get_basic_info(), node.get_blocks()) - } - Request::NewPeer(peer_info, new_peer) => { - info!( - "Get NewPeer from {:?}, new peer is {:?}", - peer_info, new_peer - ); - node.handle_incoming_peer(new_peer); - Response::Ack(my_info) - } - }; - serde_json::to_writer(&mut stream, &response)?; - stream.flush()?; - debug!("response sent {:?}", response); - break; - } - } - Err(e) => error!("Connection failed: {}", e), - } - } - Ok(()) -} - -fn handle_broadcast(node: Arc>) { - loop { - node.lock().unwrap().try_fetch_one_broadcast(); - thread::sleep(Duration::from_secs(3)); + if let Err(e) = Node::run(addr.to_owned()) { + eprintln!("Error when running node: {}", e); } } diff --git a/src/node.rs b/src/node.rs index 3ef0114..3c1ddc5 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,15 +1,19 @@ //! The blockchain node -//! TODO: Now the nodes should get synced manually. Consider adding auto message broadcasting mechanism use crate::message::{Request, Response}; use crate::*; use serde::{Deserialize, Serialize}; use serde_json::Deserializer; use std::collections::HashSet; -use std::io::{stdout, Write}; -use std::net::{SocketAddr, TcpStream, ToSocketAddrs}; +use std::io::{stdin, stdout, Write}; +use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::sync::mpsc::{channel, Receiver, Sender}; +use std::thread; use uuid::Uuid; +const MSG_COLOR: &str = "yellow"; +const ERR_COLOR: &str = "red"; +const PROMPT_COLOR: &str = "blue"; + // self introduction for others to contact you #[derive(Hash, Eq, PartialEq, Serialize, Deserialize, Debug, Clone)] pub struct PeerInfo { @@ -38,28 +42,265 @@ impl PeerInfo { } } -type ResponseHandler = fn(&Response) -> Result; +enum Event { + Request(TcpStream, Request), + Response(Response), + Broadcast(Request), + Command(Command), +} + +fn handle_incoming_connections(addr: String, sender: Sender) -> Result<()> { + let listener = TcpListener::bind(&addr).expect("Fail to bind listener"); + for stream in listener.incoming() { + debug!("new incoming connection"); + match stream { + Ok(stream) => { + // There should be only one request, but we have to deserialize from a stream in this way + let mut request = None; + for _request in Deserializer::from_reader(stream.try_clone()?).into_iter::() + { + request = Some(_request + .map_err(|e| failure::err_msg(format!("Deserializing error {}", e)))?); + debug!("request received {:?}", request); + break; + } + sender.send(Event::Request(stream, request.unwrap())); + } + Err(e) => error!("Connection failed: {}", e), + } + } + Ok(()) +} + +enum Command { + NewTrans(String, String, i64), // sender, receiver, amount + Display, + AddPeer(String), + DisplayPeers, + Resolve, + Mine, +} + +fn handle_input_commands(sender: Sender) { + loop { + let mut input = String::new(); + // a prompt for input + print!("{}", "> ".color(PROMPT_COLOR).bold()); + stdout().flush().expect("flush error"); + + stdin().read_line(&mut input).expect("cannot read input"); + + let input = input.trim(); + let args: Vec<&str> = input.split_whitespace().collect(); + let command = match args.get(0) { + Some(value) => *value, + None => { + continue; + } + }; + const NEW_TRANS: &str = "new_trans"; + const SEE_BLOCKCHAIN: &str = "list_blocks"; + const ADD_PEER: &str = "add_peer"; + const LIST_PEERS: &str = "list_peers"; + const RESOLVE_CONFLICTS: &str = "resolve"; + const EXIT: &str = "exit"; + const HELP: &str = "help"; + const MINE: &str = "mine"; + + let mut event_cmd = None; + match command { + NEW_TRANS => { + if args.len() < 4 { + eprintln!("{}", "not enough arguments!".color(ERR_COLOR)); + continue; + } + let sender = *args.get(1).unwrap(); + let receiver = *args.get(2).unwrap(); + let amount: i64; + match (*args.get(3).unwrap()).parse() { + Ok(num) => amount = num, + Err(_) => { + eprintln!("{}", "illegal amount!".color(ERR_COLOR)); + continue; + } + }; + event_cmd = Some(Command::NewTrans(sender.to_owned(), receiver.to_owned(), amount)) + } + MINE => { + event_cmd = Some(Command::Mine); + debug!("{}", "Mined!!!".color(MSG_COLOR)) + } + SEE_BLOCKCHAIN => { + event_cmd = Some(Command::Display); + } + ADD_PEER => { + if args.len() < 2 { + eprintln!("{}", "not enough arguments!".color(ERR_COLOR)); + continue; + } + let peer = *args.get(1).unwrap(); + event_cmd = Some(Command::AddPeer(peer.to_owned())); + } + LIST_PEERS => { + event_cmd = Some(Command::DisplayPeers); + } + RESOLVE_CONFLICTS => { + event_cmd = Some(Command::Resolve); + } + HELP => { + list_commands(); + } + EXIT => { + break; + } + _ => { + eprintln!( + "{}", + "Command not found. Type 'help' to list commands.".color(ERR_COLOR) + ); + } + } + if let Some(event_cmd) = event_cmd { + sender.send(Event::Command(event_cmd)).unwrap(); + } + } +} + +fn list_commands() { + println!( + "{}", + concat!("blockchain node commands:\n", + " mine - mines a new block\n", + " new_trans [sender] [receiver] [amount] - adds a new transaction into the local blockchain\n", + " list_blocks - list the local chain blocks\n", + " add_peer [addr:port] - add one node as a peer\n", + " list_peers - list the node's peers\n", + " resolve - apply the consensus algorithm to resolve conflicts\n", + " exit - quit the program") + .color(MSG_COLOR) + ); +} // TODO: add consensus protocol specification -// TODO: mpsc is not needed, since Arc> is used pub struct Node { basic_info: PeerInfo, chain: Blockchain, peers: HashSet, - broadcast_channel_in: Sender, - broadcast_channel_out: Receiver, + broadcast_sender: Sender, + event_receiver: Receiver, } impl Node { - pub fn new(addr: String) -> Result { - let (tx, rx) = channel(); - Ok(Node { + pub fn run(addr: String) -> Result<()> { + let (sender, receiver) = channel(); + let sender1 = sender.clone(); + let sender2 = sender.clone(); + let addr1 = addr.clone(); + thread::spawn(move || handle_incoming_connections(addr1, sender1)); + thread::spawn(move || handle_input_commands(sender2)); + + let mut node = Node { basic_info: PeerInfo::new(addr)?, chain: Blockchain::new(), peers: HashSet::new(), - broadcast_channel_in: tx, - broadcast_channel_out: rx, - }) + broadcast_sender: sender, + event_receiver: receiver, + }; + + loop { + match node.event_receiver.recv().unwrap() { + Event::Request(stream, request) => { + node.serve_request(stream, request); + } + Event::Response(_response) => { + unimplemented!(); + } + Event::Broadcast(request) => { + node.broadcast_request(&request); + } + Event::Command(command) => { + node.serve_command(command); + } + } + } + } + + fn serve_request(&mut self, mut stream: TcpStream, request: Request) -> Result<()> { + let peer_info = request.get_sender_peer_info(); + if self.add_peer(peer_info) { + info!("Add one new peer: {:?}", peer_info); + } + let my_info = self.get_basic_info(); + let mut response = None; + match request { + Request::Hello(peer_info) => { + info!("Get Hello from {:?}, simply ack it", peer_info); + response = Some(Response::Ack(my_info)); + } + Request::HowAreYou(peer_info) => { + info!( + "Get HowAreYou from {:?}, will respond with all my blocks", + peer_info + ); + response = Some(Response::MyBlocks(self.get_basic_info(), self.get_blocks())); + } + Request::NewTransaction(peer_info, transaction) => { + info!( + "Get NewTransaction from {:?}, add the transaction and ack it", + peer_info + ); + self.handle_incoming_transaction(transaction); + } + Request::NewBlock(peer_info, new_block) => { + info!( + "Get NewBlock from {:?}, validate it and possibly add it to our chain", + peer_info + ); + self.handle_incoming_block(new_block); + } + Request::NewPeer(peer_info, new_peer) => { + info!( + "Get NewPeer from {:?}, new peer is {:?}", + peer_info, new_peer + ); + self.handle_incoming_peer(new_peer); + } + }; + if let Some(response) = response { + serde_json::to_writer(&mut stream, &response)?; + stream.flush()?; + debug!("response sent {:?}", response); + }; + Ok(()) + } + + fn serve_command(&mut self, command: Command) -> Result<()> { + match command { + Command::NewTrans(sender, receiver, amount) => { + self.create_and_add_new_transaction(&sender, &receiver, amount); + } + Command::Display => self.display(), + Command::AddPeer(peer) => { + // BLOCKING + if false == self.greet_and_add_peer(&peer) { + eprintln!("{}", "fail to add peer".color(ERR_COLOR)); + } + } + Command::DisplayPeers => self.display_peers(), + Command::Resolve => { + // BLOCKING + if self.resolve_conflicts() { + println!("node updated"); + } else { + println!("node stays unchanged") + } + } + Command::Mine => { + self.mine(); + debug!("{}", "Mined!!!".color(MSG_COLOR)) + } + } + Ok(()) } pub fn get_basic_info(&self) -> PeerInfo { @@ -155,16 +396,20 @@ impl Node { fn async_broadcast_transaction(&self, transaction: Transaction) { // add this transaction to broadcast channel // which will then send it asynchronously - self.broadcast_channel_in - .send(( - Request::NewTransaction(self.basic_info.clone(), transaction) - )) + self.broadcast_sender + .send(Event::Broadcast(Request::NewTransaction( + self.basic_info.clone(), + transaction, + ))) .unwrap(); } fn async_broadcast_block(&self, block: Block) { - self.broadcast_channel_in - .send((Request::NewBlock(self.get_basic_info(), block))) + self.broadcast_sender + .send(Event::Broadcast(Request::NewBlock( + self.get_basic_info(), + block, + ))) .unwrap(); } @@ -173,22 +418,14 @@ impl Node { } fn async_broadcast_peer(&self, peer: PeerInfo) { - self.broadcast_channel_in - .send((Request::NewPeer(self.get_basic_info(), peer))) + self.broadcast_sender + .send(Event::Broadcast(Request::NewPeer( + self.get_basic_info(), + peer, + ))) .unwrap(); } - pub fn try_fetch_one_broadcast(&self) { - trace!("try_fetch_one_broadcast..."); - let recv_res = self.broadcast_channel_out.try_recv(); - match recv_res { - Ok(req) => { - let _ = self.broadcast_request(&req); - } - Err(_) => {} - } - } - // TODO: what does the return value mean? fn broadcast_request(&self, req: &Request) -> Result { debug!("{}", "broadcast begins".color(PROMINENT_COLOR));