Skip to content

Commit

Permalink
Merge branch 'main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Apr 18, 2024
2 parents 934a710 + e2f5bd1 commit cf79627
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 150 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:

- name: Install Postgresql
uses: ikalnytskyi/action-setup-postgres@v5
with:
username: sdss
id: postgres

- name: Install Q3C
Expand Down
168 changes: 91 additions & 77 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/too/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
)
@server_options.option(
"--user",
default=None,
default="sdss",
help="The user to connect to the database.",
)
@click.option(
Expand All @@ -74,7 +74,7 @@ def too_cli(
dbname: str = "sdss5db",
host: str = "localhost",
port: int | None = None,
user: str | None = None,
user: str = "sdss",
verbose: bool = False,
write_log: str | None = None,
):
Expand Down
24 changes: 17 additions & 7 deletions src/too/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import os
import pathlib

import polars
Expand All @@ -30,16 +31,22 @@
"get_active_targets",
]

DEFAULT_USER: str = "sdss"
DEFAULT_HOST = "localhost"


def connect_to_database(
dbname: str,
host: str = "localhost",
host: str | None = None,
port: int | None = None,
user: str | None = None,
password: str | None = None,
):
"""Connects the ``sdssdb`` ``sdss5db`` models to the database."""

if port is None:
port = int(os.environ.get("PGPORT", 5432))

catalogdb.database.connect(
dbname,
host=host,
Expand All @@ -56,21 +63,24 @@ def connect_to_database(

def get_database_uri(
dbname: str,
host: str = "localhost",
host: str | None = None,
port: int | None = None,
user: str | None = None,
password: str | None = None,
):
"""Returns the URI to the database."""

if user is None and password is None:
user = user or str(os.environ.get("PGUSER", DEFAULT_USER))
host = host or str(os.environ.get("PGHOST", DEFAULT_HOST))
port = port or int(os.environ.get("PGPORT", 5432))

if user is None and password is None: # pragma: no cover
# This should never happen. user is always set.
auth: str = ""
elif user is not None and password is None:
elif password is None:
auth: str = f"{user}@"
elif user is not None and password is not None:
auth: str = f"{user}:{password}@"
else:
raise ValueError("Passing a password requires also passing a user.")
auth: str = f"{user}:{password}@"

host_port: str = f"{host or ''}" if port is None else f"{host or ''}:{port}"

Expand Down
77 changes: 46 additions & 31 deletions src/too/xmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@
def xmatch_too_targets(
database: PeeweeDatabaseConnection,
version_plan: str | None = None,
load_catalog: bool = True,
dry_run: bool = False,
overwrite=False,
keep_temp: bool = False,
):
"""Performs a cross-match of the ToO targets with the SDSS catalogues.
Expand All @@ -104,8 +105,11 @@ def xmatch_too_targets(
The database connection to use.
version_plan
The version plan to use. Defaults to the latest plan.
load_catalog
Whether to load the ``catalog`` table after the cross-match.
dry_run
Whether to load the ``catalog`` and ``catalog_to_too_target`` tables after
the cross-match.
overwrite
Deletes the temporary tables if they exist.
keep_temp
Whether to keep the temporary tables after the cross-match.
Expand Down Expand Up @@ -166,6 +170,28 @@ def xmatch_too_targets(
log.warning("All ToO targets are already matched.")
return

# Create the XMatch instance here. We'll need it to get the relational model.
xmatch_planner = XMatchPlanner.read(
database,
plan=TOO_XMATCH_PLAN,
config_file=TOO_XMATCH_CONFIG,
log=log,
log_path=None,
)

# Delete temporary tables or fail.
md5 = xmatch_planner.md5
for temp_table in ["catalog", "catalog_to_too_target"]:
temp_table_name = f"{temp_table}_{md5}"
temp_table_schema = xmatch_planner.temp_schema
temp_table_full = f"{temp_table_schema}.{temp_table_name}"
if database.table_exists(temp_table_name, schema=temp_table_schema):
if overwrite:
log.warning(f"Dropping table {temp_table_full}.")
database.execute_sql(f"DROP TABLE {temp_table_full};")
else:
raise RuntimeError(f"Table {temp_table_full} already exists.")

# Step 1: select targets with sdss_id and without catalogid.
# Populate the catalogid column.
too_catalogid = too_unmatched.filter(
Expand Down Expand Up @@ -196,10 +222,21 @@ def xmatch_too_targets(
)
too_catalogid = too_catalogid.select("too_id", "catalogid")

# Step 2: insert the targets with catalogid into the catalog_to_too_target table.
# Step 2: insert the targets with catalogid into the sanboxed (!)
# catalog_to_too_target table.
if len(too_catalogid) > 0:
rel_model_sb = xmatch_planner.get_relational_model(
ToO_Target,
sandboxed=True,
create=True,
temp=False,
)

rel_model_sb_tn = f"{rel_model_sb._meta.schema}.{rel_model_sb._meta.table_name}"

log.info(
f"Adding {len(too_catalogid)} ToO targets with catalogid to {too_rel_fqtn}."
f"Adding {len(too_catalogid)} ToO targets with "
f"catalogid to {rel_model_sb_tn}."
)

too_catalogid = too_catalogid.rename({"too_id": "target_id"})
Expand All @@ -209,23 +246,16 @@ def xmatch_too_targets(
)

too_catalogid.write_database(
too_rel_fqtn,
rel_model_sb_tn,
database_uri,
if_table_exists="append",
engine="adbc",
)

database.execute_sql(f"VACUUM ANALYZE {too_rel_fqtn};")
database.execute_sql(f"VACUUM ANALYZE {rel_model_sb_tn};")

# Step 3: cross-match the remaining targets.
log.info("Running cross-match for remaining ToO targets.")
xmatch_planner = XMatchPlanner.read(
database,
plan=TOO_XMATCH_PLAN,
config_file=TOO_XMATCH_CONFIG,
log=log,
log_path=None,
)

# Set the starting catalogid as the max of the ToO catalogids. This is because
# we always use run_id=9 for ToOs and we should have plenty of them (and if for
Expand All @@ -249,21 +279,6 @@ def xmatch_too_targets(
if max_cid:
xmatch_planner._max_cid = max_cid + 1

# If we have a temporary catalog table, we also need to check its max catalogid
# as those won't have been added to the real catalog table yet.
temp_table = xmatch_planner._temp_table
if database.table_exists(temp_table, schema="sandbox"):
max_cid_temp = database.execute_sql(
f"SELECT MAX(catalogid) FROM sandbox.{temp_table};"
).fetchone()[0]
if max_cid_temp is not None and max_cid_temp > xmatch_planner._max_cid:
xmatch_planner._max_cid = max_cid_temp + 1

xmatch_planner.run(load_catalog=load_catalog, keep_temp=keep_temp, force=True)
xmatch_planner.run(dry_run=dry_run, keep_temp=keep_temp, force=True)

if load_catalog is False:
from target_selection.xmatch import TempCatalog

return TempCatalog
else:
return Catalog
return xmatch_planner
10 changes: 9 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def max_cid():
def connect_and_revert_database(max_cid: int):
"""Reverts the database to the original state."""

mj5 = "62abc69fd3fad42d"

execute_sql = catalogdb.database.execute_sql

connect_to_database(DBNAME)
Expand All @@ -52,9 +54,15 @@ def connect_and_revert_database(max_cid: int):
execute_sql("TRUNCATE TABLE targetdb.carton;")
execute_sql("TRUNCATE TABLE targetdb.magnitude;")

execute_sql(f"DROP TABLE IF EXISTS sandbox.catalog_{mj5};")
execute_sql(f"DROP TABLE IF EXISTS sandbox.catalog_to_too_{mj5};")

execute_sql("DELETE FROM catalogdb.catalog WHERE lead = 'too_target';")

yield

execute_sql("DROP TABLE IF EXISTS sandbox.catalog_62abc69fd3fad42d;")
execute_sql(f"DROP TABLE IF EXISTS sandbox.catalog_{mj5};")
execute_sql(f"DROP TABLE IF EXISTS sandbox.catalog_to_too_{mj5};")
execute_sql(f"DELETE FROM catalogdb.catalog WHERE catalogid > {max_cid};")


Expand Down
17 changes: 8 additions & 9 deletions tests/scripts/sdss5db_too_test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,14 @@ CREATE TABLE targetdb.design_mode (
apogee_sky_neighbors_targets DOUBLE PRECISION[]
);

\copy catalogdb.catalog FROM PROGRAM 'gzip -dc catalog.csv.gz' WITH CSV HEADER;
\copy catalogdb.sdss_id_stacked FROM PROGRAM 'gzip -dc sdss_id_stacked.csv.gz' WITH CSV HEADER;
\copy catalogdb.catalog_to_gaia_dr3_source FROM PROGRAM 'gzip -dc catalog_to_gaia_dr3_source.csv.gz' WITH CSV HEADER;
-- \copy catalogdb.catalog_to_sdss_dr13_photoobj_primary FROM PROGRAM 'gzip -dc catalog_to_sdss_dr13_photoobj_primary.csv.gz' WITH CSV HEADER;
\copy catalogdb.catalog_to_twomass_psc FROM PROGRAM 'gzip -dc catalog_to_twomass_psc.csv.gz' WITH CSV HEADER;
\copy catalogdb.gaia_dr3_source FROM PROGRAM 'gzip -dc gaia_dr3_source.csv.gz' WITH CSV HEADER;
-- \copy catalogdb.sdss_dr13_photoobj FROM PROGRAM 'gzip -dc sdss_dr13_photoobj.csv.gz' WITH CSV HEADER;
\copy catalogdb.twomass_psc FROM PROGRAM 'gzip -dc twomass_psc.csv.gz' WITH CSV HEADER;

\copy catalogdb.catalog FROM PROGRAM '/usr/bin/gzip -dc catalog.csv.gz' WITH CSV HEADER;
\copy catalogdb.sdss_id_stacked FROM PROGRAM '/usr/bin/gzip -dc sdss_id_stacked.csv.gz' WITH CSV HEADER;
\copy catalogdb.catalog_to_gaia_dr3_source FROM PROGRAM '/usr/bin/gzip -dc catalog_to_gaia_dr3_source.csv.gz' WITH CSV HEADER;
-- \copy catalogdb.catalog_to_sdss_dr13_photoobj_primary FROM PROGRAM '/usr/bin/gzip -dc catalog_to_sdss_dr13_photoobj_primary.csv.gz' WITH CSV HEADER;
\copy catalogdb.catalog_to_twomass_psc FROM PROGRAM '/usr/bin/gzip -dc catalog_to_twomass_psc.csv.gz' WITH CSV HEADER;
\copy catalogdb.gaia_dr3_source FROM PROGRAM '/usr/bin/gzip -dc gaia_dr3_source.csv.gz' WITH CSV HEADER;
-- \copy catalogdb.sdss_dr13_photoobj FROM PROGRAM '/usr/bin/gzip -dc sdss_dr13_photoobj.csv.gz' WITH CSV HEADER;
\copy catalogdb.twomass_psc FROM PROGRAM '/usr/bin/gzip -dc twomass_psc.csv.gz' WITH CSV HEADER;
\copy targetdb.design_mode FROM 'design_mode.csv' WITH CSV HEADER;

ALTER TABLE catalogdb.catalog ADD PRIMARY KEY (catalogid);
Expand Down
14 changes: 7 additions & 7 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from __future__ import annotations

import os

import polars
import pytest
from conftest import DBNAME
Expand All @@ -25,6 +27,9 @@
from too.exceptions import ValidationError


PGPORT = os.environ.get("PGPORT", 5432)


def test_database_exists():
assert catalogdb.Catalog.table_exists()
assert catalogdb.database.dbname == DBNAME
Expand Down Expand Up @@ -61,8 +66,8 @@ def test_models_exist():
@pytest.mark.parametrize(
"dbname,user,password,host,port,expected",
[
("testdb", None, None, "localhost", None, "localhost/testdb"),
("testdb", "user", "1234", "localhost", None, "user:1234@localhost/testdb"),
("testdb", None, None, "localhost", PGPORT, f"sdss@localhost:{PGPORT}/testdb"),
("db", "user", "1234", "10.1.1.1", PGPORT, f"user:1234@10.1.1.1:{PGPORT}/db"),
("testdb", "user", None, "localhost", 5432, "user@localhost:5432/testdb"),
],
)
Expand All @@ -86,11 +91,6 @@ def test_get_database_uri(
assert uri == f"postgresql://{expected}"


def test_get_database_uri_password_fails():
with pytest.raises(ValueError):
get_database_uri("testdb", password="1234")


def test_validate_too_target_passes(too_mock: polars.DataFrame):
assert isinstance(validate_too_targets(too_mock), polars.DataFrame)

Expand Down
50 changes: 34 additions & 16 deletions tests/test_xmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ def test_xmatch_prepare(too_mock: polars.DataFrame):
def test_xmatch_1():
n_too = catalogdb.ToO_Target.select().count()

TempCatalog = xmatch_too_targets(
xmatch_planner = xmatch_too_targets(
catalogdb.database,
dry_run=True,
keep_temp=True,
load_catalog=False,
overwrite=True,
)

assert xmatch_planner is not None

TempCatalog = xmatch_planner.get_output_model(temporary=True)
TempC2T = xmatch_planner.get_relational_model(
catalogdb.ToO_Target,
sandboxed=True,
create=False,
)
assert TempCatalog is not None

CatalogToToO_Target = catalogdb.database.models["catalogdb.catalog_to_too_target"]
n_target_id = (
CatalogToToO_Target.select()
.where(CatalogToToO_Target.best >> 1)
.distinct(CatalogToToO_Target.target_id)
.count()
TempC2T.select().where(TempC2T.best >> 1).distinct(TempC2T.target_id).count()
)

assert n_target_id == n_too
Expand All @@ -61,27 +66,40 @@ def test_xmatch_2(too_mock: polars.DataFrame):
too_mock_sample = too_mock[100000:200000]
load_too_targets(too_mock_sample, catalogdb.database)

TempCatalog = xmatch_too_targets(
xmatch_planner = xmatch_too_targets(
catalogdb.database,
dry_run=True,
keep_temp=True,
load_catalog=False,
overwrite=True,
)

assert xmatch_planner is not None

TempCatalog = xmatch_planner.get_output_model(temporary=True)

assert TempCatalog is not None
assert TempCatalog.select().count() == 62748
assert TempCatalog.select().count() == 62867


def test_xmatch_3():
with pytest.raises(RuntimeError):
xmatch_too_targets(catalogdb.database, overwrite=False)

def test_xmatch_3(too_mock: polars.DataFrame):

def test_xmatch_4(too_mock: polars.DataFrame):
too_mock_sample = too_mock[300000:400000]
load_too_targets(too_mock_sample, catalogdb.database)

Model = xmatch_too_targets(catalogdb.database)
xmatch_planner = xmatch_too_targets(catalogdb.database, overwrite=True)
assert xmatch_planner is not None

Model = xmatch_planner.get_output_model(temporary=False)

assert Model is not None and Model == catalogdb.Catalog
assert Model.select().count() == 3092773
assert Model is not None and Model._meta.table_name == "catalog"
assert Model.select().count() == 3093109


def test_xmatch_4(caplog: pytest.LogCaptureFixture):
def test_xmatch_5(caplog: pytest.LogCaptureFixture):
xmatch_too_targets(catalogdb.database)

assert caplog.record_tuples[-1][2] == "All ToO targets are already matched."

0 comments on commit cf79627

Please sign in to comment.