Skip to content

Commit

Permalink
Merge branch 'tickets/DM-47588'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Nov 18, 2024
2 parents 9f1831a + 882b683 commit 3af4c3f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 5 deletions.
2 changes: 1 addition & 1 deletion etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ingester:
max_messages: 10
butlers:
- butler:
instrument: lsst.obs.lsst.Latiss
instrument: lsst.obs.lsst.LsstCam
class:
import : lsst.ctrl.oods.messageAttendant
name : MessageAttendant
Expand Down
4 changes: 3 additions & 1 deletion python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ async def ingest(self, file_list):
LOGGER.info("about to ingest")
await loop.run_in_executor(executor, self.task.run, file_list)
LOGGER.info("done with ingest")
except RuntimeError as re:
LOGGER.info(f"{re}")
except Exception as e:
LOGGER.exception(f"Exception! {e=}")

Expand Down Expand Up @@ -195,7 +197,7 @@ def transmit_status(self, metadata, code, description):
msg["MSG_TYPE"] = "IMAGE_IN_OODS"
msg["STATUS_CODE"] = code
msg["DESCRIPTION"] = description
LOGGER.info("msg: %s, code: %s, description: %s", msg, code, description)
LOGGER.debug("msg: %s, code: %s, description: %s", msg, code, description)
if self.csc is None:
self.print_msg(msg)
return
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/oods/msgQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def __init__(self, brokers, group_id, topics, max_messages):
use_auth = False

if use_auth:
LOGGER.info("{MECHANISM_KEY} set to {mechanism}")
LOGGER.info("{PROTOCOL_KEY} set to {protocol}")
LOGGER.info(f"{MECHANISM_KEY} set to {mechanism}")
LOGGER.info(f"{PROTOCOL_KEY} set to {protocol}")
config = {
"bootstrap.servers": ",".join(self.brokers),
"group.id": self.group_id,
Expand Down
44 changes: 44 additions & 0 deletions tests/data/kafka_msg2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"Records": [
{
"eventVersion": "2.2",
"eventSource": "ceph:s3",
"awsRegion": "s3-butler",
"eventTime": "2024-11-13T17:52:22.138912Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "lsstcam"
},
"requestParameters": {
"sourceIPAddress": ""
},
"responseElements": {
"x-amz-request-id": "1cad9458-45e6-48cf-ae5a-2bb195396da7.41369686.13031161180866775831",
"x-amz-id-2": "2774056-s3-butler-s3-butler"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "lsst.s3.raw.lsstcam",
"bucket": {
"name": "rubinobs-raw-lsstcam",
"ownerIdentity": {
"principalId": "lsstcam"
},
"arn": "arn:aws:s3:s3-butler::rubinobs-raw-lsstcam",
"id": "1cad9458-45e6-48cf-ae5a-2bb195396da7.28735610.1"
},
"object": {
"key": "test.txt",
"size": 0,
"eTag": "d41d8cd98f00b204e9800998ecf8427e",
"versionId": "",
"sequencer": "56E7346729097308",
"metadata": [],
"tags": []
}
},
"eventId": "1731520342.141756.d41d8cd98f00b204e9800998ecf8427e",
"opaqueData": "&mechanism=SCRAM-SHA-512"
}
]
}
13 changes: 12 additions & 1 deletion tests/test_bucket_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def createBucketMessage(self, msg_file):
bucket_message = BucketMessage(message)
return bucket_message

def testBucketMessage(self):
def testBucketMessage1(self):
"""test that the message we're extracting is expected"""
bucket_message = self.createBucketMessage("kafka_msg.json")
url_list = list()
Expand All @@ -58,6 +58,17 @@ def testBucketMessage(self):
f = "s3://rubin-pp/HSC/73/2023061400090/0/6140090/HSC-Z/HSC-2023061400090-0-6140090-HSC-Z-73.fz"
self.assertEqual(url_list[0], f)

def testBucketMessage2(self):
"""test that the message we're extracting is expected"""
bucket_message = self.createBucketMessage("kafka_msg2.json")
url_list = list()
for url in bucket_message.extract_urls():
url_list.append(url)

self.assertEqual(len(url_list), 1)
f = "s3://rubinobs-raw-lsstcam/test.txt"
self.assertEqual(url_list[0], f)

def testBadBucketMessage(self):
"""test that a bad message throws an exception"""
bucket_message = self.createBucketMessage("bad_kafka_msg.json")
Expand Down

0 comments on commit 3af4c3f

Please sign in to comment.