From 988b07ec4eee3331dd4fe995b694fd91ed5d19cc Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Fri, 27 Sep 2024 15:32:21 +0200 Subject: [PATCH 1/4] docs(dbt): start guidelines --- pipeline/CONTRIBUTING.md | 41 +++++++++++++--------------- pipeline/dbt/CONTRIBUTING.md | 52 ++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 23 deletions(-) create mode 100644 pipeline/dbt/CONTRIBUTING.md diff --git a/pipeline/CONTRIBUTING.md b/pipeline/CONTRIBUTING.md index ed58fe31..cc8d7950 100644 --- a/pipeline/CONTRIBUTING.md +++ b/pipeline/CONTRIBUTING.md @@ -12,19 +12,12 @@ pip install -U pip setuptools wheel # Install the dev dependencies pip install -r requirements/dev/requirements.txt -``` - -## Running the test suite -```bash -# Copy (and optionally edit) the template .env -cp .template.env .env - -# simply use tox (for reproducible environnement, packaging errors, etc.) -tox +# Install dbt +pip install -r requirements/tasks/dbt/requirements.txt ``` -## dbt +## Running `dbt` * dbt is configured to target the `target-db` postgres container (see the root `docker-compose.yml`). * all dbt commands must be run in the in the `pipeline/dbt` directory. @@ -44,20 +37,12 @@ dbt run-operation create_udfs # run commands dbt ls -# staging, basic processing/mapping: -# - retrieve data from datalake table -# - retrieve data from raw dedicated source tables -# - retrieve data from the Soliguide S3 -dbt run --select staging - -# intermediate, specific transformations -dbt run --select intermediate - -# marts, last touch -dbt run --select marts +dbt build --select models/staging +dbt build --select models/intermediate +dbt build --select models/marts ``` -## Update schema in dbt seeds +## Updating schema in dbt seeds * Required when the schema changes. @@ -65,7 +50,7 @@ dbt run --select marts python scripts/update_schema_seeds.py ``` -## Manage the pipeline requirements +## Managing the pipeline requirements In order to prevent conflicts: @@ -84,3 +69,13 @@ make all # to upgrade dependencies make upgrade all ``` + +## Running the test suite + +```bash +# Copy (and optionally edit) the template .env +cp .template.env .env + +# simply use tox (for reproducible environnement, packaging errors, etc.) +tox +``` diff --git a/pipeline/dbt/CONTRIBUTING.md b/pipeline/dbt/CONTRIBUTING.md new file mode 100644 index 00000000..49e915bf --- /dev/null +++ b/pipeline/dbt/CONTRIBUTING.md @@ -0,0 +1,52 @@ +# `dbt` guidelines + +## testing models + +#### `data_tests` vs `unit_tests` vs `contract`: + +* with `dbt build`, `data_tests` are run **after** model execution **on the actual data**. A failing test will not prevent the faulty data to be propagated downstream, unless properly managed by the orchestration. +* with `dbt build`, `unit_tests` are run **before** model execution **on mock-up data**. This is great to test logic, but requires to make assumptions on the input data. +* `contract`s are enforced using actual DB constraints, **on the actual data**. A failing constraint will stop the model execution and prevent faulty data to be propagated downstream. Unlike `data_tests`, we cannot set a severity level. There is no middle ground. And the faulty data cannot be easily queried. + +✅ use `unit_tests` to test **complex logic** on well-defined data (e.g. converting opening hours). + +❌ avoid `unit_tests` for simple transformations. There are costly to maintain and will very often just duplicate the implementation. + +✅ always add a few `data_tests`. + +✅ use `contract`s on `marts`. Marts data can be consumed by clients. + +#### which layer (`source`, `staging`, `intermediate`, `marts`) should I test ? + +It's better to test data early, so we can make assumption on which which we can later build. + +Our `source` layer is essentially tables containing the raw data in jsonb `data` columns. While this is very handy to load data, it is unpractical to test with `data_tests`. + +Therefore our tests start at the `staging` layer. + +✅ `staging`: use `data_tests` extensively. Assumptions on data made in downstream models should be tested. + +✅ `intermediate`: use `data_tests` for primary keys and foreign keys. Use the generic tests `check_structure`, `check_service` and `check_address`. + +✅ `marts`: use `contracts` + generic tests `check_structure`, `check_service` and `check_address`. + +#### which type of `data_tests` should I use ? + +* to stay manageable, our tests should be more or less uniform across the codebase. + +✅ always use native `unique` and `not_null` for primary keys. + +✅ always use `relationships` for foreign keys. + +✅ use `not_null`, `dbt_utils.not_empty_string` and `dbt_utils.not_constant` when possible. + +✅ use `accepted_values` for categorical columns from well-defined data. + +❌ avoid `accepted_values` for categorical columns of less than great data, or downgrade the test severity to `warn`. Otherwise the test could fail too regularly. + +✅ For simple cases, use predefined generic data tests over custom data tests (in `tests/`). Usually requires less code and is easier to read, *unless* you want to test complex logic. + +## references + +* https://www.datafold.com/blog/7-dbt-testing-best-practices +* https://docs.getdbt.com/best-practices \ No newline at end of file From c54975c943a960be71a0a88bde498ed329e99074 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Wed, 11 Sep 2024 21:51:23 +0200 Subject: [PATCH 2/4] feat(pipeline): add scoring model This implementation uses a plpython udf that calls the scoring api of the data inclusion schema. --- pipeline/dags/dag_utils/dbt.py | 1 + pipeline/dbt/macros/create_udfs.sql | 1 + pipeline/dbt/macros/udfs/udf__score.sql | 40 +++++++++++++++++++ pipeline/dbt/models/intermediate/_models.yml | 27 +++++++++++++ .../intermediate/int__criteres_qualite.sql | 16 ++++++++ .../marts/inclusion/_inclusion_models.yml | 6 +++ .../inclusion/marts_inclusion__services.sql | 20 +++++++++- 7 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 pipeline/dbt/macros/udfs/udf__score.sql create mode 100644 pipeline/dbt/models/intermediate/int__criteres_qualite.sql diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index 17453f04..5a0f8656 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -109,6 +109,7 @@ def get_intermediate_tasks(): # main since the beginning as it required intermediate data to be # present ? "path:models/intermediate/int__geocodages.sql", + "path:models/intermediate/int__criteres_qualite.sql", "path:models/intermediate/int__union_contacts.sql", "path:models/intermediate/int__union_adresses.sql", "path:models/intermediate/int__union_services.sql", diff --git a/pipeline/dbt/macros/create_udfs.sql b/pipeline/dbt/macros/create_udfs.sql index 4276c0f9..7fb0f041 100644 --- a/pipeline/dbt/macros/create_udfs.sql +++ b/pipeline/dbt/macros/create_udfs.sql @@ -12,6 +12,7 @@ Another way would be to use the `on-run-start` hook, but it does not play nicely CREATE SCHEMA IF NOT EXISTS processings; {{ udf__geocode() }} +{{ udf__score() }} {{ create_udf_soliguide__new_hours_to_osm_opening_hours() }} {{ create_udf__common_checks() }} diff --git a/pipeline/dbt/macros/udfs/udf__score.sql b/pipeline/dbt/macros/udfs/udf__score.sql new file mode 100644 index 00000000..6dde4097 --- /dev/null +++ b/pipeline/dbt/macros/udfs/udf__score.sql @@ -0,0 +1,40 @@ +{% macro udf__score() %} + +DROP FUNCTION IF EXISTS processings.score; + +CREATE OR REPLACE FUNCTION processings.score(data JSONB) +RETURNS + TABLE( + score_ligne FLOAT, + nom_critere TEXT, + score_critere FLOAT + ) +AS $$ + +import json + +import pydantic + +from data_inclusion.schema import Service, score_qualite + + +# TODO(vmttn): run score *after* pydantic validation then remove this try/except +try: + service = Service(**json.loads(data)) +except pydantic.ValidationError as exc: + return [] + +score, details = score_qualite.score(service) + +return [ + { + "score_ligne": score, + "nom_critere": nom_critere, + "score_critere": score_critere, + } + for nom_critere, score_critere in details.items() +] + +$$ LANGUAGE plpython3u; + +{% endmacro %} \ No newline at end of file diff --git a/pipeline/dbt/models/intermediate/_models.yml b/pipeline/dbt/models/intermediate/_models.yml index 58723ce5..a741fc6a 100644 --- a/pipeline/dbt/models/intermediate/_models.yml +++ b/pipeline/dbt/models/intermediate/_models.yml @@ -129,6 +129,33 @@ models: - locality - municipality + - name: int__criteres_qualite + description: | + Quality criteria scorings for services from all sources. + Each row holds a single criterion score for a service. + For a given service, there is as many rows as defined criteria. + A null score means the criterion is not applicable to the service. + Scoring is done by data-inclusion-schema scoring api in PL/Python. + columns: + - name: service_id + data_tests: + - not_null + - relationships: + to: ref('int__union_services') + field: _di_surrogate_id + - name: nom_critere + description: Name of the criterion. + data_tests: + - not_null + - dbt_utils.not_empty_string + - name: score_critere + description: | + Score for the given criterion and the given service, between 0 and 1. + - name: score_ligne + data_tests: + - dbt_utils.not_constant + + unit_tests: - name: test_geocodages_full_refresh_mode model: int__geocodages diff --git a/pipeline/dbt/models/intermediate/int__criteres_qualite.sql b/pipeline/dbt/models/intermediate/int__criteres_qualite.sql new file mode 100644 index 00000000..03b88ded --- /dev/null +++ b/pipeline/dbt/models/intermediate/int__criteres_qualite.sql @@ -0,0 +1,16 @@ +WITH services AS ( + SELECT * FROM {{ ref('int__union_services__enhanced') }} +), + +final AS ( + SELECT + services._di_surrogate_id AS "service_id", + scores.score_critere AS "score_critere", + scores.nom_critere AS "nom_critere", + scores.score_ligne AS "score_ligne" + FROM + services, + LATERAL (SELECT * FROM processings.score(TO_JSONB(services))) AS scores +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml index dd1be2e0..dd3ed895 100644 --- a/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml +++ b/pipeline/dbt/models/marts/inclusion/_inclusion_models.yml @@ -229,6 +229,12 @@ models: data_type: text - name: zone_diffusion_nom data_type: text + - name: score_qualite + data_type: float + constraints: + - type: not_null + - type: check + expression: 'score_qualite BETWEEN 0 AND 1' - name: marts_inclusion__services_frais config: diff --git a/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql b/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql index d20e37db..1bc2f080 100644 --- a/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql +++ b/pipeline/dbt/models/marts/inclusion/marts_inclusion__services.sql @@ -4,6 +4,17 @@ WITH services AS ( SELECT * FROM {{ ref('int__union_services__enhanced') }} ), +criteres AS ( + SELECT * FROM {{ ref('int__criteres_qualite') }} +), + +scores AS ( + SELECT DISTINCT ON (1) + criteres.service_id AS "service_id", + criteres.score_ligne AS "score" + FROM criteres +), + final AS ( SELECT {{ @@ -15,8 +26,15 @@ final AS ( 'adresse_id', ] ) - }} + }}, + scores.score AS "score_qualite" FROM services + LEFT JOIN scores ON services._di_surrogate_id = scores.service_id + -- TODO(vmttn): services that pass SQL validation, but fail pydantic validation + -- don't have a score... scoring must be done on pydantic validated data + -- this filter is a temporary workaround until validation is done consistently + -- with pydantic + WHERE scores.score IS NOT NULL ) SELECT * FROM final From fa2008a6a968bcb49c1d8d756cd9c1b61ea14a17 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Mon, 16 Sep 2024 10:25:22 +0200 Subject: [PATCH 3/4] wip --- datawarehouse/processings/pyproject.toml | 1 + .../requirements/dev-requirements.txt | 20 ++++++++++++++++++- .../processings/requirements/requirements.txt | 20 ++++++++++++++++++- .../requirements/test-requirements.txt | 20 ++++++++++++++++++- 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/datawarehouse/processings/pyproject.toml b/datawarehouse/processings/pyproject.toml index 98c5ca38..92a9cd44 100644 --- a/datawarehouse/processings/pyproject.toml +++ b/datawarehouse/processings/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "pandas~=2.2", "requests~=2.31", "tenacity", + "data-inclusion-schema", ] [project.optional-dependencies] diff --git a/datawarehouse/processings/requirements/dev-requirements.txt b/datawarehouse/processings/requirements/dev-requirements.txt index 45d049a9..c919350b 100644 --- a/datawarehouse/processings/requirements/dev-requirements.txt +++ b/datawarehouse/processings/requirements/dev-requirements.txt @@ -1,19 +1,29 @@ # This file was autogenerated by uv via the following command: # uv pip compile pyproject.toml --extra=dev --output-file=requirements/dev-requirements.txt +annotated-types==0.7.0 + # via pydantic certifi==2024.8.30 # via requests cfgv==3.4.0 # via pre-commit charset-normalizer==3.3.2 # via requests +data-inclusion-schema @ https://github.com/gip-inclusion/data-inclusion-schema/archive/vmttn/feat/score-qualite.zip#sha256=148145139b64888d885ea33ccac08d044000c983fcbb0bd3298af579e0931f31 + # via data-inclusion-processings (pyproject.toml) distlib==0.3.8 # via virtualenv +dnspython==2.6.1 + # via email-validator +email-validator==2.2.0 + # via pydantic filelock==3.16.0 # via virtualenv identify==2.6.0 # via pre-commit idna==3.8 - # via requests + # via + # email-validator + # requests nodeenv==1.9.1 # via pre-commit numpy==2.1.1 @@ -26,6 +36,10 @@ platformdirs==4.3.2 # via virtualenv pre-commit==3.8.0 # via data-inclusion-processings (pyproject.toml) +pydantic==2.9.1 + # via data-inclusion-schema +pydantic-core==2.23.3 + # via pydantic python-dateutil==2.9.0.post0 # via pandas pytz==2024.2 @@ -40,6 +54,10 @@ six==1.16.0 # via python-dateutil tenacity==9.0.0 # via data-inclusion-processings (pyproject.toml) +typing-extensions==4.12.2 + # via + # pydantic + # pydantic-core tzdata==2024.1 # via pandas urllib3==2.2.2 diff --git a/datawarehouse/processings/requirements/requirements.txt b/datawarehouse/processings/requirements/requirements.txt index c4e17e6b..a5eddad6 100644 --- a/datawarehouse/processings/requirements/requirements.txt +++ b/datawarehouse/processings/requirements/requirements.txt @@ -1,17 +1,31 @@ # This file was autogenerated by uv via the following command: # uv pip compile pyproject.toml --output-file=requirements/requirements.txt +annotated-types==0.7.0 + # via pydantic certifi==2024.8.30 # via requests charset-normalizer==3.3.2 # via requests +data-inclusion-schema @ https://github.com/gip-inclusion/data-inclusion-schema/archive/vmttn/feat/score-qualite.zip#sha256=148145139b64888d885ea33ccac08d044000c983fcbb0bd3298af579e0931f31 + # via data-inclusion-processings (pyproject.toml) +dnspython==2.6.1 + # via email-validator +email-validator==2.2.0 + # via pydantic idna==3.8 - # via requests + # via + # email-validator + # requests numpy==2.1.1 # via # data-inclusion-processings (pyproject.toml) # pandas pandas==2.2.2 # via data-inclusion-processings (pyproject.toml) +pydantic==2.9.1 + # via data-inclusion-schema +pydantic-core==2.23.3 + # via pydantic python-dateutil==2.9.0.post0 # via pandas pytz==2024.2 @@ -22,6 +36,10 @@ six==1.16.0 # via python-dateutil tenacity==9.0.0 # via data-inclusion-processings (pyproject.toml) +typing-extensions==4.12.2 + # via + # pydantic + # pydantic-core tzdata==2024.1 # via pandas urllib3==2.2.2 diff --git a/datawarehouse/processings/requirements/test-requirements.txt b/datawarehouse/processings/requirements/test-requirements.txt index 24517c07..09fb49aa 100644 --- a/datawarehouse/processings/requirements/test-requirements.txt +++ b/datawarehouse/processings/requirements/test-requirements.txt @@ -1,11 +1,21 @@ # This file was autogenerated by uv via the following command: # uv pip compile pyproject.toml --extra=test --output-file=requirements/test-requirements.txt +annotated-types==0.7.0 + # via pydantic certifi==2024.8.30 # via requests charset-normalizer==3.3.2 # via requests +data-inclusion-schema @ https://github.com/gip-inclusion/data-inclusion-schema/archive/vmttn/feat/score-qualite.zip#sha256=148145139b64888d885ea33ccac08d044000c983fcbb0bd3298af579e0931f31 + # via data-inclusion-processings (pyproject.toml) +dnspython==2.6.1 + # via email-validator +email-validator==2.2.0 + # via pydantic idna==3.8 - # via requests + # via + # email-validator + # requests iniconfig==2.0.0 # via pytest numpy==2.1.1 @@ -18,6 +28,10 @@ pandas==2.2.2 # via data-inclusion-processings (pyproject.toml) pluggy==1.5.0 # via pytest +pydantic==2.9.1 + # via data-inclusion-schema +pydantic-core==2.23.3 + # via pydantic pytest==8.3.3 # via data-inclusion-processings (pyproject.toml) python-dateutil==2.9.0.post0 @@ -30,6 +44,10 @@ six==1.16.0 # via python-dateutil tenacity==9.0.0 # via data-inclusion-processings (pyproject.toml) +typing-extensions==4.12.2 + # via + # pydantic + # pydantic-core tzdata==2024.1 # via pandas urllib3==2.2.2 From 3d61d104693e427e6e7159f02fcbb2ababf3f386 Mon Sep 17 00:00:00 2001 From: Valentin Matton Date: Tue, 13 Aug 2024 16:46:10 +0200 Subject: [PATCH 4/4] feat(api): add score_qualite field --- ...3_151329_68fe052dc63c_add_score_qualite.py | 28 +++++++++++++++++++ .../api/inclusion_data/models.py | 1 + .../api/inclusion_data/schemas.py | 16 ++++++++++- .../__snapshots__/test_inclusion_data.ambr | 18 +++++++++++- api/tests/e2e/api/test_inclusion_data.py | 1 + api/tests/factories.py | 2 ++ 6 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 api/src/alembic/versions/20240813_151329_68fe052dc63c_add_score_qualite.py diff --git a/api/src/alembic/versions/20240813_151329_68fe052dc63c_add_score_qualite.py b/api/src/alembic/versions/20240813_151329_68fe052dc63c_add_score_qualite.py new file mode 100644 index 00000000..df54f54a --- /dev/null +++ b/api/src/alembic/versions/20240813_151329_68fe052dc63c_add_score_qualite.py @@ -0,0 +1,28 @@ +"""add-score-qualite + +Revision ID: 68fe052dc63c +Revises: e3f3dfa4ad01 +Create Date: 2024-08-13 15:13:29.690054 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "68fe052dc63c" +down_revision = "e3f3dfa4ad01" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "api__services", sa.Column("score_qualite", sa.Float(), nullable=True) + ) + op.execute("UPDATE api__services SET score_qualite = 0.5") + op.alter_column("api__services", "score_qualite", nullable=False) + + +def downgrade() -> None: + op.drop_column("api__services", "score_qualite") diff --git a/api/src/data_inclusion/api/inclusion_data/models.py b/api/src/data_inclusion/api/inclusion_data/models.py index bf975841..894869c9 100644 --- a/api/src/data_inclusion/api/inclusion_data/models.py +++ b/api/src/data_inclusion/api/inclusion_data/models.py @@ -101,6 +101,7 @@ class Service(Base): zone_diffusion_code: Mapped[str | None] zone_diffusion_nom: Mapped[str | None] zone_diffusion_type: Mapped[str | None] + score_qualite: Mapped[float] commune_: Mapped[Commune] = relationship(back_populates="services") diff --git a/api/src/data_inclusion/api/inclusion_data/schemas.py b/api/src/data_inclusion/api/inclusion_data/schemas.py index 27eed59d..eaf8875e 100644 --- a/api/src/data_inclusion/api/inclusion_data/schemas.py +++ b/api/src/data_inclusion/api/inclusion_data/schemas.py @@ -1,4 +1,7 @@ -from pydantic import BaseModel, ConfigDict +from textwrap import dedent +from typing import Annotated + +from pydantic import BaseModel, ConfigDict, Field from data_inclusion import schema @@ -17,6 +20,17 @@ class Service(schema.Service): formulaire_en_ligne: str | None = None lien_source: str | None = None + score_qualite: Annotated[ + float, + Field( + ge=0, + le=1, + description=dedent("""\ + Score de qualité du service, défini et calculé par data·inclusion. + """), + ), + ] + class Structure(schema.Structure): model_config = ConfigDict(from_attributes=True, populate_by_name=True) diff --git a/api/tests/e2e/api/__snapshots__/test_inclusion_data.ambr b/api/tests/e2e/api/__snapshots__/test_inclusion_data.ambr index bb8b8269..bae34fb7 100644 --- a/api/tests/e2e/api/__snapshots__/test_inclusion_data.ambr +++ b/api/tests/e2e/api/__snapshots__/test_inclusion_data.ambr @@ -1876,6 +1876,13 @@ ], "title": "Modes Orientation Accompagnateur Autres" }, + "score_qualite": { + "type": "number", + "maximum": 1.0, + "minimum": 0.0, + "title": "Score Qualite", + "description": "Score de qualité du service, défini et calculé par data·inclusion.\n" + }, "structure": { "$ref": "#/components/schemas/Structure" } @@ -1886,6 +1893,7 @@ "structure_id", "source", "nom", + "score_qualite", "structure" ], "title": "DetailedService" @@ -2961,6 +2969,13 @@ } ], "title": "Modes Orientation Accompagnateur Autres" + }, + "score_qualite": { + "type": "number", + "maximum": 1.0, + "minimum": 0.0, + "title": "Score Qualite", + "description": "Score de qualité du service, défini et calculé par data·inclusion.\n" } }, "type": "object", @@ -2968,7 +2983,8 @@ "id", "structure_id", "source", - "nom" + "nom", + "score_qualite" ], "title": "Service" }, diff --git a/api/tests/e2e/api/test_inclusion_data.py b/api/tests/e2e/api/test_inclusion_data.py index 28ca6c2f..e070bdfc 100644 --- a/api/tests/e2e/api/test_inclusion_data.py +++ b/api/tests/e2e/api/test_inclusion_data.py @@ -289,6 +289,7 @@ def test_list_services_all(api_client): "prise_rdv": "https://teixeira.fr/", "profils": ["femmes"], "recurrence": None, + "score_qualite": 0.5, "source": "dora", "structure_id": "prince-point-monde", "telephone": "0102030405", diff --git a/api/tests/factories.py b/api/tests/factories.py index 2f5cf1bd..f0365f2f 100644 --- a/api/tests/factories.py +++ b/api/tests/factories.py @@ -164,3 +164,5 @@ class Meta: zone_diffusion_type = None zone_diffusion_code = None zone_diffusion_nom = None + + score_qualite = 0.5