From 2d4a2f4e3c48061e2941df1e587cfcf1d7ea833a Mon Sep 17 00:00:00 2001 From: hweawer Date: Fri, 6 Sep 2024 17:49:26 +0200 Subject: [PATCH] Fix comments --- .env.example | 2 +- README.md | 33 ++++---- src/bots/depositor.py | 9 -- src/bots/pauser.py | 9 -- src/bots/unvetter.py | 9 -- src/transport/msg_providers/common.py | 9 +- src/transport/msg_providers/kafka.py | 84 ------------------- .../msg_providers/onchain_transport.py | 7 +- src/transport/msg_providers/rabbit.py | 6 +- src/transport/msg_types/deposit.py | 1 - src/transport/msg_types/pause.py | 1 - src/transport/msg_types/ping.py | 1 - src/transport/msg_types/unvet.py | 1 - src/transport/types.py | 1 - src/variables.py | 16 +--- .../transport}/onchain_sender.py | 10 +-- tests/transport/test_base_provider.py | 4 +- tests/transport/test_data_bus.py | 5 +- 18 files changed, 35 insertions(+), 173 deletions(-) delete mode 100644 src/transport/msg_providers/kafka.py rename {src/transport/msg_providers => tests/transport}/onchain_sender.py (92%) diff --git a/.env.example b/.env.example index f72d4d1e..505c704c 100644 --- a/.env.example +++ b/.env.example @@ -15,7 +15,7 @@ LIDO_LOCATOR=0x1eDf09b5023DC86737b59dE68a8130De878984f5 # Görli: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b DEPOSIT_CONTRACT=0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b -# rabbit / kafka / rabbit,kafka +# rabbit MESSAGE_TRANSPORTS=rabbit # rabbit secrets diff --git a/README.md b/README.md index 7070aa86..c4528c20 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,19 @@ ## Description -Depositor and pauser bots are parts of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability). +Depositor and pauser bots are parts +of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability). -**The Depositor Bot** obtains signed deposit messages from Council Daemons. -Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking module. +**The Depositor Bot** obtains signed deposit messages from Council Daemons. +Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking +module. This deposit is executed using the depositBufferedEther function within the "DepositSecurityModule" smart contract. -Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue. +Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault +and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue. -**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects stealing. +**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects +stealing. **The Unvetting Bot** obtains unvet message from Council Daemon and enacts unvet on the specified node operator. Unvetting is the proces of decreasing approved depositable signing keys. @@ -19,13 +23,13 @@ Unvetting is the proces of decreasing approved depositable signing keys. - [Running Daemon](#running-daemon) - [Variables](#variables) - - [Required variables](#required-variables) - - [Additional variables](#additional-variables) + - [Required variables](#required-variables) + - [Additional variables](#additional-variables) - [Metrics and logs](#metrics-and-logs) - [Development](#development) - - [Install](#install) - - [Tests](#tests) - - [Release flow](#release-flow) + - [Install](#install) + - [Tests](#tests) + - [Release flow](#release-flow) - [Annotations to code](#annotations-to-code) ## Running Daemon @@ -54,17 +58,10 @@ Unvetting is the proces of decreasing approved depositable signing keys. | DEPOSIT_CONTRACT | 0x00000000219ab540356cBB839Cbe05303d7705Fa | Ethereum deposit contract address | | DEPOSIT_MODULES_WHITELIST | 1 | List of staking module's ids in which the depositor bot will make deposits | | --- | --- | --- | -| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/kafka | +| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/onchain_transport | | RABBIT_MQ_URL | - | RabbitMQ url | | RABBIT_MQ_USERNAME | - | RabbitMQ username for virtualhost | | RABBIT_MQ_PASSWORD | - | RabbitMQ password for virtualhost | -| --- | --- _kafka is not used at the moment_ --- | --- | -| KAFKA_BROKER_ADDRESS_1 | - | Kafka servers url and port | -| KAFKA_USERNAME | - | Kafka username | -| KAFKA_PASSWORD | - | Password for kafka | -| KAFKA_NETWORK | - | Network type (mainnet or goerli) | -| KAFKA_TOPIC | - | Kafka topic name (for msg receiving) | -| KAFKA_GROUP_PREFIX | - | Just for staging (staging-) | ### Additional variables diff --git a/src/bots/depositor.py b/src/bots/depositor.py index f30c6daa..a05fc94a 100644 --- a/src/bots/depositor.py +++ b/src/bots/depositor.py @@ -22,7 +22,6 @@ ) from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.kafka import KafkaMessageProvider from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage @@ -86,14 +85,6 @@ def __init__( ) ) - if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS: - transports.append( - KafkaMessageProvider( - client=f'{variables.KAFKA_GROUP_PREFIX}deposit', - message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)), - ) - ) - if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: transports.append( OnchainTransportProvider( diff --git a/src/bots/pauser.py b/src/bots/pauser.py index 84877346..f6d216d5 100644 --- a/src/bots/pauser.py +++ b/src/bots/pauser.py @@ -9,7 +9,6 @@ from metrics.metrics import UNEXPECTED_EXCEPTIONS from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.kafka import KafkaMessageProvider from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage @@ -49,14 +48,6 @@ def __init__(self, w3: Web3): ) ) - if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS: - transports.append( - KafkaMessageProvider( - client=f'{variables.KAFKA_GROUP_PREFIX}pause', - message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)), - ) - ) - if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: transports.append( OnchainTransportProvider( diff --git a/src/bots/unvetter.py b/src/bots/unvetter.py index 546d9363..c7385b2f 100644 --- a/src/bots/unvetter.py +++ b/src/bots/unvetter.py @@ -7,7 +7,6 @@ from metrics.metrics import UNEXPECTED_EXCEPTIONS from metrics.transport_message_metrics import message_metrics_filter from schema import Or, Schema -from transport.msg_providers.kafka import KafkaMessageProvider from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks from transport.msg_providers.rabbit import MessageType, RabbitProvider from transport.msg_storage import MessageStorage @@ -54,14 +53,6 @@ def prepare_transport_bus(self): ) ) - if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS: - transports.append( - KafkaMessageProvider( - client=f'{variables.KAFKA_GROUP_PREFIX}unvet', - message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)), - ) - ) - if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: transports.append( OnchainTransportProvider( diff --git a/src/transport/msg_providers/common.py b/src/transport/msg_providers/common.py index 3560688e..1944a638 100644 --- a/src/transport/msg_providers/common.py +++ b/src/transport/msg_providers/common.py @@ -1,6 +1,6 @@ import abc import logging -from typing import Any, Optional +from typing import Any from schema import Schema, SchemaError @@ -22,13 +22,14 @@ def get_messages(self) -> list[dict]: Returns: List[Dict]: A list of processed and valid messages. """ - return [msg for msg in (self._process_msg(m) for m in self._fetch_messages()) if msg and self._is_valid(msg)] + processed = [self._process_msg(m) for m in self._fetch_messages()] + return [msg for msg in processed if msg and self._is_valid(msg)] @abc.abstractmethod - def _fetch_messages(self) -> list[Any]: + def _fetch_messages(self) -> list: raise NotImplementedError('Receive message from transport.') - def _process_msg(self, msg: Any) -> Optional[dict]: + def _process_msg(self, msg: Any) -> dict | None: # Overwrite this method to add msg serialization. # Return None if message is not serializable return msg diff --git a/src/transport/msg_providers/kafka.py b/src/transport/msg_providers/kafka.py deleted file mode 100644 index 04184b9c..00000000 --- a/src/transport/msg_providers/kafka.py +++ /dev/null @@ -1,84 +0,0 @@ -import json -import logging -from typing import Any, List, Optional - -from confluent_kafka import Consumer as BaseConsumer -from schema import Schema -from transport.msg_providers.common import BaseMessageProvider -from variables import ( - KAFKA_BROKER_ADDRESS_1, - KAFKA_NETWORK, - KAFKA_PASSWORD, - KAFKA_TOPIC, - KAFKA_USERNAME, -) - -logger = logging.getLogger(__name__) - - -class Consumer(BaseConsumer): - """Lifehack for tests. We can't monkey patch side binaries""" - - def poll(self, timeout=None): - return super().poll(timeout) - - -class KafkaMessageProvider(BaseMessageProvider): - def __init__(self, message_schema: Schema, client: str): - logger.info({'msg': 'Kafka initialize.'}) - - kafka_topic = f'{KAFKA_NETWORK}-{KAFKA_TOPIC}' - - self.kafka = Consumer( - { - 'client.id': kafka_topic + f'-{client}-client', - 'group.id': kafka_topic + f'-{client}-group', - 'bootstrap.servers': KAFKA_BROKER_ADDRESS_1, - 'auto.offset.reset': 'earliest', - 'security.protocol': 'SASL_SSL', - 'session.timeout.ms': 240000, - 'sasl.mechanisms': 'PLAIN', - 'sasl.username': KAFKA_USERNAME, - 'sasl.password': KAFKA_PASSWORD, - } - ) - - logger.info({'msg': f'Subscribe to "{kafka_topic}".'}) - self.kafka.subscribe([kafka_topic]) - - super().__init__(message_schema) - - def __del__(self): - self.kafka.close() - - def _receive_message(self) -> Optional[str]: - msg = self.kafka.poll(timeout=1) - if msg is None: - return None - - if msg.error(): - logger.error({'msg': 'Kafka error', 'error': str(msg.error())}) - return None - - return msg.value() - - def _fetch_messages(self) -> List[Any]: - messages = [] - - for _ in range(self.MAX_MESSAGES_RECEIVE): - msg = self._receive_message() - if msg is None: - break - messages.append(msg) - - return messages - - def _process_msg(self, msg: str) -> Optional[dict]: - try: - value = json.loads(msg) - except ValueError as error: - # ignore not json msg - logger.warning({'msg': 'Broken message in Kafka', 'value': str(msg), 'error': str(error)}) - return None - - return value diff --git a/src/transport/msg_providers/onchain_transport.py b/src/transport/msg_providers/onchain_transport.py index 1bfb2732..dc25d6f2 100644 --- a/src/transport/msg_providers/onchain_transport.py +++ b/src/transport/msg_providers/onchain_transport.py @@ -1,8 +1,7 @@ import abc import logging -from collections import deque from enum import StrEnum -from typing import Any, List, Optional +from typing import List, Optional from cryptography.verify_signature import compute_vs from eth_account.account import VRS @@ -93,7 +92,6 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict: 'r': r, '_vs': _vs, }, - app={'version': app[0]}, ) @@ -200,7 +198,6 @@ def __init__(self, w3: Web3, onchain_address: ChecksumAddress, message_schema: S if not sinks: raise ValueError('There must be at least a single sink for Data Bus provider') self._latest_block = -1 - self._queue: deque = deque() logger.info('Data bus client initialized.') @@ -227,7 +224,7 @@ def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventPa return parsers - def _fetch_messages(self) -> List[Any]: + def _fetch_messages(self) -> list: latest_block_number = self._w3.eth.block_number from_block = max(0, latest_block_number - self.STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block # If block distance is 0, then skip fetching to avoid looping on a single block diff --git a/src/transport/msg_providers/rabbit.py b/src/transport/msg_providers/rabbit.py index 92606944..f82d13ef 100644 --- a/src/transport/msg_providers/rabbit.py +++ b/src/transport/msg_providers/rabbit.py @@ -2,7 +2,7 @@ import json import logging import time -from typing import Any, List, Optional +from typing import List, Optional import variables from schema import Schema @@ -74,7 +74,7 @@ def _recreate_client(self): def __del__(self): self.client.disconnect() - def _fetch_messages(self) -> List[Any]: + def _fetch_messages(self) -> list: messages = [] for _ in range(self.MAX_MESSAGES_RECEIVE): @@ -102,7 +102,7 @@ def _process_msg(self, msg: str) -> Optional[dict]: value = json.loads(msg) except ValueError as error: # ignore not json msg - logger.warning({'msg': 'Broken message in Kafka', 'value': str(msg), 'error': str(error)}) + logger.warning({'msg': 'Broken message in Rabbit', 'value': str(msg), 'error': str(error)}) return None return value diff --git a/src/transport/msg_types/deposit.py b/src/transport/msg_types/deposit.py index 006255d4..510cabfe 100644 --- a/src/transport/msg_types/deposit.py +++ b/src/transport/msg_types/deposit.py @@ -53,4 +53,3 @@ class DepositMessage(TypedDict): guardianAddress: str signature: Signature stakingModuleId: int - app: dict diff --git a/src/transport/msg_types/pause.py b/src/transport/msg_types/pause.py index d9f36704..6d58d22d 100644 --- a/src/transport/msg_types/pause.py +++ b/src/transport/msg_types/pause.py @@ -43,4 +43,3 @@ class PauseMessage(TypedDict): guardianAddress: str signature: Signature stakingModuleId: int - app: dict diff --git a/src/transport/msg_types/ping.py b/src/transport/msg_types/ping.py index 7c34bb3d..92768760 100644 --- a/src/transport/msg_types/ping.py +++ b/src/transport/msg_types/ping.py @@ -21,7 +21,6 @@ class PingMessage(TypedDict): type: str blockNumber: int guardianAddress: str - app: dict def to_check_sum_address(msg: dict): diff --git a/src/transport/msg_types/unvet.py b/src/transport/msg_types/unvet.py index d97b23ec..57a6fa0a 100644 --- a/src/transport/msg_types/unvet.py +++ b/src/transport/msg_types/unvet.py @@ -32,4 +32,3 @@ class UnvetMessage(TypedDict): nonce: int operatorIds: str vettedKeysByOperator: str - app: dict diff --git a/src/transport/types.py b/src/transport/types.py index 1bfa116d..d984b2f8 100644 --- a/src/transport/types.py +++ b/src/transport/types.py @@ -1,4 +1,3 @@ class TransportType: RABBIT = 'rabbit' - KAFKA = 'kafka' ONCHAIN_TRANSPORT = 'onchain_transport' diff --git a/src/variables.py b/src/variables.py index a3708361..54a13b29 100644 --- a/src/variables.py +++ b/src/variables.py @@ -44,17 +44,9 @@ MELLOW_CONTRACT_ADDRESS = Web3.to_checksum_address(MELLOW_CONTRACT_ADDRESS) VAULT_DIRECT_DEPOSIT_THRESHOLD = Web3.to_wei(*os.getenv('VAULT_DIRECT_DEPOSIT_THRESHOLD', '1 ether').split(' ')) -# rabbit / kafka / rabbit,kafka +# rabbit MESSAGE_TRANSPORTS = os.getenv('MESSAGE_TRANSPORTS', '').split(',') -# Kafka secrets -KAFKA_BROKER_ADDRESS_1 = os.getenv('KAFKA_BROKER_ADDRESS_1') -KAFKA_USERNAME = os.getenv('KAFKA_USERNAME') -KAFKA_PASSWORD = os.getenv('KAFKA_PASSWORD') -KAFKA_NETWORK = os.getenv('KAFKA_NETWORK', 'mainnet') # or goerli -KAFKA_TOPIC = os.getenv('KAFKA_TOPIC') -KAFKA_GROUP_PREFIX = os.getenv('KAFKA_GROUP_PREFIX', '') - # rabbit secrets RABBIT_MQ_URL = os.getenv('RABBIT_MQ_URL', 'ws://127.0.0.1:15674/ws') RABBIT_MQ_USERNAME = os.getenv('RABBIT_MQ_USERNAME', 'guest') @@ -127,12 +119,6 @@ PRIVATE_ENV_VARS = { 'WEB3_RPC_ENDPOINTS': WEB3_RPC_ENDPOINTS, 'WALLET_PRIVATE_KEY': WALLET_PRIVATE_KEY, - 'KAFKA_BROKER_ADDRESS_1': KAFKA_BROKER_ADDRESS_1, - 'KAFKA_USERNAME': KAFKA_USERNAME, - 'KAFKA_PASSWORD': KAFKA_PASSWORD, - 'KAFKA_NETWORK': KAFKA_NETWORK, - 'KAFKA_TOPIC': KAFKA_TOPIC, - 'KAFKA_GROUP_PREFIX': KAFKA_GROUP_PREFIX, 'RABBIT_MQ_URL': RABBIT_MQ_URL, 'RABBIT_MQ_USERNAME': RABBIT_MQ_USERNAME, 'RABBIT_MQ_PASSWORD': RABBIT_MQ_PASSWORD, diff --git a/src/transport/msg_providers/onchain_sender.py b/tests/transport/onchain_sender.py similarity index 92% rename from src/transport/msg_providers/onchain_sender.py rename to tests/transport/onchain_sender.py index 1aaeec5c..4f59c77f 100644 --- a/src/transport/msg_providers/onchain_sender.py +++ b/tests/transport/onchain_sender.py @@ -36,7 +36,7 @@ def send_deposit(self, deposit_mes: DepositMessage): deposit_mes['blockNumber'], deposit_mes['blockHash'], deposit_mes['stakingModuleId'], - (deposit_mes['app']['version'],), + ((1).to_bytes(32),), ) mes = self._w3.codec.encode( types=[DEPOSIT_V1_DATA_SCHEMA], @@ -53,7 +53,7 @@ def send_pause_v2(self, pause_mes: PauseMessage): pause_mes['blockNumber'], pause_mes['blockHash'], pause_mes['stakingModuleId'], - (pause_mes['app']['version'],), + ((1).to_bytes(32),), ) mes = self._w3.codec.encode( types=[PAUSE_V2_DATA_SCHEMA], @@ -64,7 +64,7 @@ def send_pause_v2(self, pause_mes: PauseMessage): def send_pause_v3(self, pause_mes: PauseMessage): pause_topic = self._w3.keccak(text=OnchainTransportSinks.PAUSE_V3) - block_number, version = pause_mes['blockNumber'], pause_mes['app']['version'] + block_number, version = pause_mes['blockNumber'], (1).to_bytes(32) mes = self._w3.codec.encode(types=[PAUSE_V3_DATA_SCHEMA], args=[(block_number, self._DEFAULT_SIGNATURE, (version,))]) tx = self._data_bus.functions.sendMessage(pause_topic, mes) return tx.transact() @@ -78,7 +78,7 @@ def send_unvet(self, unvet_mes: UnvetMessage): unvet_mes['stakingModuleId'], unvet_mes['operatorIds'], unvet_mes['vettedKeysByOperator'], - unvet_mes['app']['version'], + (1).to_bytes(32), ) mes = self._w3.codec.encode( types=[UNVET_V1_DATA_SCHEMA], @@ -89,7 +89,7 @@ def send_unvet(self, unvet_mes: UnvetMessage): def send_ping(self, ping_mes: PingMessage): ping_topic = self._w3.keccak(text=OnchainTransportSinks.PING_V1) - block_number, version = ping_mes['blockNumber'], ping_mes['app']['version'] + block_number, version = ping_mes['blockNumber'], (1).to_bytes(32) mes = self._w3.codec.encode(types=[PING_V1_DATA_SCHEMA], args=[(block_number, (version,))]) tx = self._data_bus.functions.sendMessage(ping_topic, mes) return tx.transact() diff --git a/tests/transport/test_base_provider.py b/tests/transport/test_base_provider.py index 73072bd6..f9270b67 100644 --- a/tests/transport/test_base_provider.py +++ b/tests/transport/test_base_provider.py @@ -1,5 +1,3 @@ -from typing import Any, List - import pytest from schema import Or, Schema from transport.msg_providers.common import BaseMessageProvider @@ -9,7 +7,7 @@ class FakeMessageProvider(BaseMessageProvider): MAX_MESSAGES_RECEIVE = 1 - def _fetch_messages(self) -> List[Any]: + def _fetch_messages(self) -> list: return [ { 'type': 'deposit', diff --git a/tests/transport/test_data_bus.py b/tests/transport/test_data_bus.py index b355596b..7767a8b6 100644 --- a/tests/transport/test_data_bus.py +++ b/tests/transport/test_data_bus.py @@ -6,7 +6,6 @@ from blockchain.contracts.data_bus import DataBusContract from eth_typing import ChecksumAddress, HexAddress, HexStr from schema import Or, Schema -from transport.msg_providers.onchain_sender import OnchainTransportSender from transport.msg_providers.onchain_transport import ( DEPOSIT_V1_DATA_SCHEMA, PING_V1_DATA_SCHEMA, @@ -19,6 +18,8 @@ from web3 import Web3 from web3.types import EventData +from tests.transport.onchain_sender import OnchainTransportSender + # Started with config: { # NODE_HOST: 'http://127.0.0.1:8888', @@ -106,8 +107,6 @@ def test_data_bus_mock_responses(web3_lido_unit): parser._decode_event = Mock(side_effect=lambda x: x) messages = provider.get_messages() - for mes in messages: - print(mes) assert len(messages) == len(receipts)