Skip to content

Commit

Permalink
Initial vesion of xmatch_too_targets
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Feb 13, 2024
1 parent 55dc4c3 commit 5211b32
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/too/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import pathlib

import adbc_driver_postgresql.dbapi as dbapi
import peewee
import polars
from sdssdb.peewee import BaseModel
Expand Down Expand Up @@ -63,6 +62,8 @@ class ToO_Target(ToOBaseModel):
expiration_date = peewee.IntegerField()
observed = peewee.BooleanField()

_meta: peewee.Metadata

class Meta:
table_name = "too_target"

Expand Down Expand Up @@ -193,12 +194,12 @@ def load_too_targets(
else:
raise ValueError(f"Invalid file type {path.suffix!r}")

with dbapi.connect(database_uri) as conn:
current_targets = polars.read_database(
"SELECT * from catalogdb.too_target",
conn, # type: ignore
)
current_targets = current_targets.cast(too_dtypes) # type: ignore
current_targets = polars.read_database_uri(
"SELECT * from catalogdb.too_target",
database_uri,
engine="adbc",
)
current_targets = current_targets.cast(too_dtypes) # type: ignore

new_targets = targets.filter(~polars.col.too_id.is_in(current_targets["too_id"]))

Expand Down
131 changes: 131 additions & 0 deletions src/too/xmatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-02-12
# @Filename: xmatch.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import peewee
import polars
from sdssdb.connection import PeeweeDatabaseConnection
from sdssdb.peewee.sdss5db.catalogdb import Version

from too import log
from too.database import ToO_Target, get_database_uri


def xmatch_too_targets(
database: PeeweeDatabaseConnection,
version_plan: str | None = None,
):
"""Performs a cross-match of the ToO targets with the SDSS catalogues.
The routine performs the folling steps:
- Parses the ``too_target`` table.
- Identifies ToO entries with either a ``catalogid`` or an ``sdss_id`` and
manually them to the ``catalog_to_too_target`` table (if they are not
already there).
- For the remaining ToO entries, performs a cross-match with the SDSS
catalogues and adds the matches to the ``catalog_to_too_target`` table.
This is accomplished calling ``target_selection.XMatch`` in "addendum"
mode.
"""

assert database.connected, "Database is not connected"

too_target_schema: str = ToO_Target._meta.schema
too_target_table_name: str = ToO_Target._meta.table_name
catalog_to_target_table_name: str = f"catalog_to_{too_target_table_name}"

database_uri = get_database_uri(database.dbname, **database.connect_params)

# Some basic checks.
assert database.table_exists(
too_target_table_name,
schema=too_target_schema,
), f"Table {too_target_schema}.{too_target_table_name} does not exist."

assert database.table_exists(
catalog_to_target_table_name,
schema=too_target_schema,
), f"Table {too_target_schema}.{catalog_to_target_table_name} does not exist."

# Get version_id. This is all a bit silly since version_id has to be 31/1.0.0.
if version_plan is None:
version_id: int = Version.select(peewee.fn.MAX(Version.id)).scalar()
else:
version_id: int = Version.get(plan=version_plan).id

assert version_id == 31, "version_id must be 31 to allow using sdss_id."

too_unmatched = polars.read_database_uri(
f"""
SELECT t.* FROM {too_target_schema}.{too_target_table_name} t
LEFT OUTER JOIN {too_target_schema}.{catalog_to_target_table_name} c2t
ON (c2t.target_id = t.too_id
AND c2t.best IS TRUE
AND c2t.version_id = {version_id})
WHERE c2t.catalogid IS NULL
ORDER BY t.too_id
""",
database_uri,
engine="adbc",
)

if len(too_unmatched) == 0:
log.warning("All ToO targets are already matched.")
return

# Step 1: select targets with sdss_id and without catalogid.
# Populate the catalogid column.
too_catalogid = too_unmatched.filter(
polars.col("sdss_id").is_not_null() | polars.col("catalogid").is_not_null()
).select("too_id", "sdss_id", "catalogid")

too_sdss_id_catalogid = polars.read_database_uri(
f"""
SELECT t.too_id, s.sdss_id, s.catalogid31 AS catalogid
FROM {too_target_schema}.{too_target_table_name} t
JOIN catalogdb.sdss_id_stacked s ON (t.sdss_id = s.sdss_id)
ORDER BY t.too_id
""",
database_uri,
engine="adbc",
)
too_sdss_id_catalogid = too_sdss_id_catalogid.filter(
~polars.col("catalogid").is_in(too_catalogid["catalogid"])
)

too_catalogid = too_catalogid.join(
too_sdss_id_catalogid[["too_id", "catalogid"]],
on="too_id",
how="left",
)
too_catalogid = too_catalogid.with_columns(
catalogid=polars.col.catalogid.fill_null(polars.col.catalogid_right)
)
too_catalogid = too_catalogid.select("too_id", "catalogid")

# Step 2: insert the targets with catalogid into the catalog_to_too_target table.
log.info(
f"Adding {len(too_catalogid)} ToO targets with catalogid to "
f"{too_target_schema}.{catalog_to_target_table_name}."
)

too_catalogid = too_catalogid.rename({"too_id": "target_id"})
too_catalogid = too_catalogid.with_columns(
best=True,
version_id=polars.lit(version_id, dtype=polars.Int16),
)

too_catalogid.write_database(
f"{too_target_schema}.{catalog_to_target_table_name}",
database_uri,
if_table_exists="append",
engine="adbc",
)
51 changes: 51 additions & 0 deletions tests/test_xmatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego ([email protected])
# @Date: 2024-02-13
# @Filename: test_xmatch.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import polars
from conftest import DBNAME
from sdssdb.peewee.sdss5db import catalogdb

from too.database import get_database_uri, load_too_targets
from too.xmatch import xmatch_too_targets


def test_xmatch(too_mock: polars.DataFrame):
too_mock_sample = too_mock[0:10000]

# In the mock all entries with sdss_id also has a catalogid. Manually set
# one to null to test joining with sdss_id_mock.
n_catalogid = too_mock_sample.filter(polars.col.catalogid.is_not_null()).height

row_remove = too_mock_sample.filter(polars.col.catalogid.is_not_null()).head(1)
too_mock_sample.with_columns(
catalogid=polars.when(polars.col.too_id == row_remove[0, "too_id"])
.then(None)
.otherwise(polars.col.catalogid)
)

load_too_targets(too_mock_sample, get_database_uri(DBNAME))

xmatch_too_targets(catalogdb.database)

CatalogToToO_Target = catalogdb.database.models["catalogdb.catalog_to_too_target"]
n_catalogid_after = CatalogToToO_Target.select().count()

assert n_catalogid_after == n_catalogid

# Assert that the row from which we removed the catalogid has been added and its
# catalogid is the original one.
assert (
CatalogToToO_Target.select()
.where(
CatalogToToO_Target.target_id == row_remove[0, "too_id"],
CatalogToToO_Target.catalogid == row_remove[0, "catalogid"],
)
.count()
) == 1

0 comments on commit 5211b32

Please sign in to comment.