From 6963625a344b2208ddee73457ba19927a8bdcf9c Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 22 May 2024 16:23:27 +0000 Subject: [PATCH 1/3] Fix ccdname usage. --- python/lsst/consdb/hinfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index b14d1685..118d0115 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -224,7 +224,7 @@ def fp_region(imgtype: str, ra: float, dec: float, rotpa: float) -> str | None: "ccdexposure_id": (ccdexposure_id, "exposure_id", "detector"), "exposure_id": "exposure_id", "detector": "detector", - "s_region": (ccd_region, "IMGTYPE", "RA", "DEC", "ROTPA", "_CCDNAME"), + "s_region": (ccd_region, "IMGTYPE", "RA", "DEC", "ROTPA", "ccdname"), } # Mapping to column name from ObservationInfo keyword From 3cae561434c7768d80eb31d59821f3acbcdda27b Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 22 May 2024 16:24:32 +0000 Subject: [PATCH 2/3] Fix formatting of docstrings. --- python/lsst/consdb/hinfo.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 118d0115..1a190abe 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -256,17 +256,17 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: Parameters ---------- - column_def: `str` + column_def : `str` Definition of the column. Either a string specifying the info keyword to use as the column value, or a tuple containing a function to apply to the values of one or more info keywords or function application tuples. - info: `dict` + info : `dict` A dictionary containing keyword/value pairs. Returns ------- - column_value: `Any` + column_value : `Any` The value to use for the column. None if any input value is missing. """ @@ -288,7 +288,7 @@ def process_resource(resource: ResourcePath) -> None: Parameters ---------- - resource: `ResourcePath` + resource : `ResourcePath` Path to the Header Service header resource. """ global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator @@ -335,7 +335,7 @@ def process_date(day_obs: str) -> None: Parameters ---------- - day_obs: `str` + day_obs : `str` Observation day to process, as YYYY-MM-DD. """ global TOPIC_MAPPING, bucket_prefix, instrument From 7f3e47284200e6c7878d107572b717c1c10e4ff5 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 22 May 2024 16:25:47 +0000 Subject: [PATCH 3/3] Add ccdexposure table and update capability. --- python/lsst/consdb/hinfo.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 1a190abe..19541a6a 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -280,7 +280,7 @@ def process_column(column_def: str | Sequence, info: dict) -> Any: return fn(*arg_values) -def process_resource(resource: ResourcePath) -> None: +def process_resource(resource: ResourcePath, update: bool = False) -> None: """Process a header resource. Uses configured mappings and the ObservationInfo translator to generate @@ -292,7 +292,7 @@ def process_resource(resource: ResourcePath) -> None: Path to the Header Service header resource. """ global KW_MAPPING, OI_MAPPING, instrument_mapping, det_mapping, translator - global engine, exposure_table + global engine, exposure_table, ccdexposure_table exposure_rec = dict() @@ -312,8 +312,12 @@ def process_resource(resource: ResourcePath) -> None: for field, keyword in OI_MAPPING.items(): exposure_rec[field] = process_column(keyword, obs_info) - logging.debug(f"Inserting {exposure_rec}") - stmt = insert(exposure_table).values(exposure_rec).on_conflict_do_nothing() + stmt = insert(exposure_table).values(exposure_rec) + if update: + stmt = stmt.on_conflict_do_update(index_elements=["exposure_id"], set_=exposure_rec) + else: + stmt = stmt.on_conflict_do_nothing() + logging.debug(str(stmt)) with engine.begin() as conn: conn.execute(stmt) @@ -326,11 +330,20 @@ def process_resource(resource: ResourcePath) -> None: det_info["detector"] = camera[ccdname].getId() for header in content[detector]: det_info[header["keyword"]] = header["value"] - for field, keyword in det_mapping.items(): - det_exposure_rec[field] = process_column(keyword, det_info) + for column, column_def in det_mapping.items(): + det_exposure_rec[column] = process_column(column_def, det_info) + + stmt = insert(ccdexposure_table).values(det_exposure_rec) + if update: + stmt = stmt.on_conflict_do_update(index_elements=["ccdexposure_id"], set_=det_exposure_rec) + else: + stmt = stmt.on_conflict_do_nothing() + logging.debug(str(stmt)) + with engine.begin() as conn: + conn.execute(stmt) -def process_date(day_obs: str) -> None: +def process_date(day_obs: str, update: bool = False) -> None: """Process all headers from a given observation day (as YYYY-MM-DD). Parameters @@ -344,7 +357,7 @@ def process_date(day_obs: str) -> None: d = ResourcePath(f"s3://{bucket_prefix}rubinobs-lfa-cp/{TOPIC_MAPPING[instrument]}/header/{date}/") for dirpath, dirnames, filenames in d.walk(): for fname in filenames: - process_resource(d.join(fname)) + process_resource(d.join(fname), update) ################## @@ -412,6 +425,7 @@ def get_kafka_config() -> KafkaConfig: engine = setup_postgres() metadata_obj = MetaData(schema=f"cdb_{instrument.lower()}") exposure_table = Table("exposure", metadata_obj, autoload_with=engine) +ccdexposure_table = Table("ccdexposure", metadata_obj, autoload_with=engine) bucket_prefix = os.environ.get("BUCKET_PREFIX", "")