Skip to content

Commit

Permalink
Track transport and type in message metrics (#262)
Browse files Browse the repository at this point in the history
* Track transport and type in message metrics

* -1

* Metadata

* Str cast

* fix imports

* Fix
  • Loading branch information
hweawer authored Oct 9, 2024
1 parent 1b26811 commit 86e706c
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 130 deletions.
48 changes: 3 additions & 45 deletions src/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
DEPOSIT_MESSAGES = Gauge(
'deposit_messages',
'Guardians deposit messages',
['address', 'module_id', 'version'],
['address', 'module_id', 'version', 'transport', 'chain_id'],
namespace=PROMETHEUS_PREFIX,
)
PAUSE_MESSAGES = Gauge(
'pause_messages',
'Guardians pause messages',
['address', 'module_id', 'version'],
['address', 'module_id', 'version', 'transport', 'chain_id'],
namespace=PROMETHEUS_PREFIX,
)
PING_MESSAGES = Gauge(
'ping_messages',
'Guardians ping messages',
['address', 'version'],
['address', 'version', 'transport', 'chain_id'],
namespace=PROMETHEUS_PREFIX,
)
UNVET_MESSAGES = Gauge('unvet_messages', 'Guardian unvet messages', ['address', 'module_id', 'version'])
Expand Down Expand Up @@ -116,48 +116,6 @@

MODULES = Gauge('modules', 'Modules gauge', ['module_id'], namespace=PROMETHEUS_PREFIX)

ONCHAIN_TRANSPORT_FETCHED_MESSAGES = Gauge(
'onchain_fetched_messages',
'Total count of fetched onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

ONCHAIN_TRANSPORT_PROCESSED_MESSAGES = Gauge(
'onchain_processed_messages',
'Total count of processed onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

ONCHAIN_TRANSPORT_VALID_MESSAGES = Gauge(
'onchain_valid_messages',
'Total count of valid onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_FETCHED_MESSAGES = Gauge(
'rabbit_fetched_messages',
'Total count of fetched rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_PROCESSED_MESSAGES = Gauge(
'rabbit_processed_messages',
'Total count of processed rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_VALID_MESSAGES = Gauge(
'rabbit_valid_messages',
'Total count of valid rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

for module_id in DEPOSIT_MODULES_WHITELIST:
MODULES.labels(module_id).set(1)

Expand Down
50 changes: 34 additions & 16 deletions src/metrics/transport_message_metrics.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,51 @@
import logging
from typing import TypedDict

from metrics.metrics import DEPOSIT_MESSAGES, PAUSE_MESSAGES, PING_MESSAGES, UNVET_MESSAGES
from transport.msg_providers.rabbit import MessageType
from transport.msg_types.deposit import DepositMessage

logger = logging.getLogger(__name__)


def message_metrics_filter(msg: DepositMessage) -> bool:
msg_type = msg.get('type', None)
logger.info({'msg': 'Guardian message received.', 'value': msg, 'type': msg_type})

address, version = msg.get('guardianAddress'), msg.get('app', {}).get('version')
def message_metrics_filter(msg: TypedDict) -> bool:
"""
Processes guardian messages and updates Prometheus metrics based on the message type.
Returns True for valid message types to allow further processing, and False for messages
that should be filtered (such as PING messages).
if msg_type == MessageType.PAUSE:
PAUSE_MESSAGES.labels(address, msg.get('stakingModuleId', -1), version).inc()
return True
Args:
msg: A dictionary containing message details.
if msg_type == MessageType.DEPOSIT:
DEPOSIT_MESSAGES.labels(address, msg.get('stakingModuleId', -1), version).inc()
return True
Returns:
bool: True if the message should be processed, False otherwise.
"""
msg_type = msg.get('type')
logger.info({'msg': 'Guardian message received.', 'value': msg, 'type': msg_type})

if msg_type == MessageType.UNVET:
UNVET_MESSAGES.labels(address, msg.get('stakingModuleId', -1), version).inc()
address = msg.get('guardianAddress')
version = msg.get('app', {}).get('version')
transport = msg.get('transport', '')
chain_id = msg.get('chain_id', '')
staking_module_id = msg.get('stakingModuleId', -1)

metrics_map = {
MessageType.PAUSE: PAUSE_MESSAGES,
MessageType.DEPOSIT: DEPOSIT_MESSAGES,
MessageType.UNVET: UNVET_MESSAGES,
}

if msg_type in metrics_map:
metrics_map[msg_type].labels(
address=address,
staking_module_id=staking_module_id,
version=version,
transport=transport,
chain_id=chain_id,
).inc()
return True

if msg_type == MessageType.PING:
# Filter all ping messages, because we use them only for metrics
PING_MESSAGES.labels(address, version).inc()
PING_MESSAGES.labels(address=address, version=version, transport=transport, chain_id=chain_id).inc()
return False

logger.warning({'msg': 'Received unexpected msg type.', 'value': msg, 'type': msg_type})
Expand Down
22 changes: 1 addition & 21 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
from typing import Any

from prometheus_client import Gauge
from schema import Schema, SchemaError

logger = logging.getLogger(__name__)
Expand All @@ -24,12 +23,8 @@ def get_messages(self) -> list[dict]:
List[Dict]: A list of processed and valid messages.
"""
fetched = self._fetch_messages()
self.fetched_messages_metric.set(len(fetched))
processed = [self._process_msg(m) for m in fetched]
self.processed_messages_metric.set(len(processed))
valid = [msg for msg in processed if msg and self._is_valid(msg)]
self.valid_messages_metric.set(len(valid))
return valid
return [msg for msg in processed if msg and self._is_valid(msg)]

@abc.abstractmethod
def _fetch_messages(self) -> list:
Expand All @@ -48,18 +43,3 @@ def _is_valid(self, msg: dict):
return False

return True

@property
@abc.abstractmethod
def fetched_messages_metric(self) -> Gauge:
raise NotImplementedError('fetched_messages_metric')

@property
@abc.abstractmethod
def processed_messages_metric(self) -> Gauge:
raise NotImplementedError('processed_messages_metric')

@property
@abc.abstractmethod
def valid_messages_metric(self) -> Gauge:
raise NotImplementedError('valid_messages_metric')
22 changes: 7 additions & 15 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from eth_typing import ChecksumAddress
from eth_utils import to_bytes
from metrics.metrics import ONCHAIN_TRANSPORT_FETCHED_MESSAGES, ONCHAIN_TRANSPORT_PROCESSED_MESSAGES, ONCHAIN_TRANSPORT_VALID_MESSAGES
from prometheus_client import Gauge
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.rabbit import MessageType
Expand Down Expand Up @@ -240,7 +238,7 @@ def __init__(
logger.info('Data bus client initialized.')

self._w3 = w3
self._chain_id = self._w3.eth.chain_id
self._chain_id = str(self._w3.eth.chain_id)
self._allowed_guardians_provider = allowed_guardians_provider
self._parsers: List[EventParser] = [provider(w3) for provider in parsers_providers]

Expand Down Expand Up @@ -281,6 +279,12 @@ def _fetch_messages(self) -> list:
return []

def _process_msg(self, log: LogReceipt) -> Optional[dict]:
parsed = self._parse_log(log)
parsed['chain_id'] = self._chain_id
parsed['transport'] = 'onchain'
return parsed

def _parse_log(self, log: LogReceipt) -> Optional[dict]:
for parser in self._parsers:
try:
return parser.parse(log)
Expand All @@ -294,15 +298,3 @@ def _process_msg(self, log: LogReceipt) -> Optional[dict]:
}
)
return None

@property
def fetched_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_FETCHED_MESSAGES.labels(self._chain_id)

@property
def processed_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_PROCESSED_MESSAGES.labels(self._chain_id)

@property
def valid_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_VALID_MESSAGES.labels(self._chain_id)
24 changes: 7 additions & 17 deletions src/transport/msg_providers/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from typing import List, Optional

import variables
from metrics.metrics import RABBIT_TRANSPORT_FETCHED_MESSAGES, RABBIT_TRANSPORT_PROCESSED_MESSAGES, RABBIT_TRANSPORT_VALID_MESSAGES
from prometheus_client import Gauge
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.stomp.client import Client
Expand Down Expand Up @@ -100,23 +98,15 @@ def _receive_message_from_queue(self, body):
self._queue.append(body)

def _process_msg(self, msg: str) -> Optional[dict]:
parsed = self._parse_message(msg)
parsed['transport'] = 'rabbit'
return parsed

@staticmethod
def _parse_message(msg: str) -> Optional[dict]:
try:
value = json.loads(msg)
return json.loads(msg)
except ValueError as error:
# ignore not json msg
logger.warning({'msg': 'Broken message in Rabbit', 'value': str(msg), 'error': str(error)})
return None

return value

@property
def fetched_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_FETCHED_MESSAGES

@property
def processed_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_PROCESSED_MESSAGES

@property
def valid_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_VALID_MESSAGES
6 changes: 6 additions & 0 deletions src/transport/msg_types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ class Signature(TypedDict):
r: VRS
s: VRS
_vs: str


class Metadata(TypedDict):
type: str
transport: str
chain_id: str
6 changes: 2 additions & 4 deletions src/transport/msg_types/deposit.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from typing import TypedDict

from schema import And, Schema
from transport.msg_types.base import ADDRESS_REGREX, HASH_REGREX, Signature, SignatureSchema
from transport.msg_types.base import ADDRESS_REGREX, HASH_REGREX, Metadata, Signature, SignatureSchema

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,8 +43,7 @@
)


class DepositMessage(TypedDict):
type: str
class DepositMessage(Metadata):
depositRoot: str
nonce: int
blockNumber: int
Expand Down
6 changes: 2 additions & 4 deletions src/transport/msg_types/pause.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from typing import TypedDict

from schema import And, Schema
from transport.msg_types.base import ADDRESS_REGREX, Signature, SignatureSchema
from transport.msg_types.base import ADDRESS_REGREX, Metadata, Signature, SignatureSchema

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -35,8 +34,7 @@
)


class PauseMessage(TypedDict):
type: str
class PauseMessage(Metadata):
blockNumber: int
guardianAddress: str
signature: Signature
Expand Down
6 changes: 2 additions & 4 deletions src/transport/msg_types/ping.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from typing import TypedDict

from schema import And, Schema
from transport.msg_types.base import ADDRESS_REGREX
from transport.msg_types.base import ADDRESS_REGREX, Metadata
from web3 import Web3

logger = logging.getLogger(__name__)
Expand All @@ -17,8 +16,7 @@
)


class PingMessage(TypedDict):
type: str
class PingMessage(Metadata):
blockNumber: int
guardianAddress: str

Expand Down
6 changes: 2 additions & 4 deletions src/transport/msg_types/unvet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
from typing import TypedDict

from eth_typing import Hash32
from schema import And, Schema
from transport.msg_types.base import ADDRESS_REGREX, HASH_REGREX, HEX_BYTES_REGREX, Signature, SignatureSchema
from transport.msg_types.base import ADDRESS_REGREX, HASH_REGREX, HEX_BYTES_REGREX, Metadata, Signature, SignatureSchema

logger = logging.getLogger(__name__)

Expand All @@ -22,8 +21,7 @@
)


class UnvetMessage(TypedDict):
type: str
class UnvetMessage(Metadata):
blockNumber: int
blockHash: Hash32
guardianAddress: str
Expand Down

0 comments on commit 86e706c

Please sign in to comment.