Skip to content

Commit

Permalink
Rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Sep 30, 2024
1 parent 306e3c5 commit 0d06f6b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.onchain_transport import DepositParser, OnchainTransportProvider, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
parsers_providers=[DepositParser, PingParser],
)
)

Expand Down
4 changes: 2 additions & 2 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PauseV2Parser, PauseV3Parser, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, w3: Web3):
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
sinks=[OnchainTransportSinks.PAUSE_V2, OnchainTransportSinks.PAUSE_V3, OnchainTransportSinks.PING_V1],
parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser],
)
)

Expand Down
4 changes: 2 additions & 2 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PingParser, UnvetParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -59,7 +59,7 @@ def prepare_transport_bus(self):
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
sinks=[OnchainTransportSinks.UNVET_V1, OnchainTransportSinks.PING_V1],
parsers_providers=[UnvetParser, PingParser],
)
)

Expand Down
56 changes: 22 additions & 34 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import logging
from enum import StrEnum
from typing import List, Optional
from typing import Callable, List, Optional

from eth_typing import ChecksumAddress
from schema import Schema
Expand Down Expand Up @@ -83,6 +82,11 @@ def __init__(self, w3: Web3, schema: str):
def _create_message(self, parsed_data: dict, guardian: str) -> dict:
pass

@property
@abc.abstractmethod
def message_abi(self):
pass

def _decode_event(self, log: LogReceipt) -> EventData:
return get_event_data(self._w3.codec, self._message_abi, log)

Expand All @@ -98,6 +102,7 @@ def parse(self, log: LogReceipt) -> Optional[dict]:
# uint256 stakingModuleId, uint256 nonce, (bytes32 r, bytes32 vs) signature, (bytes32 version) app) data),
class DepositParser(EventParser):
DEPOSIT_V1_DATA_SCHEMA = '(uint256,bytes32,bytes32,uint256,uint256,(bytes32,bytes32),(bytes32))'
message_abi = f'MessageDepositV1(address,{DEPOSIT_V1_DATA_SCHEMA})'

def __init__(self, w3: Web3):
super().__init__(w3, self.DEPOSIT_V1_DATA_SCHEMA)
Expand All @@ -123,6 +128,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> DepositMessage:
# bytes operatorIds, bytes vettedKeysByOperator, (bytes32 r, bytes32 vs) signature, (bytes32 version) app) data)
class UnvetParser(EventParser):
UNVET_V1_DATA_SCHEMA = '(uint256,bytes32,uint256,uint256,bytes,bytes,(bytes32,bytes32),(bytes32))'
message_abi = f'MessageUnvetV1(address,{UNVET_V1_DATA_SCHEMA})'

def __init__(self, w3: Web3):
super().__init__(w3, self.UNVET_V1_DATA_SCHEMA)
Expand All @@ -147,6 +153,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> UnvetMessage:
# event MessagePingV1(address indexed guardianAddress, (uint256 blockNumber, (bytes32 version) app) data)",
class PingParser(EventParser):
PING_V1_DATA_SCHEMA = '(uint256,(bytes32))'
message_abi = f'MessagePingV1(address,{PING_V1_DATA_SCHEMA})'

def __init__(self, w3: Web3):
super().__init__(w3, self.PING_V1_DATA_SCHEMA)
Expand All @@ -164,6 +171,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> PingMessage:
# uint256 stakingModuleId, (bytes32 version) app) data)
class PauseV2Parser(EventParser):
PAUSE_V2_DATA_SCHEMA = '(uint256,bytes32,(bytes32,bytes32),uint256,(bytes32))'
message_abi = f'MessagePauseV2(address,{PAUSE_V2_DATA_SCHEMA})'

def __init__(self, w3: Web3):
super().__init__(w3, self.PAUSE_V2_DATA_SCHEMA)
Expand All @@ -186,6 +194,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# (bytes32 version) app) data)
class PauseV3Parser(EventParser):
PAUSE_V3_DATA_SCHEMA = '(uint256,bytes32,(bytes32,bytes32),(bytes32))'
message_abi = f'MessagePauseV3(address,{PAUSE_V3_DATA_SCHEMA})'

def __init__(self, w3: Web3):
super().__init__(w3, self.PAUSE_V3_DATA_SCHEMA)
Expand All @@ -203,48 +212,27 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
)


class OnchainTransportSinks(StrEnum):
DEPOSIT_V1 = f'MessageDepositV1(address,{DepositParser.DEPOSIT_V1_DATA_SCHEMA})'
PAUSE_V2 = f'MessagePauseV2(address,{PauseV2Parser.PAUSE_V2_DATA_SCHEMA})'
PAUSE_V3 = f'MessagePauseV3(address,{PauseV3Parser.PAUSE_V3_DATA_SCHEMA})'
PING_V1 = f'MessagePingV1(address,{PingParser.PING_V1_DATA_SCHEMA})'
UNVET_V1 = f'MessageUnvetV1(address,{UnvetParser.UNVET_V1_DATA_SCHEMA})'


class OnchainTransportProvider(BaseMessageProvider):
STANDARD_OFFSET: int = 256

def __init__(self, w3: Web3, onchain_address: ChecksumAddress, message_schema: Schema, sinks: list[OnchainTransportSinks]):
def __init__(
self,
w3: Web3,
onchain_address: ChecksumAddress,
message_schema: Schema,
parsers_providers: list[Callable[[Web3], EventParser]],
):
super().__init__(message_schema)
self._onchain_address = onchain_address
if not sinks:
raise ValueError('There must be at least a single sink for Data Bus provider')
if not parsers_providers:
raise ValueError('There must be at least a single parser for Data Bus provider')
self._latest_block = -1

logger.info('Data bus client initialized.')

self._w3 = w3
self._topics = [self._w3.keccak(text=sink) for sink in sinks]
self._parsers: List[EventParser] = self._construct_parsers(sinks)

def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventParser]:
parser_mapping = {
OnchainTransportSinks.DEPOSIT_V1: DepositParser,
OnchainTransportSinks.PAUSE_V2: PauseV2Parser,
OnchainTransportSinks.PAUSE_V3: PauseV3Parser,
OnchainTransportSinks.PING_V1: PingParser,
OnchainTransportSinks.UNVET_V1: UnvetParser,
}

parsers = []
for sink in sinks:
parser_class = parser_mapping.get(sink)
if parser_class:
parsers.append(parser_class(self._w3))
else:
raise ValueError(f'Invalid sink in Data Bus sinks: {sink}')

return parsers
self._parsers: List[EventParser] = [provider(w3) for provider in parsers_providers]
self._topics = [self._w3.keccak(text=parser.message_abi) for parser in self._parsers]

def _fetch_messages(self) -> list:
latest_block_number = self._w3.eth.block_number
Expand Down
11 changes: 5 additions & 6 deletions tests/transport/onchain_sender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from blockchain.contracts.data_bus import DataBusContract
from transport.msg_providers.onchain_transport import (
DepositParser,
OnchainTransportSinks,
PauseV2Parser,
PauseV3Parser,
PingParser,
Expand All @@ -27,7 +26,7 @@ def __init__(self, w3: Web3, data_bus_contract: DataBusContract):
self._data_bus = data_bus_contract

def send_deposit(self, deposit_mes: DepositMessage):
deposit_topic = self._w3.keccak(text=OnchainTransportSinks.DEPOSIT_V1)
deposit_topic = self._w3.keccak(text=DepositParser.message_abi)
deposit_root, nonce, block_number, block_hash, staking_module_id, app = (
deposit_mes['depositRoot'],
deposit_mes['nonce'],
Expand All @@ -44,7 +43,7 @@ def send_deposit(self, deposit_mes: DepositMessage):
return tx.transact()

def send_pause_v2(self, pause_mes: PauseMessage):
pause_topic = self._w3.keccak(text=OnchainTransportSinks.PAUSE_V2)
pause_topic = self._w3.keccak(text=PauseV2Parser.message_abi)
block_number, staking_module_id, app = (
pause_mes['blockNumber'],
pause_mes['stakingModuleId'],
Expand All @@ -58,7 +57,7 @@ def send_pause_v2(self, pause_mes: PauseMessage):
return tx.transact()

def send_pause_v3(self, pause_mes: PauseMessage):
pause_topic = self._w3.keccak(text=OnchainTransportSinks.PAUSE_V3)
pause_topic = self._w3.keccak(text=PauseV3Parser.message_abi)
block_number, version = pause_mes['blockNumber'], (1).to_bytes(32)
mes = self._w3.codec.encode(
types=[PauseV3Parser.PAUSE_V3_DATA_SCHEMA], args=[(block_number, self._DEFAULT_BLOCK_HASH, self._DEFAULT_SIGNATURE, (version,))]
Expand All @@ -67,7 +66,7 @@ def send_pause_v3(self, pause_mes: PauseMessage):
return tx.transact()

def send_unvet(self, unvet_mes: UnvetMessage):
unvet_topic = self._w3.keccak(text=OnchainTransportSinks.UNVET_V1)
unvet_topic = self._w3.keccak(text=UnvetParser.message_abi)
nonce, block_number, block_hash, staking_module_id, operator_ids, vetted_keys, version = (
unvet_mes['nonce'],
unvet_mes['blockNumber'],
Expand All @@ -85,7 +84,7 @@ def send_unvet(self, unvet_mes: UnvetMessage):
return tx.transact()

def send_ping(self, ping_mes: PingMessage):
ping_topic = self._w3.keccak(text=OnchainTransportSinks.PING_V1)
ping_topic = self._w3.keccak(text=PingParser.message_abi)
block_number, version = ping_mes['blockNumber'], (1).to_bytes(32)
mes = self._w3.codec.encode(types=[PingParser.PING_V1_DATA_SCHEMA], args=[(block_number, (version,))])
tx = self._data_bus.functions.sendMessage(ping_topic, mes)
Expand Down
5 changes: 2 additions & 3 deletions tests/transport/test_data_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from transport.msg_providers.onchain_transport import (
DepositParser,
OnchainTransportProvider,
OnchainTransportSinks,
PingParser,
)
from transport.msg_types.deposit import DepositMessage, DepositMessageSchema
Expand Down Expand Up @@ -79,7 +78,7 @@ def test_data_bus_provider(web3_transaction_integration):
w3=web3_transaction_integration,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
parsers_providers=[DepositParser, PingParser],
)
messages = provider.get_messages()
assert messages
Expand All @@ -100,7 +99,7 @@ def test_data_bus_mock_responses(web3_lido_unit):
w3=web3_lido_unit,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
parsers_providers=[DepositParser, PingParser],
)

for parser in provider._parsers:
Expand Down

0 comments on commit 0d06f6b

Please sign in to comment.