diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 11a7339b..0d17e047 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -621,7 +621,7 @@ impl BlockChain { block_votes_variance += p * (1.0-p); } // using gaussian approximation - let tmp = block_votes_mean - (2.0 * block_votes_variance * LOG_EPSILON).sqrt(); + let tmp = block_votes_mean - (block_votes_variance).sqrt() * (*QUANTILE_EPSILON); if tmp > 0.0 { block_votes_lcb += tmp; } @@ -651,6 +651,7 @@ impl BlockChain { for p_block in &proposer_blocks { // if the below condition is true, then final votes on p_block could overtake new_leader if max_vote_lcb < votes_lcb.get(p_block).unwrap() + remaining_votes && *p_block != new_leader.unwrap() { + println!("Candidate: {:?}, lb={}, second ub={}", new_leader, max_vote_lcb, votes_lcb.get(p_block).unwrap() + remaining_votes); new_leader = None; break; } diff --git a/src/config.rs b/src/config.rs index 16256d01..b3011fe4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,24 +6,25 @@ pub const NETWORK_DELAY: f32 = 1.4; // the expected block propagation delay (in // Design parameters pub const NUM_VOTER_CHAINS: u16 = 1000 as u16; // more chains means better latency pub const TX_BLOCK_SIZE: u32 = 64000; // the maximum size of a transaction block (in Bytes) -pub const TX_THROUGHPUT: u32 = 70000; // the transaction throughput we want to support (in Tx/s) +pub const TX_THROUGHPUT: u32 = 80000; // the transaction throughput we want to support (in Tx/s) pub const TX_BLOCK_TRANSACTIONS: u32 = TX_BLOCK_SIZE / AVG_TX_SIZE; -pub const PROPOSER_BLOCK_TX_REFS: u32 = (TX_MINING_RATE / CHAIN_MINING_RATE * 2.0) as u32; +pub const PROPOSER_BLOCK_TX_REFS: u32 = (TX_MINING_RATE / PROPOSER_CHAIN_MINING_RATE * 2.0) as u32; pub const AVG_TX_SIZE: u32 = 280; // average size of a transaction (in Bytes) pub const TX_MINING_RATE: f32 = TX_THROUGHPUT as f32 / TX_BLOCK_TRANSACTIONS as f32; -pub const CHAIN_MINING_RATE: f32 = 0.074; // mining rate of the proposer chain and each voter chain in Blks/s +pub const VOTER_CHAIN_MINING_RATE: f32 = 0.100; // mining rate of the proposer chain and each voter chain in Blks/s +pub const PROPOSER_CHAIN_MINING_RATE: f32 = 0.100; -pub const ADVERSARY_MINING_POWER: f32 = 0.43; // the adversary power we want to tolerate -pub const LOG_EPSILON: f32 = 10.0; // -ln(1-confirmation_guarantee) -pub const ALPHA: f32 = (CHAIN_MINING_RATE * NETWORK_DELAY) / (1.0 + CHAIN_MINING_RATE * NETWORK_DELAY); // alpha = orphan blocks / total blocks +pub const ADVERSARY_MINING_POWER: f32 = 0.40; // the adversary power we want to tolerate +pub const LOG_EPSILON: f32 = 20.0; // -ln(1-confirmation_guarantee) +pub const ALPHA: f32 = (VOTER_CHAIN_MINING_RATE * NETWORK_DELAY) / (1.0 + VOTER_CHAIN_MINING_RATE * NETWORK_DELAY); // alpha = orphan blocks / total blocks // Do not change from here // Mining rate of each type (Proposer : Voter (all chains) : Transaction, in Blks/s) pub const RATIO: (f32, f32, f32) = ( - CHAIN_MINING_RATE, - CHAIN_MINING_RATE * (NUM_VOTER_CHAINS as f32), + PROPOSER_CHAIN_MINING_RATE, + VOTER_CHAIN_MINING_RATE * (NUM_VOTER_CHAINS as f32), TX_MINING_RATE, ); @@ -43,6 +44,8 @@ pub const PROPOSER_INDEX: u16 = 0; pub const FIRST_VOTER_INDEX: u16 = 2; lazy_static! { + pub static ref QUANTILE_EPSILON: f32 = (2.0 * LOG_EPSILON - (2.0 * LOG_EPSILON).ln() - (2.0 * 3.1416926 as f32).ln()).sqrt(); + pub static ref DEFAULT_DIFFICULTY: H256 = { let raw: [u8; 32] = [255; 32]; raw.into() diff --git a/src/experiment/performance_counter.rs b/src/experiment/performance_counter.rs index 8c2750b6..c3e07054 100644 --- a/src/experiment/performance_counter.rs +++ b/src/experiment/performance_counter.rs @@ -153,6 +153,7 @@ impl Counter { }; match b.content { BlockContent::Transaction(_) => { + println!("Received Transaction Block Delay = {} ms", delay); self.total_transaction_block_delay .fetch_add(delay as usize, Ordering::Relaxed); self.total_transaction_block_squared_delay @@ -161,6 +162,7 @@ impl Counter { .fetch_add(1, Ordering::Relaxed); } BlockContent::Proposer(_) => { + println!("Received Proposer Block Delay = {} ms", delay); self.total_proposer_block_delay .fetch_add(delay as usize, Ordering::Relaxed); self.total_proposer_block_squared_delay @@ -169,6 +171,7 @@ impl Counter { .fetch_add(1, Ordering::Relaxed); } BlockContent::Voter(_) => { + println!("Received Voter Block Delay = {} ms", delay); self.total_voter_block_delay .fetch_add(delay as usize, Ordering::Relaxed); self.total_voter_block_squared_delay diff --git a/src/experiment/transaction_generator.rs b/src/experiment/transaction_generator.rs index 46a0bcb7..217139a7 100644 --- a/src/experiment/transaction_generator.rs +++ b/src/experiment/transaction_generator.rs @@ -103,7 +103,9 @@ impl TransactionGenerator { // TODO: make it flexible let addr = self.wallet.addresses().unwrap()[0]; let mut prev_coin = None; + let mut tx_gen_start: time::Instant = time::Instant::now(); loop { + tx_gen_start = time::Instant::now(); // check the current state and try to receive control message match self.state { State::Continuous(_) | State::Step(_) => match self.control_chan.try_recv() { @@ -168,6 +170,15 @@ impl TransactionGenerator { ArrivalDistribution::Uniform(d) => d.interval, }; let interval = time::Duration::from_micros(interval); + let time_spent = time::Instant::now().duration_since(tx_gen_start); + let interval = { + if interval > time_spent { + interval - time_spent + } + else { + time::Duration::new(0, 0) + } + }; thread::sleep(interval); } }); diff --git a/src/miner/mod.rs b/src/miner/mod.rs index 694459a3..058807e7 100644 --- a/src/miner/mod.rs +++ b/src/miner/mod.rs @@ -195,9 +195,12 @@ impl Context { } let mut rng = rand::thread_rng(); + let mut block_start = time::Instant::now(); // main mining loop loop { + block_start = time::Instant::now(); + // check and react to control signals match self.operating_state { OperatingState::Paused => { @@ -236,13 +239,18 @@ impl Context { // handle context updates let mut touched_content: BTreeSet = BTreeSet::new(); + let mut voter_shift = false; // update voter parents for voter_chain in new_voter_block.iter() { let chain_id: usize = (FIRST_VOTER_INDEX + voter_chain) as usize; let voter_parent = self.blockchain.best_voter(*voter_chain as usize); if let Content::Voter(c) = &mut self.contents[chain_id] { - c.voter_parent = voter_parent; - touched_content.insert(chain_id as u16); + if &voter_parent != &c.voter_parent { + c.voter_parent = voter_parent; + // mark that we have shifted a vote + voter_shift = true; + touched_content.insert(chain_id as u16); + } } else { unreachable!(); } @@ -341,21 +349,23 @@ impl Context { } } } else { - for voter_chain in new_voter_block.iter() { - let chain_id: usize = (FIRST_VOTER_INDEX + voter_chain) as usize; - let voter_parent = if let Content::Voter(c) = &self.contents[chain_id] { - c.voter_parent - } else { - unreachable!(); - }; - if let Content::Voter(c) = &mut self.contents[chain_id] { - c.votes = self - .blockchain - .unvoted_proposer(&voter_parent, &self.header.parent) - .unwrap(); - touched_content.insert(chain_id as u16); - } else { - unreachable!(); + if !new_voter_block.is_empty() { + for voter_chain in 0..NUM_VOTER_CHAINS { + let chain_id: usize = (FIRST_VOTER_INDEX + voter_chain) as usize; + let voter_parent = if let Content::Voter(c) = &self.contents[chain_id] { + c.voter_parent + } else { + unreachable!(); + }; + if let Content::Voter(c) = &mut self.contents[chain_id] { + c.votes = self + .blockchain + .unvoted_proposer(&voter_parent, &self.header.parent) + .unwrap(); + touched_content.insert(chain_id as u16); + } else { + unreachable!(); + } } } } @@ -364,7 +374,7 @@ impl Context { self.header.difficulty = self.get_difficulty(&self.header.parent); // update or rebuild the merkle tree according to what we did in the last stage - if new_proposer_block { + if new_proposer_block || voter_shift { // if there has been a new proposer block, simply rebuild the merkle tree self.content_merkle_tree = MerkleTree::new(&self.contents); } else { @@ -485,7 +495,10 @@ impl Context { let interval_dist = rand::distributions::Exp::new(1.0 / (i as f64)); let interval = interval_dist.sample(&mut rng); let interval = time::Duration::from_micros(interval as u64); - thread::sleep(interval); + let time_spent = time::Instant::now().duration_since(block_start); + if interval > time_spent { + thread::sleep(interval - time_spent); + } } } } diff --git a/testbed/.gitignore b/testbed/.gitignore index 5dcde73c..ffa59843 100644 --- a/testbed/.gitignore +++ b/testbed/.gitignore @@ -1,5 +1,6 @@ *.svg *.png +nodelog data diff --git a/testbed/run.sh b/testbed/run.sh index e50957d3..aee429b3 100755 --- a/testbed/run.sh +++ b/testbed/run.sh @@ -1,6 +1,7 @@ #!/bin/bash -LAUNCH_TEMPLATE=lt-02226ebae5fbef5f3 +#LAUNCH_TEMPLATE=lt-02226ebae5fbef5f3 # Ohio +LAUNCH_TEMPLATE=lt-09d74bbb3e4da1ff9 # N. Virginia function start_instances { @@ -21,12 +22,13 @@ function start_instances echo "Launching $1 AWS EC2 instances" local instances="" local remaining=$1 + batchsize="100" while [ "$remaining" -gt "0" ] do - if [ "10" -gt "$remaining" ]; then + if [ "$batchsize" -gt "$remaining" ]; then local thisbatch="$remaining" else - local thisbatch="10" + local thisbatch="$batchsize" fi tput rc tput el @@ -71,19 +73,32 @@ function start_instances tput setaf 2 echo "Instances started" tput sgr0 - curl -s --form-string "token=$PUSHOVER_TOKEN" --form-string "user=$PUSHOVER_USER" --form-string "title=EC2 Instances Launched" --form-string "message=$1 EC2 instances were just launched by user $(whoami)." https://api.pushover.net/1/messages.json &> /dev/null + #curl -s --form-string "token=$PUSHOVER_TOKEN" --form-string "user=$PUSHOVER_USER" --form-string "title=EC2 Instances Launched" --form-string "message=$1 EC2 instances were just launched by user $(whoami)." https://api.pushover.net/1/messages.json &> /dev/null } function fix_ssh_config { - local instances=`jq -r '.Instances[].InstanceId ' log/aws_start.log` + instances=`aws ec2 describe-instances --query 'Reservations[*].Instances[*].[InstanceId][][]' --filters Name=instance-state-name,Values=running Name=tag-key,Values=prism --output text` rm -f instances.txt rm -f ~/.ssh/config.d/prism echo "Querying public IPs and writing to SSH config" - for instance in $instances ; + while [ 1 ] do - local ip=`aws ec2 describe-instances --instance-ids $instance | jq -r '.Reservations[0].Instances[0].PublicIpAddress'` - local lan=`aws ec2 describe-instances --instance-ids $instance | jq -r '.Reservations[0].Instances[0].PrivateIpAddress'` + rawdetails=`aws ec2 describe-instances --instance-ids $instances --query 'Reservations[*].Instances[*].{publicip:PublicIpAddress,id:InstanceId,privateip:PrivateIpAddress}[]'` + if echo $rawdetails | jq '.[].publicip' | grep null &> /dev/null ; then + echo "Waiting for public IP addresses to be assigned" + sleep 3 + continue + else + details=`echo "$rawdetails" | jq -c '.[]'` + break + fi + done + for instancedetail in $details; + do + local instance=`echo $instancedetail | jq -r '.id'` + local ip=`echo $instancedetail | jq -r '.publicip'` + local lan=`echo $instancedetail | jq -r '.privateip'` echo "$instance,$ip,$lan" >> instances.txt echo "Host $instance" >> ~/.ssh/config.d/prism echo " Hostname $ip" >> ~/.ssh/config.d/prism @@ -93,8 +108,10 @@ function fix_ssh_config echo " UserKnownHostsFile=/dev/null" >> ~/.ssh/config.d/prism echo "" >> ~/.ssh/config.d/prism done + echo "SSH config written, waiting for instances to initialize" + aws ec2 wait instance-running --instance-ids $instances tput setaf 2 - echo "SSH config written" + echo "Instances started" tput sgr0 } @@ -122,7 +139,7 @@ function stop_instances tput setaf 2 echo "Instances terminated" tput sgr0 - curl -s --form-string "token=$PUSHOVER_TOKEN" --form-string "user=$PUSHOVER_USER" --form-string "title=EC2 Instances Stopped" --form-string "message=EC2 instances launched at $(date -r instances.txt) were just terminated by user $(whoami)." https://api.pushover.net/1/messages.json &> /dev/null + #curl -s --form-string "token=$PUSHOVER_TOKEN" --form-string "user=$PUSHOVER_USER" --form-string "title=EC2 Instances Stopped" --form-string "message=EC2 instances launched at $(date -r instances.txt) were just terminated by user $(whoami)." https://api.pushover.net/1/messages.json &> /dev/null } function count_instances @@ -488,12 +505,12 @@ function start_transactions_single { curl -s "http://$3:$4/transaction-generator/set-arrival-distribution?interval=100&distribution=uniform" curl -s "http://$3:$4/transaction-generator/set-value-distribution?min=100&max=100&distribution=uniform" - curl -s "http://$3:$4/transaction-generator/start?throttle=8000" + curl -s "http://$3:$4/transaction-generator/start?throttle=10000" } function start_mining_single { - curl -s "http://$3:$4/miner/start?lambda=258000&lazy=false" + curl -s "http://$3:$4/miner/start?lambda=666519&lazy=false" } function stop_transactions_single @@ -716,6 +733,49 @@ function show_demo ./telematics/telematics log -duration 7200 -grafana } +function set_tx_rate +{ + local nodes=`cat nodes.txt` + local num_nodes=`cat nodes.txt | wc -l` + local txrate=$1 + if [ "$1" -lt "$num_nodes" ]; then + txrate=$num_nodes + fi + local itv=`expr 1000000 / \( $txrate / $num_nodes \)` + local pids='' + for node in $nodes; do + local name + local host + local pubip + local apiport + IFS=',' read -r name host pubip _ _ apiport _ <<< "$node" + curl -s "http://$pubip:$apiport/transaction-generator/set-arrival-distribution?interval=$itv&distribution=uniform" &> /dev/null & + pids="$pids $!" + done + for pid in $pids; do + wait $pid + done +} + +function copy_log +{ + rm -rf nodelog + mkdir -p nodelog + local nodes=`cat nodes.txt` + local num_nodes=`cat nodes.txt | wc -l` + local pids='' + for node in $nodes; do + local name + local host + IFS=',' read -r name host _ _ _ _ _ <<< "$node" + (ssh $host -- "cat log/$name.log | gzip > nodelog.gzip" && scp $host:~/nodelog.gzip nodelog/$name.gzip) &> /dev/null & + pids="$pids $!" + done + for pid in $pids; do + wait $pid + done +} + mkdir -p log case "$1" in help) @@ -749,6 +809,7 @@ case "$1" in show-demo Start the demo workflow stop-tx Stop generating transactions stop-mine Stop mining + tx-rate r Set transaction throughput to r Run Algorand Experiment @@ -765,6 +826,7 @@ case "$1" in profile node f d Capture stack trace for node with frequency f and duration d flamegraph node Generate and download flamegraph for node open-dashboard Open the performance dashboard + copy-log Copy node log to nodelog/ Connect to Testbed @@ -808,6 +870,8 @@ case "$1" in show_demo ;; stop-tx) query_api stop_transactions 0 ;; + tx-rate) + set_tx_rate $2 ;; stop-mine) query_api stop_mining 0 ;; shape-traffic) @@ -846,6 +910,8 @@ case "$1" in scp_from_server $2 $3 $4 ;; read-log) read_log $2 ;; + copy-log) + copy_log ;; *) tput setaf 1 echo "Unrecognized subcommand '$1'" diff --git a/testbed/scripts/gen_prism_payload.py b/testbed/scripts/gen_prism_payload.py index d589c5e8..226f00aa 100644 --- a/testbed/scripts/gen_prism_payload.py +++ b/testbed/scripts/gen_prism_payload.py @@ -4,7 +4,7 @@ import subprocess template = """ -/home/ubuntu/payload/binary/prism --p2p {ip}:{p2p_port} --api {ip}:{api_port} --visual {ip}:{vis_port} --blockdb /tmp/prism/{node_name}-blockdb.rocksdb --blockchaindb /tmp/prism/{node_name}-blockchaindb.rocksdb --utxodb /tmp/prism/{node_name}-utxodb.rocksdb --walletdb /tmp/prism/{node_name}-wallet.rocksdb -vv --load-key /home/ubuntu/payload/prism-payload/{node_name}.pkcs8 {peer_opt} {fund_opt} --fund-coins=100000 --mempool-size=50000 +/home/ubuntu/payload/binary/prism --p2p {ip}:{p2p_port} --api {ip}:{api_port} --visual {ip}:{vis_port} --blockdb /tmp/prism/{node_name}-blockdb.rocksdb --blockchaindb /tmp/prism/{node_name}-blockchaindb.rocksdb --utxodb /tmp/prism/{node_name}-utxodb.rocksdb --walletdb /tmp/prism/{node_name}-wallet.rocksdb -vv --load-key /home/ubuntu/payload/prism-payload/{node_name}.pkcs8 {peer_opt} {fund_opt} --fund-coins=40000 --mempool-size=50000 """ instances_file = sys.argv[1] diff --git a/testbed/scripts/generate_topo.py b/testbed/scripts/generate_topo.py index d3ec9fad..ff84683b 100644 --- a/testbed/scripts/generate_topo.py +++ b/testbed/scripts/generate_topo.py @@ -21,6 +21,8 @@ print("Unrecognized topology") sys.exit(1) +sys.stderr.write(str(nx.algorithms.distance_measures.diameter(graph))) + for node in graph.nodes(): name = "node_" + str(node) nodes.append(name) @@ -33,4 +35,3 @@ }) result = {"nodes": nodes, "connections": connections} print(json.dumps(result, sort_keys=True, indent=4)) -print(nx.complete_graph(100)) diff --git a/testbed/scripts/stop-prism.sh b/testbed/scripts/stop-prism.sh index f5916de4..3ee2b3ef 100644 --- a/testbed/scripts/stop-prism.sh +++ b/testbed/scripts/stop-prism.sh @@ -2,7 +2,7 @@ echo "Killing Prism processes" -pkill prism +pkill -9 prism wait $! # pids=`cat /home/ubuntu/log/prism.pid` diff --git a/testbed/telematics/log.go b/testbed/telematics/log.go index e3160574..4f024b97 100644 --- a/testbed/telematics/log.go +++ b/testbed/telematics/log.go @@ -39,6 +39,7 @@ type Snapshot struct { type expSnapshot struct { time int + generated_tx int confirmed_tx int confirmed_tx_blk int processed_voter int @@ -342,6 +343,7 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { expStartTime = dur lastSnapshot = expSnapshot{ time: dur, + generated_tx: curr["node_9"].Generated_transactions, confirmed_tx: curr["node_0"].Confirmed_transactions, confirmed_tx_blk: curr["node_0"].Confirmed_transaction_blocks, processed_voter: curr["node_0"].Processed_voter_blocks, @@ -350,7 +352,7 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { proposer_len: curr["node_0"].Proposer_main_chain_length, latency_sum: curr["node_0"].Total_transaction_block_confirmation_latency, } - expStopAlarm = time.After(300 * time.Second) + expStopAlarm = time.After(600 * time.Second) case <-expStopAlarm: expStarted = true expRunning = false @@ -365,6 +367,7 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { expRunning = true expStartPerf = expSnapshot{ time: dur, + generated_tx: lastSnapshot.generated_tx, confirmed_tx: lastSnapshot.confirmed_tx, confirmed_tx_blk: lastSnapshot.confirmed_tx_blk, processed_voter: lastSnapshot.processed_voter, @@ -379,6 +382,7 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { if lastSnapshot.confirmed_tx != curr["node_0"].Confirmed_transactions { expStopPerf = expSnapshot{ time: dur, + generated_tx: lastSnapshot.generated_tx, confirmed_tx: lastSnapshot.confirmed_tx, confirmed_tx_blk: lastSnapshot.confirmed_tx_blk, processed_voter: lastSnapshot.processed_voter, @@ -393,6 +397,7 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { lastSnapshot = expSnapshot{ time: dur, + generated_tx: curr["node_0"].Generated_transactions, confirmed_tx: curr["node_0"].Confirmed_transactions, confirmed_tx_blk: curr["node_0"].Confirmed_transaction_blocks, processed_voter: curr["node_0"].Processed_voter_blocks, @@ -431,12 +436,13 @@ func log(interval, duration uint, nodesFile, dataDir string, grafana bool) { tm.Printf("Thruput %7.7g\n", float64(expStopPerf.confirmed_tx-expStartPerf.confirmed_tx)/float64(expdur)) tm.Printf("Prop Fork %7.7g\n", float64(expStopPerf.processed_proposer-expStopPerf.proposer_len-expStartPerf.processed_proposer+expStartPerf.proposer_len)/float64(expStopPerf.processed_proposer-expStartPerf.processed_proposer)) tm.Printf("Vote Fork %7.7g\n", float64(expStopPerf.processed_voter-expStopPerf.voter_len_sum-expStartPerf.processed_voter+expStartPerf.voter_len_sum)/float64(expStopPerf.processed_voter-expStartPerf.processed_voter)) + tm.Printf("Generation %7.7g\n", float64(expStopPerf.generated_tx-expStartPerf.generated_tx)/float64(expdur)) } } else { if !expStarted { tm.Printf("Hit x to start a recording\n") } else { - tm.Printf("Experiment running. Remaining time: %v seconds\n", 300-dur+expStartTime) + tm.Printf("Experiment running. Remaining time: %v seconds\n", 600-dur+expStartTime) } } tm.Flush()