From cecf85cb6227b17ba18c72b2541857d161f6d35b Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 1 Nov 2024 16:07:01 -0500 Subject: [PATCH 1/6] Run ingest in smaller batches --- python/lsst/ctrl/oods/fileIngester.py | 22 ++++++++++++++++++++-- python/lsst/ctrl/oods/msgIngester.py | 4 ++-- python/lsst/ctrl/oods/msgQueue.py | 25 +++++-------------------- tests/test_msgqueue.py | 2 +- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/python/lsst/ctrl/oods/fileIngester.py b/python/lsst/ctrl/oods/fileIngester.py index 0b8cb4e..081a554 100644 --- a/python/lsst/ctrl/oods/fileIngester.py +++ b/python/lsst/ctrl/oods/fileIngester.py @@ -23,6 +23,7 @@ import logging import os import os.path +from itertools import islice from lsst.ctrl.oods.butlerProxy import ButlerProxy from lsst.ctrl.oods.cacheCleaner import CacheCleaner @@ -201,5 +202,22 @@ async def dequeue_and_ingest_files(self): # to the area where they're staged for the OODS. # Files staged here so the scanning asyncio routine doesn't # queue them twice. - butler_file_list = self.stageFiles(file_list) - await self.ingest(butler_file_list) + for files in self._grouped(file_list, 1000): + butler_file_list = self.stageFiles(files) + await self.ingest(butler_file_list) + + def _grouped(self, file_list, n): + # this should be replaced by itertools.batched + # when we up-rev to python 3.13 + """return 'n' element groups from file_list + + Parameters + ---------- + file_list: `list` + an iterable data structure + n: `int` + largest group to return at once + """ + it = iter(file_list) + while batch := tuple(islice(it, n)): + yield batch diff --git a/python/lsst/ctrl/oods/msgIngester.py b/python/lsst/ctrl/oods/msgIngester.py index 778d0a7..9af3ef6 100644 --- a/python/lsst/ctrl/oods/msgIngester.py +++ b/python/lsst/ctrl/oods/msgIngester.py @@ -121,8 +121,8 @@ def run_tasks(self): # this is split into two tasks so they can run at slightly different # cadences. We want to gather as many files as we can before we # do the ingest - task = asyncio.create_task(self.msgQueue.queue_files()) - self.tasks.append(task) + #task = asyncio.create_task(self.msgQueue.queue_files()) + #self.tasks.append(task) task = asyncio.create_task(self.dequeue_and_ingest_files()) self.tasks.append(task) diff --git a/python/lsst/ctrl/oods/msgQueue.py b/python/lsst/ctrl/oods/msgQueue.py index 53134f3..ff95361 100644 --- a/python/lsst/ctrl/oods/msgQueue.py +++ b/python/lsst/ctrl/oods/msgQueue.py @@ -73,6 +73,7 @@ def __init__(self, brokers, group_id, topics, max_messages): } # note: this is done because mocking a cimpl is...tricky self.createConsumer(config, topics) + self.running = True def createConsumer(self, config, topics): """Create a Kafka Consumer""" @@ -80,20 +81,6 @@ def createConsumer(self, config, topics): self.consumer.subscribe(topics) LOGGER.info("subscribed") - async def queue_files(self): - """Queue all files in messages on the subscribed topics""" - loop = asyncio.get_running_loop() - # now, add all the currently known files to the queue - self.running = True - while self.running: - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - message_list = await loop.run_in_executor(pool, self._get_messages) - - if message_list: - async with self.condition: - self.msgList.extend(message_list) - self.condition.notify_all() - def _get_messages(self): """Return up to max_messages at a time from Kafka""" LOGGER.debug("getting more messages") @@ -109,12 +96,10 @@ def _get_messages(self): return m_list async def dequeue_messages(self): - """Return all of the messages retrieved so far""" - # get a list of messages, clear the msgList - async with self.condition: - await self.condition.wait() - message_list = list(self.msgList) - self.msgList.clear() + """Retrieve messages""" + loop = asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + message_list = await loop.run_in_executor(pool, self._get_messages) return message_list def commit(self, message): diff --git a/tests/test_msgqueue.py b/tests/test_msgqueue.py index 84e1158..9f15581 100644 --- a/tests/test_msgqueue.py +++ b/tests/test_msgqueue.py @@ -51,7 +51,7 @@ async def testMsgQueue(self, MockClass1): mq.consumer.close = MagicMock() task_list = [] - task_list.append(asyncio.create_task(mq.queue_files())) + #task_list.append(asyncio.create_task(mq.queue_files())) task_list.append(asyncio.create_task(self.interrupt_me())) msg = await mq.dequeue_messages() self.assertEqual(len(msg), 1) From f83746fc771c6061fe7bed5c916674a15d0109d6 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 1 Nov 2024 16:27:47 -0500 Subject: [PATCH 2/6] Report version --- python/lsst/ctrl/oods/dm_csc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/ctrl/oods/dm_csc.py b/python/lsst/ctrl/oods/dm_csc.py index 953842e..f096d4c 100644 --- a/python/lsst/ctrl/oods/dm_csc.py +++ b/python/lsst/ctrl/oods/dm_csc.py @@ -45,10 +45,10 @@ class DmCsc(BaseCsc): valid_simulation_modes = [0] version = __version__ - LOGGER.info(f"{version=}") def __init__(self, name, initial_state): super().__init__(name, initial_state=initial_state) + LOGGER.info(f"OODS version: {self.version}") self.estimated_timeout = 5.0 From 98c740b15b5a047e61761a0f805b08b0647aa9d0 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 8 Nov 2024 13:39:38 -0600 Subject: [PATCH 3/6] Make file based ingestion adhere to batchSize limit in configuration file --- etc/oods_example.yaml | 1 + python/lsst/ctrl/oods/fileIngester.py | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/etc/oods_example.yaml b/etc/oods_example.yaml index bbd455b..fd640c2 100644 --- a/etc/oods_example.yaml +++ b/etc/oods_example.yaml @@ -5,6 +5,7 @@ defaultInterval: &interval seconds: 0 ingester: + imageStagingDirectory: /tmp butlers: - butler: collections: diff --git a/python/lsst/ctrl/oods/fileIngester.py b/python/lsst/ctrl/oods/fileIngester.py index 081a554..cabab01 100644 --- a/python/lsst/ctrl/oods/fileIngester.py +++ b/python/lsst/ctrl/oods/fileIngester.py @@ -33,6 +33,8 @@ LOGGER = logging.getLogger(__name__) +DEFAULT_BATCH_SIZE = 1000 + class FileIngester(object): """Ingest files into the butler specified in the configuration. @@ -51,6 +53,11 @@ def __init__(self, mainConfig, csc=None): self.config = mainConfig["ingester"] self.image_staging_dir = self.config["imageStagingDirectory"] + self.batch_size = self.config.get("batchSize", None) + if self.batch_size is None: + LOGGER.info(f"configuration 'batchSize' not set, defaulting to {DEFAULT_BATCH_SIZE}") + self.batch_size = DEFAULT_BATCH_SIZE + LOGGER.info(f'will ingest in groups of batchSize={self.batch_size}') scanInterval = self.config["scanInterval"] seconds = TimeInterval.calculateTotalSeconds(scanInterval) @@ -202,7 +209,7 @@ async def dequeue_and_ingest_files(self): # to the area where they're staged for the OODS. # Files staged here so the scanning asyncio routine doesn't # queue them twice. - for files in self._grouped(file_list, 1000): + for files in self._grouped(file_list, self.batch_size): butler_file_list = self.stageFiles(files) await self.ingest(butler_file_list) From 79be8b5cb563577fede205553dc040d7c1b9a69b Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 8 Nov 2024 13:47:37 -0600 Subject: [PATCH 4/6] Add comments --- python/lsst/ctrl/oods/msgQueue.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/lsst/ctrl/oods/msgQueue.py b/python/lsst/ctrl/oods/msgQueue.py index ff95361..25d13b2 100644 --- a/python/lsst/ctrl/oods/msgQueue.py +++ b/python/lsst/ctrl/oods/msgQueue.py @@ -103,8 +103,16 @@ async def dequeue_messages(self): return message_list def commit(self, message): + """Perform Kafka commit a message + + Parameters + ---------- + message: Kafka message + message to commit + """ self.consumer.commit(message=message) def stop(self): + """shut down""" self.running = False self.consumer.close() From 94e949daaf0c552e66affca4bf1e139f03a76291 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 8 Nov 2024 14:07:17 -0600 Subject: [PATCH 5/6] Remove unnecessary parameter --- etc/msg_oods.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/etc/msg_oods.yaml b/etc/msg_oods.yaml index bfc96ad..be64e48 100644 --- a/etc/msg_oods.yaml +++ b/etc/msg_oods.yaml @@ -29,4 +29,3 @@ ingester: scanInterval: <<: *interval seconds: 10 - batchSize: 20 From d6dc8c90adc577a3e720206f9e343823f0cb8967 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Fri, 8 Nov 2024 14:10:57 -0600 Subject: [PATCH 6/6] Fix lint and formatting issues --- python/lsst/ctrl/oods/fileIngester.py | 2 +- python/lsst/ctrl/oods/msgIngester.py | 2 -- tests/test_msgqueue.py | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/lsst/ctrl/oods/fileIngester.py b/python/lsst/ctrl/oods/fileIngester.py index cabab01..ef9d30f 100644 --- a/python/lsst/ctrl/oods/fileIngester.py +++ b/python/lsst/ctrl/oods/fileIngester.py @@ -57,7 +57,7 @@ def __init__(self, mainConfig, csc=None): if self.batch_size is None: LOGGER.info(f"configuration 'batchSize' not set, defaulting to {DEFAULT_BATCH_SIZE}") self.batch_size = DEFAULT_BATCH_SIZE - LOGGER.info(f'will ingest in groups of batchSize={self.batch_size}') + LOGGER.info(f"will ingest in groups of batchSize={self.batch_size}") scanInterval = self.config["scanInterval"] seconds = TimeInterval.calculateTotalSeconds(scanInterval) diff --git a/python/lsst/ctrl/oods/msgIngester.py b/python/lsst/ctrl/oods/msgIngester.py index 9af3ef6..3a27d16 100644 --- a/python/lsst/ctrl/oods/msgIngester.py +++ b/python/lsst/ctrl/oods/msgIngester.py @@ -121,8 +121,6 @@ def run_tasks(self): # this is split into two tasks so they can run at slightly different # cadences. We want to gather as many files as we can before we # do the ingest - #task = asyncio.create_task(self.msgQueue.queue_files()) - #self.tasks.append(task) task = asyncio.create_task(self.dequeue_and_ingest_files()) self.tasks.append(task) diff --git a/tests/test_msgqueue.py b/tests/test_msgqueue.py index 9f15581..27babcf 100644 --- a/tests/test_msgqueue.py +++ b/tests/test_msgqueue.py @@ -51,7 +51,6 @@ async def testMsgQueue(self, MockClass1): mq.consumer.close = MagicMock() task_list = [] - #task_list.append(asyncio.create_task(mq.queue_files())) task_list.append(asyncio.create_task(self.interrupt_me())) msg = await mq.dequeue_messages() self.assertEqual(len(msg), 1)