Skip to content

Commit

Permalink
Add casting to lz
Browse files Browse the repository at this point in the history
  • Loading branch information
grimadas committed Sep 22, 2023
1 parent f350f72 commit 775b0ac
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
44 changes: 38 additions & 6 deletions src/bami/lz/community.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import heapq
import random
from binascii import unhexlify
from collections import defaultdict
import random
from typing import Iterable, List, NewType, Set, Tuple, Union

from ipv8.lazy_community import lazy_wrapper, lazy_wrapper_unsigned
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.payload import IntroductionRequestPayload
from ipv8.peer import Peer
from ipv8.types import AnyPayload
Expand Down Expand Up @@ -121,6 +121,7 @@ def start_tasks(self):
if self.settings.enable_client:
self.start_tx_creation()
else:
self.start_batch_making()
self.start_reconciliation()

def create_introduction_request(self, socket_address, extra_bytes=b'', new_style=False, prefix=None):
Expand Down Expand Up @@ -183,15 +184,14 @@ def create_transaction(self):
self.on_transaction_created(t_id)

# This is a new transaction - push to neighbors
selected = random.sample(self.get_full_nodes(),
min(self.settings.initial_fanout,
len(self.get_full_nodes())))
selected = random.sample(self.get_full_nodes(), min(self.settings.initial_fanout,
len(self.get_full_nodes())))

for p in selected:
self.logger.debug("Sending transaction to {}".format(p))
self.ez_send(p, new_tx)

def get_full_nodes(self) -> Iterable[Peer]:
def get_full_nodes(self) -> List[Peer]:
return [p for p in self.get_peers() if p.public_key.key_to_bin() not in self.connected_clients]

def start_tx_creation(self):
Expand All @@ -217,6 +217,37 @@ def received_transaction(self, _: Peer, payload: TransactionPayload):
def on_process_new_transaction(self, t_id: int, tx_payload: TransactionPayload):
pass

def seal_new_batch(self):
if len(self.pending_transactions) > 0:
# select first k transactions for the batch
selected = self.pending_transactions[:self.settings.batch_size]
batch = TransactionBatchPayload(selected)
self.broadcast(batch)
self.pending_transactions = self.pending_transactions[self.settings.batch_size:]

def broadcast(self, message_payload):
"""Broadcast message to all peers and return the awaited id for acknowledgement"""
for p in self.get_full_nodes():
self.ez_send(p, message_payload)

def feed_batch_maker(self, new_tx: TransactionPayload):
self.pending_transactions.append(new_tx)
if len(self.pending_transactions) >= self.settings.batch_size:
self.replace_task(
"batch_maker",
self.seal_new_batch,
interval=self.settings.batch_freq,
delay=0,
)

def start_batch_making(self):
self.register_task(
"batch_maker",
self.seal_new_batch,
interval=self.settings.batch_freq,
delay=random.random() * self.settings.batch_delay,
)

def process_transaction(self, payload: TransactionPayload):

t_id = bytes_to_uint(payload.t_id, self.settings.tx_id_size)
Expand All @@ -225,6 +256,7 @@ def process_transaction(self, payload: TransactionPayload):
return

if not self.memcache.get_tx_payload(t_id):
self.feed_batch_maker(payload)
self.memcache.add_tx_payload(t_id, payload)
self.memcache.peer_clock(self.my_peer_id).increment(t_id)
self.on_process_new_transaction(t_id, payload)
Expand Down
11 changes: 8 additions & 3 deletions src/bami/lz/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class ClientSettings:
@dataclasses.dataclass
class ReconciliationSettings:
# Reconciliation settings
recon_freq = 5 # Reconciliation round frequency
recon_fanout = 10 # selected peers for reconciliation
recon_delay = 0 # start first round after the delay
recon_freq = 1 # Reconciliation round frequency
recon_fanout = 5 # selected peers for reconciliation
recon_delay = 1 # start first round after the delay

max_pending_requests = 1

Expand Down Expand Up @@ -64,6 +64,11 @@ class LZSettings(ClientSettings,
sketch_size: int = 100
max_sections = 500

# BatchMaker parameters
batch_size: int = 250 # number of transactions
batch_freq: float = 0.5 # in seconds
batch_delay: float = 0.6 # in seconds, delay before starting batch creation

settle_freq = 5
settle_delay = 2
settle_strategy = SettlementStrategy.LOCAL_ORDER
Expand Down

0 comments on commit 775b0ac

Please sign in to comment.