diff --git a/.github/workflows/formatting.yaml b/.github/workflows/formatting.yaml new file mode 100644 index 0000000..27f34a6 --- /dev/null +++ b/.github/workflows/formatting.yaml @@ -0,0 +1,11 @@ +name: Check Python formatting + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/formatting.yaml@main diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 2b20981..6c463ee 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -1,22 +1,16 @@ name: lint on: - - push - - pull_request + push: + branches: + - main + pull_request: jobs: - lint: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/lint.yaml@main + ruff: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: 3.7 - - - name: Install - run: pip install -r <(curl https://raw.githubusercontent.com/lsst/linting/main/requirements.txt) - - - name: Run linter - run: flake8 + - uses: actions/checkout@v3 + - uses: chartboost/ruff-action@v1 diff --git a/.github/workflows/mypy.yaml b/.github/workflows/mypy.yaml new file mode 100644 index 0000000..0849ea4 --- /dev/null +++ b/.github/workflows/mypy.yaml @@ -0,0 +1,11 @@ +name: Run mypy + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/mypy.yaml@main diff --git a/.github/workflows/rebase_checker.yaml b/.github/workflows/rebase_checker.yaml index 62aeca7..65516d9 100644 --- a/.github/workflows/rebase_checker.yaml +++ b/.github/workflows/rebase_checker.yaml @@ -1,4 +1,3 @@ ---- name: Check that 'main' is not merged into the development branch on: pull_request diff --git a/.github/workflows/yamllint.yaml b/.github/workflows/yamllint.yaml new file mode 100644 index 0000000..76ad875 --- /dev/null +++ b/.github/workflows/yamllint.yaml @@ -0,0 +1,11 @@ +name: Lint YAML Files + +on: + push: + branches: + - main + pull_request: + +jobs: + call-workflow: + uses: lsst/rubin_workflows/.github/workflows/yamllint.yaml@main diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a57c469 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-yaml + args: + - "--unsafe" + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/psf/black-pre-commit-mirror + rev: 24.2.0 + hooks: + - id: black + # It is recommended to specify the latest version of Python + # supported by your project here, or alternatively use + # pre-commit's default_language_version, see + # https://pre-commit.com/#top_level-default_language_version + language_version: python3.11 + - repo: https://github.com/pycqa/isort + rev: 5.13.2 + hooks: + - id: isort + name: isort (python) + - repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.3.1 + hooks: + - id: ruff diff --git a/README.rst b/README.rst index 5357088..c40ee96 100644 --- a/README.rst +++ b/README.rst @@ -2,6 +2,6 @@ dax_ppdb ######## - +``dax_ppdb`` is a package in the `LSST Science Pipelines `_. .. Add a brief (few sentence) description of what this package provides. diff --git a/bin.src/SConscript b/bin.src/SConscript index e00724c..6d4fd52 100644 --- a/bin.src/SConscript +++ b/bin.src/SConscript @@ -1,3 +1,4 @@ # -*- python -*- from lsst.sconsUtils import scripts + scripts.BasicSConscript.shebang() diff --git a/bin.src/ppdb-cli b/bin.src/ppdb-cli new file mode 100644 index 0000000..c1b7bae --- /dev/null +++ b/bin.src/ppdb-cli @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from lsst.dax.ppdb.cli import ppdb_cli + +if __name__ == "__main__": + ppdb_cli.main() diff --git a/bin.src/ppdb-replication b/bin.src/ppdb-replication new file mode 100644 index 0000000..a0ecc5c --- /dev/null +++ b/bin.src/ppdb-replication @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from lsst.dax.ppdb.cli import ppdb_replication + +if __name__ == "__main__": + ppdb_replication.main() diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..02dd1d5 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,28 @@ +[mypy] +ignore_errors = False +warn_unused_configs = True +warn_redundant_casts = True +ignore_missing_imports = False +disallow_untyped_defs = True +disallow_incomplete_defs = True + +[mypy-astropy.*] +ignore_missing_imports = True + +[mypy-lsst.daf.*] +ignore_missing_imports = True + +[mypy-lsst.sphgeom] +ignore_missing_imports = True + +[mypy-lsst.dax.ppdb.*] +ignore_missing_imports = False +ignore_errors = False +disallow_untyped_defs = True +disallow_incomplete_defs = True +strict_equality = True +warn_unreachable = True +warn_unused_ignores = True + +[mypy-lsst.dax.ppdb.version] +ignore_errors = True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..660bcaf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,111 @@ +[build-system] +requires = ["setuptools", "lsst-versions >= 1.3.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "lsst-dax-ppdb" +description = "Prompt Products Database for LSST AP pipeline." +license = {text = "GPLv3+ License"} +readme = "README.md" +authors = [ + {name="Rubin Observatory Data Management", email="dm-admin@lists.lsst.org"}, +] +classifiers = [ + "Intended Audience :: Science/Research", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Astronomy", +] +keywords = ["lsst"] +dependencies = [ + "astropy", + "numpy", + "pandas", + "pyyaml >= 5.1", + "sqlalchemy", + "lsst-felis", + "lsst-utils", + "lsst-resources", + "lsst-dax-apdb", +] +dynamic = ["version"] + +[project.urls] +"Homepage" = "https://github.com/lsst/dax_ppdb" + +[project.optional-dependencies] +test = [ + "pytest >= 3.2", + "pytest-openfiles >= 0.5.0" +] + +[tool.setuptools.packages.find] +where = ["python"] + +[tool.setuptools] +zip-safe = true +license-files = ["COPYRIGHT", "LICENSE"] + +[tool.setuptools.package-data] +"lsst.dax.ppdb" = ["py.typed"] + +[tool.setuptools.dynamic] +version = { attr = "lsst_versions.get_lsst_version" } + +[tool.black] +line-length = 110 +target-version = ["py311"] + +[tool.isort] +profile = "black" +line_length = 110 + +[tool.lsst_versions] +write_to = "python/lsst/dax/ppdb/version.py" + +[tool.ruff] +exclude = [ + "__init__.py", + "doc/conf.py", +] +line-length = 110 +target-version = "py311" + +[tool.ruff.lint] +ignore = [ + "N802", + "N803", + "N806", + "N812", + "N815", + "N816", + "N999", + "D107", + "D105", + "D102", + "D104", + "D100", + "D200", + "D205", + "D400", +] +select = [ + "E", # pycodestyle + "F", # pycodestyle + "N", # pep8-naming + "W", # pycodestyle + "D", # pydocstyle +] +extend-select = [ + "RUF100", # Warn about unused noqa +] + +[tool.ruff.lint.pycodestyle] +max-doc-length = 79 + +[tool.ruff.lint.pydocstyle] +convention = "numpy" diff --git a/python/lsst/dax/ppdb/__init__.py b/python/lsst/dax/ppdb/__init__.py index 35961cf..2cb6bd2 100644 --- a/python/lsst/dax/ppdb/__init__.py +++ b/python/lsst/dax/ppdb/__init__.py @@ -19,4 +19,5 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from .ppdb import * from .version import * # Generated by sconsUtils diff --git a/python/lsst/dax/ppdb/_factory.py b/python/lsst/dax/ppdb/_factory.py new file mode 100644 index 0000000..ddbfb4e --- /dev/null +++ b/python/lsst/dax/ppdb/_factory.py @@ -0,0 +1,110 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["config_type_for_name", "ppdb_type", "ppdb_type_for_name"] + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .config import PpdbConfig + from .sql import PpdbSql + + +def ppdb_type(config: PpdbConfig) -> type[PpdbSql]: + """Return Ppdb class matching Ppdb configuration type. + + Parameters + ---------- + config : `PpdbConfig` + Configuration object, sub-class of PpdbConfig. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `Ppdb` class. + + Raises + ------ + TypeError + Raised if type of ``config`` does not match any known types. + """ + from .sql import PpdbSqlConfig + + if type(config) is PpdbSqlConfig: + from .sql import PpdbSql + + return PpdbSql + + raise TypeError(f"Unknown type of config object: {type(config)}") + + +def ppdb_type_for_name(type_name: str) -> type[PpdbSql]: + """Return Ppdb class matching type name. + + Parameters + ---------- + type_name : `str` + Short type name of Ppdb implement, for now only "sql" is supported. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `Ppdb` class. + + Raises + ------ + TypeError + Raised if ``type_name`` does not match any known types. + """ + if type_name == "sql": + from .sql import PpdbSql + + return PpdbSql + + raise TypeError(f"Unknown type name: {type_name}") + + +def config_type_for_name(type_name: str) -> type[PpdbConfig]: + """Return PpdbConfig class matching type name. + + Parameters + ---------- + type_name : `str` + Short type name of Ppdb implement, for now only "sql" is supported. + + Returns + ------- + type : `type` [`Ppdb`] + Subclass of `PpdbConfig` class. + + Raises + ------ + TypeError + Raised if ``type_name`` does not match any known types. + """ + if type_name == "sql": + from .sql import PpdbSqlConfig + + return PpdbSqlConfig + + raise TypeError(f"Unknown type name: {type_name}") diff --git a/python/lsst/dax/ppdb/cli/__init__.py b/python/lsst/dax/ppdb/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/lsst/dax/ppdb/cli/options.py b/python/lsst/dax/ppdb/cli/options.py new file mode 100644 index 0000000..e7d1731 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/options.py @@ -0,0 +1,89 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["felis_schema_options", "sql_db_options", "replication_options"] + +import argparse + + +def felis_schema_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for Felis schema file.""" + group = parser.add_argument_group("felis schema options") + group.add_argument( + "--felis-path", + help="YAML file with PPDB felis schema (can be same as APDB schema).", + metavar="PATH", + default=None, + ) + group.add_argument( + "--felis-schema", + help="Schema name used in felis YAML file.", + metavar="NAME", + default=None, + ) + + +def sql_db_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for database connection.""" + group = parser.add_argument_group("database options") + group.add_argument( + "-s", + "--schema", + help="Optional schema name.", + metavar="DB_SCHEMA", + default=None, + ) + + group.add_argument( + "--connection-pool", + help="Enable/disable use of connection pool.", + action=argparse.BooleanOptionalAction, + default=True, + ) + + group.add_argument( + "--isolation-level", + help="Transaction isolation level, allowed values: %(choices)s", + metavar="STRING", + choices=["READ_COMMITTED", "READ_UNCOMMITTED", "REPEATABLE_READ", "SERIALIZABLE"], + default=None, + ) + + group.add_argument( + "--connection-timeout", + type=float, + help="Maximum connection timeout in seconds.", + metavar="SECONDS", + default=None, + ) + + +def replication_options(parser: argparse.ArgumentParser) -> None: + """Define CLI options for replication.""" + group = parser.add_argument_group("replication options") + group.add_argument( + "--single", help="Copy single replication item and stop.", default=False, action="store_true" + ) + group.add_argument( + "--update", help="Allow updates to already replicated data.", default=False, action="store_true" + ) diff --git a/python/lsst/dax/ppdb/cli/ppdb_cli.py b/python/lsst/dax/ppdb/cli/ppdb_cli.py new file mode 100644 index 0000000..82b82e3 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/ppdb_cli.py @@ -0,0 +1,59 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["main"] + +import argparse + +from lsst.dax.apdb.cli.logging_cli import LoggingCli + +from .. import scripts +from . import options + + +def main() -> None: + """PPDB command line tools.""" + parser = argparse.ArgumentParser(description="PPDB command line tools") + log_cli = LoggingCli(parser) + + subparsers = parser.add_subparsers(title="available subcommands", required=True) + _create_sql_subcommand(subparsers) + + args = parser.parse_args() + log_cli.process_args(args) + + kwargs = vars(args) + method = kwargs.pop("method") + method(**kwargs) + + +def _create_sql_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("create-sql", help="Create new PPDB instance in SQL database.") + parser.add_argument("db_url", help="Database URL in SQLAlchemy format for PPDB instance.") + parser.add_argument("config_path", help="Name of the new configuration file for created PPDB instance.") + options.felis_schema_options(parser) + options.sql_db_options(parser) + parser.add_argument( + "--drop", help="If True then drop existing tables.", default=False, action="store_true" + ) + parser.set_defaults(method=scripts.create_sql) diff --git a/python/lsst/dax/ppdb/cli/ppdb_replication.py b/python/lsst/dax/ppdb/cli/ppdb_replication.py new file mode 100644 index 0000000..ea3fe10 --- /dev/null +++ b/python/lsst/dax/ppdb/cli/ppdb_replication.py @@ -0,0 +1,72 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["main"] + +import argparse + +from lsst.dax.apdb.cli.logging_cli import LoggingCli + +from .. import scripts +from . import options + +_longLogFmt = "%(asctime)s %(levelname)s %(name)s - %(message)s" + + +def main() -> None: + """Commands for managing APDB-to-PPDB replication.""" + parser = argparse.ArgumentParser(description="PPDB command line tools") + log_cli = LoggingCli(parser) + + subparsers = parser.add_subparsers(title="available subcommands", required=True) + _list_chunks_apdb_subcommand(subparsers) + _list_chunks_ppdb_subcommand(subparsers) + _run_subcommand(subparsers) + + args = parser.parse_args() + log_cli.process_args(args) + kwargs = vars(args) + method = kwargs.pop("method") + method(**kwargs) + + +def _list_chunks_apdb_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "list-chunks-apdb", help="Print full list of replic chunks existing on APDB side." + ) + parser.add_argument("apdb_config", help="Path to the APDB configuration.") + parser.set_defaults(method=scripts.replication_list_chunks_apdb) + + +def _list_chunks_ppdb_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("list-chunks-ppdb", help="List full set of replica chunks in PPDB.") + parser.add_argument("ppdb_config", help="Path to the PPDB configuration.") + parser.set_defaults(method=scripts.replication_list_chunks_ppdb) + + +def _run_subcommand(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("run", help="Run replication from APDB to PPDB.") + parser.add_argument("apdb_config", help="Path to the APDB configuration.") + parser.add_argument("ppdb_config", help="Path to the PPDB configuration.") + options.replication_options(parser) + parser.set_defaults(method=scripts.replication_run) diff --git a/python/lsst/dax/ppdb/config.py b/python/lsst/dax/ppdb/config.py new file mode 100644 index 0000000..3263f49 --- /dev/null +++ b/python/lsst/dax/ppdb/config.py @@ -0,0 +1,64 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PpdbConfig"] + +from collections.abc import Mapping +from typing import Any + +import yaml +from lsst.resources import ResourcePath, ResourcePathExpression +from pydantic import BaseModel + +from ._factory import config_type_for_name + + +class PpdbConfig(BaseModel): + """Base class for PPDB configuration types.""" + + @classmethod + def from_uri(cls, uri: ResourcePathExpression) -> PpdbConfig: + """Load configuration object from external file. + + Parameters + ---------- + uri : `~lsst.resources.ResourcePathExpression` + Location of the file containing serialized configuration in YAML + format. + + Returns + ------- + config : `PpdbConfig` + PPD configuration object. + """ + path = ResourcePath(uri) + config_str = path.read() + config_object = yaml.safe_load(config_str) + if not isinstance(config_object, Mapping): + raise TypeError("YAML configuration file does not represent valid object") + config_dict: dict[str, Any] = dict(config_object) + type_name = config_dict.pop("implementation_type", None) + if not type_name: + raise LookupError("YAML configuration file does not have `implementation_type` key") + klass = config_type_for_name(type_name) + return klass(**config_dict) diff --git a/python/lsst/dax/ppdb/ppdb.py b/python/lsst/dax/ppdb/ppdb.py new file mode 100644 index 0000000..9177aaf --- /dev/null +++ b/python/lsst/dax/ppdb/ppdb.py @@ -0,0 +1,139 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["Ppdb"] + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +import astropy.time +from lsst.dax.apdb import ApdbMetadata, ApdbTableData, ReplicaChunk +from lsst.resources import ResourcePathExpression + +from ._factory import ppdb_type +from .config import PpdbConfig + + +@dataclass(frozen=True) +class PpdbReplicaChunk(ReplicaChunk): + """ReplicaChunk with additional PPDB-specific info.""" + + replica_time: astropy.time.Time + """Time when this bucket was replicated (`astropy.time.Time`).""" + + +class Ppdb(ABC): + """Class defining an interface for PPDB management operations.""" + + @classmethod + def from_config(cls, config: PpdbConfig) -> Ppdb: + """Create Ppdb instance from configuration object. + + Parameters + ---------- + config : `PpdbConfig` + Configuration object, type of this object determines type of the + Ppdb implementation. + + Returns + ------- + ppdb : `Ppdb` + Instance of `Ppdb` class. + """ + # Dispatch to actual implementation class based on config type. + + ppdb_class = ppdb_type(config) + return ppdb_class(config) + + @classmethod + def from_uri(cls, uri: ResourcePathExpression) -> Ppdb: + """Read PPDB configuration from URI and make a Ppdb instance. + + Parameters + ---------- + uri : `~lsst.resources.ResourcePathExpression` + Location of the file containing serialized configuration in YAML + format. + + Returns + ------- + ppdb : `Ppdb` + Instance of `Ppdb` class. + """ + config = PpdbConfig.from_uri(uri) + return cls.from_config(config) + + @property + @abstractmethod + def metadata(self) -> ApdbMetadata: + """Object controlling access to metadata + (`~lsst.dax.apdb.ApdbMetadata`). + """ + raise NotImplementedError() + + @abstractmethod + def get_replica_chunks(self) -> list[PpdbReplicaChunk] | None: + """Return collection of replica chunks known to the database. + + Returns + ------- + chunks : `list` [`PpdbReplicaChunk`] or `None` + List of chunks, they may be time-ordered if database supports + ordering. `None` is returned if database is not configured to store + chunk information. + """ + raise NotImplementedError() + + @abstractmethod + def store( + self, + replica_chunk: ReplicaChunk, + objects: ApdbTableData, + sources: ApdbTableData, + forced_sources: ApdbTableData, + *, + update: bool = False, + ) -> None: + """Copy APDB data to PPDB. + + Parameters + ---------- + replica_chunk : `~lsst.dax.apdb.ReplicaChunk` + Insertion ID for APDB data. + objects : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaObjects. + sources : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaSources. + forced_sources : `~lsst.dax.apdb.ApdbTableData` + Matching APDB data for DiaForcedSources. + update : `bool`, optional + If `True` then allow updates for existing data from the same + ``replica_chunk``. + + Notes + ----- + Replication from APDB to PPDB should happen in the same order as + insertion order for APDB, i.e. in the order of increasing + ``replica_chunk.id`` values. + """ + raise NotImplementedError() diff --git a/python/lsst/dax/ppdb/py.typed b/python/lsst/dax/ppdb/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/python/lsst/dax/ppdb/scripts/__init__.py b/python/lsst/dax/ppdb/scripts/__init__.py new file mode 100644 index 0000000..94b85c3 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/__init__.py @@ -0,0 +1,25 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from .create_sql import create_sql +from .replication_list_chunks_apdb import replication_list_chunks_apdb +from .replication_list_chunks_ppdb import replication_list_chunks_ppdb +from .replication_run import replication_run diff --git a/python/lsst/dax/ppdb/scripts/create_sql.py b/python/lsst/dax/ppdb/scripts/create_sql.py new file mode 100644 index 0000000..2dec70c --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/create_sql.py @@ -0,0 +1,79 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["create_sql"] + +import yaml + +from ..sql import PpdbSql + + +def create_sql( + db_url: str, + schema: str | None, + config_path: str, + felis_path: str, + felis_schema: str, + connection_pool: bool, + isolation_level: str | None, + connection_timeout: float | None, + drop: bool, +) -> None: + """Create new PPDB instance in SQL database. + + Parameters + ---------- + db_url : `str` + SQLAlchemy connection string. + schema : `str` or `None` + Database schema name, `None` to use default schema. + config_path : `str` + Name of the file to write PPDB configuration. + felis_path : `str` + Path to the Felis YAML file with table schema definition. + felis_schema : `str` + Name of the schema defined in felis YAML file. + connection_pool : `bool` + If True then enable connection pool. + isolation_level : `str` or `None` + Transaction isolation level, if unset then backend-default value is + used. + connection_timeout: `float` or `None` + Maximum connection timeout in seconds. + drop : `bool` + If `True` then drop existing tables. + """ + config = PpdbSql.init_database( + db_url=db_url, + schema_name=schema, + schema_file=felis_path, + felis_schema=felis_schema, + use_connection_pool=connection_pool, + isolation_level=isolation_level, + connection_timeout=connection_timeout, + drop=drop, + ) + config_dict = config.model_dump(exclude_unset=True, exclude_defaults=True) + config_dict["implementation_type"] = "sql" + with open(config_path, "w") as config_file: + yaml.dump(config_dict, config_file) diff --git a/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py b/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py new file mode 100644 index 0000000..6cd1560 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_list_chunks_apdb.py @@ -0,0 +1,50 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_list_chunks_apdb"] + +from lsst.dax.apdb import ApdbReplica + + +def replication_list_chunks_apdb(apdb_config: str) -> None: + """Print full list of replica chunks existing on APDB side. + + Parameters + ---------- + apdb_config : `str` + URL for APDB configuration file. + """ + apdb = ApdbReplica.from_uri(apdb_config) + chunks = apdb.getReplicaChunks() + if chunks is not None: + print(" Chunk Id Update time Unique Id") + sep = "-" * 77 + print(sep) + chunks = sorted(chunks, key=lambda chunk: chunk.id) + for chunk in chunks: + insert_time = chunk.last_update_time + print(f"{chunk.id:10d} {insert_time.tai.isot}/tai {chunk.unique_id}") + print(sep) + print(f"Total: {len(chunks)}") + else: + print("APDB instance does not support InsertIds") diff --git a/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py b/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py new file mode 100644 index 0000000..ec5f9c1 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_list_chunks_ppdb.py @@ -0,0 +1,52 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_list_chunks_ppdb"] + +from ..ppdb import Ppdb + + +def replication_list_chunks_ppdb(ppdb_config: str) -> None: + """Print list of replica chunks existing on PPDB side. + + Parameters + ---------- + ppdb_config : `str` + URL for PPDB configuration file. + """ + ppdb = Ppdb.from_uri(ppdb_config) + chunks = ppdb.get_replica_chunks() + if chunks is not None: + print(" Chunk Id Update time Replica time Unique Id") + sep = "-" * 106 + print(sep) + chunks = sorted(chunks, key=lambda chunk: chunk.id) + for insert in chunks: + print( + f"{insert.id:10d} {insert.last_update_time.tai.isot}/tai " + f"{insert.replica_time.tai.isot}/tai {insert.unique_id}" + ) + print(sep) + print(f"Total: {len(chunks)}") + else: + print("APDB instance does not support InsertIds") diff --git a/python/lsst/dax/ppdb/scripts/replication_run.py b/python/lsst/dax/ppdb/scripts/replication_run.py new file mode 100644 index 0000000..96c71db --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/replication_run.py @@ -0,0 +1,106 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["replication_run"] + +import logging +from typing import TYPE_CHECKING + +from lsst.dax.apdb import ApdbReplica + +from ..ppdb import Ppdb + +if TYPE_CHECKING: + from lsst.dax.apdb import ReplicaChunk + + from ..ppdb import PpdbReplicaChunk + +_LOG = logging.getLogger(__name__) + + +def replication_run( + apdb_config: str, + ppdb_config: str, + single: bool, + update: bool, +) -> None: + """Execute replication process from APDB to PPDB. + + Parameters + ---------- + apdb_config : `str` + URL for APDB configuration file. + ppdb_config : `str` + URL for PPDB configuration file. + single : `bool` + Copy single bucket and stop. + update : `bool` + If `True` then allow updates to previously replicated data. + """ + apdb = ApdbReplica.from_uri(apdb_config) + ppdb = Ppdb.from_uri(ppdb_config) + + chunks = apdb.getReplicaChunks() + if chunks is None: + raise TypeError("APDB implementation does not support replication") + ppdb_chunks = ppdb.get_replica_chunks() + if ppdb_chunks is None: + raise TypeError("PPDB implementation does not support replication") + + ids = _merge_ids(chunks, ppdb_chunks) + + # Check existing PPDB ids for consistency. + for apdb_chunk, ppdb_chunk in ids: + if ppdb_chunk is not None: + if ppdb_chunk.unique_id != apdb_chunk.unique_id: + raise ValueError(f"Inconsistent values of unique ID - APDB: {apdb_chunk} PPDB: {ppdb_chunk}") + + ids_to_copy = [apdb_chunk for apdb_chunk, ppdb_chunk in ids if ppdb_chunk is None] + for apdb_chunk in ids_to_copy: + _LOG.info("Will replicate bucket %s", apdb_chunk) + _replicate_one(apdb, ppdb, apdb_chunk, update) + if single: + break + + +def _merge_ids( + chunks: list[ReplicaChunk], ppdb_chunks: list[PpdbReplicaChunk] +) -> list[tuple[ReplicaChunk, PpdbReplicaChunk | None]]: + """Make a list of pairs (apdb_chunk, ppdb_chunk), if ppdb_chunk does not + exist for apdb_chunk then it will be None. + """ + ppdb_id_map = {ppdb_chunk.id: ppdb_chunk for ppdb_chunk in ppdb_chunks} + apdb_ids = sorted(chunks, key=lambda apdb_chunk: apdb_chunk.id) + return [(apdb_chunk, ppdb_id_map.get(apdb_chunk.id)) for apdb_chunk in apdb_ids] + + +def _replicate_one(apdb: ApdbReplica, ppdb: Ppdb, replica_chunk: ReplicaChunk, update: bool) -> None: + + dia_objects = apdb.getDiaObjectsChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaObjects for replication", len(dia_objects.rows())) + dia_sources = apdb.getDiaSourcesChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaSources for replication", len(dia_sources.rows())) + dia_forced_sources = apdb.getDiaForcedSourcesChunks([replica_chunk.id]) + _LOG.info("Selected %s DiaForcedSources for replication", len(dia_forced_sources.rows())) + + ppdb.store(replica_chunk, dia_objects, dia_sources, dia_forced_sources, update=update) diff --git a/python/lsst/dax/ppdb/sql/__init__.py b/python/lsst/dax/ppdb/sql/__init__.py new file mode 100644 index 0000000..d8ad126 --- /dev/null +++ b/python/lsst/dax/ppdb/sql/__init__.py @@ -0,0 +1,22 @@ +# This file is part of dax_ppdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from ._ppdb_sql import PpdbSql, PpdbSqlConfig diff --git a/python/lsst/dax/ppdb/sql/_ppdb_sql.py b/python/lsst/dax/ppdb/sql/_ppdb_sql.py new file mode 100644 index 0000000..590fae6 --- /dev/null +++ b/python/lsst/dax/ppdb/sql/_ppdb_sql.py @@ -0,0 +1,530 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PpdbSql", "PpdbSqlConfig"] + +import logging +import os +import sqlite3 +from collections.abc import MutableMapping +from contextlib import closing, suppress +from typing import Any + +import astropy.time +import sqlalchemy +import yaml +from felis.datamodel import Schema, SchemaVersion +from felis.metadata import MetaDataBuilder +from lsst.dax.apdb import ApdbMetadata, ApdbTableData, IncompatibleVersionError, ReplicaChunk, VersionTuple +from lsst.dax.apdb.sql.apdbMetadataSql import ApdbMetadataSql +from lsst.dax.apdb.sql.apdbSqlSchema import GUID +from lsst.resources import ResourcePath +from lsst.utils.iteration import chunk_iterable +from sqlalchemy import sql +from sqlalchemy.pool import NullPool + +from ..config import PpdbConfig +from ..ppdb import Ppdb, PpdbReplicaChunk +from .bulk_insert import make_inserter + +_LOG = logging.getLogger(__name__) + +VERSION = VersionTuple(0, 1, 0) +"""Version for the code defined in this module. This needs to be updated +(following compatibility rules) when schema produced by this code changes. +""" + + +def _onSqlite3Connect( + dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord +) -> None: + # Enable foreign keys + with closing(dbapiConnection.cursor()) as cursor: + cursor.execute("PRAGMA foreign_keys=ON;") + + +class PpdbSqlConfig(PpdbConfig): + db_url: str + """SQLAlchemy database connection URI.""" + + schema_name: str | None = None + """Database schema name, if `None` then default schema is used.""" + + felis_path: str | None = None + """Name of YAML file with ``felis`` schema, if `None` then default schema + file is used. + """ + + felis_schema: str | None = None + """Name of the schema in YAML file, if `None` then file has to contain + single schema. + """ + + use_connection_pool: bool = True + """If True then allow use of connection pool.""" + + isolation_level: str | None = None + """Transaction isolation level, if unset then backend-default value is + used. + """ + + connection_timeout: float | None = None + """Maximum connection timeout in seconds.""" + + +class PpdbSql(Ppdb): + default_felis_schema_file = "${SDM_SCHEMAS_DIR}/yml/apdb.yaml" + + meta_schema_version_key = "version:schema" + """Name of the metadata key to store schema version number.""" + + meta_code_version_key = "version:PpdbSql" + """Name of the metadata key to store code version number.""" + + def __init__(self, config: PpdbConfig): + if not isinstance(config, PpdbSqlConfig): + raise TypeError("Expecting PpdbSqlConfig instance") + self.config = config + + self._sa_metadata, schema_version = self._read_schema( + config.felis_path, config.schema_name, config.felis_schema + ) + + self._engine = self._make_engine(config) + sa_metadata = sqlalchemy.MetaData(schema=config.schema_name) + + meta_table: sqlalchemy.schema.Table | None = None + with suppress(sqlalchemy.exc.NoSuchTableError): + meta_table = sqlalchemy.schema.Table("metadata", sa_metadata, autoload_with=self._engine) + + self._metadata = ApdbMetadataSql(self._engine, meta_table) + + # Check schema version compatibility + if self._metadata.table_exists(): + self._versionCheck(self._metadata, schema_version) + + @classmethod + def init_database( + cls, + db_url: str, + schema_name: str | None, + schema_file: str | None, + felis_schema: str | None, + use_connection_pool: bool, + isolation_level: str | None, + connection_timeout: float | None, + drop: bool, + ) -> PpdbConfig: + """Initialize PPDB database. + + Parameters + ---------- + db_url : `str` + SQLAlchemy database connection URI. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + schema_file : `str` or `None` + Name of YAML file with ``felis`` schema, if `None` then default + schema file is used. + felis_schema : `str` or `None` + Name of the schema in YAML file, if `None` then file has to contain + single schema. + use_connection_pool : `bool` + If True then allow use of connection pool. + isolation_level : `str` or `None` + Transaction isolation level, if unset then backend-default value is + used. + connection_timeout: `float` or `None` + Maximum connection timeout in seconds. + drop : `bool` + If `True` then drop existing tables. + """ + sa_metadata, schema_version = cls._read_schema(schema_file, schema_name, felis_schema) + config = PpdbSqlConfig( + db_url=db_url, + schema_name=schema_name, + felis_path=schema_file, + felis_schema=felis_schema, + use_connection_pool=use_connection_pool, + isolation_level=isolation_level, + connection_timeout=connection_timeout, + ) + cls._make_database(config, sa_metadata, schema_version, drop) + return config + + @classmethod + def _read_schema( + cls, schema_file: str | None, schema_name: str | None, felis_schema: str | None + ) -> tuple[sqlalchemy.schema.MetaData, VersionTuple]: + """Read felis schema definitions for PPDB. + + Parameters + ---------- + schema_file : `str` or `None` + Name of YAML file with ``felis`` schema, if `None` then default + schema file is used. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + felis_schema : `str`, optional + Name of the schema in YAML file, if `None` then file has to contain + single schema. + + Returns + ------- + metadata : `sqlalchemy.schema.MetaData` + SQLAlchemy metadata instance containing information for all tables. + version : `lsst.dax.apdb.VersionTuple` or `None` + Schema version defined in schema or `None` if not defined. + """ + if schema_file is None: + schema_file = os.path.expandvars(cls.default_felis_schema_file) + + res = ResourcePath(schema_file) + schemas_list = list(yaml.load_all(res.read(), Loader=yaml.SafeLoader)) + if not schemas_list: + raise ValueError(f"Schema file {schema_file!r} does not define any schema") + if felis_schema is not None: + schemas_list = [schema for schema in schemas_list if schema.get("name") == felis_schema] + if not schemas_list: + raise ValueError(f"Schema file {schema_file!r} does not define schema {felis_schema!r}") + elif len(schemas_list) > 1: + raise ValueError(f"Schema file {schema_file!r} defines multiple schemas") + schema_dict = schemas_list[0] + + # In case we use APDB schema drop tables that are not needed in PPDB. + filtered_tables = [ + table for table in schema_dict["tables"] if table["name"] not in ("DiaObjectLast",) + ] + schema_dict["tables"] = filtered_tables + schema = Schema.model_validate(schema_dict) + + # Replace schema name with a configured one, this helps in case we + # want to use default schema on database side. + if schema_name: + schema.name = schema_name + metadata = MetaDataBuilder(schema).build() + else: + builder = MetaDataBuilder(schema, apply_schema_to_metadata=False, apply_schema_to_tables=False) + metadata = builder.build() + + # Add table for replication support. + sqlalchemy.schema.Table( + "PpdbReplicaChunk", + metadata, + sqlalchemy.schema.Column( + "apdb_replica_chunk", sqlalchemy.BigInteger, primary_key=True, autoincrement=False + ), + sqlalchemy.schema.Column("last_update_time", sqlalchemy.types.TIMESTAMP, nullable=False), + sqlalchemy.schema.Column("unique_id", GUID, nullable=False), + sqlalchemy.schema.Column("replica_time", sqlalchemy.types.TIMESTAMP, nullable=False), + sqlalchemy.schema.Index("PpdbInsertId_idx_last_update_time", "last_update_time"), + sqlalchemy.schema.Index("PpdbInsertId_idx_replica_time", "replica_time"), + schema=schema_name, + ) + + if isinstance(schema.version, str): + version = VersionTuple.fromString(schema.version) + elif isinstance(schema.version, SchemaVersion): + version = VersionTuple.fromString(schema.version.current) + else: + # Missing schema version is identical to 0.1.0 + version = VersionTuple(0, 1, 0) + + return metadata, version + + @classmethod + def _make_database( + cls, + config: PpdbSqlConfig, + sa_metadata: sqlalchemy.schema.MetaData, + schema_version: VersionTuple | None, + drop: bool, + ) -> None: + """Initialize database schema. + + Parameters + ---------- + db_url : `str` + SQLAlchemy database connection URI. + schema_name : `str` or `None` + Database schema name, if `None` then default schema is used. + sa_metadata : `sqlalchemy.schema.MetaData` + Schema definition. + schema_version : `lsst.dax.apdb.VersionTuple` or `None` + Schema version defined in schema or `None` if not defined. + drop : `bool` + If `True` then drop existing tables before creating new ones. + """ + engine = cls._make_engine(config) + + if config.schema_name is not None: + dialect = engine.dialect + quoted_schema = dialect.preparer(dialect).quote_schema(config.schema_name) + create_schema = sqlalchemy.DDL( + "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema} + ).execute_if(dialect="postgresql") + sqlalchemy.event.listen(sa_metadata, "before_create", create_schema) + + if drop: + _LOG.info("dropping all tables") + sa_metadata.drop_all(engine) + _LOG.info("creating all tables") + sa_metadata.create_all(engine) + + # Need metadata table to store few items in it, if table exists. + meta_table: sqlalchemy.schema.Table | None = None + for table in sa_metadata.tables.values(): + if table.name == "metadata": + meta_table = table + break + + apdb_meta = ApdbMetadataSql(engine, meta_table) + if apdb_meta.table_exists(): + # Fill version numbers, overwrite if they are already there. + if schema_version is not None: + _LOG.info("Store metadata %s = %s", cls.meta_schema_version_key, schema_version) + apdb_meta.set(cls.meta_schema_version_key, str(schema_version), force=True) + _LOG.info("Store metadata %s = %s", cls.meta_code_version_key, VERSION) + apdb_meta.set(cls.meta_code_version_key, str(VERSION), force=True) + + @classmethod + def _make_engine(cls, config: PpdbSqlConfig) -> sqlalchemy.engine.Engine: + """Make SQLALchemy engine based on configured parameters. + + Parameters + ---------- + config : `PpdbSqlConfig` + Configuration object. + """ + kw: MutableMapping[str, Any] = {} + conn_args: dict[str, Any] = dict() + if not config.use_connection_pool: + kw["poolclass"] = NullPool + if config.isolation_level is not None: + kw.update(isolation_level=config.isolation_level) + elif config.db_url.startswith("sqlite"): # type: ignore + # Use READ_UNCOMMITTED as default value for sqlite. + kw.update(isolation_level="READ_UNCOMMITTED") + if config.connection_timeout is not None: + if config.db_url.startswith("sqlite"): + conn_args.update(timeout=config.connection_timeout) + elif config.db_url.startswith(("postgresql", "mysql")): + conn_args.update(connect_timeout=config.connection_timeout) + kw = {"connect_args": conn_args} + engine = sqlalchemy.create_engine(config.db_url, **kw) + + if engine.dialect.name == "sqlite": + # Need to enable foreign keys on every new connection. + sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect) + + return engine + + def _versionCheck(self, metadata: ApdbMetadataSql, schema_version: VersionTuple) -> None: + """Check schema version compatibility.""" + + def _get_version(key: str, default: VersionTuple) -> VersionTuple: + """Retrieve version number from given metadata key.""" + if metadata.table_exists(): + version_str = metadata.get(key) + if version_str is None: + # Should not happen with existing metadata table. + raise RuntimeError(f"Version key {key!r} does not exist in metadata table.") + return VersionTuple.fromString(version_str) + return default + + # For old databases where metadata table does not exist we assume that + # version of both code and schema is 0.1.0. + initial_version = VersionTuple(0, 1, 0) + db_schema_version = _get_version(self.meta_schema_version_key, initial_version) + db_code_version = _get_version(self.meta_code_version_key, initial_version) + + # For now there is no way to make read-only APDB instances, assume that + # any access can do updates. + if not schema_version.checkCompatibility(db_schema_version, True): + raise IncompatibleVersionError( + f"Configured schema version {schema_version} " + f"is not compatible with database version {db_schema_version}" + ) + if not VERSION.checkCompatibility(db_code_version, True): + raise IncompatibleVersionError( + f"Current code version {VERSION} " + f"is not compatible with database version {db_code_version}" + ) + + def _get_table(self, name: str) -> sqlalchemy.schema.Table: + for table in self._sa_metadata.tables.values(): + if table.name == name: + return table + raise LookupError(f"Unknown table {name}") + + @property + def metadata(self) -> ApdbMetadata: + # docstring is inherited from a base class + return self._metadata + + def get_replica_chunks(self) -> list[PpdbReplicaChunk] | None: + # docstring is inherited from a base class + table = self._get_table("PpdbReplicaChunk") + query = sql.select( + table.columns["apdb_replica_chunk"], + table.columns["last_update_time"], + table.columns["unique_id"], + table.columns["replica_time"], + ).order_by(table.columns["last_update_time"]) + with self._engine.connect() as conn: + result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query) + ids = [] + for row in result: + last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai") + replica_time = astropy.time.Time(row[3].timestamp(), format="unix_tai") + ids.append( + PpdbReplicaChunk( + id=row[0], + last_update_time=last_update_time, + unique_id=row[2], + replica_time=replica_time, + ) + ) + return ids + + def store( + self, + replica_chunk: ReplicaChunk, + objects: ApdbTableData, + sources: ApdbTableData, + forced_sources: ApdbTableData, + *, + update: bool = False, + ) -> None: + # docstring is inherited from a base class + + # We want to run all inserts in one transaction. + with self._engine.begin() as connection: + # Check for existing InsertId first, if it does not exist we can + # run more optimal queries. + if update: + table = self._get_table("PpdbReplicaChunk") + query = sql.select(sql.expression.literal(1)).where( + table.columns["apdb_replica_chunk"] == replica_chunk.id + ) + if connection.execute(query).one_or_none() is None: + update = False + + self._store_insert_id(replica_chunk, connection, update) + self._store_objects(objects, connection, update) + self._store_table_data(sources, connection, update, "DiaSource", 100) + self._store_table_data(forced_sources, connection, update, "DiaForcedSource", 1000) + + def _store_insert_id( + self, replica_chunk: ReplicaChunk, connection: sqlalchemy.engine.Connection, update: bool + ) -> None: + """Insert or replace single record in PpdbReplicaChunk table""" + insert_dt = replica_chunk.last_update_time.tai.datetime + now = astropy.time.Time.now().tai.datetime + + table = self._get_table("PpdbReplicaChunk") + + values = {"last_update_time": insert_dt, "unique_id": replica_chunk.unique_id, "replica_time": now} + row = {"apdb_replica_chunk": replica_chunk.id} | values + if update: + # We need UPSERT which is dialect-specific construct + if connection.dialect.name == "sqlite": + insert_sqlite = sqlalchemy.dialects.sqlite.insert(table) + insert_sqlite = insert_sqlite.on_conflict_do_update( + index_elements=table.primary_key, set_=values + ) + connection.execute(insert_sqlite, row) + elif connection.dialect.name == "postgresql": + insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table) + insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values) + connection.execute(insert_pg, row) + else: + raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.") + else: + insert = table.insert() + connection.execute(insert, row) + + def _store_objects( + self, objects: ApdbTableData, connection: sqlalchemy.engine.Connection, update: bool + ) -> None: + """Store or replace DiaObjects.""" + # Store all records. + self._store_table_data(objects, connection, update, "DiaObject", 100) + + table = self._get_table("DiaObject") + + # We need to fill validityEnd column for the previously stored + # objects that have new records. Window function is used here to find + # records with validityEnd=NULL, order them and update validityEnd + # of older records from validityStart of newer records + idx = objects.column_names().index("diaObjectId") + ids = sorted(set(row[idx] for row in objects.rows())) + count = 0 + for chunk in chunk_iterable(ids, 1000): + select_cte = sqlalchemy.cte( + sqlalchemy.select( + table.columns["diaObjectId"], + table.columns["validityStart"], + table.columns["validityEnd"], + sqlalchemy.func.rank() + .over( + partition_by=table.columns["diaObjectId"], + order_by=table.columns["validityStart"], + ) + .label("rank"), + ).where(table.columns["diaObjectId"].in_(chunk)) + ) + sub1 = select_cte.alias("s1") + sub2 = select_cte.alias("s2") + new_end = sql.select(sub2.columns["validityStart"]).select_from( + sub1.join( + sub2, + sqlalchemy.and_( + sub1.columns["diaObjectId"] == sub2.columns["diaObjectId"], + sub1.columns["rank"] + sqlalchemy.literal(1) == sub2.columns["rank"], + sub1.columns["validityStart"] == table.columns["validityStart"], + ), + ) + ) + stmt = ( + table.update() + .values(validityEnd=new_end.scalar_subquery()) + .where(table.columns["validityStart"] == None) # noqa: E711 + ) + result = connection.execute(stmt) + count += result.rowcount + _LOG.info("Updated %d rows in DiaObject table with new validityEnd values", count) + + def _store_table_data( + self, + table_data: ApdbTableData, + connection: sqlalchemy.engine.Connection, + update: bool, + table_name: str, + chunk_size: int, + ) -> None: + """Store or replace DiaSources.""" + table = self._get_table(table_name) + inserter = make_inserter(connection) + count = inserter.insert(table, table_data, chunk_size=chunk_size) + _LOG.info("Inserted %d rows into %s table", count, table_name) diff --git a/python/lsst/dax/ppdb/sql/bulk_insert.py b/python/lsst/dax/ppdb/sql/bulk_insert.py new file mode 100644 index 0000000..dd416dd --- /dev/null +++ b/python/lsst/dax/ppdb/sql/bulk_insert.py @@ -0,0 +1,134 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["BulkInserter", "make_inserter"] + +import logging +import tempfile +from abc import ABC, abstractmethod +from collections.abc import Sequence +from typing import Any + +import sqlalchemy +from lsst.dax.apdb import ApdbTableData +from lsst.utils.iteration import chunk_iterable + +from .pg_dump import PgBinaryDumper + +_LOG = logging.getLogger(__name__) + + +class BulkInserter(ABC): + """Interface for bulk insert operations into a table.""" + + @abstractmethod + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + """Insert multiple rows into a single table. + + Parameters + ---------- + table : `sqlalchemy.schema.Table` + Table to insert data into. + data : `ApdbTableData` + Data to insert into the table. + chunk_size : `int`, optional + Number of rows for a single chunk for insertion. + """ + raise NotImplementedError() + + +class _DefaultBulkInserter(BulkInserter): + + def __init__(self, connection: sqlalchemy.engine.Connection): + self.connection = connection + + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + # Docstring inherited. + table_columns = set(column.name for column in table.columns) + data_columns = set(data.column_names()) + drop_columns = data_columns - table_columns + insert = table.insert() + count = 0 + # DiaObject table is very wide, use smaller chunks. + for chunk in chunk_iterable(data.rows(), chunk_size): + insert_data = [self._row_to_dict(data.column_names(), row, drop_columns) for row in chunk] + result = self.connection.execute(insert.values(insert_data)) + count += result.rowcount + return count + + @staticmethod + def _row_to_dict(column_names: Sequence[str], row: tuple, drop_columns: set[str]) -> dict[str, Any]: + """Convert TableData row into dict.""" + data = dict(zip(column_names, row)) + for column in drop_columns: + del data[column] + return data + + +class _Psycopg2BulkInserter(BulkInserter): + + def __init__(self, connection: sqlalchemy.engine.Connection): + self.connection = connection + self.ident_prepare = sqlalchemy.sql.compiler.IdentifierPreparer(connection.dialect) + + def insert(self, table: sqlalchemy.schema.Table, data: ApdbTableData, *, chunk_size: int = 1000) -> int: + # Docstring inherited. + + # To oavoid potential mismatch or ambiguity in column definitions I + # want to reflect actual table definition from database. + meta = sqlalchemy.schema.MetaData() + reflected = sqlalchemy.schema.Table( + table.name, meta, autoload_with=self.connection, resolve_fks=False, schema=table.schema + ) + + conn = self.connection.connection.dbapi_connection + assert conn is not None, "Connection cannot be None" + cursor = conn.cursor() + + with tempfile.TemporaryFile() as stream: + + _LOG.info("Writing %s data to a temporary file", table.name) + + dumper = PgBinaryDumper(stream, reflected) + columns = dumper.dump(data) + + # Build COPY query, may need to quote some column names. + columns_str = ", ".join(self.ident_prepare.quote(column) for column in columns) + table_name = self.ident_prepare.format_table(table) + sql = f"COPY {table_name} ({columns_str}) FROM STDIN WITH BINARY" + _LOG.debug("COPY query: %s", sql) + + # Rewind the file so that reading from it can work. + _LOG.info("Ingesting %s data to Postgres table", table.name) + stream.seek(0) + cursor.copy_expert(sql, stream, 1024 * 1024) + _LOG.info("Successfully ingested %s data", table.name) + + return len(data.rows()) + + +def make_inserter(connection: sqlalchemy.engine.Connection) -> BulkInserter: + """Make instance of `BulkInserter` suitable for a given connection.""" + if connection.dialect.driver == "psycopg2": + return _Psycopg2BulkInserter(connection) + return _DefaultBulkInserter(connection) diff --git a/python/lsst/dax/ppdb/sql/pg_dump.py b/python/lsst/dax/ppdb/sql/pg_dump.py new file mode 100644 index 0000000..7b6f00b --- /dev/null +++ b/python/lsst/dax/ppdb/sql/pg_dump.py @@ -0,0 +1,274 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ["PgBinaryDumper"] + +import logging +import struct +from abc import ABC, abstractmethod +from datetime import datetime, timedelta, timezone +from typing import Any, BinaryIO, NamedTuple + +import sqlalchemy +import sqlalchemy.dialects.postgresql.types as pg_types +from lsst.dax.apdb import ApdbTableData +from sqlalchemy.sql import sqltypes + +_LOG = logging.getLogger(__name__) + + +class StructData(NamedTuple): + + size: int + format: str + value: Any + + +class _ColumnDataHandler(ABC): + + @abstractmethod + def to_struct(self, column_value: Any) -> StructData: + raise NotImplementedError() + + +class PgBinaryDumper: + """Class that knows how to dump ApdbTableData to binary PostgreSQL file.""" + + _HEADER = b"PGCOPY\n\377\r\n\0" + + def __init__(self, stream: BinaryIO, table: sqlalchemy.schema.Table): + self._stream = stream + self._table = table + + def dump(self, data: ApdbTableData) -> list[str]: + """Dump the whole contents of table data to a file.""" + # Only care about columns that exists in both table and data. + data_column_names = data.column_names() + table_column_names = set(column.name for column in self._table.columns) + _LOG.debug("table_column_names: %s", table_column_names) + + column_indices = [idx for idx, name in enumerate(data_column_names) if name in table_column_names] + types = [self._table.columns[data_column_names[idx]].type for idx in column_indices] + handlers = [_TYPE_MAP[column_type.__class__] for column_type in types] + + # Write PGDUMP header and flags (two 32-bit integers) + self._stream.write(self._HEADER + b"\0\0\0\0\0\0\0\0") + + # Dump all rows. + for row in data.rows(): + + # Buld row struct, it starts with the number of columns as 16-bit + # integer, all data is in network order. + fmt = ["!h"] + args = [len(column_indices)] + for idx, handler in zip(column_indices, handlers): + struct_data = handler.to_struct(row[idx]) + if struct_data.value is None: + # Null is encoded as size=-1, without data + fmt.append("i") + args.append(-1) + else: + fmt.extend(["i", struct_data.format]) + args.extend([struct_data.size, struct_data.value]) + + row_bytes = struct.pack("".join(fmt), *args) + self._stream.write(row_bytes) + + return [data_column_names[idx] for idx in column_indices] + + +class _FixedColumnDataHandler(_ColumnDataHandler): + + def __init__(self, size: int, format: str): + self._size = size + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + return StructData(size=self._size, format=self._format, value=column_value) + + +class _ByteArrayColumnDataHandler(_ColumnDataHandler): + + def __init__(self, format: str): + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format=self._format, value=None) + format = f"{len(column_value)}{self._format}" + return StructData(size=len(column_value), format=format, value=column_value) + + +class _StringColumnDataHandler(_ColumnDataHandler): + + def __init__(self, format: str): + self._format = format + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format=self._format, value=None) + # Assume that utf8 is OK for all string data + assert isinstance(column_value, str), "Expect string instance" + value = column_value.encode() + format = f"{len(value)}{self._format}" + return StructData(size=len(value), format=format, value=value) + + +class _TimestampColumnDataHandler(_ColumnDataHandler): + + epoch_utc = datetime(2000, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + epoch_naive = datetime(2000, 1, 1, 0, 0, 0) + + def to_struct(self, column_value: Any) -> StructData: + if column_value is None: + return StructData(size=-1, format="q", value=None) + assert isinstance(column_value, datetime), "Expect datetime instance" + # Timestamps are stored internally as microseconds since epoch + # (Jan 1, 2000) + if column_value.tzinfo is None: + delta = column_value - self.epoch_naive + else: + delta = column_value - self.epoch_utc + delta_usec = int(delta / timedelta(microseconds=1)) + return StructData(size=8, format="q", value=delta_usec) + + +_pg_types = [ + "BIT", + "BYTEA", # done + "CIDR", + "CITEXT", + "INET", + "INTERVAL", + "MACADDR", + "MACADDR8", + "MONEY", + "OID", + "PGBit", + "PGCidr", + "PGInet", + "PGInterval", + "PGMacAddr", + "PGMacAddr8", + "PGUuid", + "REGCLASS", + "REGCONFIG", + "TIME", + "TIMESTAMP", + "TSQUERY", + "TSVECTOR", +] + +_sqltypes = [ + "ARRAY", + "BIGINT", # done + "BINARY", + "BLOB", + "BOOLEAN", + "BigInteger", + "Boolean", + "CHAR", # done + "CLOB", + "DATE", + "DATETIME", + "DATETIME_TIMEZONE", + "DECIMAL", + "DOUBLE", + "DOUBLE_PRECISION", + "Date", + "DateTime", + "Double", + "FLOAT", + "Float", + "INT", # done + "INTEGER", # done + "Integer", + "Interval", + "JSON", + "LargeBinary", + "MATCHTYPE", + "MatchType", + "NCHAR", + "NULLTYPE", + "NUMERIC", + "NUMERICTYPE", + "NVARCHAR", + "Numeric", + "REAL", + "SMALLINT", # done + "STRINGTYPE", + "SmallInteger", + "String", + "TEXT", + "TIME", + "TIMESTAMP", + "TIME_TIMEZONE", + "Text", + "Time", + "Tuple", + "TupleType", + "Type", + "UUID", + "Unicode", + "UnicodeText", + "Uuid", + "VARBINARY", + "VARCHAR", +] + +_TYPE_MAP = { + sqltypes.SMALLINT: _FixedColumnDataHandler(2, "h"), + sqltypes.INT: _FixedColumnDataHandler(4, "i"), + sqltypes.INTEGER: _FixedColumnDataHandler(4, "i"), + sqltypes.BIGINT: _FixedColumnDataHandler(8, "q"), + sqltypes.DOUBLE: _FixedColumnDataHandler(8, "d"), + sqltypes.DOUBLE_PRECISION: _FixedColumnDataHandler(8, "d"), + sqltypes.BOOLEAN: _FixedColumnDataHandler(1, "?"), + sqltypes.FLOAT: _FixedColumnDataHandler(4, "f"), + sqltypes.CHAR: _StringColumnDataHandler("s"), + sqltypes.VARCHAR: _StringColumnDataHandler("s"), + pg_types.BYTEA: _ByteArrayColumnDataHandler("s"), + pg_types.TIMESTAMP: _TimestampColumnDataHandler(), +} + + +def _dump_pgdump(filename: str) -> None: + + with open(filename, "rb") as pgdump: + header = pgdump.read(11 + 4 + 4) + print("header:", header) + buffer = pgdump.read(2) + (count,) = struct.unpack("!h", buffer) + print("column count:", count) + for i in range(count): + buffer = pgdump.read(4) + (size,) = struct.unpack("!i", buffer) + print(f" {i}: {size}") + if size > 0: + buffer = pgdump.read(size) + + +if __name__ == "__main__": + import sys + + _dump_pgdump(sys.argv[1]) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..22d38ae --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +astropy +pyyaml >= 5.1 +sqlalchemy +lsst-dax-apdb @ git+https://github.com/lsst/dax_apdb@main +lsst-utils @ git+https://github.com/lsst/utils@main +lsst-resources[s3] @ git+https://github.com/lsst/resources@main +lsst-felis @ git+https://github.com/lsst/felis@main diff --git a/setup.cfg b/setup.cfg index 53fa4eb..bdd0dc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,3 @@ exclude = **/*/__init__.py, **/*/version.py, tests/.tests - -[tool:pytest] -addopts = --flake8 -flake8-ignore = E133 E226 E228 N802 N803 N806 N812 N813 N815 N816 W503 diff --git a/ups/dax_ppdb.table b/ups/dax_ppdb.table index e3b1923..f12decc 100644 --- a/ups/dax_ppdb.table +++ b/ups/dax_ppdb.table @@ -1,9 +1,9 @@ -# List EUPS dependencies of this package here. -# - Any package whose API is used directly should be listed explicitly. -# - Common third-party packages can be assumed to be recursively included by -# the "sconsUtils" package. +setupRequired(dax_apdb) +setupRequired(felis) +setupRequired(resources) setupRequired(sconsUtils) +setupRequired(sdm_schemas) +setupRequired(utils) -# The following is boilerplate for all packages. -# See https://dmtn-001.lsst.io for details on LSST_LIBRARY_PATH. +envPrepend(PATH, ${PRODUCT_DIR}/bin) envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)