Skip to content

Commit

Permalink
Rework buffer for providers
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Sep 5, 2024
1 parent 9bf3920 commit 9341074
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 29 deletions.
28 changes: 6 additions & 22 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
import logging
from collections import deque
from typing import Any, List, Optional

from schema import Schema, SchemaError
Expand All @@ -15,30 +14,15 @@ class BaseMessageProvider(abc.ABC):

def __init__(self, message_schema: Schema):
self.message_schema = message_schema
self._queue: deque[Any] = deque([])

def get_messages(self) -> List[dict]:
messages = []
"""
Fetches new messages, processes them, and filters out only the valid ones.
for _ in range(self.MAX_MESSAGES_RECEIVE):
msg = self._fetch_message()

if msg is None:
break

value = self._process_msg(msg)

if value and self._is_valid(value):
messages.append(value)

return messages

def _fetch_message(self) -> Optional[Any]:
if not self._queue:
messages = self._fetch_messages()
if messages:
self._queue.extend(messages)
return None if not self._queue else self._queue.popleft()
Returns:
List[Dict]: A list of processed and valid messages.
"""
return [msg for msg in (self._process_msg(m) for m in self._fetch_messages()) if msg and self._is_valid(msg)]

@abc.abstractmethod
def _fetch_messages(self) -> List[Any]:
Expand Down
13 changes: 10 additions & 3 deletions src/transport/msg_providers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, message_schema: Schema, client: str):
def __del__(self):
self.kafka.close()

def _receive_message(self) -> Optional[dict]:
def _receive_message(self) -> Optional[str]:
msg = self.kafka.poll(timeout=1)
if msg is None:
return None
Expand All @@ -63,8 +63,15 @@ def _receive_message(self) -> Optional[dict]:
return msg.value()

def _fetch_messages(self) -> List[Any]:
msg = self._receive_message()
return [] if msg is None else [msg]
messages = []

for _ in range(self.MAX_MESSAGES_RECEIVE):
msg = self._receive_message()
if msg is None:
break
messages.append(msg)

return messages

def _process_msg(self, msg: str) -> Optional[dict]:
try:
Expand Down
15 changes: 11 additions & 4 deletions src/transport/msg_providers/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ def _recreate_client(self):
def __del__(self):
self.client.disconnect()

def _fetch_messages(self) -> List[Any]:
messages = []

for _ in range(self.MAX_MESSAGES_RECEIVE):
msg = self._receive_message()
if msg is None:
break
messages.append(msg)

return messages

def _receive_message(self) -> Optional[dict]:
if not self.connection:
raise ConnectionError('Connection RabbitMQ was lost.')
Expand All @@ -83,10 +94,6 @@ def _receive_message(self) -> Optional[dict]:
except IndexError:
return None

def _fetch_messages(self) -> List[Any]:
msg = self._receive_message()
return [] if msg is None else [msg]

def _receive_message_from_queue(self, body):
self._queue.append(body)

Expand Down

0 comments on commit 9341074

Please sign in to comment.