diff --git a/api/requirements/dev-requirements.txt b/api/requirements/dev-requirements.txt index ae2c5c15..ccf511ff 100644 --- a/api/requirements/dev-requirements.txt +++ b/api/requirements/dev-requirements.txt @@ -14,8 +14,6 @@ argon2-cffi==23.1.0 # via minio argon2-cffi-bindings==21.2.0 # via argon2-cffi -brotli==1.1.0 - # via py7zr cachetools==5.5.0 # via tox certifi==2024.8.30 @@ -101,8 +99,6 @@ idna==3.8 # email-validator # httpx # requests -inflate64==1.0.0 - # via py7zr jinja2==3.1.4 # via # data-inclusion-api (setup.py) @@ -115,8 +111,6 @@ markupsafe==2.1.5 # mako minio==7.2.8 # via data-inclusion-api (setup.py) -multivolumefile==0.2.3 - # via py7zr nodeenv==1.9.1 # via pre-commit numpy==2.1.1 @@ -149,26 +143,18 @@ pluggy==1.5.0 # via tox pre-commit==3.8.0 # via data-inclusion-api (setup.py) -psutil==6.0.0 - # via py7zr psycopg2==2.9.9 # via data-inclusion-api (setup.py) -py7zr==0.22.0 - # via data-inclusion-api (setup.py) pyarrow==17.0.0 # via data-inclusion-api (setup.py) pyasn1==0.6.1 # via # python-jose # rsa -pybcj==1.0.2 - # via py7zr pycparser==2.22 # via cffi pycryptodome==3.20.0 # via minio -pycryptodomex==3.20.0 - # via py7zr pydantic==2.9.1 # via # data-inclusion-api (setup.py) @@ -190,8 +176,6 @@ pyinstrument==4.7.3 # via fastapi-debug-toolbar pyogrio==0.9.0 # via geopandas -pyppmd==1.1.0 - # via py7zr pyproj==3.6.1 # via geopandas pyproject-api==1.7.1 @@ -215,8 +199,6 @@ pyyaml==6.0.2 # via # pre-commit # uvicorn -pyzstd==0.16.1 - # via py7zr requests==2.32.3 # via data-inclusion-api (setup.py) rsa==4.9 @@ -246,8 +228,6 @@ sqlparse==0.5.1 # via fastapi-debug-toolbar starlette==0.38.5 # via fastapi -texttable==1.7.0 - # via py7zr tox==4.18.1 # via data-inclusion-api (setup.py) tqdm==4.66.5 diff --git a/api/requirements/requirements.txt b/api/requirements/requirements.txt index 7a2aaca9..43cf156c 100644 --- a/api/requirements/requirements.txt +++ b/api/requirements/requirements.txt @@ -13,8 +13,6 @@ argon2-cffi==23.1.0 # via minio argon2-cffi-bindings==21.2.0 # via argon2-cffi -brotli==1.1.0 - # via py7zr certifi==2024.8.30 # via # data-inclusion-api (setup.py) @@ -81,8 +79,6 @@ idna==3.8 # email-validator # httpx # requests -inflate64==1.0.0 - # via py7zr jinja2==3.1.4 # via data-inclusion-api (setup.py) mako==1.3.5 @@ -93,8 +89,6 @@ markupsafe==2.1.5 # mako minio==7.2.8 # via data-inclusion-api (setup.py) -multivolumefile==0.2.3 - # via py7zr numpy==2.1.1 # via # data-inclusion-api (setup.py) @@ -115,26 +109,18 @@ pandas==2.2.2 # via # data-inclusion-api (setup.py) # geopandas -psutil==6.0.0 - # via py7zr psycopg2==2.9.9 # via data-inclusion-api (setup.py) -py7zr==0.22.0 - # via data-inclusion-api (setup.py) pyarrow==17.0.0 # via data-inclusion-api (setup.py) pyasn1==0.6.1 # via # python-jose # rsa -pybcj==1.0.2 - # via py7zr pycparser==2.22 # via cffi pycryptodome==3.20.0 # via minio -pycryptodomex==3.20.0 - # via py7zr pydantic==2.9.1 # via # data-inclusion-api (setup.py) @@ -148,8 +134,6 @@ pydantic-settings==2.5.2 # via data-inclusion-api (setup.py) pyogrio==0.9.0 # via geopandas -pyppmd==1.1.0 - # via py7zr pyproj==3.6.1 # via geopandas python-dateutil==2.9.0.post0 @@ -169,8 +153,6 @@ pytz==2024.2 # pandas pyyaml==6.0.2 # via uvicorn -pyzstd==0.16.1 - # via py7zr requests==2.32.3 # via data-inclusion-api (setup.py) rsa==4.9 @@ -196,8 +178,6 @@ sqlalchemy==2.0.34 # geoalchemy2 starlette==0.38.5 # via fastapi -texttable==1.7.0 - # via py7zr tqdm==4.66.5 # via data-inclusion-api (setup.py) typing-extensions==4.12.2 diff --git a/api/requirements/test-requirements.txt b/api/requirements/test-requirements.txt index f6d43ea0..ef8e8174 100644 --- a/api/requirements/test-requirements.txt +++ b/api/requirements/test-requirements.txt @@ -14,8 +14,6 @@ argon2-cffi==23.1.0 # via minio argon2-cffi-bindings==21.2.0 # via argon2-cffi -brotli==1.1.0 - # via py7zr certifi==2024.8.30 # via # data-inclusion-api (setup.py) @@ -89,8 +87,6 @@ idna==3.8 # email-validator # httpx # requests -inflate64==1.0.0 - # via py7zr iniconfig==2.0.0 # via pytest jinja2==3.1.4 @@ -105,8 +101,6 @@ markupsafe==2.1.5 # mako minio==7.2.8 # via data-inclusion-api (setup.py) -multivolumefile==0.2.3 - # via py7zr numpy==2.1.1 # via # data-inclusion-api (setup.py) @@ -130,26 +124,18 @@ pandas==2.2.2 # geopandas pluggy==1.5.0 # via pytest -psutil==6.0.0 - # via py7zr psycopg2==2.9.9 # via data-inclusion-api (setup.py) -py7zr==0.22.0 - # via data-inclusion-api (setup.py) pyarrow==17.0.0 # via data-inclusion-api (setup.py) pyasn1==0.6.1 # via # python-jose # rsa -pybcj==1.0.2 - # via py7zr pycparser==2.22 # via cffi pycryptodome==3.20.0 # via minio -pycryptodomex==3.20.0 - # via py7zr pydantic==2.9.1 # via # data-inclusion-api (setup.py) @@ -171,8 +157,6 @@ pyinstrument==4.7.3 # via fastapi-debug-toolbar pyogrio==0.9.0 # via geopandas -pyppmd==1.1.0 - # via py7zr pyproj==3.6.1 # via geopandas pytest==8.3.3 @@ -200,8 +184,6 @@ pytz==2024.2 # pandas pyyaml==6.0.2 # via uvicorn -pyzstd==0.16.1 - # via py7zr requests==2.32.3 # via data-inclusion-api (setup.py) rsa==4.9 @@ -231,8 +213,6 @@ starlette==0.38.5 # via fastapi syrupy==4.7.1 # via data-inclusion-api (setup.py) -texttable==1.7.0 - # via py7zr tqdm==4.66.5 # via data-inclusion-api (setup.py) typing-extensions==4.12.2 diff --git a/api/setup.py b/api/setup.py index 0bb275f9..627f32de 100644 --- a/api/setup.py +++ b/api/setup.py @@ -27,7 +27,6 @@ "numpy", "pandas", "psycopg2", - "py7zr", "pyarrow", "pydantic[email]>=2.5.0", "pydantic-settings", diff --git a/api/src/alembic/env.py b/api/src/alembic/env.py index 825098d4..427cb41b 100644 --- a/api/src/alembic/env.py +++ b/api/src/alembic/env.py @@ -3,9 +3,9 @@ from alembic import context from sqlalchemy import engine_from_config, pool -from data_inclusion.api.code_officiel_geo import models as _ # noqa: F401 F811 from data_inclusion.api.config import settings from data_inclusion.api.core import db +from data_inclusion.api.decoupage_administratif import models as _ # noqa: F401 F811 from data_inclusion.api.inclusion_data import models as _ # noqa: F401 F811 from data_inclusion.api.request import models as _ # noqa: F401 F811 diff --git a/api/src/alembic/versions/20240527_130609_9f9a66546e3a_add_fk_structure_commune.py b/api/src/alembic/versions/20240527_130609_9f9a66546e3a_add_fk_structure_commune.py index 314cfb87..fc879363 100644 --- a/api/src/alembic/versions/20240527_130609_9f9a66546e3a_add_fk_structure_commune.py +++ b/api/src/alembic/versions/20240527_130609_9f9a66546e3a_add_fk_structure_commune.py @@ -6,13 +6,8 @@ """ -import sqlalchemy as sa from alembic import op -from data_inclusion.api.code_officiel_geo import constants -from data_inclusion.api.code_officiel_geo.models import Commune -from data_inclusion.api.inclusion_data.models import Service, Structure - # revision identifiers, used by Alembic. revision = "9f9a66546e3a" down_revision = "170af30febde" @@ -21,26 +16,6 @@ def upgrade() -> None: - conn = op.get_bind() - - # must clean up the data before adding the foreign key - for model in [Structure, Service]: - # remove district codes - for k, v in constants._DISTRICTS_BY_CITY.items(): - conn.execute( - sa.update(model) - .where(model.code_insee.startswith(v[0][:3])) - .values({model.code_insee: k}) - .returning(1) - ) - - # remove invalid codes - conn.execute( - sa.update(model) - .where(model.code_insee.not_in(sa.select(Commune.code))) - .values({model.code_insee: None}) - ) - op.create_foreign_key( op.f("fk_api__structures__code_insee__api__communes"), "api__structures", diff --git a/api/src/alembic/versions/20240830_175854_e3f3dfa4ad01_modified_api__communes.py b/api/src/alembic/versions/20240830_175854_e3f3dfa4ad01_modified_api__communes.py new file mode 100644 index 00000000..b20db25f --- /dev/null +++ b/api/src/alembic/versions/20240830_175854_e3f3dfa4ad01_modified_api__communes.py @@ -0,0 +1,78 @@ +"""Modified api__communes + +Revision ID: e3f3dfa4ad01 +Revises: 517603187775 +Create Date: 2024-08-30 17:58:54.747630 + +""" + +import geoalchemy2 +import sqlalchemy as sa +from alembic import op + +from data_inclusion.api.core.db import SortedTextArray + +# revision identifiers, used by Alembic. +revision = "e3f3dfa4ad01" +down_revision = "517603187775" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.alter_column( + "api__communes", "siren_epci", existing_type=sa.VARCHAR(), nullable=True + ) + op.add_column( + "api__communes", + sa.Column( + "codes_postaux", + SortedTextArray(sa.Text()), + nullable=True, + ), + ) + op.add_column( + "api__communes", + sa.Column( + "centre", + geoalchemy2.types.Geometry( + srid=4326, from_text="ST_GeomFromEWKT", name="geometry" + ), + nullable=True, + ), + ) + op.drop_index("ix_api__communes__geography", table_name="api__communes") + op.drop_column("api__communes", "geom") + + +def downgrade() -> None: + op.add_column( + "api__communes", + sa.Column( + "geom", + geoalchemy2.types.Geometry( + srid=4326, + spatial_index=False, + from_text="ST_GeomFromEWKT", + name="geometry", + _spatial_index_reflected=True, + ), + autoincrement=False, + nullable=True, + ), + ) + op.create_index( + "ix_api__communes__geography", + "api__communes", + [ + sa.text( + "(st_simplify(geom, 0.01::double precision)::geography(Geometry,4326))" + ) + ], + unique=False, + ) + op.drop_column("api__communes", "centre") + op.drop_column("api__communes", "codes_postaux") + op.alter_column( + "api__communes", "siren_epci", existing_type=sa.VARCHAR(), nullable=False + ) diff --git a/api/src/data_inclusion/api/cli.py b/api/src/data_inclusion/api/cli.py index da48e401..9e239d64 100644 --- a/api/src/data_inclusion/api/cli.py +++ b/api/src/data_inclusion/api/cli.py @@ -3,7 +3,7 @@ import click from data_inclusion.api import auth -from data_inclusion.api.code_officiel_geo.commands import import_admin_express +from data_inclusion.api.decoupage_administratif.commands import import_communes from data_inclusion.api.inclusion_data.commands import load_inclusion_data logger = logging.getLogger(__name__) @@ -34,17 +34,17 @@ def _generate_token_for_user( click.echo(auth.create_access_token(subject=email, admin=admin)) -@cli.command(name="import_admin_express") -def _import_admin_express(): - """Import the latest Admin Express COG database""" - click.echo(import_admin_express()) - - @cli.command(name="load_inclusion_data") def _load_inclusion_data(): """Load the latest inclusion data""" click.echo(load_inclusion_data()) +@cli.command(name="import_communes") +def _import_communes(): + """Import the communes from the Decoupage Administratif API""" + click.echo(import_communes()) + + if __name__ == "__main__": cli() diff --git a/api/src/data_inclusion/api/code_officiel_geo/commands.py b/api/src/data_inclusion/api/code_officiel_geo/commands.py deleted file mode 100644 index 09eb9f5c..00000000 --- a/api/src/data_inclusion/api/code_officiel_geo/commands.py +++ /dev/null @@ -1,99 +0,0 @@ -import logging -import pathlib -import tempfile - -import geopandas -import httpx -import py7zr -from sqlalchemy.dialects.postgresql import insert -from tqdm import tqdm - -from data_inclusion.api.code_officiel_geo import models -from data_inclusion.api.core import db - -logger = logging.getLogger(__name__) - -IGN_ADMIN_EXPRESS_FILE_URL = "http://files.opendatarchives.fr/professionnels.ign.fr/adminexpress/ADMIN-EXPRESS_3-1__SHP__FRA_WM_2022-09-20.7z" - - -# TODO(vmttn): use https://geo.api.gouv.fr/ - - -def download(url: str, output_path: pathlib.Path): - with httpx.stream("GET", url) as response: - total = int(response.headers["Content-Length"]) - - response.raise_for_status() - - with output_path.open("wb") as fp: - with tqdm( - total=total, unit_scale=True, unit_divisor=1024, unit="B" - ) as progress: - num_bytes_downloaded = response.num_bytes_downloaded - for chunck in response.iter_bytes(chunk_size=32768): - fp.write(chunck) - progress.update( - response.num_bytes_downloaded - num_bytes_downloaded - ) - num_bytes_downloaded = response.num_bytes_downloaded - - -def load_communes(file_path: pathlib.Path): - for i in tqdm(range(100)): - chunck_df = geopandas.read_file(file_path, rows=slice(1000 * i, 1000 * (i + 1))) - - if len(chunck_df) == 0: - break - - chunck_df = chunck_df.rename( - columns={ - "INSEE_COM": "code", - "NOM": "nom", - "INSEE_DEP": "departement", - "INSEE_REG": "region", - "SIREN_EPCI": "siren_epci", - } - ) - - chunck_df = chunck_df.rename_geometry("geom") - - column_names = ["code", "nom", "departement", "region", "siren_epci", "geom"] - chunck_df = chunck_df[column_names] - - chunck_df = chunck_df.to_wkt() - - # optimize future lookups - chunck_df = chunck_df.sort_values(by="code") - - commune_data_list = chunck_df.to_dict(orient="records") - - stmt = insert(models.Commune).values(commune_data_list) - - stmt = stmt.on_conflict_do_update( - index_elements=[models.Commune.code], - set_={col: getattr(stmt.excluded, col) for col in column_names}, - ) - - with db.SessionLocal() as session: - session.execute(stmt) - session.commit() - - -def import_admin_express(): - with tempfile.TemporaryDirectory() as tmp_dir_name: - tmp_dir_path = pathlib.Path(tmp_dir_name) - archive_path = tmp_dir_path / "ign_admin_express.7z" - extract_dir_path = tmp_dir_path - logger.info(f"Downloading to {archive_path}") - - download(IGN_ADMIN_EXPRESS_FILE_URL, archive_path) - - logger.info(f"Extracting to {extract_dir_path}") - - with py7zr.SevenZipFile(archive_path, "r") as archive: - archive.extractall(extract_dir_path) - - logger.info("Loading communes") - - communes_file_path = next(tmp_dir_path.glob("**/COMMUNE.shp")) - load_communes(communes_file_path) diff --git a/api/src/data_inclusion/api/code_officiel_geo/models.py b/api/src/data_inclusion/api/code_officiel_geo/models.py deleted file mode 100644 index 58154e46..00000000 --- a/api/src/data_inclusion/api/code_officiel_geo/models.py +++ /dev/null @@ -1,85 +0,0 @@ -import geoalchemy2 -import sqlalchemy as sqla -from sqlalchemy.orm import Mapped, mapped_column - -from data_inclusion.api.core.db import Base - -# all fields are nullable or have a default value. These models will only be used to -# query valid data coming from the data pipeline. - - -class Commune(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - departement: Mapped[str] - region: Mapped[str] - siren_epci: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__communes__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class EPCI(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - nature: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__epcis__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class Departement(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - insee_reg: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__departements__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" - - -class Region(Base): - code: Mapped[str] = mapped_column(primary_key=True) - nom: Mapped[str] - geom = mapped_column( - geoalchemy2.Geometry("Geometry", srid=4326, spatial_index=False) - ) - - __table_args__ = ( - sqla.Index( - "ix_api__regions__geography", - sqla.text("CAST(ST_Simplify(geom, 0.01) AS geography(geometry, 4326))"), - ), - ) - - def __repr__(self) -> str: - return f"" diff --git a/api/src/data_inclusion/api/code_officiel_geo/__init__.py b/api/src/data_inclusion/api/decoupage_administratif/__init__.py similarity index 100% rename from api/src/data_inclusion/api/code_officiel_geo/__init__.py rename to api/src/data_inclusion/api/decoupage_administratif/__init__.py diff --git a/api/src/data_inclusion/api/decoupage_administratif/commands.py b/api/src/data_inclusion/api/decoupage_administratif/commands.py new file mode 100644 index 00000000..5e968ad6 --- /dev/null +++ b/api/src/data_inclusion/api/decoupage_administratif/commands.py @@ -0,0 +1,80 @@ +import logging + +import numpy as np +import pandas as pd +from furl import furl +from sqlalchemy import types +from sqlalchemy.dialects.postgresql import insert + +from data_inclusion.api.core import db +from data_inclusion.api.inclusion_data import models + +logger = logging.getLogger(__name__) + + +def import_communes(): + query_params = { + "fields": ("nom,code,codesPostaux,codeDepartement,codeRegion,codeEpci,centre"), + "format": "json", + } + dtypes = { + "code": types.TEXT, + "codeEpci": types.TEXT, + "departement": types.TEXT, + "region": types.TEXT, + "centre": types.JSON, + "codesPostaux": types.ARRAY(types.TEXT), + } + + communes_url = ( + furl("https://geo.api.gouv.fr/communes").set(query_params=query_params).url + ) + communes_df = pd.read_json(communes_url, dtype=dtypes) + + districts_url = ( + furl("https://geo.api.gouv.fr/communes") + .set(query_params=query_params | {"type": "arrondissement-municipal"}) + .url + ) + districts_df = pd.read_json(districts_url, dtype=dtypes) + + df = pd.concat([communes_df, districts_df]) + + df = df.rename( + columns={ + "codesPostaux": "codes_postaux", + "codeDepartement": "departement", + "codeRegion": "region", + "codeEpci": "siren_epci", + } + ) + + def create_point(geom): + return f"POINT({geom['coordinates'][0]} {geom['coordinates'][1]})" + + df["centre"] = df["centre"].apply(create_point) + + df = df.replace({np.nan: None}) + + df = df.sort_values(by="code") + + commune_data_list = df.to_dict(orient="records") + stmt = insert(models.Commune).values(commune_data_list) + + column_names = [ + "code", + "nom", + "departement", + "region", + "siren_epci", + "centre", + "codes_postaux", + ] + stmt = stmt.on_conflict_do_update( + index_elements=[models.Commune.code], + set_={col: getattr(stmt.excluded, col) for col in column_names}, + ) + + with db.SessionLocal() as session: + session.execute(stmt) + session.commit() diff --git a/api/src/data_inclusion/api/code_officiel_geo/constants.py b/api/src/data_inclusion/api/decoupage_administratif/constants.py similarity index 84% rename from api/src/data_inclusion/api/code_officiel_geo/constants.py rename to api/src/data_inclusion/api/decoupage_administratif/constants.py index c50f2078..8bdbe8a1 100644 --- a/api/src/data_inclusion/api/code_officiel_geo/constants.py +++ b/api/src/data_inclusion/api/decoupage_administratif/constants.py @@ -162,70 +162,3 @@ class RegionEnum(Enum): "RegionCodeEnum", {member.name: member.value.code for member in RegionEnum}, ) - - -# based on -# https://github.com/gip-inclusion/dora-back/blob/main/dora/admin_express/utils.py - -_DISTRICTS_BY_CITY = { - # Paris - "75056": [ - "75101", - "75102", - "75103", - "75104", - "75105", - "75106", - "75107", - "75108", - "75109", - "75110", - "75111", - "75112", - "75113", - "75114", - "75115", - "75116", - "75117", - "75118", - "75119", - "75120", - ], - # Lyon - "69123": [ - "69381", - "69382", - "69383", - "69384", - "69385", - "69386", - "69387", - "69388", - "69389", - ], - # Marseille - "13055": [ - "13201", - "13202", - "13203", - "13204", - "13205", - "13206", - "13207", - "13208", - "13209", - "13210", - "13211", - "13212", - "13213", - "13214", - "13215", - "13216", - ], -} - -CODE_COMMUNE_BY_CODE_ARRONDISSEMENT = { - code_arrondissement: code_commune - for code_commune, codes_arrondissements in _DISTRICTS_BY_CITY.items() - for code_arrondissement in codes_arrondissements -} diff --git a/api/src/data_inclusion/api/decoupage_administratif/models.py b/api/src/data_inclusion/api/decoupage_administratif/models.py new file mode 100644 index 00000000..425dcd39 --- /dev/null +++ b/api/src/data_inclusion/api/decoupage_administratif/models.py @@ -0,0 +1,21 @@ +import geoalchemy2 +from sqlalchemy.orm import Mapped, mapped_column + +from data_inclusion.api.core.db import Base + + +class Commune(Base): + code: Mapped[str] = mapped_column(primary_key=True) + nom: Mapped[str] + departement: Mapped[str] + region: Mapped[str] + siren_epci: Mapped[str | None] + # FIXME(vperron) : This column should have an index (spatial_index=True) + # but let's do it in a future migration + centre = mapped_column(geoalchemy2.Geometry("Geometry", srid=4326)) + # FIXME(vperron) : This should not be nullable but at the time of migration + # it's necessary as the info is not there yet. Let's clean up later. + codes_postaux: Mapped[list[str] | None] + + def __repr__(self) -> str: + return f"" diff --git a/api/src/data_inclusion/api/code_officiel_geo/utils.py b/api/src/data_inclusion/api/decoupage_administratif/utils.py similarity index 91% rename from api/src/data_inclusion/api/code_officiel_geo/utils.py rename to api/src/data_inclusion/api/decoupage_administratif/utils.py index 64ac2ff0..eff0ffe2 100644 --- a/api/src/data_inclusion/api/code_officiel_geo/utils.py +++ b/api/src/data_inclusion/api/decoupage_administratif/utils.py @@ -1,4 +1,4 @@ -from data_inclusion.api.code_officiel_geo import constants +from data_inclusion.api.decoupage_administratif import constants def get_departement_by_code_or_slug( diff --git a/api/src/data_inclusion/api/inclusion_data/commands.py b/api/src/data_inclusion/api/inclusion_data/commands.py index 0e89635a..b0c047b0 100644 --- a/api/src/data_inclusion/api/inclusion_data/commands.py +++ b/api/src/data_inclusion/api/inclusion_data/commands.py @@ -11,7 +11,6 @@ from tqdm import tqdm from data_inclusion import schema -from data_inclusion.api.code_officiel_geo import constants from data_inclusion.api.config import settings from data_inclusion.api.core import db from data_inclusion.api.inclusion_data import models @@ -124,14 +123,6 @@ def load_inclusion_data(): structures_df = structures_df.replace({np.nan: None}) services_df = services_df.replace({np.nan: None}) - # TODO(vperron) : To remove when we handle the city districts - structures_df = structures_df.assign( - code_insee=structures_df.code_insee.apply(clean_up_code_insee), - ) - services_df = services_df.assign( - code_insee=services_df.code_insee.apply(clean_up_code_insee), - ) - structures_df = structures_df.drop(columns=["_di_geocodage_score"]) services_df = services_df.drop(columns=["_di_geocodage_score"]) @@ -196,10 +187,6 @@ def load_inclusion_data(): connection.execute(sqla.text("VACUUM ANALYZE api__services")) -def clean_up_code_insee(v) -> str | None: - return constants.CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get(v, v) - - def validate_data(model_schema, data): try: model_schema(**data) diff --git a/api/src/data_inclusion/api/inclusion_data/models.py b/api/src/data_inclusion/api/inclusion_data/models.py index 9d49fa36..bf975841 100644 --- a/api/src/data_inclusion/api/inclusion_data/models.py +++ b/api/src/data_inclusion/api/inclusion_data/models.py @@ -3,8 +3,8 @@ import sqlalchemy as sqla from sqlalchemy.orm import Mapped, mapped_column, relationship -from data_inclusion.api.code_officiel_geo.models import Commune from data_inclusion.api.core.db import Base +from data_inclusion.api.decoupage_administratif.models import Commune # all fields are nullable or have a default value. These models will only be used to # query valid data coming from the data pipeline. diff --git a/api/src/data_inclusion/api/inclusion_data/routes.py b/api/src/data_inclusion/api/inclusion_data/routes.py index d833944a..c97c27ad 100644 --- a/api/src/data_inclusion/api/inclusion_data/routes.py +++ b/api/src/data_inclusion/api/inclusion_data/routes.py @@ -6,20 +6,19 @@ from data_inclusion import schema as di_schema from data_inclusion.api import auth -from data_inclusion.api.code_officiel_geo.constants import ( - CODE_COMMUNE_BY_CODE_ARRONDISSEMENT, +from data_inclusion.api.config import settings +from data_inclusion.api.core import db +from data_inclusion.api.decoupage_administratif.constants import ( DepartementCodeEnum, DepartementSlugEnum, RegionCodeEnum, RegionSlugEnum, ) -from data_inclusion.api.code_officiel_geo.models import Commune -from data_inclusion.api.code_officiel_geo.utils import ( +from data_inclusion.api.decoupage_administratif.models import Commune +from data_inclusion.api.decoupage_administratif.utils import ( get_departement_by_code_or_slug, get_region_by_code_or_slug, ) -from data_inclusion.api.config import settings -from data_inclusion.api.core import db from data_inclusion.api.inclusion_data import schemas, services from data_inclusion.api.utils import pagination, soliguide @@ -412,9 +411,6 @@ def search_services_endpoint( commune_instance = None search_point = None if code_commune is not None: - code_commune = CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get( - code_commune, code_commune - ) commune_instance = db_session.get(Commune, code_commune) if commune_instance is None: raise fastapi.HTTPException( diff --git a/api/src/data_inclusion/api/inclusion_data/services.py b/api/src/data_inclusion/api/inclusion_data/services.py index fa5419d4..2c91b660 100644 --- a/api/src/data_inclusion/api/inclusion_data/services.py +++ b/api/src/data_inclusion/api/inclusion_data/services.py @@ -15,12 +15,11 @@ from fastapi_pagination.ext.sqlalchemy import paginate from data_inclusion import schema as di_schema -from data_inclusion.api.code_officiel_geo.constants import ( - CODE_COMMUNE_BY_CODE_ARRONDISSEMENT, +from data_inclusion.api.decoupage_administratif.constants import ( Departement, Region, ) -from data_inclusion.api.code_officiel_geo.models import Commune +from data_inclusion.api.decoupage_administratif.models import Commune from data_inclusion.api.inclusion_data import models logger = logging.getLogger(__name__) @@ -198,9 +197,6 @@ def list_structures( query = query.filter_by(id=id_) if commune_code is not None: - commune_code = CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get( - commune_code, commune_code - ) query = query.filter(models.Structure.code_insee == commune_code) if departement is not None: @@ -328,10 +324,6 @@ def list_services( query = query.filter(Commune.region == region.code) if code_commune is not None: - code_commune = CODE_COMMUNE_BY_CODE_ARRONDISSEMENT.get( - code_commune, code_commune - ) - query = query.filter(models.Service.code_insee == code_commune) query = filter_services( @@ -415,16 +407,7 @@ def search_services( if search_point is not None: dest_geometry = search_point else: - dest_geometry = ( - sqla.select( - sqla.cast( - geoalchemy2.functions.ST_Simplify(Commune.geom, 0.01), - geoalchemy2.Geography(geometry_type="GEOMETRY", srid=4326), - ) - ) - .filter(Commune.code == commune_instance.code) - .scalar_subquery() - ) + dest_geometry = commune_instance.centre query = query.filter( sqla.or_( @@ -432,7 +415,7 @@ def search_services( geoalchemy2.functions.ST_DWithin( src_geometry, dest_geometry, - 50_000, # meters or 50km + 50_000, # meters ), # or `a-distance` models.Service.modes_accueil.contains( @@ -454,8 +437,8 @@ def search_services( src_geometry, dest_geometry, ) - / 1000 - ).cast(sqla.Integer), # conversion to kms + / 1000 # conversion to kms + ).cast(sqla.Integer), ), else_=sqla.null().cast(sqla.Integer), ) diff --git a/api/tests/communes.parquet.gzip b/api/tests/communes.parquet.gzip index 1df74eb0..24c86ead 100644 Binary files a/api/tests/communes.parquet.gzip and b/api/tests/communes.parquet.gzip differ diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 3201d2ad..6ef12bed 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -10,9 +10,9 @@ from fastapi.testclient import TestClient from data_inclusion.api.app import create_app -from data_inclusion.api.code_officiel_geo import models from data_inclusion.api.config import settings from data_inclusion.api.core import db +from data_inclusion.api.decoupage_administratif.models import Commune from . import factories @@ -121,7 +121,7 @@ def communes(db_connection): df = df.to_wkt() commune_data_list = df.to_dict(orient="records") - db_connection.execute(sqla.insert(models.Commune).values(commune_data_list)) + db_connection.execute(sqla.insert(Commune).values(commune_data_list)) db_connection.commit() diff --git a/api/tests/e2e/api/test_inclusion_data.py b/api/tests/e2e/api/test_inclusion_data.py index eb83ac02..28ca6c2f 100644 --- a/api/tests/e2e/api/test_inclusion_data.py +++ b/api/tests/e2e/api/test_inclusion_data.py @@ -5,7 +5,7 @@ import pytest from data_inclusion import schema -from data_inclusion.api.code_officiel_geo.constants import RegionEnum +from data_inclusion.api.decoupage_administratif.constants import RegionEnum from data_inclusion.api.utils import soliguide from ... import factories @@ -15,6 +15,7 @@ LILLE = {"code_insee": "59350", "latitude": 50.633333, "longitude": 3.066667} MAUBEUGE = {"code_insee": "59392"} PARIS = {"code_insee": "75056", "latitude": 48.866667, "longitude": 2.333333} +PARIS_11 = {"code_insee": "75111", "latitude": 48.86010, "longitude": 2.38160} ROUBAIX = {"code_insee": "59512"} @@ -614,7 +615,8 @@ def test_can_filter_resources_by_slug_region(api_client, url, factory): (None, DUNKERQUE["code_insee"], False), (DUNKERQUE["code_insee"], DUNKERQUE["code_insee"], True), (DUNKERQUE["code_insee"], "62041", False), - (PARIS["code_insee"], "75101", True), + (PARIS["code_insee"], "75056", True), + (PARIS_11["code_insee"], "75111", True), ], ) def test_can_filter_resources_by_code_commune( @@ -837,8 +839,8 @@ def test_can_filter_resources_by_sources(api_client, url, factory): (None, DUNKERQUE["code_insee"], False), (DUNKERQUE, DUNKERQUE["code_insee"], True), (DUNKERQUE, MAUBEUGE["code_insee"], False), - (PARIS, "75101", True), - pytest.param(PARIS, PARIS["code_insee"], True, marks=pytest.mark.xfail), + (PARIS, "75056", True), + (PARIS_11, "75111", True), ], ) @pytest.mark.with_token @@ -1224,7 +1226,7 @@ def test_search_services_with_code_commune_sample_distance(api_client): resp_data = response.json() assert_paginated_response_data(resp_data, total=1) assert resp_data["items"][0]["service"]["id"] == service_1.id - assert resp_data["items"][0]["distance"] == 35 + assert resp_data["items"][0]["distance"] == 39 @pytest.mark.with_token diff --git a/pipeline/dags/import_admin_express.py b/pipeline/dags/import_admin_express.py deleted file mode 100644 index 1cea2eb4..00000000 --- a/pipeline/dags/import_admin_express.py +++ /dev/null @@ -1,318 +0,0 @@ -import logging - -import pendulum - -import airflow -from airflow.operators import bash, empty, python - -from dag_utils import date -from dag_utils.virtualenvs import PIPX_PYTHON_BIN_PATH, PYTHON_BIN_PATH - -logger = logging.getLogger(__name__) - -default_args = {} - - -def _download_dataset(): - import requests - - from airflow.models import Variable - - with requests.get( - Variable.get("IGN_ADMIN_EXPRESS_FILE_URL"), stream=True - ) as response: - response.raise_for_status() - with open("/tmp/ign_admin_express.7z", "wb") as fp: - for chunck in response.iter_content(chunk_size=32768): - fp.write(chunck) - - -def _load_communes(): - import pathlib - import textwrap - - import geopandas - - from airflow.providers.postgres.hooks import postgres - - pg_hook = postgres.PostgresHook(postgres_conn_id="pg") - engine = pg_hook.get_sqlalchemy_engine() - - tmp_dir_path = ( - pathlib.Path("/tmp") - / "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19" - / "ADMIN-EXPRESS-COG" - / "1_DONNEES_LIVRAISON_2021-05-19" - / "ADECOG_3-0_SHP_WGS84G_FRA" - ) - - tmp_file_path = tmp_dir_path / "COMMUNE.shp" - - target_table = "admin_express_communes" - - with engine.connect() as conn: - with conn.begin(): - for i in range(100): - chunck_df = geopandas.read_file( - tmp_file_path, rows=slice(1000 * i, 1000 * (i + 1)) - ) - - if len(chunck_df) == 0: - break - - chunck_df = chunck_df.rename( - columns={ - "INSEE_COM": "code", - "NOM": "nom", - "INSEE_DEP": "departement", - "INSEE_REG": "region", - "SIREN_EPCI": "siren_epci", - } - ) - chunck_df = chunck_df.rename_geometry("geom") - chunck_df = chunck_df[ - ["code", "nom", "departement", "region", "siren_epci", "geom"] - ] - - # optimize future lookups - chunck_df = chunck_df.sort_values(by="code") - - chunck_df.to_postgis( - target_table, - con=conn, - if_exists="replace" if i == 0 else "append", - index=False, - ) - - conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);") - # create an index on simplified geography elements, for fast radius search - conn.execute( - textwrap.dedent( - f""" - CREATE INDEX idx_admin_express_communes_simple_geog - ON {target_table} - USING GIST( - CAST( - ST_Simplify(geom, 0.01) AS geography(geometry, 4326) - ) - ); - """ - ) - ) - - -def _load_epcis(): - import pathlib - import textwrap - - import geopandas - - from airflow.providers.postgres.hooks import postgres - - pg_hook = postgres.PostgresHook(postgres_conn_id="pg") - tmp_dir_path = ( - pathlib.Path("/tmp") - / "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19" - / "ADMIN-EXPRESS-COG" - / "1_DONNEES_LIVRAISON_2021-05-19" - / "ADECOG_3-0_SHP_WGS84G_FRA" - ) - - tmp_file_path = tmp_dir_path / "EPCI.shp" - df = geopandas.read_file(tmp_file_path) - df = df.rename( - columns={ - "CODE_SIREN": "code", - "NOM": "nom", - "NATURE": "nature", - } - ) - df = df.rename_geometry("geom") - df = df[["code", "nom", "nature", "geom"]] - df = df.sort_values(by="code") # optimize future lookups - - engine = pg_hook.get_sqlalchemy_engine() - target_table = "admin_express_epcis" - - with engine.connect() as conn: - with conn.begin(): - df.to_postgis(target_table, con=conn, if_exists="replace", index=False) - conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);") - # create an index on simplified geography elements, for fast radius search - conn.execute( - textwrap.dedent( - f""" - CREATE INDEX idx_admin_express_epcis_simple_geog - ON {target_table} - USING GIST( - CAST( - ST_Simplify(geom, 0.01) AS geography(geometry, 4326) - ) - ); - """ - ) - ) - - -def _load_departements(): - import pathlib - import textwrap - - import geopandas - - from airflow.providers.postgres.hooks import postgres - - pg_hook = postgres.PostgresHook(postgres_conn_id="pg") - tmp_dir_path = ( - pathlib.Path("/tmp") - / "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19" - / "ADMIN-EXPRESS-COG" - / "1_DONNEES_LIVRAISON_2021-05-19" - / "ADECOG_3-0_SHP_WGS84G_FRA" - ) - - tmp_file_path = tmp_dir_path / "DEPARTEMENT.shp" - df = geopandas.read_file(tmp_file_path) - df = df.rename( - columns={ - "INSEE_DEP": "code", - "NOM": "nom", - "INSEE_REG": "insee_reg", - } - ) - df = df.rename_geometry("geom") - df = df[["code", "nom", "insee_reg", "geom"]] - df = df.sort_values(by="code") # optimize future lookups - - engine = pg_hook.get_sqlalchemy_engine() - target_table = "admin_express_departements" - - with engine.connect() as conn: - with conn.begin(): - df.to_postgis(target_table, con=conn, if_exists="replace", index=False) - conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);") - # create an index on simplified geography elements, for fast radius search - conn.execute( - textwrap.dedent( - f""" - CREATE INDEX idx_admin_express_departements_simple_geog - ON {target_table} - USING GIST( - CAST( - ST_Simplify(geom, 0.01) AS geography(geometry, 4326) - ) - ); - """ - ) - ) - - -def _load_regions(): - import pathlib - import textwrap - - import geopandas - - from airflow.providers.postgres.hooks import postgres - - pg_hook = postgres.PostgresHook(postgres_conn_id="pg") - tmp_dir_path = ( - pathlib.Path("/tmp") - / "ADMIN-EXPRESS-COG_3-0__SHP__FRA_2021-05-19" - / "ADMIN-EXPRESS-COG" - / "1_DONNEES_LIVRAISON_2021-05-19" - / "ADECOG_3-0_SHP_WGS84G_FRA" - ) - - tmp_file_path = tmp_dir_path / "REGION.shp" - df = geopandas.read_file(tmp_file_path) - df = df.rename( - columns={ - "INSEE_REG": "code", - "NOM": "nom", - } - ) - df = df.rename_geometry("geom") - df = df[["code", "nom", "geom"]] - df = df.sort_values(by="code") # optimize future lookups - - engine = pg_hook.get_sqlalchemy_engine() - target_table = "admin_express_regions" - - with engine.connect() as conn: - with conn.begin(): - df.to_postgis(target_table, con=conn, if_exists="replace", index=False) - conn.execute(f"ALTER TABLE {target_table} ADD PRIMARY KEY (code);") - # create an index on simplified geography elements, for fast radius search - conn.execute( - textwrap.dedent( - f""" - CREATE INDEX idx_admin_express_regions_simple_geog - ON {target_table} - USING GIST( - CAST( - ST_Simplify(geom, 0.01) AS geography(geometry, 4326) - ) - ); - """ - ) - ) - - -with airflow.DAG( - dag_id="import_admin_express", - start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=default_args, - schedule="@once", - catchup=False, -) as dag: - start = empty.EmptyOperator(task_id="start") - end = empty.EmptyOperator(task_id="end") - - download_dataset = python.ExternalPythonOperator( - task_id="download", - python=str(PYTHON_BIN_PATH), - python_callable=_download_dataset, - ) - - extract_dataset = bash.BashOperator( - task_id="extract", - bash_command=( - f"{PIPX_PYTHON_BIN_PATH.parent / 'pipx'} " - "run py7zr x /tmp/ign_admin_express.7z" - ), - append_env=True, - cwd="/tmp", - ) - - load_communes = python.ExternalPythonOperator( - task_id="load_communes", - python=str(PYTHON_BIN_PATH), - python_callable=_load_communes, - ) - - load_epcis = python.ExternalPythonOperator( - task_id="load_epcis", - python=str(PYTHON_BIN_PATH), - python_callable=_load_epcis, - ) - - load_departements = python.ExternalPythonOperator( - task_id="load_departements", - python=str(PYTHON_BIN_PATH), - python_callable=_load_departements, - ) - - load_regions = python.ExternalPythonOperator( - task_id="load_regions", - python=str(PYTHON_BIN_PATH), - python_callable=_load_regions, - ) - - ( - start - >> download_dataset - >> extract_dataset - >> [load_communes, load_epcis, load_departements, load_regions] - >> end - ) diff --git a/pipeline/dags/import_decoupage_administratif.py b/pipeline/dags/import_decoupage_administratif.py new file mode 100755 index 00000000..6c1797b8 --- /dev/null +++ b/pipeline/dags/import_decoupage_administratif.py @@ -0,0 +1,109 @@ +import pendulum + +from airflow.decorators import dag, task +from airflow.operators import empty + +from dag_utils import date, dbt, notifications +from dag_utils.virtualenvs import PYTHON_BIN_PATH + + +@task.external_python( + python=str(PYTHON_BIN_PATH), + retries=2, +) +def extract_and_load(): + import pandas as pd + import sqlalchemy as sqla + from furl import furl + + from dag_utils import pg + + base_url = furl("https://geo.api.gouv.fr") + # the default zone parameter is inconsistent between resources + # so we explicitely set it for all resources + base_url.set({"zone": ",".join(["metro", "drom", "com"])}) + URL_BY_RESOURCE = { + "regions": base_url / "regions", + "departements": base_url / "departements", + "epcis": base_url / "epcis", + "communes": (base_url / "communes").set( + { + # explicitely list retrieve fields + # to include the "center" field + "fields": ",".join( + [ + "nom", + "code", + "centre", + "codesPostaux", + "codeEpci", + "codeDepartement", + "codeRegion", + ] + ) + } + ), + } + + # arrondissements do not have a dedicated endpoint + # they are retrieved using an opt-in parameter + # on the communes endpoint + URL_BY_RESOURCE["arrondissements"] = ( + URL_BY_RESOURCE["communes"].copy().add({"type": "arrondissement-municipal"}) + ) + + schema = "decoupage_administratif" + pg.create_schema(schema) + + for resource, url in URL_BY_RESOURCE.items(): + print(f"Fetching resource={resource} from url={url}") + df = pd.read_json(str(url), dtype=False) + + fq_table_name = f"{schema}.{resource}" + print(f"Loading to {fq_table_name}") + with pg.connect_begin() as conn: + df.to_sql( + f"{resource}_tmp", + con=conn, + schema=schema, + if_exists="replace", + index=False, + dtype={ + "centre": sqla.JSON, + "codesPostaux": sqla.ARRAY(sqla.TEXT), + } + if resource in ["communes", "arrondissements"] + else None, + ) + conn.execute( + f"""\ + CREATE TABLE IF NOT EXISTS {fq_table_name} + (LIKE {fq_table_name}_tmp); + TRUNCATE {fq_table_name}; + INSERT INTO {fq_table_name} + (SELECT * FROM {fq_table_name}_tmp); + DROP TABLE {fq_table_name}_tmp; + """ + ) + + +@dag( + start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), + default_args=notifications.notify_failure_args(), + schedule="@monthly", + catchup=False, +) +def import_decoupage_administratif(): + start = empty.EmptyOperator(task_id="start") + end = empty.EmptyOperator(task_id="end") + + dbt_build_staging = dbt.dbt_operator_factory( + task_id="dbt_build_staging", + command="build", + select="path:models/staging/decoupage_administratif", + ) + + start >> extract_and_load() >> dbt_build_staging >> end + + +import_decoupage_administratif() diff --git a/pipeline/dags/import_insee_code_officiel_geographique.py b/pipeline/dags/import_insee_code_officiel_geographique.py deleted file mode 100644 index 0c619812..00000000 --- a/pipeline/dags/import_insee_code_officiel_geographique.py +++ /dev/null @@ -1,77 +0,0 @@ -import logging - -import pendulum - -import airflow -from airflow.operators import empty, python - -from dag_utils import date, dbt -from dag_utils.virtualenvs import PYTHON_BIN_PATH - -logger = logging.getLogger(__name__) - -default_args = {} - - -def _import_dataset_ressource(): - from urllib.parse import urljoin - - import pandas as pd - - from airflow.models import Variable - - from dag_utils import pg - - base_url = Variable.get("INSEE_COG_DATASET_URL") - - pg.create_schema("insee") - - for resource in ["region", "departement", "commune"]: - schema, table_name = "insee", f"{resource}s" - url = urljoin(base_url, f"v_{resource}_2024.csv") - - print(f"Extracting {url}...") - df = pd.read_csv(url, sep=",", dtype=str) - - print(f"Loading to {schema}.{table_name}") - with pg.connect_begin() as conn: - df.to_sql( - schema=schema, - name=f"{table_name}_tmp", - con=conn, - if_exists="replace", - index=False, - ) - - conn.execute( - f"""\ - CREATE TABLE IF NOT EXISTS {schema}.{table_name} (LIKE {schema}.{table_name}_tmp); - TRUNCATE {schema}.{table_name}; - INSERT INTO {schema}.{table_name} SELECT * FROM {schema}.{table_name}_tmp; - DROP TABLE {schema}.{table_name}_tmp;""" # noqa: E501 - ) - - -with airflow.DAG( - dag_id="import_insee_code_officiel_geographique", - start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=default_args, - schedule="@once", - catchup=False, -) as dag: - start = empty.EmptyOperator(task_id="start") - end = empty.EmptyOperator(task_id="end") - - import_insee_dataset = python.ExternalPythonOperator( - task_id="import_insee_dataset", - python=str(PYTHON_BIN_PATH), - python_callable=_import_dataset_ressource, - ) - - dbt_build_staging = dbt.dbt_operator_factory( - task_id="dbt_build_staging", - command="build", - select="path:models/staging/code_officiel_geographique", - ) - - start >> import_insee_dataset >> dbt_build_staging >> end diff --git a/pipeline/dbt/models/_sources.yml b/pipeline/dbt/models/_sources.yml index 89fa5ba7..1ffa198e 100644 --- a/pipeline/dbt/models/_sources.yml +++ b/pipeline/dbt/models/_sources.yml @@ -23,12 +23,9 @@ sources: - name: insee schema: insee tables: - - name: regions - - name: departements - - name: communes + - name: etat_civil_prenoms - name: sirene_etablissement_historique - name: sirene_etablissement_succession - - name: etat_civil_prenoms - name: dora schema: dora @@ -210,3 +207,12 @@ sources: - name: services meta: kind: service + + - name: decoupage_administratif + schema: decoupage_administratif + tables: + - name: regions + - name: departements + - name: epcis + - name: communes + - name: arrondissements diff --git a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql index 4b295bc7..a02857e2 100644 --- a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql @@ -11,7 +11,7 @@ adresses AS ( ), departements AS ( - SELECT * FROM {{ source('insee', 'departements') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }} ), -- TODO: Refactoring needed to be able to do geocoding per source and then use the result in the mapping @@ -25,8 +25,8 @@ services_with_zone_diffusion AS ( END AS "zone_diffusion_code", CASE WHEN services.source = ANY(ARRAY['monenfant', 'soliguide']) THEN adresses.commune - WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN (SELECT departements."LIBELLE" FROM departements WHERE departements."DEP" = LEFT(adresses.code_insee, 2)) - WHEN services.source = 'mediation-numerique' THEN (SELECT departements."LIBELLE" FROM departements WHERE departements."DEP" = services.zone_diffusion_code) + WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN (SELECT departements."nom" FROM departements WHERE departements."code" = LEFT(adresses.code_insee, 2)) + WHEN services.source = 'mediation-numerique' THEN (SELECT departements."nom" FROM departements WHERE departements."code" = services.zone_diffusion_code) ELSE services.zone_diffusion_nom END AS "zone_diffusion_nom" FROM diff --git a/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__adresses.sql b/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__adresses.sql index 44b6463f..84db300b 100644 --- a/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__adresses.sql +++ b/pipeline/dbt/models/intermediate/sources/france_travail/int_france_travail__adresses.sql @@ -3,30 +3,22 @@ WITH agences AS ( ), communes AS ( - SELECT * FROM {{ source('insee', 'communes') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__communes') }} ), --- exclude communes déléguées (duplicated codes) ; should maybe be a permanent INSEE view ? -filtered_communes AS ( - SELECT * - FROM communes - WHERE "TYPECOM" != 'COMD' -), - - final AS ( SELECT - agences.id AS "id", - agences.longitude AS "longitude", - agences.latitude AS "latitude", - agences._di_source_id AS "source", - agences.complement_adresse AS "complement_adresse", - agences.adresse AS "adresse", - agences.code_postal AS "code_postal", - agences.code_insee AS "code_insee", - filtered_communes."LIBELLE" AS "commune" + agences.id AS "id", + agences.longitude AS "longitude", + agences.latitude AS "latitude", + agences._di_source_id AS "source", + agences.complement_adresse AS "complement_adresse", + agences.adresse AS "adresse", + agences.code_postal AS "code_postal", + agences.code_insee AS "code_insee", + communes.nom AS "commune" FROM agences - LEFT JOIN filtered_communes ON agences.code_insee = filtered_communes."COM" + LEFT JOIN communes ON agences.code_insee = communes.code ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/code_officiel_geographique/_code_officiel_geographique__models.yml b/pipeline/dbt/models/staging/code_officiel_geographique/_code_officiel_geographique__models.yml deleted file mode 100644 index f986599b..00000000 --- a/pipeline/dbt/models/staging/code_officiel_geographique/_code_officiel_geographique__models.yml +++ /dev/null @@ -1,50 +0,0 @@ -version: 2 - -models: - - name: stg_code_officiel_geographique__communes - columns: - - name: type_commune - data_tests: - - not_null - - dbt_utils.not_constant - - accepted_values: - values: ['commune', 'commune-associee', 'arrondissement-municipal'] - - name: code - data_tests: - - unique - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string - - name: libelle - data_tests: - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string - - - name: stg_code_officiel_geographique__departements - columns: - - name: code - data_tests: - - unique - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string - - name: libelle - data_tests: - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string - - - name: stg_code_officiel_geographique__regions - columns: - - name: code - data_tests: - - unique - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string - - name: libelle - data_tests: - - not_null - - dbt_utils.not_constant - - dbt_utils.not_empty_string diff --git a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__communes.sql b/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__communes.sql deleted file mode 100644 index 2c447d49..00000000 --- a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__communes.sql +++ /dev/null @@ -1,27 +0,0 @@ -WITH source AS ( - SELECT * FROM {{ source('insee', 'communes') }} -), - -communes AS ( - SELECT - "COM" AS "code", - "LIBELLE" AS "libelle", - CASE "TYPECOM" - WHEN 'COM' THEN 'commune' - WHEN 'COMA' THEN 'commune-associee' - WHEN 'COMD' THEN 'commune-deleguee' - WHEN 'ARM' THEN 'arrondissement-municipal' - END AS "type_commune" - FROM source -), - --- drop communes deleguees --- they use the same code as their parent commune --- this creates duplicates -final AS ( - SELECT * - FROM communes - WHERE type_commune != 'commune-deleguee' -) - -SELECT * FROM final diff --git a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__departements.sql b/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__departements.sql deleted file mode 100644 index dd9e4770..00000000 --- a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__departements.sql +++ /dev/null @@ -1,12 +0,0 @@ -WITH source AS ( - SELECT * FROM {{ source('insee', 'departements') }} -), - -final AS ( - SELECT - "DEP" AS "code", - "LIBELLE" AS "libelle" - FROM source -) - -SELECT * FROM final diff --git a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__regions.sql b/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__regions.sql deleted file mode 100644 index 500c8b47..00000000 --- a/pipeline/dbt/models/staging/code_officiel_geographique/stg_code_officiel_geographique__regions.sql +++ /dev/null @@ -1,12 +0,0 @@ -WITH source AS ( - SELECT * FROM {{ source('insee', 'regions') }} -), - -final AS ( - SELECT - "REG" AS "code", - "LIBELLE" AS "libelle" - FROM source -) - -SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml new file mode 100644 index 00000000..664acfcf --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml @@ -0,0 +1,73 @@ +version: 2 + +models: + - name: stg_decoupage_administratif__regions + columns: + - name: code + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: nom + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + + - name: stg_decoupage_administratif__departements + columns: + - name: code + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: nom + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: code_region + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + + - name: stg_decoupage_administratif__communes + columns: + - name: code + data_tests: + - unique + - not_null + - name: nom + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: code_departement + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - relationships: + to: ref('stg_decoupage_administratif__departements') + field: code + - name: code_region + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - relationships: + to: ref('stg_decoupage_administratif__regions') + field: code + - name: code_epci + data_tests: + # NOTE(vperron): the test is against a source, since the EPCIs are + # very rarely used yet and it does not seem worth it to create a + # staging view for them yet. + - relationships: + to: source('decoupage_administratif', 'epcis') + field: code + - name: centre + data_tests: + - not_null + - dbt_utils.not_constant diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql new file mode 100644 index 00000000..fa385dac --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__communes.sql @@ -0,0 +1,40 @@ +WITH source_communes AS ( + {{ stg_source_header('decoupage_administratif', 'communes') }} +), + +source_arrondissements AS ( + {{ stg_source_header('decoupage_administratif', 'arrondissements') }} +), + +communes AS ( + SELECT + source_communes.code AS "code", + source_communes.nom AS "nom", + source_communes."codeRegion" AS "code_region", + source_communes."codeDepartement" AS "code_departement", + source_communes."codeEpci" AS "code_epci", + ST_GEOMFROMGEOJSON(source_communes.centre) AS "centre", + source_communes."codesPostaux" AS "codes_postaux" + FROM source_communes +), + +arrondissements AS ( + SELECT + source_arrondissements.code AS "code", + source_arrondissements.nom AS "nom", + source_arrondissements."codeRegion" AS "code_region", + source_arrondissements."codeDepartement" AS "code_departement", + NULL AS "code_epci", + ST_GEOMFROMGEOJSON(source_arrondissements.centre) AS "centre", + source_arrondissements."codesPostaux" AS "codes_postaux" + FROM source_arrondissements +), + +final AS ( + SELECT * FROM communes + UNION ALL + SELECT * FROM arrondissements + ORDER BY code +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql new file mode 100644 index 00000000..0c59e169 --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__departements.sql @@ -0,0 +1,14 @@ +WITH source AS ( + {{ stg_source_header('decoupage_administratif', 'departements') }} +), + +final AS ( + SELECT + code AS "code", + nom AS "nom", + "codeRegion" AS "code_region" + FROM source + ORDER BY code +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql new file mode 100644 index 00000000..95c6ae3d --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__regions.sql @@ -0,0 +1,13 @@ +WITH source AS ( + {{ stg_source_header('decoupage_administratif', 'regions') }} +), + +final AS ( + SELECT + code AS "code", + nom AS "nom" + FROM source + ORDER BY code +) + +SELECT * FROM final diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql index d10ab56f..08502540 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__communes.sql @@ -2,7 +2,7 @@ {% set table_exists = adapter.get_relation(database=source_model.database, schema=source_model.schema, identifier=source_model.name) is not none %} --- depends_on: {{ ref('stg_code_officiel_geographique__communes') }} +-- depends_on: {{ ref('stg_decoupage_administratif__communes') }} {% if table_exists %} @@ -11,7 +11,7 @@ ), communes AS ( - SELECT * FROM {{ ref('stg_code_officiel_geographique__communes') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__communes') }} ), final AS ( @@ -20,7 +20,7 @@ source."ID_COM" AS "id_com", source."ID_RES" AS "id_res", source."CODE_COMMUNE_COM" AS "code_commune_com", - communes.libelle AS "libelle" + communes.nom AS "libelle" FROM source LEFT JOIN communes ON source."CODE_COMMUNE_COM" = communes.code ) diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql index f3138c02..a239b15d 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__departements.sql @@ -2,7 +2,7 @@ {% set table_exists = adapter.get_relation(database=source_model.database, schema=source_model.schema, identifier=source_model.name) is not none %} --- depends_on: {{ ref('stg_code_officiel_geographique__departements') }} +-- depends_on: {{ ref('stg_decoupage_administratif__departements') }} {% if table_exists %} @@ -11,7 +11,7 @@ ), departements AS ( - SELECT * FROM {{ ref('stg_code_officiel_geographique__departements') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }} ), final AS ( @@ -20,7 +20,7 @@ source."ID_DPT" AS "id_dpt", source."ID_RES" AS "id_res", source."CODE_DEPARTEMENT_DPT" AS "code_departement_dpt", - departements.libelle AS "libelle" + departements.nom AS "libelle" FROM source LEFT JOIN departements ON source."CODE_DEPARTEMENT_DPT" = departements.code ) diff --git a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql index b400f946..4ac0bda1 100644 --- a/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql +++ b/pipeline/dbt/models/staging/sources/odspep/stg_odspep__regions.sql @@ -10,7 +10,7 @@ ) is not none) -%} --- depends_on: {{ ref('stg_code_officiel_geographique__regions') }} +-- depends_on: {{ ref('stg_decoupage_administratif__regions') }} {% if tables_exist %} @@ -23,7 +23,7 @@ ), regions AS ( - SELECT * FROM {{ ref('stg_code_officiel_geographique__regions') }} + SELECT * FROM {{ ref('stg_decoupage_administratif__regions') }} ), final AS ( @@ -32,12 +32,10 @@ source."ID_REG" AS "id_reg", source."ID_RES" AS "id_res", source."CODE_REGION_REG" AS "code_region_reg", - 'Région' AS "zone_diffusion_type", - regions.libelle AS "libelle" - + 'Région' AS "zone_diffusion_type", + regions.nom AS "libelle" FROM source LEFT JOIN regions ON source."CODE_REGION_REG" = regions.code - ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/sources/soliguide/_soliguide__models.yml b/pipeline/dbt/models/staging/sources/soliguide/_soliguide__models.yml index 142f2650..35876bd9 100644 --- a/pipeline/dbt/models/staging/sources/soliguide/_soliguide__models.yml +++ b/pipeline/dbt/models/staging/sources/soliguide/_soliguide__models.yml @@ -54,7 +54,7 @@ models: - not_null - dbt_utils.not_empty_string - relationships: - to: ref('stg_code_officiel_geographique__communes') + to: ref('stg_decoupage_administratif__communes') field: code config: severity: warn diff --git a/pipeline/defaults.env b/pipeline/defaults.env index bd61b1dc..73ad9860 100644 --- a/pipeline/defaults.env +++ b/pipeline/defaults.env @@ -16,9 +16,7 @@ AIRFLOW_VAR_DORA_PREPROD_API_URL=https://api.dora.incubateur.net/api/v2/ AIRFLOW_VAR_EMPLOIS_API_URL=https://emplois.inclusion.beta.gouv.fr/api/v1/structures/ AIRFLOW_VAR_ETAB_PUB_FILE_URL=https://www.data.gouv.fr/fr/datasets/r/73302880-e4df-4d4c-8676-1a61bb997f3d AIRFLOW_VAR_FINESS_FILE_URL=https://www.data.gouv.fr/fr/datasets/r/3dc9b1d5-0157-440d-a7b5-c894fcfdfd45 -AIRFLOW_VAR_IGN_ADMIN_EXPRESS_FILE_URL=https://files.opendatarchives.fr/professionnels.ign.fr/adminexpress/ADMIN-EXPRESS-COG_3-0__SHP__FRA_WM_2021-05-19.7z AIRFLOW_VAR_INSEE_FIRSTNAME_FILE_URL=https://www.insee.fr/fr/statistiques/fichier/2540004/nat2021_csv.zip -AIRFLOW_VAR_INSEE_COG_DATASET_URL=https://www.insee.fr/fr/statistiques/fichier/7766585/ AIRFLOW_VAR_MEDNUM_API_URL=https://cartographie.societenumerique.gouv.fr/api/v0/ AIRFLOW_VAR_MES_AIDES_AIDES_URL=https://airtable.com/appRga7C9USklxYiV/tblcAC5yMV3Ftzv5c/viwMte3unsIYXxY9a AIRFLOW_VAR_MES_AIDES_GARAGES_URL=https://airtable.com/appRga7C9USklxYiV/tblfhYoBpcQoJwGIv/viwoJsw0vsAnU0fAo