-
Notifications
You must be signed in to change notification settings - Fork 4
Update the log listener to publish v2 events as well #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,15 +14,20 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Log listener module.""" | ||
|
||
import json | ||
import logging | ||
import os | ||
import pathlib | ||
import threading | ||
import traceback | ||
import time | ||
from typing import Optional | ||
|
||
from eiffellib.events import EiffelTestExecutionRecipeCollectionCreatedEvent | ||
from etos_lib.lib.config import Config | ||
from etos_lib.messaging.events import parse | ||
from etos_lib.messaging.v2alpha.publisher import Publisher | ||
|
||
from .log_subscriber import LogSubscriber | ||
|
||
|
@@ -47,6 +52,9 @@ def __init__(self, lock: threading.Lock, log_file: pathlib.Path, event_file: pat | |
with self.lock: | ||
with self.event_file.open() as _event_file: | ||
self.id = len(_event_file.readlines()) + 1 | ||
self.v2client = Publisher( | ||
Config().etos_rabbitmq_publisher_uri(), Config().etos_stream_name() | ||
) | ||
|
||
@property | ||
def identifier(self) -> str: | ||
|
@@ -69,7 +77,13 @@ def new_event(self, event: dict, _: Optional[str] = None) -> None: | |
"""Get a new event from the internal RabbitMQ bus and write it to file.""" | ||
if event.get("event") is None: | ||
event = {"event": "message", "data": event} | ||
self.__write(**event) | ||
self.__write(event.get("event", ""), event.get("data", "")) | ||
try: | ||
self.v2client.publish(self.identifier, parse(event)) | ||
except: # pylint:disable=bare-except | ||
# A catch-all exception here because if v2 publish fails, this | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If v2 events suddenly stop getting published, we won't be able to see why. Is this correct? Is there a risk that this function will be called endlessly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I have added traceback.print_exc() locally that will be pushed, but no production release will be made using SSEv2 with the log listener. We'll still use v1 in production until all services publish v2.
Not anymore, since I added the bare except. |
||
# function will be called again with the same event. | ||
traceback.print_exc() | ||
|
||
def __write(self, event: str, data: str) -> None: | ||
"""Write an event, and its data, to a file.""" | ||
|
@@ -119,8 +133,10 @@ def run(self) -> None: | |
self.rabbitmq.subscribe("*", self.new_event) | ||
self.rabbitmq.start() | ||
self.rabbitmq.wait_start() | ||
self.v2client.start() | ||
while self.rabbitmq.is_alive() and not self.__stop: | ||
time.sleep(0.1) | ||
self.v2client.close() | ||
self.rabbitmq.stop() | ||
self.rabbitmq.wait_close() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identifier()
function will be called again. Then__write()
will be executed twice?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If v2 event publishing fails this function will not be called again since we catch all exceptions when publishing to v2.