diff --git a/projects/log_listener/pyproject.toml b/projects/log_listener/pyproject.toml index 5a99e95..136ef8e 100644 --- a/projects/log_listener/pyproject.toml +++ b/projects/log_listener/pyproject.toml @@ -16,9 +16,9 @@ classifiers = [ ] requires-python = ">=3.9" dependencies = [ - "eiffellib[rabbitmq]==2.4.1", - "fastapi==0.109.1", - "uvicorn==0.22.0", + "etos_lib==5.1.0", + "fastapi==0.109.1", + "uvicorn==0.22.0", ] [project.urls] @@ -49,4 +49,4 @@ testpaths = ["tests"] root = "../.." [tool.setuptools.packages] -find = { where = ["src"] } \ No newline at end of file +find = { where = ["src"] } diff --git a/projects/log_listener/src/log_listener/listener.py b/projects/log_listener/src/log_listener/listener.py index 02c91d2..f6ecd42 100644 --- a/projects/log_listener/src/log_listener/listener.py +++ b/projects/log_listener/src/log_listener/listener.py @@ -14,15 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. """Log listener module.""" + import json import logging import os import pathlib import threading +import traceback import time from typing import Optional from eiffellib.events import EiffelTestExecutionRecipeCollectionCreatedEvent +from etos_lib.lib.config import Config +from etos_lib.messaging.events import parse +from etos_lib.messaging.v2alpha.publisher import Publisher from .log_subscriber import LogSubscriber @@ -47,6 +52,9 @@ def __init__(self, lock: threading.Lock, log_file: pathlib.Path, event_file: pat with self.lock: with self.event_file.open() as _event_file: self.id = len(_event_file.readlines()) + 1 + self.v2client = Publisher( + Config().etos_rabbitmq_publisher_uri(), Config().etos_stream_name() + ) @property def identifier(self) -> str: @@ -69,7 +77,13 @@ def new_event(self, event: dict, _: Optional[str] = None) -> None: """Get a new event from the internal RabbitMQ bus and write it to file.""" if event.get("event") is None: event = {"event": "message", "data": event} - self.__write(**event) + self.__write(event.get("event", ""), event.get("data", "")) + try: + self.v2client.publish(self.identifier, parse(event)) + except: # pylint:disable=bare-except + # A catch-all exception here because if v2 publish fails, this + # function will be called again with the same event. + traceback.print_exc() def __write(self, event: str, data: str) -> None: """Write an event, and its data, to a file.""" @@ -119,8 +133,10 @@ def run(self) -> None: self.rabbitmq.subscribe("*", self.new_event) self.rabbitmq.start() self.rabbitmq.wait_start() + self.v2client.start() while self.rabbitmq.is_alive() and not self.__stop: time.sleep(0.1) + self.v2client.close() self.rabbitmq.stop() self.rabbitmq.wait_close()