Skip to content

Commit

Permalink
Improve lz and narwhal code
Browse files Browse the repository at this point in the history
  • Loading branch information
grimadas committed Sep 26, 2023
1 parent 49be7bd commit b881eec
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 70 deletions.
57 changes: 37 additions & 20 deletions simulations/broadcast/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,19 @@ def __init__(self, *args, **kwargs) -> None:
self.my_peer_num = - 1
super().__init__(*args, **kwargs)

def on_new_batch_created(self, new_batch: TxBatchPayload):
super().on_new_batch_created(new_batch)
print("Peer {}: New batch with {} # txs created at time {}".format(self.my_peer_num,
len(new_batch.txs),
get_event_loop().time()))
self.tx_start_time = {}
self.tx_ready_time = {}

# noinspection PyUnreachableCode
def on_transaction_created(self, new_tx: TransactionPayload):
# Write time when we've first seen the transaction
print("Peer {}: New transaction {} at time {}".format(self.my_peer_num,
new_tx.tx_id,
get_event_loop().time()))
self.tx_start_time[new_tx.tx_id] = get_event_loop().time()
super().on_transaction_created(new_tx)

def on_new_header(self, new_header: HeaderPayload):
super().on_new_header(new_header)
# Write the time when transaction is finalized
for batch_ack in new_header.batches:
batch = self.batches[batch_ack.batch_id]
print("Peer {}: Number of transactions finalized {} at time {}".format(
self.my_peer_num,
len(batch.txs),
get_event_loop().time()))
def on_transaction_finalized(self, tx: TransactionPayload):
# Write time when transaction is finalized
self.tx_ready_time[tx.tx_id] = get_event_loop().time()
super().on_transaction_finalized(tx)


def main(prefix="", sim_settings: SimulationSettings = None):
Expand All @@ -77,7 +68,7 @@ def main(prefix="", sim_settings: SimulationSettings = None):
else:
LATENCY = "global"
N_CLIENTS = 10
N = 40
N = 200

settings = SimulationSettings()
settings.clients = N_CLIENTS
Expand All @@ -98,7 +89,33 @@ def main(prefix="", sim_settings: SimulationSettings = None):
simulation.loop.run_forever()

for peer_id in simulation.nodes.keys():
print(peer_id, len(simulation.nodes[peer_id].overlays[0].batches))
print(peer_id,
simulation.nodes[peer_id].overlays[0].receive_counter,
simulation.nodes[peer_id].overlays[0].send_counter,
)

# Collect transaction start time, merge dictionaries
total_tx_start_times = {}
for peer_id in simulation.nodes.keys():
total_tx_start_times.update(simulation.nodes[peer_id].overlays[0].tx_start_time)

# Collect transaction ready time, merge dictionaries
total_tx_ready_times = {}
for peer_id in simulation.nodes.keys():
for tx_id, ready_time in simulation.nodes[peer_id].overlays[0].tx_ready_time.items():
# Start time
latency = ready_time - total_tx_start_times[tx_id]
if tx_id not in total_tx_ready_times:
total_tx_ready_times[tx_id] = []
total_tx_ready_times[tx_id].append(latency)
# Report for 10 random transaction min, avg, max
import random
random_tx_ids = random.sample(list(total_tx_ready_times.keys()), min(10, len(total_tx_ready_times.keys())))
for tx_id in random_tx_ids:
print(len(total_tx_ready_times[tx_id]),
min(total_tx_ready_times[tx_id]),
sum(total_tx_ready_times[tx_id]) / len(total_tx_ready_times[tx_id]),
max(total_tx_ready_times[tx_id]))


if __name__ == "__main__":
Expand All @@ -108,4 +125,4 @@ def main(prefix="", sim_settings: SimulationSettings = None):
# input_value = int(sys.argv[1])
input_value = 1
prefix = ""
main("net_data/" + str(input_value) + prefix)
main()
59 changes: 37 additions & 22 deletions simulations/lz/basic_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def on_discovery_complete(self):
else:
self.nodes[peer_id].overlays[0].start_reconciliation()
self.nodes[peer_id].overlays[0].start_periodic_settlement()
self.nodes[peer_id].overlays[0].start_batch_making()


class SimulateLZCommunity(SimulatedCommunityMixin, SyncCommunity):
Expand All @@ -59,6 +60,8 @@ class SimulateLZCommunity(SimulatedCommunityMixin, SyncCommunity):
on_received_reconciliation_response = time_mark(SyncCommunity.on_received_reconciliation_response)
reconcile_with_neighbors = time_mark(SyncCommunity.reconcile_with_neighbors)
on_received_transaction_batch = time_mark(SyncCommunity.on_received_transaction_batch)
on_received_transaction = time_mark(SyncCommunity.on_received_transaction)
settle_transactions = time_mark(SyncCommunity.settle_transactions)

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand All @@ -76,53 +79,53 @@ def on_settle_transactions(self, settled_txs: Iterable[int]):
out.write("{},{},{}\n".format(hash(self.my_peer), t_id, get_event_loop().time()))


def main(sim_settings: SimulationSettings = None):
def main(prefix="", sim_settings: SimulationSettings = None):
if sim_settings:
settings = sim_settings
else:
LATENCY = "global"
N_CLIENTS = 20
N_PEERS = 300
N_FAULTS = 10
N_CLIENTS = 10
N_PEERS = 190
N_FAULTS = 0
N = N_CLIENTS + N_PEERS

settings = SimulationSettings()
settings.clients = N_CLIENTS
settings.peers = N
settings.faulty = N_FAULTS
settings.duration = 60
d = 30
settings.duration = 30
d = 8
settings.topology = random_topology(N, d)
settings.logging_level = 'INFO'
settings.logging_level = 'WARNING'
settings.discovery_delay = 5
settings.location_latency_generator = LocalLocations if LATENCY == 'local' else DefaultLocations

settings.community_map = {'LZCommunity': SimulateLZCommunity}

class SimSettings(LZSettings):
recon_freq = 1
recon_freq = 2
recon_fanout = 8
tx_batch = 1
tx_freq = 2
initial_fanout = 4
tx_freq = 1 / 10
initial_fanout = 5

sketch_size = 100
settle_size = 350
settle_freq = 14
settle_delay = 14
settle_strategy = SettlementStrategy.FAIR
settle_freq = 1
settle_delay = 1
settle_strategy = SettlementStrategy.VANILLA

settings.overlay_settings = SimSettings()

DIR_PREFIX = "../../lz_visualize/data/sim"
DIR_PREFIX = "../../lz_visualize/" + prefix

DATA_FILE = DIR_PREFIX + "_n_{}_t_{}_f_{}_d_{}_t_{:.1f}_s_{}(2)".format(N,
d,
SimSettings.recon_fanout,
SimSettings.recon_freq,
SimSettings.tx_batch / SimSettings.tx_freq * N_CLIENTS,
SimSettings.settle_freq
)
DATA_FILE = DIR_PREFIX + "_n_{}_t_{}_f_{}_d_{}_t_{:.1f}_s_{}".format(N,
d,
SimSettings.recon_fanout,
SimSettings.recon_freq,
SimSettings.tx_batch / SimSettings.tx_freq * N_CLIENTS,
SimSettings.settle_freq
)

TX_FILE = DATA_FILE + ".csv"
SD_FILE = DATA_FILE + "_data.csv"
Expand All @@ -137,6 +140,12 @@ class SimSettings(LZSettings):
for peer_id in simulation.nodes.keys():
print(peer_id, len(simulation.nodes[peer_id].overlays[0].memcache.tx_payloads))

for peer_id in simulation.nodes.keys():
print(peer_id,
simulation.nodes[peer_id].overlays[0].receive_counter,
simulation.nodes[peer_id].overlays[0].send_counter,
)

SD_FILE = settings.consts.get("SD_FILE")

with open(SD_FILE, "w") as out_data:
Expand All @@ -150,4 +159,10 @@ class SimSettings(LZSettings):


if __name__ == "__main__":
main()
import sys

# Access the input value
# input_value = int(sys.argv[1])
input_value = 1
prefix = ""
main("net_data/" + str(input_value) + prefix)
17 changes: 17 additions & 0 deletions simulations/lz/multi_sim.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

# Function to run the Python script
run_script() {
input=$1
/Users/bulat/Library/Caches/pypoetry/virtualenvs/bami-T1bVtQL0-py3.10/bin/python /Users/bulat/projects/main_bami/simulations/lz/basic_simulation.py "$input"
}

# Iterate over the input numbers from 1 to 10
for ((input=1; input<=10; input++)); do
run_script "$input" &
done

# Wait for all the background processes to finish
wait

echo "All processes finished."
18 changes: 12 additions & 6 deletions src/bami/broadcast/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,24 @@ def __init__(self, *args, **kwargs) -> None:
self.pending_headers = {}
self.batch_to_header = {}

self.receive_counter = 0
self.send_counter = 0

# Message state machine
self.add_message_handler(TxBatchPayload, self.receive_new_batch)
self.add_message_handler(BatchAckPayload, self.receive_batch_ack)
self.add_message_handler(HeaderPayload, self.receive_header)
self.add_message_handler(BatchRequestPayload, self.receive_batch_request)
self.add_message_handler(TransactionPayload, self.received_transaction)

def on_packet(self, packet, warn_unknown=True):
self.receive_counter += len(packet)
super().on_packet(packet, warn_unknown)

def ezr_pack(self, msg_num: int, *payloads: AnyPayload, **kwargs) -> bytes:
self.send_counter += len(payloads)
return super().ezr_pack(msg_num, *payloads, **kwargs)

def ez_send(self, peer: Peer, *payloads: AnyPayload, **kwargs) -> None:
if self.latency_sim:
latency = self.sim_net.get_link_latency(self.my_peer_id, peer.public_key.key_to_bin())
Expand Down Expand Up @@ -128,12 +139,7 @@ def on_transaction_created(self, new_tx: TransactionPayload):
def feed_batch_maker(self, new_tx: TransactionPayload):
self.pending_transactions.append(new_tx)
if len(self.pending_transactions) >= self.settings.batch_size:
self.replace_task(
"batch_maker",
self.seal_new_batch,
interval=self.settings.batch_freq,
delay=0,
)
self.seal_new_batch()

def start_batch_making(self):
self.register_task(
Expand Down
10 changes: 5 additions & 5 deletions src/bami/broadcast/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
class MempoolBroadcastSettings:
# Transaction creation parameters
script_size: int = 200 # in bytes
tx_freq: float = 1 / 200 # x/k: k transaction in x second
tx_freq: float = 1 / 10 # x/k: k transaction in x second
tx_delay: float = 0.05 # Maximum delay before starting transaction creation
initial_fanout: int = 8 # number of peers to send the transaction to
initial_fanout: int = 5 # number of peers to send the transaction to

# BatchMaker parameters
batch_size: int = 250 # number of transactions
batch_freq: float = 0.5 # in seconds
batch_freq: float = 1.5 # in seconds
batch_delay: float = 0.6 # in seconds, delay before starting batch creation

# Quorum parameters
quorum_threshold: int = 25 # number of nodes required to progress
quorum_threshold: int = 130 # number of nodes required to progress

# Sync parameters
sync_timer_delta: float = 1 # in seconds
Expand All @@ -27,4 +27,4 @@ class MempoolBroadcastSettings:
header_freq: float = 0.2 # in seconds

start_immediately: bool = False
simulate_network_latency: bool = True
simulate_network_latency: bool = False
32 changes: 17 additions & 15 deletions src/bami/lz/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ def __init__(self, *args, **kwargs) -> None:

# Message processing
# Gossip messages
self.add_message_handler(TransactionPayload, self.received_transaction)
self.add_message_handler(TransactionPayload, self.on_received_transaction)
self.add_message_handler(ReconciliationRequestPayload, self.on_received_reconciliation_request)
self.add_message_handler(ReconciliationResponsePayload, self.on_received_reconciliation_response)
self.add_message_handler(TransactionsChallengePayload, self.on_received_transactions_challenge)
self.add_message_handler(TransactionsRequestPayload, self.on_received_transactions_request)
self.add_message_handler(TransactionBatchPayload, self.on_received_transaction_batch)

self.receive_counter = 0
self.send_counter = 0

self._my_peer_id = self.my_peer.public_key.key_to_bin()
self.latency_sim = self.settings.simulate_network_latency
if self.latency_sim:
Expand Down Expand Up @@ -101,6 +104,14 @@ def __init__(self, *args, **kwargs) -> None:
if self.settings.start_immediately:
self.start_tasks()

def on_packet(self, packet, warn_unknown=True):
self.receive_counter += len(packet)
super().on_packet(packet, warn_unknown)

def ezr_pack(self, msg_num: int, *payloads: AnyPayload, **kwargs) -> bytes:
self.send_counter += len(payloads)
return super().ezr_pack(msg_num, *payloads, **kwargs)

def ez_send(self, peer: Peer, *payloads: AnyPayload, **kwargs) -> None:
if self.latency_sim:
latency = self.sim_net.get_link_latency(self.my_peer_id, peer.public_key.key_to_bin())
Expand Down Expand Up @@ -178,16 +189,11 @@ def on_transaction_created(self, tx_id: int):
def create_transaction(self):
for _ in range(self.settings.tx_batch):
new_tx = self.create_transaction_payload()
self.process_transaction(new_tx)

t_id = bytes_to_uint(new_tx.t_id, self.settings.tx_id_size)

self.on_transaction_created(t_id)

# This is a new transaction - push to neighbors
selected = random.sample(self.get_full_nodes(), min(self.settings.initial_fanout,
len(self.get_full_nodes())))

for p in selected:
self.logger.debug("Sending transaction to {}".format(p))
self.ez_send(p, new_tx)
Expand All @@ -210,7 +216,7 @@ def start_tx_creation(self):
# --------- Transaction Processing ---------------

@lazy_wrapper(TransactionPayload)
def received_transaction(self, _: Peer, payload: TransactionPayload):
def on_received_transaction(self, _: Peer, payload: TransactionPayload):
if self.is_light_client:
self.logger.warn("Client received transaction")
self.process_transaction(payload)
Expand All @@ -224,7 +230,7 @@ def seal_new_batch(self):
selected = self.pending_tx_payloads[:self.settings.batch_size]
batch = TransactionBatchPayload(selected)
self.broadcast(batch)
self.pending_transactions = self.pending_tx_payloads[self.settings.batch_size:]
self.pending_tx_payloads = self.pending_tx_payloads[self.settings.batch_size:]

def broadcast(self, message_payload):
"""Broadcast message to all peers and return the awaited id for acknowledgement"""
Expand All @@ -239,12 +245,7 @@ def broadcast(self, message_payload):
def feed_batch_maker(self, new_tx: TransactionPayload):
self.pending_tx_payloads.append(new_tx)
if len(self.pending_tx_payloads) >= self.settings.batch_size:
self.replace_task(
"batch_maker",
self.seal_new_batch,
interval=self.settings.batch_freq,
delay=0,
)
self.seal_new_batch()

def start_batch_making(self):
self.logger.info("Start batch maker")
Expand Down Expand Up @@ -280,7 +281,8 @@ def process_transaction(self, payload: TransactionPayload):
if t.script != payload.script:
self.logger.warn("ID collision detected")
else:
self.logger.warn("Received duplicate")
# self.logger.warn("Received duplicate")
pass

# --------------- Transaction Reconciliation -----------

Expand Down
4 changes: 2 additions & 2 deletions src/bami/lz/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class LZSettings(ClientSettings,

# BatchMaker parameters
batch_size: int = 250 # number of transactions
batch_freq: float = 0.5 # in seconds
batch_delay: float = 0.6 # in seconds, delay before starting batch creation
batch_freq: float = 1.0 # in seconds
batch_delay: float = 0.1 # in seconds, delay before starting batch creation

settle_freq = 5
settle_delay = 2
Expand Down

0 comments on commit b881eec

Please sign in to comment.