diff --git a/simulations/broadcast/simulation.py b/simulations/broadcast/simulation.py index 3cb9cb2..5fba118 100644 --- a/simulations/broadcast/simulation.py +++ b/simulations/broadcast/simulation.py @@ -47,28 +47,19 @@ def __init__(self, *args, **kwargs) -> None: self.my_peer_num = - 1 super().__init__(*args, **kwargs) - def on_new_batch_created(self, new_batch: TxBatchPayload): - super().on_new_batch_created(new_batch) - print("Peer {}: New batch with {} # txs created at time {}".format(self.my_peer_num, - len(new_batch.txs), - get_event_loop().time())) + self.tx_start_time = {} + self.tx_ready_time = {} # noinspection PyUnreachableCode def on_transaction_created(self, new_tx: TransactionPayload): # Write time when we've first seen the transaction - print("Peer {}: New transaction {} at time {}".format(self.my_peer_num, - new_tx.tx_id, - get_event_loop().time())) + self.tx_start_time[new_tx.tx_id] = get_event_loop().time() + super().on_transaction_created(new_tx) - def on_new_header(self, new_header: HeaderPayload): - super().on_new_header(new_header) - # Write the time when transaction is finalized - for batch_ack in new_header.batches: - batch = self.batches[batch_ack.batch_id] - print("Peer {}: Number of transactions finalized {} at time {}".format( - self.my_peer_num, - len(batch.txs), - get_event_loop().time())) + def on_transaction_finalized(self, tx: TransactionPayload): + # Write time when transaction is finalized + self.tx_ready_time[tx.tx_id] = get_event_loop().time() + super().on_transaction_finalized(tx) def main(prefix="", sim_settings: SimulationSettings = None): @@ -77,7 +68,7 @@ def main(prefix="", sim_settings: SimulationSettings = None): else: LATENCY = "global" N_CLIENTS = 10 - N = 40 + N = 200 settings = SimulationSettings() settings.clients = N_CLIENTS @@ -98,7 +89,33 @@ def main(prefix="", sim_settings: SimulationSettings = None): simulation.loop.run_forever() for peer_id in simulation.nodes.keys(): - print(peer_id, len(simulation.nodes[peer_id].overlays[0].batches)) + print(peer_id, + simulation.nodes[peer_id].overlays[0].receive_counter, + simulation.nodes[peer_id].overlays[0].send_counter, + ) + + # Collect transaction start time, merge dictionaries + total_tx_start_times = {} + for peer_id in simulation.nodes.keys(): + total_tx_start_times.update(simulation.nodes[peer_id].overlays[0].tx_start_time) + + # Collect transaction ready time, merge dictionaries + total_tx_ready_times = {} + for peer_id in simulation.nodes.keys(): + for tx_id, ready_time in simulation.nodes[peer_id].overlays[0].tx_ready_time.items(): + # Start time + latency = ready_time - total_tx_start_times[tx_id] + if tx_id not in total_tx_ready_times: + total_tx_ready_times[tx_id] = [] + total_tx_ready_times[tx_id].append(latency) + # Report for 10 random transaction min, avg, max + import random + random_tx_ids = random.sample(list(total_tx_ready_times.keys()), min(10, len(total_tx_ready_times.keys()))) + for tx_id in random_tx_ids: + print(len(total_tx_ready_times[tx_id]), + min(total_tx_ready_times[tx_id]), + sum(total_tx_ready_times[tx_id]) / len(total_tx_ready_times[tx_id]), + max(total_tx_ready_times[tx_id])) if __name__ == "__main__": @@ -108,4 +125,4 @@ def main(prefix="", sim_settings: SimulationSettings = None): # input_value = int(sys.argv[1]) input_value = 1 prefix = "" - main("net_data/" + str(input_value) + prefix) + main() diff --git a/simulations/lz/basic_simulation.py b/simulations/lz/basic_simulation.py index ef68672..4a1be92 100644 --- a/simulations/lz/basic_simulation.py +++ b/simulations/lz/basic_simulation.py @@ -50,6 +50,7 @@ def on_discovery_complete(self): else: self.nodes[peer_id].overlays[0].start_reconciliation() self.nodes[peer_id].overlays[0].start_periodic_settlement() + self.nodes[peer_id].overlays[0].start_batch_making() class SimulateLZCommunity(SimulatedCommunityMixin, SyncCommunity): @@ -59,6 +60,8 @@ class SimulateLZCommunity(SimulatedCommunityMixin, SyncCommunity): on_received_reconciliation_response = time_mark(SyncCommunity.on_received_reconciliation_response) reconcile_with_neighbors = time_mark(SyncCommunity.reconcile_with_neighbors) on_received_transaction_batch = time_mark(SyncCommunity.on_received_transaction_batch) + on_received_transaction = time_mark(SyncCommunity.on_received_transaction) + settle_transactions = time_mark(SyncCommunity.settle_transactions) def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -76,53 +79,53 @@ def on_settle_transactions(self, settled_txs: Iterable[int]): out.write("{},{},{}\n".format(hash(self.my_peer), t_id, get_event_loop().time())) -def main(sim_settings: SimulationSettings = None): +def main(prefix="", sim_settings: SimulationSettings = None): if sim_settings: settings = sim_settings else: LATENCY = "global" - N_CLIENTS = 20 - N_PEERS = 300 - N_FAULTS = 10 + N_CLIENTS = 10 + N_PEERS = 190 + N_FAULTS = 0 N = N_CLIENTS + N_PEERS settings = SimulationSettings() settings.clients = N_CLIENTS settings.peers = N settings.faulty = N_FAULTS - settings.duration = 60 - d = 30 + settings.duration = 30 + d = 8 settings.topology = random_topology(N, d) - settings.logging_level = 'INFO' + settings.logging_level = 'WARNING' settings.discovery_delay = 5 settings.location_latency_generator = LocalLocations if LATENCY == 'local' else DefaultLocations settings.community_map = {'LZCommunity': SimulateLZCommunity} class SimSettings(LZSettings): - recon_freq = 1 + recon_freq = 2 recon_fanout = 8 tx_batch = 1 - tx_freq = 2 - initial_fanout = 4 + tx_freq = 1 / 10 + initial_fanout = 5 sketch_size = 100 settle_size = 350 - settle_freq = 14 - settle_delay = 14 - settle_strategy = SettlementStrategy.FAIR + settle_freq = 1 + settle_delay = 1 + settle_strategy = SettlementStrategy.VANILLA settings.overlay_settings = SimSettings() - DIR_PREFIX = "../../lz_visualize/data/sim" + DIR_PREFIX = "../../lz_visualize/" + prefix - DATA_FILE = DIR_PREFIX + "_n_{}_t_{}_f_{}_d_{}_t_{:.1f}_s_{}(2)".format(N, - d, - SimSettings.recon_fanout, - SimSettings.recon_freq, - SimSettings.tx_batch / SimSettings.tx_freq * N_CLIENTS, - SimSettings.settle_freq - ) + DATA_FILE = DIR_PREFIX + "_n_{}_t_{}_f_{}_d_{}_t_{:.1f}_s_{}".format(N, + d, + SimSettings.recon_fanout, + SimSettings.recon_freq, + SimSettings.tx_batch / SimSettings.tx_freq * N_CLIENTS, + SimSettings.settle_freq + ) TX_FILE = DATA_FILE + ".csv" SD_FILE = DATA_FILE + "_data.csv" @@ -137,6 +140,12 @@ class SimSettings(LZSettings): for peer_id in simulation.nodes.keys(): print(peer_id, len(simulation.nodes[peer_id].overlays[0].memcache.tx_payloads)) + for peer_id in simulation.nodes.keys(): + print(peer_id, + simulation.nodes[peer_id].overlays[0].receive_counter, + simulation.nodes[peer_id].overlays[0].send_counter, + ) + SD_FILE = settings.consts.get("SD_FILE") with open(SD_FILE, "w") as out_data: @@ -150,4 +159,10 @@ class SimSettings(LZSettings): if __name__ == "__main__": - main() + import sys + + # Access the input value + # input_value = int(sys.argv[1]) + input_value = 1 + prefix = "" + main("net_data/" + str(input_value) + prefix) diff --git a/simulations/lz/multi_sim.sh b/simulations/lz/multi_sim.sh new file mode 100644 index 0000000..79cb19c --- /dev/null +++ b/simulations/lz/multi_sim.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +# Function to run the Python script +run_script() { + input=$1 + /Users/bulat/Library/Caches/pypoetry/virtualenvs/bami-T1bVtQL0-py3.10/bin/python /Users/bulat/projects/main_bami/simulations/lz/basic_simulation.py "$input" +} + +# Iterate over the input numbers from 1 to 10 +for ((input=1; input<=10; input++)); do + run_script "$input" & +done + +# Wait for all the background processes to finish +wait + +echo "All processes finished." diff --git a/src/bami/broadcast/community.py b/src/bami/broadcast/community.py index a15b753..7131df7 100644 --- a/src/bami/broadcast/community.py +++ b/src/bami/broadcast/community.py @@ -48,6 +48,9 @@ def __init__(self, *args, **kwargs) -> None: self.pending_headers = {} self.batch_to_header = {} + self.receive_counter = 0 + self.send_counter = 0 + # Message state machine self.add_message_handler(TxBatchPayload, self.receive_new_batch) self.add_message_handler(BatchAckPayload, self.receive_batch_ack) @@ -55,6 +58,14 @@ def __init__(self, *args, **kwargs) -> None: self.add_message_handler(BatchRequestPayload, self.receive_batch_request) self.add_message_handler(TransactionPayload, self.received_transaction) + def on_packet(self, packet, warn_unknown=True): + self.receive_counter += len(packet) + super().on_packet(packet, warn_unknown) + + def ezr_pack(self, msg_num: int, *payloads: AnyPayload, **kwargs) -> bytes: + self.send_counter += len(payloads) + return super().ezr_pack(msg_num, *payloads, **kwargs) + def ez_send(self, peer: Peer, *payloads: AnyPayload, **kwargs) -> None: if self.latency_sim: latency = self.sim_net.get_link_latency(self.my_peer_id, peer.public_key.key_to_bin()) @@ -128,12 +139,7 @@ def on_transaction_created(self, new_tx: TransactionPayload): def feed_batch_maker(self, new_tx: TransactionPayload): self.pending_transactions.append(new_tx) if len(self.pending_transactions) >= self.settings.batch_size: - self.replace_task( - "batch_maker", - self.seal_new_batch, - interval=self.settings.batch_freq, - delay=0, - ) + self.seal_new_batch() def start_batch_making(self): self.register_task( diff --git a/src/bami/broadcast/settings.py b/src/bami/broadcast/settings.py index a7f57cc..14827df 100644 --- a/src/bami/broadcast/settings.py +++ b/src/bami/broadcast/settings.py @@ -5,17 +5,17 @@ class MempoolBroadcastSettings: # Transaction creation parameters script_size: int = 200 # in bytes - tx_freq: float = 1 / 200 # x/k: k transaction in x second + tx_freq: float = 1 / 10 # x/k: k transaction in x second tx_delay: float = 0.05 # Maximum delay before starting transaction creation - initial_fanout: int = 8 # number of peers to send the transaction to + initial_fanout: int = 5 # number of peers to send the transaction to # BatchMaker parameters batch_size: int = 250 # number of transactions - batch_freq: float = 0.5 # in seconds + batch_freq: float = 1.5 # in seconds batch_delay: float = 0.6 # in seconds, delay before starting batch creation # Quorum parameters - quorum_threshold: int = 25 # number of nodes required to progress + quorum_threshold: int = 130 # number of nodes required to progress # Sync parameters sync_timer_delta: float = 1 # in seconds @@ -27,4 +27,4 @@ class MempoolBroadcastSettings: header_freq: float = 0.2 # in seconds start_immediately: bool = False - simulate_network_latency: bool = True + simulate_network_latency: bool = False diff --git a/src/bami/lz/community.py b/src/bami/lz/community.py index d5eb142..5034f7f 100644 --- a/src/bami/lz/community.py +++ b/src/bami/lz/community.py @@ -54,13 +54,16 @@ def __init__(self, *args, **kwargs) -> None: # Message processing # Gossip messages - self.add_message_handler(TransactionPayload, self.received_transaction) + self.add_message_handler(TransactionPayload, self.on_received_transaction) self.add_message_handler(ReconciliationRequestPayload, self.on_received_reconciliation_request) self.add_message_handler(ReconciliationResponsePayload, self.on_received_reconciliation_response) self.add_message_handler(TransactionsChallengePayload, self.on_received_transactions_challenge) self.add_message_handler(TransactionsRequestPayload, self.on_received_transactions_request) self.add_message_handler(TransactionBatchPayload, self.on_received_transaction_batch) + self.receive_counter = 0 + self.send_counter = 0 + self._my_peer_id = self.my_peer.public_key.key_to_bin() self.latency_sim = self.settings.simulate_network_latency if self.latency_sim: @@ -101,6 +104,14 @@ def __init__(self, *args, **kwargs) -> None: if self.settings.start_immediately: self.start_tasks() + def on_packet(self, packet, warn_unknown=True): + self.receive_counter += len(packet) + super().on_packet(packet, warn_unknown) + + def ezr_pack(self, msg_num: int, *payloads: AnyPayload, **kwargs) -> bytes: + self.send_counter += len(payloads) + return super().ezr_pack(msg_num, *payloads, **kwargs) + def ez_send(self, peer: Peer, *payloads: AnyPayload, **kwargs) -> None: if self.latency_sim: latency = self.sim_net.get_link_latency(self.my_peer_id, peer.public_key.key_to_bin()) @@ -178,16 +189,11 @@ def on_transaction_created(self, tx_id: int): def create_transaction(self): for _ in range(self.settings.tx_batch): new_tx = self.create_transaction_payload() - self.process_transaction(new_tx) - t_id = bytes_to_uint(new_tx.t_id, self.settings.tx_id_size) - self.on_transaction_created(t_id) - # This is a new transaction - push to neighbors selected = random.sample(self.get_full_nodes(), min(self.settings.initial_fanout, len(self.get_full_nodes()))) - for p in selected: self.logger.debug("Sending transaction to {}".format(p)) self.ez_send(p, new_tx) @@ -210,7 +216,7 @@ def start_tx_creation(self): # --------- Transaction Processing --------------- @lazy_wrapper(TransactionPayload) - def received_transaction(self, _: Peer, payload: TransactionPayload): + def on_received_transaction(self, _: Peer, payload: TransactionPayload): if self.is_light_client: self.logger.warn("Client received transaction") self.process_transaction(payload) @@ -224,7 +230,7 @@ def seal_new_batch(self): selected = self.pending_tx_payloads[:self.settings.batch_size] batch = TransactionBatchPayload(selected) self.broadcast(batch) - self.pending_transactions = self.pending_tx_payloads[self.settings.batch_size:] + self.pending_tx_payloads = self.pending_tx_payloads[self.settings.batch_size:] def broadcast(self, message_payload): """Broadcast message to all peers and return the awaited id for acknowledgement""" @@ -239,12 +245,7 @@ def broadcast(self, message_payload): def feed_batch_maker(self, new_tx: TransactionPayload): self.pending_tx_payloads.append(new_tx) if len(self.pending_tx_payloads) >= self.settings.batch_size: - self.replace_task( - "batch_maker", - self.seal_new_batch, - interval=self.settings.batch_freq, - delay=0, - ) + self.seal_new_batch() def start_batch_making(self): self.logger.info("Start batch maker") @@ -280,7 +281,8 @@ def process_transaction(self, payload: TransactionPayload): if t.script != payload.script: self.logger.warn("ID collision detected") else: - self.logger.warn("Received duplicate") + # self.logger.warn("Received duplicate") + pass # --------------- Transaction Reconciliation ----------- diff --git a/src/bami/lz/settings.py b/src/bami/lz/settings.py index 2055641..a8748a0 100644 --- a/src/bami/lz/settings.py +++ b/src/bami/lz/settings.py @@ -66,8 +66,8 @@ class LZSettings(ClientSettings, # BatchMaker parameters batch_size: int = 250 # number of transactions - batch_freq: float = 0.5 # in seconds - batch_delay: float = 0.6 # in seconds, delay before starting batch creation + batch_freq: float = 1.0 # in seconds + batch_delay: float = 0.1 # in seconds, delay before starting batch creation settle_freq = 5 settle_delay = 2