diff --git a/src/bots/depositor.py b/src/bots/depositor.py index a05fc94..568759c 100644 --- a/src/bots/depositor.py +++ b/src/bots/depositor.py @@ -22,7 +22,7 @@ ) from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks +from transport.msg_providers.onchain_transport import DepositParser, OnchainTransportProvider, PingParser from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage from transport.msg_types.common import get_messages_sign_filter @@ -91,7 +91,7 @@ def __init__( w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)), onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)), - sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1], + parsers_providers=[DepositParser, PingParser], ) ) diff --git a/src/bots/pauser.py b/src/bots/pauser.py index f6d216d..198b566 100644 --- a/src/bots/pauser.py +++ b/src/bots/pauser.py @@ -9,7 +9,7 @@ from metrics.metrics import UNEXPECTED_EXCEPTIONS from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks +from transport.msg_providers.onchain_transport import OnchainTransportProvider, PauseV2Parser, PauseV3Parser, PingParser from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage from transport.msg_types.common import get_messages_sign_filter @@ -54,7 +54,7 @@ def __init__(self, w3: Web3): w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)), onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)), - sinks=[OnchainTransportSinks.PAUSE_V2, OnchainTransportSinks.PAUSE_V3, OnchainTransportSinks.PING_V1], + parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser], ) ) diff --git a/src/bots/unvetter.py b/src/bots/unvetter.py index c7385b2..5bf87a9 100644 --- a/src/bots/unvetter.py +++ b/src/bots/unvetter.py @@ -7,7 +7,7 @@ from metrics.metrics import UNEXPECTED_EXCEPTIONS from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks +from transport.msg_providers.onchain_transport import OnchainTransportProvider, PingParser, UnvetParser from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage from transport.msg_types.common import get_messages_sign_filter @@ -59,7 +59,7 @@ def prepare_transport_bus(self): w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)), onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)), - sinks=[OnchainTransportSinks.UNVET_V1, OnchainTransportSinks.PING_V1], + parsers_providers=[UnvetParser, PingParser], ) ) diff --git a/src/transport/msg_providers/onchain_transport.py b/src/transport/msg_providers/onchain_transport.py index 0933f77..c7e8094 100644 --- a/src/transport/msg_providers/onchain_transport.py +++ b/src/transport/msg_providers/onchain_transport.py @@ -1,7 +1,6 @@ import abc import logging -from enum import StrEnum -from typing import List, Optional +from typing import Callable, List, Optional from eth_typing import ChecksumAddress from schema import Schema @@ -83,6 +82,11 @@ def __init__(self, w3: Web3, schema: str): def _create_message(self, parsed_data: dict, guardian: str) -> dict: pass + @property + @abc.abstractmethod + def message_abi(self): + pass + def _decode_event(self, log: LogReceipt) -> EventData: return get_event_data(self._w3.codec, self._message_abi, log) @@ -98,6 +102,7 @@ def parse(self, log: LogReceipt) -> Optional[dict]: # uint256 stakingModuleId, uint256 nonce, (bytes32 r, bytes32 vs) signature, (bytes32 version) app) data), class DepositParser(EventParser): DEPOSIT_V1_DATA_SCHEMA = '(uint256,bytes32,bytes32,uint256,uint256,(bytes32,bytes32),(bytes32))' + message_abi = f'MessageDepositV1(address,{DEPOSIT_V1_DATA_SCHEMA})' def __init__(self, w3: Web3): super().__init__(w3, self.DEPOSIT_V1_DATA_SCHEMA) @@ -123,6 +128,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> DepositMessage: # bytes operatorIds, bytes vettedKeysByOperator, (bytes32 r, bytes32 vs) signature, (bytes32 version) app) data) class UnvetParser(EventParser): UNVET_V1_DATA_SCHEMA = '(uint256,bytes32,uint256,uint256,bytes,bytes,(bytes32,bytes32),(bytes32))' + message_abi = f'MessageUnvetV1(address,{UNVET_V1_DATA_SCHEMA})' def __init__(self, w3: Web3): super().__init__(w3, self.UNVET_V1_DATA_SCHEMA) @@ -147,6 +153,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> UnvetMessage: # event MessagePingV1(address indexed guardianAddress, (uint256 blockNumber, (bytes32 version) app) data)", class PingParser(EventParser): PING_V1_DATA_SCHEMA = '(uint256,(bytes32))' + message_abi = f'MessagePingV1(address,{PING_V1_DATA_SCHEMA})' def __init__(self, w3: Web3): super().__init__(w3, self.PING_V1_DATA_SCHEMA) @@ -164,6 +171,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> PingMessage: # uint256 stakingModuleId, (bytes32 version) app) data) class PauseV2Parser(EventParser): PAUSE_V2_DATA_SCHEMA = '(uint256,bytes32,(bytes32,bytes32),uint256,(bytes32))' + message_abi = f'MessagePauseV2(address,{PAUSE_V2_DATA_SCHEMA})' def __init__(self, w3: Web3): super().__init__(w3, self.PAUSE_V2_DATA_SCHEMA) @@ -186,6 +194,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict: # (bytes32 version) app) data) class PauseV3Parser(EventParser): PAUSE_V3_DATA_SCHEMA = '(uint256,bytes32,(bytes32,bytes32),(bytes32))' + message_abi = f'MessagePauseV3(address,{PAUSE_V3_DATA_SCHEMA})' def __init__(self, w3: Web3): super().__init__(w3, self.PAUSE_V3_DATA_SCHEMA) @@ -203,48 +212,27 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict: ) -class OnchainTransportSinks(StrEnum): - DEPOSIT_V1 = f'MessageDepositV1(address,{DepositParser.DEPOSIT_V1_DATA_SCHEMA})' - PAUSE_V2 = f'MessagePauseV2(address,{PauseV2Parser.PAUSE_V2_DATA_SCHEMA})' - PAUSE_V3 = f'MessagePauseV3(address,{PauseV3Parser.PAUSE_V3_DATA_SCHEMA})' - PING_V1 = f'MessagePingV1(address,{PingParser.PING_V1_DATA_SCHEMA})' - UNVET_V1 = f'MessageUnvetV1(address,{UnvetParser.UNVET_V1_DATA_SCHEMA})' - - class OnchainTransportProvider(BaseMessageProvider): STANDARD_OFFSET: int = 256 - def __init__(self, w3: Web3, onchain_address: ChecksumAddress, message_schema: Schema, sinks: list[OnchainTransportSinks]): + def __init__( + self, + w3: Web3, + onchain_address: ChecksumAddress, + message_schema: Schema, + parsers_providers: list[Callable[[Web3], EventParser]], + ): super().__init__(message_schema) self._onchain_address = onchain_address - if not sinks: - raise ValueError('There must be at least a single sink for Data Bus provider') + if not parsers_providers: + raise ValueError('There must be at least a single parser for Data Bus provider') self._latest_block = -1 logger.info('Data bus client initialized.') self._w3 = w3 - self._topics = [self._w3.keccak(text=sink) for sink in sinks] - self._parsers: List[EventParser] = self._construct_parsers(sinks) - - def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventParser]: - parser_mapping = { - OnchainTransportSinks.DEPOSIT_V1: DepositParser, - OnchainTransportSinks.PAUSE_V2: PauseV2Parser, - OnchainTransportSinks.PAUSE_V3: PauseV3Parser, - OnchainTransportSinks.PING_V1: PingParser, - OnchainTransportSinks.UNVET_V1: UnvetParser, - } - - parsers = [] - for sink in sinks: - parser_class = parser_mapping.get(sink) - if parser_class: - parsers.append(parser_class(self._w3)) - else: - raise ValueError(f'Invalid sink in Data Bus sinks: {sink}') - - return parsers + self._parsers: List[EventParser] = [provider(w3) for provider in parsers_providers] + self._topics = [self._w3.keccak(text=parser.message_abi) for parser in self._parsers] def _fetch_messages(self) -> list: latest_block_number = self._w3.eth.block_number diff --git a/tests/transport/onchain_sender.py b/tests/transport/onchain_sender.py index c839b63..989b0d7 100644 --- a/tests/transport/onchain_sender.py +++ b/tests/transport/onchain_sender.py @@ -1,7 +1,6 @@ from blockchain.contracts.data_bus import DataBusContract from transport.msg_providers.onchain_transport import ( DepositParser, - OnchainTransportSinks, PauseV2Parser, PauseV3Parser, PingParser, @@ -27,7 +26,7 @@ def __init__(self, w3: Web3, data_bus_contract: DataBusContract): self._data_bus = data_bus_contract def send_deposit(self, deposit_mes: DepositMessage): - deposit_topic = self._w3.keccak(text=OnchainTransportSinks.DEPOSIT_V1) + deposit_topic = self._w3.keccak(text=DepositParser.message_abi) deposit_root, nonce, block_number, block_hash, staking_module_id, app = ( deposit_mes['depositRoot'], deposit_mes['nonce'], @@ -44,7 +43,7 @@ def send_deposit(self, deposit_mes: DepositMessage): return tx.transact() def send_pause_v2(self, pause_mes: PauseMessage): - pause_topic = self._w3.keccak(text=OnchainTransportSinks.PAUSE_V2) + pause_topic = self._w3.keccak(text=PauseV2Parser.message_abi) block_number, staking_module_id, app = ( pause_mes['blockNumber'], pause_mes['stakingModuleId'], @@ -58,7 +57,7 @@ def send_pause_v2(self, pause_mes: PauseMessage): return tx.transact() def send_pause_v3(self, pause_mes: PauseMessage): - pause_topic = self._w3.keccak(text=OnchainTransportSinks.PAUSE_V3) + pause_topic = self._w3.keccak(text=PauseV3Parser.message_abi) block_number, version = pause_mes['blockNumber'], (1).to_bytes(32) mes = self._w3.codec.encode( types=[PauseV3Parser.PAUSE_V3_DATA_SCHEMA], args=[(block_number, self._DEFAULT_BLOCK_HASH, self._DEFAULT_SIGNATURE, (version,))] @@ -67,7 +66,7 @@ def send_pause_v3(self, pause_mes: PauseMessage): return tx.transact() def send_unvet(self, unvet_mes: UnvetMessage): - unvet_topic = self._w3.keccak(text=OnchainTransportSinks.UNVET_V1) + unvet_topic = self._w3.keccak(text=UnvetParser.message_abi) nonce, block_number, block_hash, staking_module_id, operator_ids, vetted_keys, version = ( unvet_mes['nonce'], unvet_mes['blockNumber'], @@ -85,7 +84,7 @@ def send_unvet(self, unvet_mes: UnvetMessage): return tx.transact() def send_ping(self, ping_mes: PingMessage): - ping_topic = self._w3.keccak(text=OnchainTransportSinks.PING_V1) + ping_topic = self._w3.keccak(text=PingParser.message_abi) block_number, version = ping_mes['blockNumber'], (1).to_bytes(32) mes = self._w3.codec.encode(types=[PingParser.PING_V1_DATA_SCHEMA], args=[(block_number, (version,))]) tx = self._data_bus.functions.sendMessage(ping_topic, mes) diff --git a/tests/transport/test_data_bus.py b/tests/transport/test_data_bus.py index 8c89fe1..f20afe9 100644 --- a/tests/transport/test_data_bus.py +++ b/tests/transport/test_data_bus.py @@ -9,7 +9,6 @@ from transport.msg_providers.onchain_transport import ( DepositParser, OnchainTransportProvider, - OnchainTransportSinks, PingParser, ) from transport.msg_types.deposit import DepositMessage, DepositMessageSchema @@ -79,7 +78,7 @@ def test_data_bus_provider(web3_transaction_integration): w3=web3_transaction_integration, onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)), - sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1], + parsers_providers=[DepositParser, PingParser], ) messages = provider.get_messages() assert messages @@ -100,7 +99,7 @@ def test_data_bus_mock_responses(web3_lido_unit): w3=web3_lido_unit, onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)), - sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1], + parsers_providers=[DepositParser, PingParser], ) for parser in provider._parsers: