Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Sep 6, 2024
1 parent dcce2f9 commit 2d4a2f4
Show file tree
Hide file tree
Showing 18 changed files with 35 additions and 173 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ LIDO_LOCATOR=0x1eDf09b5023DC86737b59dE68a8130De878984f5
# Görli: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b
DEPOSIT_CONTRACT=0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b

# rabbit / kafka / rabbit,kafka
# rabbit
MESSAGE_TRANSPORTS=rabbit

# rabbit secrets
Expand Down
33 changes: 15 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

## Description

Depositor and pauser bots are parts of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability).
Depositor and pauser bots are parts
of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability).

**The Depositor Bot** obtains signed deposit messages from Council Daemons.
Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking module.
**The Depositor Bot** obtains signed deposit messages from Council Daemons.
Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking
module.
This deposit is executed using the depositBufferedEther function within the "DepositSecurityModule" smart contract.

Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue.
Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault
and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue.

**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects stealing.
**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects
stealing.

**The Unvetting Bot** obtains unvet message from Council Daemon and enacts unvet on the specified node operator.
Unvetting is the proces of decreasing approved depositable signing keys.
Expand All @@ -19,13 +23,13 @@ Unvetting is the proces of decreasing approved depositable signing keys.

- [Running Daemon](#running-daemon)
- [Variables](#variables)
- [Required variables](#required-variables)
- [Additional variables](#additional-variables)
- [Required variables](#required-variables)
- [Additional variables](#additional-variables)
- [Metrics and logs](#metrics-and-logs)
- [Development](#development)
- [Install](#install)
- [Tests](#tests)
- [Release flow](#release-flow)
- [Install](#install)
- [Tests](#tests)
- [Release flow](#release-flow)
- [Annotations to code](#annotations-to-code)

## Running Daemon
Expand Down Expand Up @@ -54,17 +58,10 @@ Unvetting is the proces of decreasing approved depositable signing keys.
| DEPOSIT_CONTRACT | 0x00000000219ab540356cBB839Cbe05303d7705Fa | Ethereum deposit contract address |
| DEPOSIT_MODULES_WHITELIST | 1 | List of staking module's ids in which the depositor bot will make deposits |
| --- | --- | --- |
| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/kafka |
| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/onchain_transport |
| RABBIT_MQ_URL | - | RabbitMQ url |
| RABBIT_MQ_USERNAME | - | RabbitMQ username for virtualhost |
| RABBIT_MQ_PASSWORD | - | RabbitMQ password for virtualhost |
| --- | --- _kafka is not used at the moment_ --- | --- |
| KAFKA_BROKER_ADDRESS_1 | - | Kafka servers url and port |
| KAFKA_USERNAME | - | Kafka username |
| KAFKA_PASSWORD | - | Password for kafka |
| KAFKA_NETWORK | - | Network type (mainnet or goerli) |
| KAFKA_TOPIC | - | Kafka topic name (for msg receiving) |
| KAFKA_GROUP_PREFIX | - | Just for staging (staging-) |

### Additional variables

Expand Down
9 changes: 0 additions & 9 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
)
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
Expand Down Expand Up @@ -86,14 +85,6 @@ def __init__(
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}deposit',
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
)
)

if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
OnchainTransportProvider(
Expand Down
9 changes: 0 additions & 9 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
Expand Down Expand Up @@ -49,14 +48,6 @@ def __init__(self, w3: Web3):
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}pause',
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
)
)

if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
OnchainTransportProvider(
Expand Down
9 changes: 0 additions & 9 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, OnchainTransportSinks
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
Expand Down Expand Up @@ -54,14 +53,6 @@ def prepare_transport_bus(self):
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}unvet',
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
)
)

if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
OnchainTransportProvider(
Expand Down
9 changes: 5 additions & 4 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import logging
from typing import Any, Optional
from typing import Any

from schema import Schema, SchemaError

Expand All @@ -22,13 +22,14 @@ def get_messages(self) -> list[dict]:
Returns:
List[Dict]: A list of processed and valid messages.
"""
return [msg for msg in (self._process_msg(m) for m in self._fetch_messages()) if msg and self._is_valid(msg)]
processed = [self._process_msg(m) for m in self._fetch_messages()]
return [msg for msg in processed if msg and self._is_valid(msg)]

@abc.abstractmethod
def _fetch_messages(self) -> list[Any]:
def _fetch_messages(self) -> list:
raise NotImplementedError('Receive message from transport.')

def _process_msg(self, msg: Any) -> Optional[dict]:
def _process_msg(self, msg: Any) -> dict | None:
# Overwrite this method to add msg serialization.
# Return None if message is not serializable
return msg
Expand Down
84 changes: 0 additions & 84 deletions src/transport/msg_providers/kafka.py

This file was deleted.

7 changes: 2 additions & 5 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import abc
import logging
from collections import deque
from enum import StrEnum
from typing import Any, List, Optional
from typing import List, Optional

from cryptography.verify_signature import compute_vs
from eth_account.account import VRS
Expand Down Expand Up @@ -93,7 +92,6 @@ def _create_message(self, parsed_data: tuple, guardian: str) -> dict:
'r': r,
'_vs': _vs,
},
app={'version': app[0]},
)


Expand Down Expand Up @@ -200,7 +198,6 @@ def __init__(self, w3: Web3, onchain_address: ChecksumAddress, message_schema: S
if not sinks:
raise ValueError('There must be at least a single sink for Data Bus provider')
self._latest_block = -1
self._queue: deque = deque()

logger.info('Data bus client initialized.')

Expand All @@ -227,7 +224,7 @@ def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventPa

return parsers

def _fetch_messages(self) -> List[Any]:
def _fetch_messages(self) -> list:
latest_block_number = self._w3.eth.block_number
from_block = max(0, latest_block_number - self.STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block
# If block distance is 0, then skip fetching to avoid looping on a single block
Expand Down
6 changes: 3 additions & 3 deletions src/transport/msg_providers/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
import time
from typing import Any, List, Optional
from typing import List, Optional

import variables
from schema import Schema
Expand Down Expand Up @@ -74,7 +74,7 @@ def _recreate_client(self):
def __del__(self):
self.client.disconnect()

def _fetch_messages(self) -> List[Any]:
def _fetch_messages(self) -> list:
messages = []

for _ in range(self.MAX_MESSAGES_RECEIVE):
Expand Down Expand Up @@ -102,7 +102,7 @@ def _process_msg(self, msg: str) -> Optional[dict]:
value = json.loads(msg)
except ValueError as error:
# ignore not json msg
logger.warning({'msg': 'Broken message in Kafka', 'value': str(msg), 'error': str(error)})
logger.warning({'msg': 'Broken message in Rabbit', 'value': str(msg), 'error': str(error)})
return None

return value
1 change: 0 additions & 1 deletion src/transport/msg_types/deposit.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,3 @@ class DepositMessage(TypedDict):
guardianAddress: str
signature: Signature
stakingModuleId: int
app: dict
1 change: 0 additions & 1 deletion src/transport/msg_types/pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,3 @@ class PauseMessage(TypedDict):
guardianAddress: str
signature: Signature
stakingModuleId: int
app: dict
1 change: 0 additions & 1 deletion src/transport/msg_types/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class PingMessage(TypedDict):
type: str
blockNumber: int
guardianAddress: str
app: dict


def to_check_sum_address(msg: dict):
Expand Down
1 change: 0 additions & 1 deletion src/transport/msg_types/unvet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ class UnvetMessage(TypedDict):
nonce: int
operatorIds: str
vettedKeysByOperator: str
app: dict
1 change: 0 additions & 1 deletion src/transport/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
class TransportType:
RABBIT = 'rabbit'
KAFKA = 'kafka'
ONCHAIN_TRANSPORT = 'onchain_transport'
16 changes: 1 addition & 15 deletions src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,9 @@
MELLOW_CONTRACT_ADDRESS = Web3.to_checksum_address(MELLOW_CONTRACT_ADDRESS)
VAULT_DIRECT_DEPOSIT_THRESHOLD = Web3.to_wei(*os.getenv('VAULT_DIRECT_DEPOSIT_THRESHOLD', '1 ether').split(' '))

# rabbit / kafka / rabbit,kafka
# rabbit
MESSAGE_TRANSPORTS = os.getenv('MESSAGE_TRANSPORTS', '').split(',')

# Kafka secrets
KAFKA_BROKER_ADDRESS_1 = os.getenv('KAFKA_BROKER_ADDRESS_1')
KAFKA_USERNAME = os.getenv('KAFKA_USERNAME')
KAFKA_PASSWORD = os.getenv('KAFKA_PASSWORD')
KAFKA_NETWORK = os.getenv('KAFKA_NETWORK', 'mainnet') # or goerli
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC')
KAFKA_GROUP_PREFIX = os.getenv('KAFKA_GROUP_PREFIX', '')

# rabbit secrets
RABBIT_MQ_URL = os.getenv('RABBIT_MQ_URL', 'ws://127.0.0.1:15674/ws')
RABBIT_MQ_USERNAME = os.getenv('RABBIT_MQ_USERNAME', 'guest')
Expand Down Expand Up @@ -127,12 +119,6 @@
PRIVATE_ENV_VARS = {
'WEB3_RPC_ENDPOINTS': WEB3_RPC_ENDPOINTS,
'WALLET_PRIVATE_KEY': WALLET_PRIVATE_KEY,
'KAFKA_BROKER_ADDRESS_1': KAFKA_BROKER_ADDRESS_1,
'KAFKA_USERNAME': KAFKA_USERNAME,
'KAFKA_PASSWORD': KAFKA_PASSWORD,
'KAFKA_NETWORK': KAFKA_NETWORK,
'KAFKA_TOPIC': KAFKA_TOPIC,
'KAFKA_GROUP_PREFIX': KAFKA_GROUP_PREFIX,
'RABBIT_MQ_URL': RABBIT_MQ_URL,
'RABBIT_MQ_USERNAME': RABBIT_MQ_USERNAME,
'RABBIT_MQ_PASSWORD': RABBIT_MQ_PASSWORD,
Expand Down
Loading

0 comments on commit 2d4a2f4

Please sign in to comment.