Skip to content

Commit

Permalink
Merge branch 'tickets/DM-46719'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Nov 11, 2024
2 parents 3475eb6 + d6dc8c9 commit 5d582b3
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 27 deletions.
1 change: 0 additions & 1 deletion etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ ingester:
scanInterval:
<<: *interval
seconds: 10
batchSize: 20
1 change: 1 addition & 0 deletions etc/oods_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defaultInterval: &interval
seconds: 0

ingester:
imageStagingDirectory: /tmp
butlers:
- butler:
collections:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/oods/dm_csc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class DmCsc(BaseCsc):

valid_simulation_modes = [0]
version = __version__
LOGGER.info(f"{version=}")

def __init__(self, name, initial_state):
super().__init__(name, initial_state=initial_state)
LOGGER.info(f"OODS version: {self.version}")

self.estimated_timeout = 5.0

Expand Down
29 changes: 27 additions & 2 deletions python/lsst/ctrl/oods/fileIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import logging
import os
import os.path
from itertools import islice

from lsst.ctrl.oods.butlerProxy import ButlerProxy
from lsst.ctrl.oods.cacheCleaner import CacheCleaner
Expand All @@ -32,6 +33,8 @@

LOGGER = logging.getLogger(__name__)

DEFAULT_BATCH_SIZE = 1000


class FileIngester(object):
"""Ingest files into the butler specified in the configuration.
Expand All @@ -50,6 +53,11 @@ def __init__(self, mainConfig, csc=None):
self.config = mainConfig["ingester"]

self.image_staging_dir = self.config["imageStagingDirectory"]
self.batch_size = self.config.get("batchSize", None)
if self.batch_size is None:
LOGGER.info(f"configuration 'batchSize' not set, defaulting to {DEFAULT_BATCH_SIZE}")
self.batch_size = DEFAULT_BATCH_SIZE
LOGGER.info(f"will ingest in groups of batchSize={self.batch_size}")
scanInterval = self.config["scanInterval"]
seconds = TimeInterval.calculateTotalSeconds(scanInterval)

Expand Down Expand Up @@ -201,5 +209,22 @@ async def dequeue_and_ingest_files(self):
# to the area where they're staged for the OODS.
# Files staged here so the scanning asyncio routine doesn't
# queue them twice.
butler_file_list = self.stageFiles(file_list)
await self.ingest(butler_file_list)
for files in self._grouped(file_list, self.batch_size):
butler_file_list = self.stageFiles(files)
await self.ingest(butler_file_list)

def _grouped(self, file_list, n):
# this should be replaced by itertools.batched
# when we up-rev to python 3.13
"""return 'n' element groups from file_list
Parameters
----------
file_list: `list`
an iterable data structure
n: `int`
largest group to return at once
"""
it = iter(file_list)
while batch := tuple(islice(it, n)):
yield batch
2 changes: 0 additions & 2 deletions python/lsst/ctrl/oods/msgIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ def run_tasks(self):
# this is split into two tasks so they can run at slightly different
# cadences. We want to gather as many files as we can before we
# do the ingest
task = asyncio.create_task(self.msgQueue.queue_files())
self.tasks.append(task)

task = asyncio.create_task(self.dequeue_and_ingest_files())
self.tasks.append(task)
Expand Down
33 changes: 13 additions & 20 deletions python/lsst/ctrl/oods/msgQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,14 @@ def __init__(self, brokers, group_id, topics, max_messages):
}
# note: this is done because mocking a cimpl is...tricky
self.createConsumer(config, topics)
self.running = True

def createConsumer(self, config, topics):
"""Create a Kafka Consumer"""
self.consumer = Consumer(config)
self.consumer.subscribe(topics)
LOGGER.info("subscribed")

async def queue_files(self):
"""Queue all files in messages on the subscribed topics"""
loop = asyncio.get_running_loop()
# now, add all the currently known files to the queue
self.running = True
while self.running:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
message_list = await loop.run_in_executor(pool, self._get_messages)

if message_list:
async with self.condition:
self.msgList.extend(message_list)
self.condition.notify_all()

def _get_messages(self):
"""Return up to max_messages at a time from Kafka"""
LOGGER.debug("getting more messages")
Expand All @@ -109,17 +96,23 @@ def _get_messages(self):
return m_list

async def dequeue_messages(self):
"""Return all of the messages retrieved so far"""
# get a list of messages, clear the msgList
async with self.condition:
await self.condition.wait()
message_list = list(self.msgList)
self.msgList.clear()
"""Retrieve messages"""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
message_list = await loop.run_in_executor(pool, self._get_messages)
return message_list

def commit(self, message):
"""Perform Kafka commit a message
Parameters
----------
message: Kafka message
message to commit
"""
self.consumer.commit(message=message)

def stop(self):
"""shut down"""
self.running = False
self.consumer.close()
1 change: 0 additions & 1 deletion tests/test_msgqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ async def testMsgQueue(self, MockClass1):
mq.consumer.close = MagicMock()

task_list = []
task_list.append(asyncio.create_task(mq.queue_files()))
task_list.append(asyncio.create_task(self.interrupt_me()))
msg = await mq.dequeue_messages()
self.assertEqual(len(msg), 1)
Expand Down

0 comments on commit 5d582b3

Please sign in to comment.