Skip to content

Commit

Permalink
Merge branch 'tickets/DM-47634'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Nov 20, 2024
2 parents 3af4c3f + 7f9d06f commit de1224a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
6 changes: 3 additions & 3 deletions etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ ingester:
class:
import : lsst.ctrl.oods.messageAttendant
name : MessageAttendant
repoDirectory : /tmp/repo/LATISS
repoDirectory : /tmp/repo/LSSTCAM
collections:
- LATISS/raw/all
- LSSTCam/raw/all
cleanCollections:
- collection: LATISS/raw/all
- collection: LSSTCam/raw/all
filesOlderThan:
<<: *interval
seconds: 30
Expand Down
26 changes: 14 additions & 12 deletions python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,15 @@ def cleanCollection(self, collection, olderThan):
)
t = t - td

LOGGER.info("about to createButler()")
LOGGER.info("cleaning collections")
LOGGER.debug("about to createButler()")
butler = self.createButler()

LOGGER.info("about to refresh()")
LOGGER.debug("about to refresh()")
butler.registry.refresh()

# get all datasets in these collections
LOGGER.info("about to call queryDatasets")
LOGGER.debug("about to call queryDatasets")
all_datasets = set(
butler.registry.queryDatasets(
datasetType=...,
Expand All @@ -261,25 +262,25 @@ def cleanCollection(self, collection, olderThan):
bind={"ref_date": t},
)
)
LOGGER.info("done calling queryDatasets")
LOGGER.debug("done calling queryDatasets")

# get all TAGGED collections
LOGGER.info("about to call queryCollections")
LOGGER.debug("about to call queryCollections")
tagged_cols = list(butler.registry.queryCollections(collectionTypes=CollectionType.TAGGED))
LOGGER.info("done calling queryCollections")
LOGGER.debug("done calling queryCollections")

# Note: The code below is to get around an issue where passing
# an empty list as the collections argument to queryDatasets
# returns all datasets.
if tagged_cols:
# get all TAGGED datasets
LOGGER.info("about to run queryDatasets for TAGGED collections")
LOGGER.debug("about to run queryDatasets for TAGGED collections")
tagged_datasets = set(butler.registry.queryDatasets(datasetType=..., collections=tagged_cols))
LOGGER.info("done running queryDatasets for TAGGED collections; differencing datasets")
LOGGER.debug("done running queryDatasets for TAGGED collections; differencing datasets")

# get a set of datasets in all_datasets, but not in tagged_datasets
ref = all_datasets.difference(tagged_datasets)
LOGGER.info("done differencing datasets")
LOGGER.debug("done differencing datasets")
else:
# no TAGGED collections, so use all_datasets
ref = all_datasets
Expand All @@ -305,9 +306,10 @@ def cleanCollection(self, collection, olderThan):
except Exception as e:
LOGGER.warning("couldn't remove %s: %s", uri, e)

LOGGER.info("about to run pruneDatasets")
LOGGER.debug("about to run pruneDatasets")
butler.pruneDatasets(ref, purge=True, unstore=True)
LOGGER.info("done running pruneDatasets")
LOGGER.debug("done running pruneDatasets")
LOGGER.info("done cleaning collections")

def rawexposure_info(self, data):
"""Return a sparsely initialized dictionary
Expand Down Expand Up @@ -359,6 +361,6 @@ def definer_run(self, file_datasets):
refs = fds.refs
ids = [ref.dataId for ref in refs]
self.visit_definer.run(ids)
LOGGER.info("Defined visits for %s", ids)
LOGGER.debug("Defined visits for %s", ids)
except Exception as e:
LOGGER.exception(e)
20 changes: 18 additions & 2 deletions python/lsst/ctrl/oods/msgIngester.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import asyncio
import logging
import os
import re

from confluent_kafka import KafkaError
from lsst.ctrl.oods.bucketMessage import BucketMessage
Expand Down Expand Up @@ -85,6 +87,9 @@ def __init__(self, mainConfig, csc=None):
self.tasks = []
self.dequeue_task = None

self.regex = re.compile(os.environ.get("DATASET_REGEXP", r".*\.(fits|fits.fz)$"))
LOGGER.info(f"Ingesting files matching regular expression {self.regex.pattern}")

def get_butler_clean_tasks(self):
"""Get a list of all butler run_task methods
Expand Down Expand Up @@ -140,6 +145,9 @@ def stop_tasks(self):
task.cancel()
self.tasks = []

def filter_by_regex(self, files):
return [s for s in files if self.regex.search(s)]

async def dequeue_and_ingest_files(self):
self.running = True
while self.running:
Expand All @@ -152,6 +160,8 @@ async def dequeue_and_ingest_files(self):
else:
raise Exception(f"KafkaError = {m.error().code()}")
rps = self._gather_all_resource_paths(m)
if rps is None:
continue
resources.extend(rps)
await self.ingest(resources)

Expand All @@ -162,6 +172,12 @@ def _gather_all_resource_paths(self, m):
# extract all urls within this message
msg = BucketMessage(m.value())

rp_list = [ResourcePath(url) for url in msg.extract_urls()]
urls = [url for url in msg.extract_urls()]

filtered_urls = self.filter_by_regex(urls)

if filtered_urls:
rps = [ResourcePath(url) for url in filtered_urls]
return rps

return rp_list
return None

0 comments on commit de1224a

Please sign in to comment.