Skip to content

Commit

Permalink
Merge branch 'tickets/DM-47539'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Nov 14, 2024
2 parents 5d582b3 + 742df0b commit 9f1831a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 19 deletions.
21 changes: 14 additions & 7 deletions bin.src/standalone
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import asyncio
import logging
import os
import sys

import lsst.log as lsstlog
import yaml
Expand Down Expand Up @@ -82,10 +83,10 @@ class Standalone(object):
self.ingester = MsgIngester(self.config, self)

self.task_list = self.ingester.run_tasks()
return self.task_list

async def stop_services(self):
"""Stop all cleanup and archiving services"""
print("calling stop_tasks")
self.ingester.stop_tasks()

async def done(self):
Expand All @@ -94,14 +95,20 @@ class Standalone(object):
await asyncio.sleep(3600)
print("done!")

async def call_fault(self, code, report):
LOGGER.info(f"{code}: {report}")
await self.stop_services()

async def main(self):
await self.start_services()
group = asyncio.gather(self.done())
print("gathering")
await group
print("finished")
tasks = await self.start_services()
try:
group = await asyncio.gather(*tasks)
except asyncio.exceptions.CancelledError:
LOGGER.info("cancelled")
except Exception as e:
LOGGER.info(e)
LOGGER.info("exiting")
await self.stop_services()
print("complete")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def ingest(self, file_list):
await loop.run_in_executor(executor, self.task.run, file_list)
LOGGER.info("done with ingest")
except Exception as e:
print(f"Exception! {e=}")
LOGGER.exception(f"Exception! {e=}")

def create_bad_dirname(self, bad_dir_root, staging_dir_root, original):
"""Create a full path to a directory contained in the
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/ctrl/oods/msgIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import asyncio
import logging

from confluent_kafka import KafkaError
from lsst.ctrl.oods.bucketMessage import BucketMessage
from lsst.ctrl.oods.butlerProxy import ButlerProxy
from lsst.ctrl.oods.msgQueue import MsgQueue
Expand Down Expand Up @@ -145,6 +146,11 @@ async def dequeue_and_ingest_files(self):
message_list = await self.msgQueue.dequeue_messages()
resources = []
for m in message_list:
if m.error():
if m.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
raise Exception("The topic or partition does not exist")
else:
raise Exception(f"KafkaError = {m.error().code()}")
rps = self._gather_all_resource_paths(m)
resources.extend(rps)
await self.ingest(resources)
Expand Down
41 changes: 30 additions & 11 deletions python/lsst/ctrl/oods/msgQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,39 @@ def __init__(self, brokers, group_id, topics, max_messages):
self.msgList = list()
self.condition = asyncio.Condition()

username = os.environ.get(USERNAME_KEY, "USERNAME_NOT_CONFIGURED")
password = os.environ.get(PASSWORD_KEY, "PASSWORD_NOT_CONFIGURED")
username = os.environ.get(USERNAME_KEY, None)
password = os.environ.get(PASSWORD_KEY, None)
mechanism = os.environ.get(MECHANISM_KEY, SASL_MECHANISM)
protocol = os.environ.get(PROTOCOL_KEY, SECURITY_PROTOCOL)

config = {
"bootstrap.servers": ",".join(self.brokers),
"group.id": self.group_id,
"auto.offset.reset": "earliest",
"security.protocol": protocol,
"sasl.mechanism": mechanism,
"sasl.username": username,
"sasl.password": password,
}
use_auth = True
if username is None:
LOGGER.info(f"{USERNAME_KEY} has not been set.")
use_auth = False
if password is None:
LOGGER.info(f"{PASSWORD_KEY} has not been set.")
use_auth = False

if use_auth:
LOGGER.info("{MECHANISM_KEY} set to {mechanism}")
LOGGER.info("{PROTOCOL_KEY} set to {protocol}")
config = {
"bootstrap.servers": ",".join(self.brokers),
"group.id": self.group_id,
"auto.offset.reset": "earliest",
"security.protocol": protocol,
"sasl.mechanism": mechanism,
"sasl.username": username,
"sasl.password": password,
}
else:
LOGGER.info("Defaulting to no authentication to Kafka")
config = {
"bootstrap.servers": ",".join(self.brokers),
"group.id": self.group_id,
"auto.offset.reset": "earliest",
}

# note: this is done because mocking a cimpl is...tricky
self.createConsumer(config, topics)
self.running = True
Expand Down

0 comments on commit 9f1831a

Please sign in to comment.