Skip to content

Commit

Permalink
Merge branch 'tickets/DM-49010'
Browse files Browse the repository at this point in the history
  • Loading branch information
srp3rd committed Feb 20, 2025
2 parents 37de38f + 23085bd commit e19fe9e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
3 changes: 2 additions & 1 deletion etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ ingester:
class:
import : lsst.ctrl.oods.messageAttendant
name : MessageAttendant
repoDirectory : /tmp/repo/LSSTCAM
repoDirectory : s3://testprofile@oods
s3profile: testprofile
collections:
- LSSTCam/raw/all
cleanCollections:
Expand Down
14 changes: 11 additions & 3 deletions python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, config, csc=None):
self.scanInterval = self.config["scanInterval"]
self.collections = self.config["collections"]
self.cleanCollections = self.config.get("cleanCollections")
self.s3profile = self.config.get("s3profile", None)

LOGGER.info(f"Using Butler repo located at {repo}")
self.butlerConfig = repo
Expand Down Expand Up @@ -105,11 +106,15 @@ async def ingest(self, file_list):

# Ingest images.
await asyncio.sleep(0)
new_list = file_list
if self.s3profile:
# rewrite URI to add s3profile
new_list = [s.replace(netloc=f"{self.s3profile}@{s.netloc}") for s in file_list]
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
try:
LOGGER.info("about to ingest")
await loop.run_in_executor(executor, self.task.run, file_list)
await loop.run_in_executor(executor, self.task.run, new_list)
LOGGER.info("done with ingest")
except RuntimeError as re:
LOGGER.info(f"{re}")
Expand Down Expand Up @@ -230,8 +235,11 @@ async def clean(self):
collection = entry["collection"]
olderThan = entry["filesOlderThan"]
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
await loop.run_in_executor(executor, self.cleanCollection, collection, olderThan)
try:
with ThreadPoolExecutor() as executor:
await loop.run_in_executor(executor, self.cleanCollection, collection, olderThan)
except Exception as e:
LOGGER.error(e)

async def send_status_task(self):
LOGGER.debug("send_status_task started")
Expand Down

0 comments on commit e19fe9e

Please sign in to comment.