From 254cfbe04e5476377ec0fa470bb18cd9b82bc0ff Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Tue, 19 Nov 2024 13:16:10 -0600 Subject: [PATCH 1/3] Add regular expression matching to check file extensions on ingest Fix collection name in example configuration file Change info logger messages to debug --- etc/msg_oods.yaml | 4 ++-- python/lsst/ctrl/oods/butlerAttendant.py | 26 +++++++++++++----------- python/lsst/ctrl/oods/msgIngester.py | 18 +++++++++++++++- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/etc/msg_oods.yaml b/etc/msg_oods.yaml index 649e5e5..b304972 100644 --- a/etc/msg_oods.yaml +++ b/etc/msg_oods.yaml @@ -20,9 +20,9 @@ ingester: name : MessageAttendant repoDirectory : /tmp/repo/LATISS collections: - - LATISS/raw/all + - LSSTCam/raw/all cleanCollections: - - collection: LATISS/raw/all + - collection: LSSTCam/raw/all filesOlderThan: <<: *interval seconds: 30 diff --git a/python/lsst/ctrl/oods/butlerAttendant.py b/python/lsst/ctrl/oods/butlerAttendant.py index f92e399..9b11ce5 100644 --- a/python/lsst/ctrl/oods/butlerAttendant.py +++ b/python/lsst/ctrl/oods/butlerAttendant.py @@ -245,14 +245,15 @@ def cleanCollection(self, collection, olderThan): ) t = t - td - LOGGER.info("about to createButler()") + LOGGER.info("cleaning collections") + LOGGER.debug("about to createButler()") butler = self.createButler() - LOGGER.info("about to refresh()") + LOGGER.debug("about to refresh()") butler.registry.refresh() # get all datasets in these collections - LOGGER.info("about to call queryDatasets") + LOGGER.debug("about to call queryDatasets") all_datasets = set( butler.registry.queryDatasets( datasetType=..., @@ -261,25 +262,25 @@ def cleanCollection(self, collection, olderThan): bind={"ref_date": t}, ) ) - LOGGER.info("done calling queryDatasets") + LOGGER.debug("done calling queryDatasets") # get all TAGGED collections - LOGGER.info("about to call queryCollections") + LOGGER.debug("about to call queryCollections") tagged_cols = list(butler.registry.queryCollections(collectionTypes=CollectionType.TAGGED)) - LOGGER.info("done calling queryCollections") + LOGGER.debug("done calling queryCollections") # Note: The code below is to get around an issue where passing # an empty list as the collections argument to queryDatasets # returns all datasets. if tagged_cols: # get all TAGGED datasets - LOGGER.info("about to run queryDatasets for TAGGED collections") + LOGGER.debug("about to run queryDatasets for TAGGED collections") tagged_datasets = set(butler.registry.queryDatasets(datasetType=..., collections=tagged_cols)) - LOGGER.info("done running queryDatasets for TAGGED collections; differencing datasets") + LOGGER.debug("done running queryDatasets for TAGGED collections; differencing datasets") # get a set of datasets in all_datasets, but not in tagged_datasets ref = all_datasets.difference(tagged_datasets) - LOGGER.info("done differencing datasets") + LOGGER.debug("done differencing datasets") else: # no TAGGED collections, so use all_datasets ref = all_datasets @@ -305,9 +306,10 @@ def cleanCollection(self, collection, olderThan): except Exception as e: LOGGER.warning("couldn't remove %s: %s", uri, e) - LOGGER.info("about to run pruneDatasets") + LOGGER.debug("about to run pruneDatasets") butler.pruneDatasets(ref, purge=True, unstore=True) - LOGGER.info("done running pruneDatasets") + LOGGER.debug("done running pruneDatasets") + LOGGER.info("done cleaning collections") def rawexposure_info(self, data): """Return a sparsely initialized dictionary @@ -359,6 +361,6 @@ def definer_run(self, file_datasets): refs = fds.refs ids = [ref.dataId for ref in refs] self.visit_definer.run(ids) - LOGGER.info("Defined visits for %s", ids) + LOGGER.debug("Defined visits for %s", ids) except Exception as e: LOGGER.exception(e) diff --git a/python/lsst/ctrl/oods/msgIngester.py b/python/lsst/ctrl/oods/msgIngester.py index 9917bbf..f6aaa3b 100644 --- a/python/lsst/ctrl/oods/msgIngester.py +++ b/python/lsst/ctrl/oods/msgIngester.py @@ -21,6 +21,8 @@ import asyncio import logging +import os +import re from confluent_kafka import KafkaError from lsst.ctrl.oods.bucketMessage import BucketMessage @@ -85,6 +87,9 @@ def __init__(self, mainConfig, csc=None): self.tasks = [] self.dequeue_task = None + self.regex = re.compile(os.environ.get("DATASET_REGEXP", r".*\.(fits|fits.fz)$")) + LOGGER.info(f"Ingesting files matching regular expression {self.regex.pattern}") + def get_butler_clean_tasks(self): """Get a list of all butler run_task methods @@ -140,6 +145,9 @@ def stop_tasks(self): task.cancel() self.tasks = [] + def filter_by_regex(self, files): + return [s for s in files if self.regex.search(s)] + async def dequeue_and_ingest_files(self): self.running = True while self.running: @@ -152,6 +160,8 @@ async def dequeue_and_ingest_files(self): else: raise Exception(f"KafkaError = {m.error().code()}") rps = self._gather_all_resource_paths(m) + if rps is None: + continue resources.extend(rps) await self.ingest(resources) @@ -162,6 +172,12 @@ def _gather_all_resource_paths(self, m): # extract all urls within this message msg = BucketMessage(m.value()) - rp_list = [ResourcePath(url) for url in msg.extract_urls()] + names = [url for url in msg.extract_urls()] + + name_list = self.filter_by_regex(names) + if len(name_list) == 0: + return None + + rp_list = [ResourcePath(url) for url in name_list] return rp_list From a584a0b7e4271e56172366b5abfcc02e7942bc47 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Tue, 19 Nov 2024 16:19:27 -0600 Subject: [PATCH 2/3] Change some variable names to make them clearer. Make if statement more pythonic --- python/lsst/ctrl/oods/msgIngester.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/lsst/ctrl/oods/msgIngester.py b/python/lsst/ctrl/oods/msgIngester.py index f6aaa3b..a55a718 100644 --- a/python/lsst/ctrl/oods/msgIngester.py +++ b/python/lsst/ctrl/oods/msgIngester.py @@ -172,12 +172,12 @@ def _gather_all_resource_paths(self, m): # extract all urls within this message msg = BucketMessage(m.value()) - names = [url for url in msg.extract_urls()] + urls = [url for url in msg.extract_urls()] - name_list = self.filter_by_regex(names) - if len(name_list) == 0: - return None + filtered_urls = self.filter_by_regex(urls) - rp_list = [ResourcePath(url) for url in name_list] + if filtered_urls: + rps = [ResourcePath(url) for url in filtered_urls] + return rps - return rp_list + return None From 7f9d06f77350e462d55e20a922fa5b631b115b90 Mon Sep 17 00:00:00 2001 From: Stephen R Pietrowicz Date: Wed, 20 Nov 2024 09:52:44 -0600 Subject: [PATCH 3/3] Change repo name --- etc/msg_oods.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/msg_oods.yaml b/etc/msg_oods.yaml index b304972..738e07f 100644 --- a/etc/msg_oods.yaml +++ b/etc/msg_oods.yaml @@ -18,7 +18,7 @@ ingester: class: import : lsst.ctrl.oods.messageAttendant name : MessageAttendant - repoDirectory : /tmp/repo/LATISS + repoDirectory : /tmp/repo/LSSTCAM collections: - LSSTCam/raw/all cleanCollections: