Skip to content

Commit

Permalink
Buffer in the abstract class
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Sep 2, 2024
1 parent 650a08b commit 4731925
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
13 changes: 11 additions & 2 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import logging
from collections import deque
from typing import Any, List, Optional

from schema import Schema, SchemaError
Expand All @@ -14,12 +15,13 @@ class BaseMessageProvider(abc.ABC):

def __init__(self, message_schema: Schema):
self.message_schema = message_schema
self._queue: deque[Any] = deque([])

def get_messages(self) -> List[dict]:
messages = []

for _ in range(self.MAX_MESSAGES_RECEIVE):
msg = self._receive_message()
msg = self._fetch_message()

if msg is None:
break
Expand All @@ -31,8 +33,15 @@ def get_messages(self) -> List[dict]:

return messages

def _fetch_message(self) -> Optional[Any]:
if not self._queue:
messages = self._fetch_messages()
if messages:
self._queue.extend(messages)
return None if not self._queue else self._queue.popleft()

@abc.abstractmethod
def _receive_message(self) -> Any:
def _fetch_messages(self) -> List[Any]:
raise NotImplementedError('Receive message from transport.')

def _process_msg(self, msg: Any) -> Optional[dict]:
Expand Down
16 changes: 11 additions & 5 deletions src/transport/msg_providers/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Optional
from typing import Any, List, Optional

from confluent_kafka import Consumer as BaseConsumer
from schema import Schema
Expand Down Expand Up @@ -53,12 +53,18 @@ def __del__(self):

def _receive_message(self) -> Optional[dict]:
msg = self.kafka.poll(timeout=1)
if msg is None:
return None

if msg is not None:
if not msg.error():
return msg.value()

if msg.error():
logger.error({'msg': 'Kafka error', 'error': str(msg.error())})
return None

return msg.value()

def _fetch_messages(self) -> List[Any]:
msg = self._receive_message()
return [] if msg is None else [msg]

def _process_msg(self, msg: str) -> Optional[dict]:
try:
Expand Down
18 changes: 5 additions & 13 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections import deque
from enum import StrEnum
from typing import List, Optional
from typing import Any, List, Optional

from eth_account.account import VRS
from eth_typing import ChecksumAddress, HexStr
Expand Down Expand Up @@ -45,7 +45,6 @@


def signature_to_r_vs(signature: bytes) -> tuple[VRS, VRS]:
# 0 byte - 0x
r, _vs = signature[:32], signature[32:]
return HexStr(bytes_to_hex_string(r)), HexStr(bytes_to_hex_string(_vs))

Expand Down Expand Up @@ -226,15 +225,7 @@ def _construct_parsers(self, sinks: List[OnchainTransportSinks]) -> List[EventPa

return parsers

def _receive_message(self) -> Optional[LogReceipt]:
if not self._queue:
self._fetch_logs_into_queue()
try:
return self._queue.popleft()
except IndexError:
return None

def _fetch_logs_into_queue(self):
def _fetch_messages(self) -> List[Any]:
latest_block_number = self._w3.eth.block_number
from_block = max(0, latest_block_number - self.STANDARD_OFFSET) if self._latest_block == -1 else self._latest_block

Expand All @@ -247,23 +238,24 @@ def _fetch_logs_into_queue(self):
try:
logs = self._w3.eth.get_logs(filter_params)
if logs:
self._queue.extend(logs)
self._latest_block = latest_block_number

return logs
except BlockNotFound as e:
logger.error(
{
'msg': 'Block not found',
'err': repr(e),
}
)
return []
except Exception as e:
logger.error(
{
'msg': 'Failed to fetch logs',
'err': repr(e),
}
)
return []

def _process_msg(self, log: LogReceipt) -> Optional[dict]:
for parser in self._parsers:
Expand Down
6 changes: 5 additions & 1 deletion src/transport/msg_providers/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
import time
from typing import List, Optional
from typing import Any, List, Optional

import variables
from schema import Schema
Expand Down Expand Up @@ -83,6 +83,10 @@ def _receive_message(self) -> Optional[dict]:
except IndexError:
return None

def _fetch_messages(self) -> List[Any]:
msg = self._receive_message()
return [] if msg is None else [msg]

def _receive_message_from_queue(self, body):
self._queue.append(body)

Expand Down

0 comments on commit 4731925

Please sign in to comment.