Skip to content

Commit

Permalink
Fix import
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Aug 27, 2024
1 parent da2cfe6 commit 937dcc3
Showing 1 changed file with 46 additions and 41 deletions.
87 changes: 46 additions & 41 deletions src/transport/msg_providers/data_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,30 @@
from enum import Enum
from typing import List, Optional

import variables
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_types.deposit import DepositMessage
from transport.msg_types.pause import PauseMessage
from transport.msg_types.ping import PingMessageDataBus
from transport.msg_types.unvet import UnvetMessage
from web3 import Web3
from web3._utils.events import get_event_data
from web3.exceptions import BlockNotFound
from web3.types import FilterParams, LogReceipt
from web3_multi_provider import FallbackProvider

import variables
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_types.deposit import DepositMessage
from transport.msg_types.pause import PauseMessageV3, PauseMessage
from transport.msg_types.ping import PingMessageDataBus
from transport.msg_types.unvet import UnvetMessage

logger = logging.getLogger(__name__)

_MESSAGE = {
"anonymous": True,
"inputs": [
{"indexed": True, "name": "eventId", "type": "bytes32"},
{"indexed": True, "name": "sender", "type": "address"},
{"indexed": False, "name": "data", "type": "bytes"},
'anonymous': True,
'inputs': [
{'indexed': True, 'name': 'eventId', 'type': 'bytes32'},
{'indexed': True, 'name': 'sender', 'type': 'address'},
{'indexed': False, 'name': 'data', 'type': 'bytes'},
],
"name": "Message",
"type": "event"
'name': 'Message',
'type': 'event',
}


Expand All @@ -49,12 +48,14 @@ def parse(self, log: LogReceipt) -> Optional[dict]:
decoded_data = self._w3.codec.decode(self._schema, unparsed_event)[0]
return self._create_message(decoded_data, guardian)
except Exception as error:
logger.debug({
'msg': 'Failed to parse log',
'log': log,
'error': str(error),
'parser': type(self).__name__,
})
logger.debug(
{
'msg': 'Failed to parse log',
'log': log,
'error': str(error),
'parser': type(self).__name__,
}
)
return None


Expand Down Expand Up @@ -82,6 +83,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# event MessageUnvetV1(address indexed guardianAddress, (uint256 nonce, uint256 blockNumber, bytes32 blockHash, uint256 stakingModuleId,
# bytes signature, bytes32 operatorIds, bytes32 vettedKeysByOperator, (bytes32 version) app) data)",


class UnvetParser(LogParser):
def __init__(self, w3: Web3):
super().__init__(w3, '(uint256,uint256,bytes32,uint256,bytes,bytes32,bytes32,(bytes32))')
Expand All @@ -102,6 +104,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:

# event MessagePingV1(address indexed guardianAddress, (uint256 blockNumber, (bytes32 version) app) data)",


class PingParser(LogParser):
def __init__(self, w3: Web3):
super().__init__(w3, '(uint256,(bytes32))')
Expand All @@ -118,6 +121,7 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
# event MessagePauseV2(address indexed guardianAddress, (bytes32 depositRoot, uint256 nonce, uint256 blockNumber, bytes32 blockHash,
# bytes signature, uint256 stakingModuleId, (bytes32 version) app) data)",


class PauseV2Parser(LogParser):
def __init__(self, w3: Web3):
super().__init__(w3, '(bytes32,uint256,uint256,bytes32,bytes,uint256,(bytes32))')
Expand Down Expand Up @@ -173,13 +177,12 @@ def _construct_parsers(w3: Web3, sinks: List[DataBusSinks]) -> List[LogParser]:
if parser_class:
parsers.append(parser_class(w3))
else:
raise ValueError(f"Invalid sink in Data Bus sinks: {sink}")
raise ValueError(f'Invalid sink in Data Bus sinks: {sink}')

return parsers


class DataBusProvider(BaseMessageProvider):

def __init__(self, message_schema: Schema, sinks: [DataBusSinks]):
super().__init__('', message_schema)
if len(sinks) == 0:
Expand Down Expand Up @@ -212,11 +215,7 @@ def _receive_message(self) -> Optional[LogReceipt]:
def _fetch_logs_into_queue(self):
try:
latest_block_number = self._w3.eth.block_number
from_block = (
latest_block_number - self._STANDARD_OFFSET
if self._latest_block == -1
else self._latest_block
)
from_block = latest_block_number - self._STANDARD_OFFSET if self._latest_block == -1 else self._latest_block

filter_params = FilterParams(
fromBlock=from_block,
Expand All @@ -230,25 +229,31 @@ def _fetch_logs_into_queue(self):
self._latest_block = latest_block_number

except BlockNotFound as e:
logger.error({
'msg': 'Block not found',
'err': repr(e),
})
logger.error(
{
'msg': 'Block not found',
'err': repr(e),
}
)
except Exception as e:
logger.error({
'msg': 'Failed to fetch logs',
'err': repr(e),
})
logger.error(
{
'msg': 'Failed to fetch logs',
'err': repr(e),
}
)

def _process_msg(self, log: LogReceipt) -> Optional[dict]:
for parser in self._parsers:
try:
return parser.parse(log)
except Exception as error:
logger.debug({
'msg': 'Data Bus parser failed to parse log',
'log': log,
'error': str(error),
'parser': type(parser).__name__,
})
logger.debug(
{
'msg': 'Data Bus parser failed to parse log',
'log': log,
'error': str(error),
'parser': type(parser).__name__,
}
)
return None

0 comments on commit 937dcc3

Please sign in to comment.