Skip to content

Commit

Permalink
Prototypes for JTM demo.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Feb 20, 2024
1 parent e6fd6bd commit 6d4d200
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 5 deletions.
2 changes: 1 addition & 1 deletion python/lsst/consdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .version import * # Generated by sconsUtils
# from .version import * # Generated by sconsUtils
58 changes: 58 additions & 0 deletions python/lsst/consdb/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
import os
from pandas import DataFrame
import requests
from typing import Any, Iterable
from urllib.parse import urljoin

session = requests.Session()
base_url = os.environ["CONSDB_URL"]


def insert(table: str, values: dict[str, Any], **kwargs):
values.update(kwargs)
# check values against schema for table
data = {"table": table, "values": values}
url = urljoin(base_url, "insert")
try:
response = requests.post(url, json=data)
except:
raise
response.raise_for_status()


def query(
tables: str | Iterable[str],
columns: str | Iterable[str],
*,
where: str | None = None,
join: str | None = None
) -> list[Any]:
if isinstance(tables, str):
tables = [tables]
if isinstance(columns, str):
columns = [columns]
url = urljoin(base_url, "query")
data = {"tables": tables, "columns": columns, "where": where, "join": join}
try:
response = requests.post(url, json=data)
except:
raise
try:
response.raise_for_status()
except:
print(response.content.decode())
raise
arr = response.json()
return DataFrame(arr[1:], columns=arr[0])


def schema(table: str):
url = urljoin(base_url, "schema/")
url = urljoin(url, table)
try:
response = requests.get(url)
except:
raise
response.raise_for_status()
return response.json()
10 changes: 6 additions & 4 deletions python/lsst/consdb/header_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def process_resource(resource: lsst.resources.ResourcePath) -> None:
content = json.loads(resource.read())
with engine.begin() as conn:
# TODO get all fields and tables, do as a transaction
conn.execute(
text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"),
[dict(a=content["something"], b=2, c=3, d=4, e=5)],
)
# conn.execute(
# text("INSERT INTO exposure (a, b, c, d, e)" " VALUES(:a, :b, :c, :d, :e)"),
# [dict(a=content["something"], b=2, c=3, d=4, e=5)],
# )
print(f"Processing {resource}: {content[0:100]}")
# TODO check result


Expand All @@ -51,6 +52,7 @@ async def main() -> None:
topic,
bootstrap_servers=kafka_cluster,
group_id=kafka_group_id,
auto_offset_reset="earliest",
)
await consumer.start()
try:
Expand Down
144 changes: 144 additions & 0 deletions python/lsst/consdb/hinfo-latiss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from datetime import datetime
import os
import sys
from typing import Any, Iterable

import yaml
from astropy.time import Time
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.dialects.postgresql import insert

from astro_metadata_translator import ObservationInfo
from lsst.resources import ResourcePath
from lsst.obs.lsst.translators import LatissTranslator

os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"

engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1")
metadata_obj = MetaData(schema="cdb_latiss")
exposure_table = Table("exposure", metadata_obj, autoload_with=engine)

def ninety_minus(angle: float) -> float:
return 90.0 - angle

def tai_convert(t: str) -> datetime:
return Time(t, format="isot", scale="tai").datetime

def tai_mean(start: str, end: str) -> datetime:
s = Time(start, format="isot", scale="tai")
e = Time(end, format="isot", scale="tai")
return (s + (e - s) / 2).datetime

def mean(*iterable: Iterable[Any]) -> Any:
return sum(iterable) / len(iterable)

def logical_or(*bools: Iterable[int | str | None]) -> bool:
return any([b == 1 or b == "1" for b in bools])


KW_MAPPING = {
"controller": "CONTRLLR",
"seq_num": "SEQNUM",
"band": "FILTBAND",
"ra": "RA",
"decl": "DEC",
"skyrotation": "ROTPA",
"azimuth_start": "AZSTART",
"azimuth_end": "AZEND",
"altitude_start": (ninety_minus, "ELSTART"),
"altitude_end": (ninety_minus, "ELEND"),
"zenithdistance_start": "ELSTART",
"zenithdistance_end": "ELEND",
"expmidpt": (tai_mean, "DATE-BEG", "DATE-END"),
"expmidptmjd": (mean, "MJD-BEG", "MJD-END"),
"obsstart": (tai_convert, "DATE-BEG"),
"obsstartmjd": "MJD-BEG",
"obsend": (tai_convert, "DATE-END"),
"obsendmjd": "MJD-END",
"exptime": "EXPTIME",
"shuttime": "SHUTTIME",
"darktime": "DARKTIME",
"group_id": "GROUPID",
"curindex": "CURINDEX",
"maxindex": "MAXINDEX",
"imgtype": "IMGTYPE",
"emulated": (logical_or, "EMUIMAGE"),
"science_program": "PROGRAM",
"observation_reason": "REASON",
"target_name": "OBJECT",
"airtemp": "AIRTEMP",
"pressure": "PRESSURE",
"humidity": "HUMIDITY",
"wind_speed": "WINDSPD",
"wind_dir": "WINDDIR",
"dimm_seeing": "SEEING",
}

LATISS_MAPPING = {
"focus_z": "FOCUSZ",
"dome_azimuth": "DOMEAZ",
"shut_lower": "SHUTLOWR",
"shut_upper": "SHUTUPPR",
# "temp_set": "TEMP_SET",
"simulated": (logical_or, "SIMULATE ATMCS", "SIMULATE ATHEXAPOD", "SIMULAT ATPNEUMATICS", "SIMULATE ATDOME", "SIMULATE ATSPECTROGRAPH"),
}

# LATISS_DETECTOR_MAPPING = {
# "ccdtemp": "CCDTEMP",
# }

OI_MAPPING = {
"exposure_id": "exposure_id",
"physical_filter": "physical_filter",
"airmass": "boresight_airmass",
"day_obs": "observing_day",
}


def process_keyword(keyword: str | tuple, info: dict) -> Any:
if type(keyword) == str:
if keyword in info:
return info[keyword]
elif type(keyword) == tuple:
fn = keyword[0]
args = keyword[1:]
if all([a in info for a in args]):
return fn(*[info[a] for a in args])

def process_resource(resource: ResourcePath) -> None:
content = yaml.safe_load(resource.read())
exposure_rec = dict()

info = dict()
for header in content["PRIMARY"]:
info[header["keyword"]] = header["value"]
for field, keyword in KW_MAPPING.items():
exposure_rec[field] = process_keyword(keyword, info)
for field, keyword in LATISS_MAPPING.items():
exposure_rec[field] = process_keyword(keyword, info)

# det_info = dict()
# for header in content["R00S00_PRIMARY"]:
# det_info[header["keyword"]] = header["value"]
# for field, keyword in LATISS_DETECTOR_MAPPING.items():
# det_exposure_rec[field] = process_keyword(keyword, det_info)

obs_info_obj = ObservationInfo(info, translator_class=LatissTranslator)
obs_info = dict()
for keyword in OI_MAPPING.values():
obs_info[keyword] = getattr(obs_info_obj, keyword)
for field, keyword in OI_MAPPING.items():
exposure_rec[field] = process_keyword(keyword, obs_info)

stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing()
with engine.begin() as conn:
result = conn.execute(stmt)

print(exposure_rec)


date = "/".join(sys.argv[1].split("-"))
d = ResourcePath(f"s3://rubin:rubinobs-lfa-cp/ATHeaderService/header/{date}/")
for dirpath, dirnames, filenames in d.walk():
for fname in filenames:
process_resource(d.join(fname))
57 changes: 57 additions & 0 deletions python/lsst/consdb/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from flask import Flask, request
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.dialects.postgresql import insert
import sqlalchemy.exc


app = Flask(__name__)
engine = create_engine("postgresql://usdf-butler.slac.stanford.edu:5432/lsstdb1")
metadata_obj = MetaData(schema="cdb_latiss")
metadata_obj.reflect(engine)


@app.post("/insert")
def insert():
info = request.json
table = info["table"]
valdict = info["values"]
keylist = list(valdict.keys())
valuelist = list(valdict.values())
placeholders = ",".join(["?"] * len(valdict))
# check schema

with engine.begin() as conn:
conn.exec_driver_sql(
f"INSERT OR UPDATE INTO ? ({placeholders}) VALUES ({placeholders})",
[table] + keylist + valuelist,
)
return ("OK", 200)


@app.post("/query")
def query():
info = request.json
tables = ",".join(info["tables"])
columns = ",".join(info["columns"])
if "where" in info:
where = "WHERE " + info["where"]
if ";" in where:
return ("Cannot create query containing more than one statement", 403)
with engine.begin() as conn:
try:
cursor = conn.exec_driver_sql(f"SELECT {columns} FROM {tables} {where}")
first = True
result = []
for row in cursor:
if first:
result.append(row._fields)
first = False
result.append(list(row))
return result
except sqlalchemy.exc.DBAPIError as e:
return (str(e), 500)


@app.get("/schema/<table>")
def schema(table: str):
return [(c.name, str(c.type), c.doc) for c in metadata_obj.tables[table.lower()].columns]

0 comments on commit 6d4d200

Please sign in to comment.