Skip to content

Commit

Permalink
Partially fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Aug 30, 2024
1 parent f525f67 commit 4a9ff6f
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 75 deletions.
11 changes: 6 additions & 5 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
)
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.data_bus import DataBusProvider, DataBusSinks
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -95,12 +95,13 @@ def __init__(
)
)

if TransportType.DATA_BUS in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
DataBusProvider(
w3=Web3(FallbackProvider(variables.WEB3_RPC_GNOSIS_ENDPOINTS)),
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[DataBusSinks.DEPOSIT_V1, DataBusSinks.PING_V1],
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
)
)

Expand Down
11 changes: 6 additions & 5 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.data_bus import DataBusProvider, DataBusSinks
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -58,12 +58,13 @@ def __init__(self, w3: Web3):
)
)

if TransportType.DATA_BUS in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
DataBusProvider(
w3=Web3(FallbackProvider(variables.WEB3_RPC_GNOSIS_ENDPOINTS)),
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
sinks=[DataBusSinks.PAUSE_V2, DataBusSinks.PAUSE_V3, DataBusSinks.PING_V1],
sinks=[OnchainTransportSinks.PAUSE_V2, OnchainTransportSinks.PAUSE_V3, OnchainTransportSinks.PING_V1],
)
)

Expand Down
11 changes: 6 additions & 5 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.data_bus import DataBusProvider, DataBusSinks
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand Down Expand Up @@ -63,12 +63,13 @@ def prepare_transport_bus(self):
)
)

if TransportType.DATA_BUS in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
DataBusProvider(
w3=Web3(FallbackProvider(variables.WEB3_RPC_GNOSIS_ENDPOINTS)),
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
sinks=[DataBusSinks.UNVET_V1, DataBusSinks.PING_V1],
sinks=[OnchainTransportSinks.UNVET_V1, OnchainTransportSinks.PING_V1],
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import abc
import logging
from collections import deque
from enum import Enum
from enum import StrEnum
from typing import List, Optional

import variables
from eth_account.account import VRS
from eth_typing import HexStr
from eth_typing import ChecksumAddress, HexStr
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.rabbit import MessageType
Expand All @@ -22,7 +21,7 @@

logger = logging.getLogger(__name__)

_MESSAGE = {
MESSAGE_EVENT_ABI = {
'anonymous': True,
'inputs': [
{'indexed': True, 'name': 'eventId', 'type': 'bytes32'},
Expand All @@ -47,11 +46,11 @@ def signature_to_r_vs(signature: bytes) -> tuple[VRS, VRS]:
return HexStr(bytes_to_hex_string(r)), HexStr(bytes_to_hex_string(_vs))


class LogParser(abc.ABC):
class EventParser(abc.ABC):
def __init__(self, w3: Web3, schema: str):
self._w3 = w3
self._schema = schema
self._message_abi = w3.eth.contract(abi=[_MESSAGE]).events.Message().abi
self._message_abi = w3.eth.contract(abi=[MESSAGE_EVENT_ABI]).events.Message().abi

@abc.abstractmethod
def _create_message(self, parsed_data: dict, guardian: str) -> dict:
Expand All @@ -70,7 +69,7 @@ def parse(self, log: LogReceipt) -> Optional[dict]:

# event MessageDepositV1(address indexed guardianAddress, (bytes32 depositRoot, uint256 nonce, uint256 blockNumber, bytes32 blockHash,
# bytes signature, uint256 stakingModuleId, (bytes32 version) app) data)",
class DepositParser(LogParser):
class DepositParser(EventParser):
def __init__(self, w3: Web3):
super().__init__(w3, DEPOSIT_V1_DATA_SCHEMA)

Expand All @@ -97,7 +96,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# bytes signature, bytes32 operatorIds, bytes32 vettedKeysByOperator, (bytes32 version) app) data)",


class UnvetParser(LogParser):
class UnvetParser(EventParser):
def __init__(self, w3: Web3):
super().__init__(w3, UNVET_V1_DATA_SCHEMA)

Expand All @@ -122,7 +121,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# event MessagePingV1(address indexed guardianAddress, (uint256 blockNumber, (bytes32 version) app) data)",


class PingParser(LogParser):
class PingParser(EventParser):
def __init__(self, w3: Web3):
super().__init__(w3, PING_V1_DATA_SCHEMA)

Expand All @@ -139,7 +138,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# bytes signature, uint256 stakingModuleId, (bytes32 version) app) data)",


class PauseV2Parser(LogParser):
class PauseV2Parser(EventParser):
def __init__(self, w3: Web3):
super().__init__(w3, PAUSE_V2_DATA_SCHEMA)

Expand All @@ -161,7 +160,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# event MessagePauseV3(address indexed guardianAddress, (uint256 blockNumber, bytes signature, (bytes32 version) app) data)",


class PauseV3Parser(LogParser):
class PauseV3Parser(EventParser):
def __init__(self, w3: Web3):
super().__init__(w3, PAUSE_V3_DATA_SCHEMA)

Expand All @@ -179,57 +178,51 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
)


class DataBusSinks(str, Enum):
class OnchainTransportSinks(StrEnum):
DEPOSIT_V1 = f'MessageDepositV1(address,{DEPOSIT_V1_DATA_SCHEMA})'
PAUSE_V2 = f'MessagePauseV2(address,{PAUSE_V2_DATA_SCHEMA})'
PAUSE_V3 = f'MessagePauseV3(address,{PAUSE_V3_DATA_SCHEMA})'
PING_V1 = f'MessagePingV1(address,{PING_V1_DATA_SCHEMA})'
UNVET_V1 = f'MessageUnvetV1(address,{UNVET_V1_DATA_SCHEMA})'


def _construct_parsers(w3: Web3, sinks: List[DataBusSinks]) -> List[LogParser]:
parser_mapping = {
DataBusSinks.DEPOSIT_V1: DepositParser,
DataBusSinks.PAUSE_V2: PauseV2Parser,
DataBusSinks.PAUSE_V3: PauseV3Parser,
DataBusSinks.PING_V1: PingParser,
DataBusSinks.UNVET_V1: UnvetParser,
}
class OnchainTransportProvider(BaseMessageProvider):
STANDARD_OFFSET: int = 256

parsers = []
for sink in sinks:
parser_class = parser_mapping.get(sink)
if parser_class:
parsers.append(parser_class(w3))
else:
raise ValueError(f'Invalid sink in Data Bus sinks: {sink}')

return parsers


class DataBusProvider(BaseMessageProvider):
def __init__(self, w3: Web3, message_schema: Schema, sinks: [DataBusSinks]):
def __init__(self, w3: Web3, onchain_address: ChecksumAddress, message_schema: Schema, sinks: list[OnchainTransportSinks]):
super().__init__('', message_schema)
if len(sinks) == 0:
raise ValueError('There must be at least a single sink for Data Bus provider')

self._onchain_address = onchain_address
if not sinks:
raise ValueError('There must be at least one sink for Data Bus provider')

self._STANDARD_OFFSET = 256
raise ValueError('There must be at least a single sink for Data Bus provider')
self._latest_block = -1
self._queue: deque = deque()

logger.info('Data bus client initialized.')

self._w3 = w3
self._topics = [self._w3.keccak(text=sink) for sink in sinks]
self._parsers: List[LogParser] = _construct_parsers(self._w3, sinks)
self._parsers: List[EventParser] = self._construct_parsers(sinks)

def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventParser]:
parser_mapping = {
OnchainTransportSinks.DEPOSIT_V1: DepositParser,
OnchainTransportSinks.PAUSE_V2: PauseV2Parser,
OnchainTransportSinks.PAUSE_V3: PauseV3Parser,
OnchainTransportSinks.PING_V1: PingParser,
OnchainTransportSinks.UNVET_V1: UnvetParser,
}

parsers = []
for sink in sinks:
parser_class = parser_mapping.get(sink)
if parser_class:
parsers.append(parser_class(self._w3))
else:
raise ValueError(f'Invalid sink in Data Bus sinks: {sink}')

return parsers

def _receive_message(self) -> Optional[LogReceipt]:
if not self._w3.is_connected():
raise ConnectionError('Connection Data Bus was lost.')

if not self._queue:
self._fetch_logs_into_queue()
try:
Expand All @@ -240,11 +233,11 @@ def _receive_message(self) -> Optional[LogReceipt]:
def _fetch_logs_into_queue(self):
try:
latest_block_number = self._w3.eth.block_number
from_block = max(0, latest_block_number - self._STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block
from_block = max(0, latest_block_number - self.STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block

filter_params = FilterParams(
fromBlock=from_block,
address=variables.DATA_BUS_ADDRESS,
address=self._onchain_address,
topics=[self._topics],
)

Expand Down
2 changes: 1 addition & 1 deletion src/transport/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class TransportType:
RABBIT = 'rabbit'
KAFKA = 'kafka'
DATA_BUS = 'data_bus'
ONCHAIN_TRANSPORT = 'onchain_transport'
2 changes: 1 addition & 1 deletion src/utils/bytes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def from_hex_string_to_bytes(hex_string):
def from_hex_string_to_bytes(hex_string: str) -> bytes:
if hex_string.startswith('0x'):
return bytes.fromhex(hex_string[2:])
return bytes.fromhex(hex_string)
Expand Down
10 changes: 5 additions & 5 deletions src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@

# data bus
# gnosis nodes
WEB3_RPC_GNOSIS_ENDPOINTS = os.getenv('WEB3_RPC_GNOSIS_ENDPOINTS', '').split(',')
ONCHAIN_TRANSPORT_RPC_ENDPOINTS = os.getenv('ONCHAIN_TRANSPORT_RPC_ENDPOINTS', '').split(',')

DATA_BUS_ADDRESS = os.getenv('DATA_BUS_ADDRESS', None)
if DATA_BUS_ADDRESS:
ONCHAIN_TRANSPORT_ADDRESS = os.getenv('ONCHAIN_TRANSPORT_ADDRESS', None)
if ONCHAIN_TRANSPORT_ADDRESS:
# bot will throw exception if there is unexpected str and it's ok
DATA_BUS_ADDRESS = Web3.to_checksum_address(DATA_BUS_ADDRESS)
ONCHAIN_TRANSPORT_ADDRESS = Web3.to_checksum_address(ONCHAIN_TRANSPORT_ADDRESS)

# Transactions settings
CREATE_TRANSACTIONS = os.getenv('CREATE_TRANSACTIONS') == 'true'
Expand All @@ -76,7 +76,7 @@
MAX_PRIORITY_FEE = Web3.to_wei(*os.getenv('MAX_PRIORITY_FEE', '10 gwei').split(' '))

MAX_GAS_FEE = Web3.to_wei(*os.getenv('MAX_GAS_FEE', '100 gwei').split(' '))
CONTRACT_GAS_LIMIT = int(os.getenv('CONTRACT_GAS_LIMIT', 15 * 10 ** 6))
CONTRACT_GAS_LIMIT = int(os.getenv('CONTRACT_GAS_LIMIT', 15 * 10**6))

# Mainnet: "https://relay.flashbots.net",
# Holesky: "https://relay-holesky.flashbots.net",
Expand Down
24 changes: 16 additions & 8 deletions tests/transport/test_data_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import pytest
import variables
from eth_typing import ChecksumAddress, HexAddress, HexStr
from schema import Or, Schema
from transport.msg_providers.data_bus import DEPOSIT_V1_DATA_SCHEMA, PING_V1_DATA_SCHEMA, DataBusProvider, DataBusSinks
from transport.msg_providers.onchain_transport import (
DEPOSIT_V1_DATA_SCHEMA,
PING_V1_DATA_SCHEMA,
OnchainTransportProvider,
OnchainTransportSinks,
)
from transport.msg_types.deposit import DepositMessageSchema
from transport.msg_types.ping import PingMessageSchema
from web3 import Web3
Expand All @@ -22,12 +28,13 @@ def test_data_bus_provider():
"""
Utilise this function for an adhoc testing of data bus transport
"""
variables.WEB3_RPC_GNOSIS_ENDPOINTS = ['http://127.0.0.1:8888']
variables.DATA_BUS_ADDRESS = '0x5FbDB2315678afecb367f032d93F642f64180aa3'
provider = DataBusProvider(
w3=Web3(FallbackProvider(variables.WEB3_RPC_GNOSIS_ENDPOINTS)),
variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS = ['http://127.0.0.1:8888']
variables.ONCHAIN_TRANSPORT_ADDRESS = ChecksumAddress(HexAddress(HexStr('0x5FbDB2315678afecb367f032d93F642f64180aa3')))
provider = OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[DataBusSinks.DEPOSIT_V1, DataBusSinks.PING_V1],
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
)
messages = provider.get_messages()
for mes in messages:
Expand Down Expand Up @@ -62,10 +69,11 @@ def test_data_bus_mock_responses(web3_lido_unit):
web3_lido_unit.eth.get_logs = Mock(side_effect=[receipts, None])
web3_lido_unit.is_connected = Mock(return_value=True)
web3_lido_unit.eth.get_block_number = Mock(return_value=1)
provider = DataBusProvider(
provider = OnchainTransportProvider(
w3=web3_lido_unit,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
sinks=[DataBusSinks.DEPOSIT_V1, DataBusSinks.PING_V1],
sinks=[OnchainTransportSinks.DEPOSIT_V1, OnchainTransportSinks.PING_V1],
)

for parser in provider._parsers:
Expand Down

0 comments on commit 4a9ff6f

Please sign in to comment.