diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
new file mode 100644
index 00000000..59770d0b
--- /dev/null
+++ b/.github/workflows/build.yaml
@@ -0,0 +1,26 @@
+name: CI build of all containers
+on:
+ push:
+ branches:
+ - main
+ tags:
+ - "*"
+ pull_request:
+
+jobs:
+ push:
+ runs-on: ubuntu-latest
+ permissions:
+ packages: write
+ contents: read
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Build hinfo
+ uses: lsst-sqre/build-and-push-to-ghcr@v1
+ with:
+ image: ${{ github.repository }}-hinfo
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ dockerfile: Dockerfile.hinfo
diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml
index ce34a7bf..3733b421 100644
--- a/.github/workflows/lint.yaml
+++ b/.github/workflows/lint.yaml
@@ -8,12 +8,12 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
- name: Set up Python
- uses: actions/setup-python@v2
+ uses: actions/setup-python@v5
with:
- python-version: 3.9
+ python-version: '3.11'
- name: Install
run: pip install -r <(curl https://raw.githubusercontent.com/lsst/linting/main/requirements.txt)
diff --git a/Dockerfile.hinfo b/Dockerfile.hinfo
new file mode 100644
index 00000000..e3155876
--- /dev/null
+++ b/Dockerfile.hinfo
@@ -0,0 +1,22 @@
+ARG RUBINENV_VERSION=8.0.0
+FROM lsstsqre/newinstall:${RUBINENV_VERSION}
+ARG OBS_LSST_VERSION
+ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_06}
+USER lsst
+RUN source loadLSST.bash && mamba install aiokafka httpx
+RUN source loadLSST.bash && pip install kafkit
+RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst
+COPY python/lsst/consdb/hinfo.py ./hinfo/
+
+# Environment variables that must be set:
+# INSTRUMENT: LATISS, LSSTComCam, LSSTComCamSim, LSSTCam
+# POSTGRES_URL: SQLAlchemy connection URL
+# KAFKA_BOOTSTRAP: host:port of bootstrap server
+# KAFKA_PASSWORD: password for SASL_PLAIN authentication
+# SCHEMA_URL: Kafkit registry schema URL
+# Optional environment variables:
+# BUCKET_PREFIX: set to "rubin:" at USDF, default is ""
+# KAFKA_GROUP_ID: name of consumer group, default is "consdb-consumer"
+# KAFKA_USERNAME: username for SASL_PLAIN authentication, default is "consdb"
+
+ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ./hinfo/hinfo.py" ]
diff --git a/Dockerfile.server b/Dockerfile.server
new file mode 100644
index 00000000..9c2dd9c5
--- /dev/null
+++ b/Dockerfile.server
@@ -0,0 +1,8 @@
+FROM python:3.11
+RUN pip install flask gunicorn sqlalchemy
+WORKDIR /consdb-server
+COPY src/server.py /consdb-server/
+# Environment variables that must be set:
+# POSTGRES_URL
+ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "server:app" ]
+
diff --git a/README.rst b/README.rst
index 4cebb847..20c06bb2 100644
--- a/README.rst
+++ b/README.rst
@@ -2,4 +2,6 @@
consdb
######
-This CSC listens for SAL events, executes EFD queries when they arrive, and writes results to columns in relational database tables in the Consolidated Database.
+Scripts and services for generating the Summit Visit Database and Consolidated Database (ConsDB), including summarizing the Engineering and Facilities Database (EFD).
+
+See also DMTN-227.lsst.io
diff --git a/SConstruct b/SConstruct
deleted file mode 100644
index 378aa934..00000000
--- a/SConstruct
+++ /dev/null
@@ -1,4 +0,0 @@
-# -*- python -*-
-from lsst.sconsUtils import scripts
-# Python-only package
-scripts.BasicSConstruct("consdb", disableCc=True, noCfgFile=True)
diff --git a/bin.src/SConscript b/bin.src/SConscript
deleted file mode 100644
index e00724c6..00000000
--- a/bin.src/SConscript
+++ /dev/null
@@ -1,3 +0,0 @@
-# -*- python -*-
-from lsst.sconsUtils import scripts
-scripts.BasicSConscript.shebang()
diff --git a/python/lsst/consdb/__init__.py b/python/lsst/consdb/__init__.py
index 810dc34f..073f2658 100644
--- a/python/lsst/consdb/__init__.py
+++ b/python/lsst/consdb/__init__.py
@@ -19,7 +19,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-try:
- from .version import * # Generated by sconsUtils
-except ImportError:
- __version__ = "?"
+# from .version import * # Generated by sconsUtils
\ No newline at end of file
diff --git a/python/lsst/consdb/client.py b/python/lsst/consdb/client.py
new file mode 100644
index 00000000..fa7d8fcb
--- /dev/null
+++ b/python/lsst/consdb/client.py
@@ -0,0 +1,58 @@
+import os
+from pandas import DataFrame
+import requests
+from requests.exceptions import RequestException
+from typing import Any, Iterable
+from urllib.parse import urljoin
+
+session = requests.Session()
+base_url = os.environ["CONSDB_URL"]
+
+
+def insert(table: str, values: dict[str, Any], **kwargs):
+ values.update(kwargs)
+ # check values against schema for table
+ data = {"table": table, "values": values}
+ url = urljoin(base_url, "insert")
+ try:
+ response = requests.post(url, json=data)
+ except RequestException as e:
+ raise e
+ response.raise_for_status()
+
+
+def query(
+ tables: str | Iterable[str],
+ columns: str | Iterable[str],
+ *,
+ where: str | None = None,
+ join: str | None = None
+) -> list[Any]:
+ if isinstance(tables, str):
+ tables = [tables]
+ if isinstance(columns, str):
+ columns = [columns]
+ url = urljoin(base_url, "query")
+ data = {"tables": tables, "columns": columns, "where": where, "join": join}
+ try:
+ response = requests.post(url, json=data)
+ except RequestException as e:
+ raise e
+ try:
+ response.raise_for_status()
+ except Exception as ex:
+ print(response.content.decode())
+ raise ex
+ arr = response.json()
+ return DataFrame(arr[1:], columns=arr[0])
+
+
+def schema(table: str):
+ url = urljoin(base_url, "schema/")
+ url = urljoin(url, table)
+ try:
+ response = requests.get(url)
+ except RequestException as e:
+ raise e
+ response.raise_for_status()
+ return response.json()
diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py
new file mode 100644
index 00000000..9e06e817
--- /dev/null
+++ b/python/lsst/consdb/hinfo.py
@@ -0,0 +1,363 @@
+import asyncio
+import logging
+import os
+import random
+import re
+import sys
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Any, Sequence
+
+import aiokafka
+import astropy.time
+import httpx
+import kafkit.registry
+import kafkit.registry.httpx
+import yaml
+from astro_metadata_translator import ObservationInfo
+from lsst.resources import ResourcePath
+from sqlalchemy import MetaData, Table, create_engine
+from sqlalchemy.dialects.postgresql import insert
+
+###############################
+# Header Processing Functions #
+###############################
+
+
+def ninety_minus(angle: float) -> float:
+ return 90.0 - angle
+
+
+def tai_convert(t: str) -> datetime:
+ return astropy.time.Time(t, format="isot", scale="tai").datetime
+
+
+def tai_mean(start: str, end: str) -> datetime:
+ s = astropy.time.Time(start, format="isot", scale="tai")
+ e = astropy.time.Time(end, format="isot", scale="tai")
+ return (s + (e - s) / 2).datetime
+
+
+def mean(*iterable: float) -> Any:
+ return sum(iterable) / len(iterable)
+
+
+def logical_or(*bools: int | str | None) -> bool:
+ return any([b == 1 or b == "1" for b in bools])
+
+
+#################################
+# Header Mapping Configurations #
+#################################
+
+# Non-instrument-specific mapping to column name from Header Service keyword
+KW_MAPPING: dict[str, str | Sequence] = {
+ "exposure_name": "OBSID",
+ "controller": "CONTRLLR",
+ "seq_num": "SEQNUM",
+ "band": "FILTBAND",
+ "ra": "RA",
+ "decl": "DEC",
+ "skyrotation": "ROTPA",
+ "azimuth_start": "AZSTART",
+ "azimuth_end": "AZEND",
+ "azimuth": (mean, "AZSTART", "AZEND"),
+ "altitude_start": (ninety_minus, "ELSTART"),
+ "altitude_end": (ninety_minus, "ELEND"),
+ "altitude": (mean, (ninety_minus, "ELSTART"), (ninety_minus, "ELEND")),
+ "zenithdistance_start": "ELSTART",
+ "zenithdistance_end": "ELEND",
+ "zenithdistance": (mean, "ELSTART", "ELEND"),
+ "expmidpt": (tai_mean, "DATE-BEG", "DATE-END"),
+ "expmidptmjd": (mean, "MJD-BEG", "MJD-END"),
+ "obsstart": (tai_convert, "DATE-BEG"),
+ "obsstartmjd": "MJD-BEG",
+ "obsend": (tai_convert, "DATE-END"),
+ "obsendmjd": "MJD-END",
+ "exptime": "EXPTIME",
+ "shuttime": "SHUTTIME",
+ "darktime": "DARKTIME",
+ "group_id": "GROUPID",
+ "curindex": "CURINDEX",
+ "maxindex": "MAXINDEX",
+ "imgtype": "IMGTYPE",
+ "emulated": (logical_or, "EMUIMAGE"),
+ "science_program": "PROGRAM",
+ "observation_reason": "REASON",
+ "target_name": "OBJECT",
+ "airtemp": "AIRTEMP",
+ "pressure": "PRESSURE",
+ "humidity": "HUMIDITY",
+ "wind_speed": "WINDSPD",
+ "wind_dir": "WINDDIR",
+ "dimm_seeing": "SEEING",
+}
+
+# Instrument-specific mapping to column name from Header Service keyword
+LATISS_MAPPING: dict[str, str | Sequence] = {
+ "focus_z": "FOCUSZ",
+ "dome_azimuth": "DOMEAZ",
+ "shut_lower": "SHUTLOWR",
+ "shut_upper": "SHUTUPPR",
+ # "temp_set": "TEMP_SET",
+ "simulated": (
+ logical_or,
+ "SIMULATE ATMCS",
+ "SIMULATE ATHEXAPOD",
+ "SIMULAT ATPNEUMATICS",
+ "SIMULATE ATDOME",
+ "SIMULATE ATSPECTROGRAPH",
+ ),
+}
+
+LSSTCOMCAM_MAPPING: dict[str, str | Sequence] = {}
+LSSTCOMCAMSIM_MAPPING: dict[str, str | Sequence] = {}
+LSSTCAM_MAPPING: dict[str, str | Sequence] = {}
+
+# LATISS_DETECTOR_MAPPING = {
+# "ccdtemp": "CCDTEMP",
+# }
+
+# Mapping to column name from ObservationInfo keyword
+OI_MAPPING = {
+ "exposure_id": "exposure_id",
+ "physical_filter": "physical_filter",
+ "airmass": "boresight_airmass",
+ "day_obs": "observing_day",
+}
+
+# Mapping from instrument name to Header Service topic name
+TOPIC_MAPPING = {
+ "LATISS": "ATHeaderService",
+ "LSSTComCam": "CCHeaderService",
+ "LSSTComCamSim": "CCHeaderService",
+ "LSSTCam": "MTHeaderService",
+}
+
+
+########################
+# Processing Functions #
+########################
+
+
+def process_column(column_def: str | Sequence, info: dict) -> Any:
+ """Generate a column value from one or more keyword values in a dict.
+
+ The dict may contain FITS headers or ObservationInfo.
+
+ Parameters
+ ----------
+ column_def: `str`
+ Definition of the column. Either a string specifying the info keyword
+ to use as the column value, or a tuple containing a function to apply
+ to the values of one or more info keywords or function application
+ tuples.
+ info: `dict`
+ A dictionary containing keyword/value pairs.
+
+ Returns
+ -------
+ column_value: `Any`
+ The value to use for the column.
+ None if any input value is missing.
+ """
+ if type(column_def) is str:
+ if column_def in info:
+ return info[column_def]
+ elif type(column_def) is tuple:
+ fn = column_def[0]
+ arg_values = [process_column(a, info) for a in column_def[1:]]
+ if all(arg_values):
+ return fn(*arg_values)
+
+
+def process_resource(resource: ResourcePath) -> None:
+ """Process a header resource.
+
+ Uses configured mappings and the ObservationInfo translator to generate
+ column values that are inserted into the exposure table.
+
+ Parameters
+ ----------
+ resource: `ResourcePath`
+ Path to the Header Service header resource.
+ """
+ global KW_MAPPING, OI_MAPPING, instrument_mapping, translator
+ global engine, exposure_table
+
+ exposure_rec = dict()
+
+ info = dict()
+ content = yaml.safe_load(resource.read())
+ for header in content["PRIMARY"]:
+ info[header["keyword"]] = header["value"]
+ for column, column_def in KW_MAPPING.items():
+ exposure_rec[column] = process_column(column_def, info)
+ for column, column_def in instrument_mapping.items():
+ exposure_rec[column] = process_column(column_def, info)
+
+ obs_info_obj = ObservationInfo(info, translator_class=translator)
+ obs_info = dict()
+ for keyword in OI_MAPPING.values():
+ obs_info[keyword] = getattr(obs_info_obj, keyword)
+ for field, keyword in OI_MAPPING.items():
+ exposure_rec[field] = process_column(keyword, obs_info)
+
+ logging.debug(f"Inserting {exposure_rec}")
+ stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing()
+ with engine.begin() as conn:
+ conn.execute(stmt)
+
+ # TODO: exposure_detector table processing
+ # det_info = dict()
+ # for header in content["R00S00_PRIMARY"]:
+ # det_info[header["keyword"]] = header["value"]
+ # for field, keyword in LATISS_DETECTOR_MAPPING.items():
+ # det_exposure_rec[field] = process_column(keyword, det_info)
+
+
+def process_date(day_obs: str) -> None:
+ """Process all headers from a given observation day (as YYYY-MM-DD).
+
+ Parameters
+ ----------
+ day_obs: `str`
+ Observation day to process, as YYYY-MM-DD.
+ """
+ global TOPIC_MAPPING, bucket_prefix, instrument
+
+ date = "/".join(day_obs.split("-"))
+ d = ResourcePath(
+ f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/"
+ )
+ for dirpath, dirnames, filenames in d.walk():
+ for fname in filenames:
+ process_resource(d.join(fname))
+
+
+##################
+# Initialization #
+##################
+
+
+@dataclass
+class KafkaConfig:
+ """Class for configuring Kafka-related items."""
+
+ bootstrap: str
+ group_id: str
+ username: str
+ password: str
+ schema_url: str
+
+
+def get_kafka_config() -> KafkaConfig:
+ return KafkaConfig(
+ bootstrap=os.environ["KAFKA_BOOTSTRAP"],
+ group_id=os.environ.get("KAFKA_GROUP_ID", "consdb-consumer"),
+ username=os.environ.get("KAFKA_USERNAME", "consdb"),
+ password=os.environ["KAFKA_PASSWORD"],
+ schema_url=os.environ["SCHEMA_URL"],
+ )
+
+
+logging.basicConfig(stream=sys.stderr, level=logging.INFO)
+
+instrument = os.environ.get("INSTRUMENT", "LATISS")
+match instrument:
+ case "LATISS":
+ from lsst.obs.lsst.translators import LatissTranslator
+
+ translator = LatissTranslator
+ instrument_mapping = LATISS_MAPPING
+ case "LSSTComCam":
+ from lsst.obs.lsst.translators import LsstComCamTranslator
+
+ translator = LsstComCamTranslator
+ instrument_mapping = LSSTCOMCAM_MAPPING
+ case "LSSTComCamSim":
+ from lsst.obs.lsst.translators import LsstComCamSimTranslator
+
+ translator = LsstComCamSimTranslator
+ instrument_mapping = LSSTCOMCAMSIM_MAPPING
+ case "LSSTCam":
+ from lsst.obs.lsst.translators import LsstCamTranslator
+
+ translator = LsstCamTranslator
+ instrument_mapping = LSSTCAM_MAPPING
+logging.info(f"Instrument = {instrument}")
+
+host = os.environ.get("DB_HOST")
+passwd = os.environ.get("DB_PASS")
+user = os.environ.get("DB_USER")
+dbname = os.environ.get("DB_NAME")
+pg_url = ""
+if host and passwd and user and dbname:
+ logging.info(f"Connecting to {host} as {user} to {dbname}")
+ pg_url = f"postgresql://{user}:{passwd}@{host}/{dbname}"
+else:
+ pg_url = os.environ.get(
+ "POSTGRES_URL", "postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1"
+ )
+ logging.info(f"Using POSTGRES_URL {user} {host} {dbname}")
+engine = create_engine(pg_url)
+metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}")
+exposure_table = Table("exposure", metadata_obj, autoload_with=engine)
+
+
+bucket_prefix = os.environ.get("BUCKET_PREFIX", "")
+if bucket_prefix:
+ os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"
+
+
+topic = f"lsst.sal.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable"
+
+
+#################
+# Main Function #
+#################
+
+
+async def main() -> None:
+ """Handle Header Service largeFileObjectAvailable messages."""
+ global bucket_prefix
+
+ kafka_config = get_kafka_config()
+ async with httpx.AsyncClient() as client:
+ schema_registry = kafkit.registry.httpx.RegistryApi(
+ http_client=client, url=kafka_config.schema_url
+ )
+ deserializer = kafkit.registry.Deserializer(registry=schema_registry)
+
+ consumer = aiokafka.AIOKafkaConsumer(
+ topic,
+ bootstrap_servers=kafka_config.bootstrap,
+ group_id=kafka_config.group_id,
+ auto_offset_reset="earliest",
+ isolation_level="read_committed",
+ security_protocol="SASL_PLAINTEXT",
+ sasl_mechanism="SCRAM-SHA-512",
+ sasl_plain_username=kafka_config.username,
+ sasl_plain_password=kafka_config.password,
+ )
+
+ await consumer.start()
+ logging.info("Consumer started")
+ try:
+ async for msg in consumer:
+ message = (await deserializer.deserialize(msg.value))["message"]
+ logging.debug(f"Received message {message}")
+ url = message["url"]
+ if bucket_prefix:
+ url = re.sub(r"s3://", "s3://" + bucket_prefix, url)
+ resource = ResourcePath(url)
+ logging.info(f"Waiting for {url}")
+ while not resource.exists():
+ await asyncio.sleep(random.uniform(0.1, 2.0))
+ process_resource(resource)
+ logging.info(f"Processed {url}")
+ finally:
+ await consumer.stop()
+
+
+asyncio.run(main())
diff --git a/python/lsst/consdb/server.py b/python/lsst/consdb/server.py
new file mode 100644
index 00000000..5344db69
--- /dev/null
+++ b/python/lsst/consdb/server.py
@@ -0,0 +1,56 @@
+from flask import Flask, request
+from sqlalchemy import create_engine, MetaData
+import sqlalchemy.exc
+
+
+app = Flask(__name__)
+engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1")
+metadata_obj = MetaData(schema="cdb_latiss")
+metadata_obj.reflect(engine)
+
+
+@app.post("/insert")
+def insert():
+ info = request.json
+ table = info["table"]
+ valdict = info["values"]
+ keylist = list(valdict.keys())
+ valuelist = list(valdict.values())
+ placeholders = ",".join(["?"] * len(valdict))
+ # check schema
+
+ with engine.begin() as conn:
+ conn.exec_driver_sql(
+ f"INSERT OR UPDATE INTO ? ({placeholders}) VALUES ({placeholders})",
+ [table] + keylist + valuelist,
+ )
+ return ("OK", 200)
+
+
+@app.post("/query")
+def query():
+ info = request.json
+ tables = ",".join(info["tables"])
+ columns = ",".join(info["columns"])
+ if "where" in info:
+ where = "WHERE " + info["where"]
+ if ";" in where:
+ return ("Cannot create query containing more than one statement", 403)
+ with engine.begin() as conn:
+ try:
+ cursor = conn.exec_driver_sql(f"SELECT {columns} FROM {tables} {where}")
+ first = True
+ result = []
+ for row in cursor:
+ if first:
+ result.append(row._fields)
+ first = False
+ result.append(list(row))
+ return result
+ except sqlalchemy.exc.DBAPIError as e:
+ return (str(e), 500)
+
+
+@app.get("/schema/
")
+def schema(table: str):
+ return [(c.name, str(c.type), c.doc) for c in metadata_obj.tables[table.lower()].columns]
diff --git a/python/lsst/consdb/summarize_efd.py b/python/lsst/consdb/summarize_efd.py
new file mode 100644
index 00000000..f8da6ae0
--- /dev/null
+++ b/python/lsst/consdb/summarize_efd.py
@@ -0,0 +1,185 @@
+import argparse
+from typing import TYPE_CHECKING, Any, Callable
+
+import astropy.time
+import lsst_efd_client
+import yaml
+from lsst.daf.butler import Butler
+from sqlalchemy import create_engine
+
+if TYPE_CHECKING:
+ import lsst.daf.butler
+ import pandas
+ import sqlalchemy
+
+
+class Summary:
+ # TODO define summary
+ pass
+
+
+# TODO add all summarizing functions
+def gen_mean(
+ config: dict[str, Any]
+) -> Callable[[pandas.DataFrame, astropy.time.Time, astropy.time.Time], Summary]:
+ def do(
+ series: pandas.DataFrame, start: astropy.time.Time, end: astropy.time.Time
+ ) -> Summary:
+ return Summary()
+
+ return do
+
+
+FUNCTION_GENERATORS = dict(mean=gen_mean)
+
+
+class EfdValues:
+ def __init__(
+ self,
+ config: dict[str, Any],
+ window: astropy.time.TimeDelta,
+ series: pandas.DataFrame,
+ ):
+ self._entries = series
+ self._sum_function = FUNCTION_GENERATORS[config["function"]](config)
+ self._window = window
+
+ def summarize(self, start: astropy.time.Time, end: astropy.time.Time) -> Any:
+ return self._sum_function(
+ self._entries, start - self._window, end + self._window
+ )
+
+
+class Records:
+ def __init__(self, db: sqlalchemy.Engine):
+ self._db = db
+
+ def add(
+ self, dim: lsst.daf.butler.DimensionRecord, topic: dict[str, Any], summary: Any
+ ) -> None:
+ pass
+
+ def write(self, table: str) -> None:
+ pass
+
+
+def read_config(config_name: str) -> dict[str, Any]:
+ with open(config_name, "r") as f:
+ return yaml.safe_load(f)
+
+
+def get_efd_values(
+ efd: lsst_efd_client.EfdClient,
+ topic: dict[str, Any],
+ start: astropy.time.Time,
+ end: astropy.time.Time,
+) -> pandas.DataFrame:
+ window = astropy.time.TimeDelta(topic.get("window", 0.0), format="sec")
+ series = efd.select_time_series(
+ topic["name"],
+ topic["fields"],
+ start - window,
+ end + window,
+ topic.get("index", None),
+ )
+ return EfdValues(topic, window, series)
+
+
+def process_interval(
+ butler: Butler,
+ db: sqlalchemy.Engine,
+ efd: lsst_efd_client.EfdClient,
+ config: dict[str, Any],
+ instrument: str,
+ start_time: str,
+ end_time: str,
+) -> None:
+ start = astropy.time.Time(start_time, format="isot")
+ end = astropy.time.Time(end_time, format="isot")
+
+ exposure_list = []
+ visit_list = []
+ min_topic_time = end
+ max_topic_time = start
+
+ where_clause = "instrument=instr and timespan OVERLAPS (start, end)"
+
+ for e in butler.queryDimensionRecords(
+ "exposure",
+ where=where_clause,
+ bind=dict(instr=instrument, start=start, end=end),
+ ):
+ if e.timespan.end < end:
+ exposure_list.append(e)
+ min_topic_time = min(e.timespan.begin, min_topic_time)
+ max_topic_time = max(e.timespan.begin, max_topic_time)
+
+ for v in butler.queryDimensionRecords(
+ "visit", where=where_clause, bind=dict(instr=instrument, start=start, end=end)
+ ):
+ if v.timespan.end < end:
+ visit_list.append(v)
+ min_topic_time = min(v.timespan.begin, min_topic_time)
+ max_topic_time = max(v.timespan.begin, max_topic_time)
+
+ exposure_records = Records(db)
+ visit_records = Records(db)
+ for topic in config["topics"]:
+ efd_values = get_efd_values(efd, topic, min_topic_time, max_topic_time)
+ for e in exposure_list:
+ summary = efd_values.summarize(e.timespan.begin, e.timespan.end)
+ exposure_records.add(e, topic, summary)
+ for v in visit_list:
+ summary = efd_values.summarize(v.timespan.begin, v.timespan.end)
+ visit_records.add(v, topic, summary)
+
+ exposure_records.write(config["exposure_table"])
+ visit_records.write(config["visit_table"])
+
+
+def build_argparser() -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(description="Summarize EFD topics in a time range")
+ parser.add_argument(
+ "-c", "--config", dest="config_name", required=True, help="config YAML"
+ )
+ parser.add_argument(
+ "-i", "--instrument", dest="instrument", required=True, help="instrument name"
+ )
+ parser.add_argument(
+ "-s",
+ "--start",
+ dest="start_time",
+ required=True,
+ help="start time (ISO, YYYY-MM-DDTHH:MM:SS)",
+ )
+ parser.add_argument(
+ "-e",
+ "--end",
+ dest="end_time",
+ required=True,
+ help="end time (ISO, YYYY-MM-DDTHH:MM:SS)",
+ )
+ parser.add_argument("-r", "--repo", dest="repo", required=True, help="Butler repo")
+ parser.add_argument(
+ "-d",
+ "--db",
+ dest="db_conn_str",
+ required=True,
+ help="Consolidated Database connection string",
+ )
+ parser.add_argument(
+ "-E", "--efd", dest="efd_conn_str", required=True, help="EFD connection string"
+ )
+ return parser
+
+
+def main() -> None:
+ parser = build_argparser()
+ args = parser.parse_args()
+ butler = Butler(args.repo)
+ db = create_engine(args.db_conn_str)
+ efd = lsst_efd_client.EfdClient(args.efd_conn_str)
+ config = read_config(args.config_name)
+ process_interval(
+ butler, db, efd, config, args.instrument, args.start_time, args.end_time
+ )
diff --git a/ups/consdb.table b/ups/consdb.table
deleted file mode 100644
index e3b19238..00000000
--- a/ups/consdb.table
+++ /dev/null
@@ -1,9 +0,0 @@
-# List EUPS dependencies of this package here.
-# - Any package whose API is used directly should be listed explicitly.
-# - Common third-party packages can be assumed to be recursively included by
-# the "sconsUtils" package.
-setupRequired(sconsUtils)
-
-# The following is boilerplate for all packages.
-# See https://dmtn-001.lsst.io for details on LSST_LIBRARY_PATH.
-envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)