Skip to content

Commit

Permalink
feat(api) : Restore the import_communes command
Browse files Browse the repository at this point in the history
  • Loading branch information
vperron committed Sep 10, 2024
1 parent 923148d commit 5532082
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 54 deletions.
1 change: 1 addition & 0 deletions api/src/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from data_inclusion.api.config import settings
from data_inclusion.api.core import db
from data_inclusion.api.decoupage_administratif import models as _ # noqa: F401 F811
from data_inclusion.api.inclusion_data import models as _ # noqa: F401 F811
from data_inclusion.api.request import models as _ # noqa: F401 F811

Expand Down
7 changes: 7 additions & 0 deletions api/src/data_inclusion/api/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import click

from data_inclusion.api import auth
from data_inclusion.api.decoupage_administratif.commands import import_communes
from data_inclusion.api.inclusion_data.commands import load_inclusion_data

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,5 +40,11 @@ def _load_inclusion_data():
click.echo(load_inclusion_data())


@cli.command(name="import_communes")
def _import_communes():
"""Import the communes from the Decoupage Administratif API"""
click.echo(import_communes())


if __name__ == "__main__":
cli()
80 changes: 80 additions & 0 deletions api/src/data_inclusion/api/decoupage_administratif/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging

import numpy as np
import pandas as pd
from furl import furl
from sqlalchemy import types
from sqlalchemy.dialects.postgresql import insert

from data_inclusion.api.core import db
from data_inclusion.api.inclusion_data import models

logger = logging.getLogger(__name__)


def import_communes():
query_params = {
"fields": ("nom,code,codesPostaux,codeDepartement,codeRegion,codeEpci,centre"),
"format": "json",
}
dtypes = {
"code": types.TEXT,
"codeEpci": types.TEXT,
"departement": types.TEXT,
"region": types.TEXT,
"centre": types.JSON,
"codesPostaux": types.ARRAY(types.TEXT),
}

communes_url = (
furl("https://geo.api.gouv.fr/communes").set(query_params=query_params).url
)
communes_df = pd.read_json(communes_url, dtype=dtypes)

districts_url = (
furl("https://geo.api.gouv.fr/communes")
.set(query_params=query_params | {"type": "arrondissement-municipal"})
.url
)
districts_df = pd.read_json(districts_url, dtype=dtypes)

df = pd.concat([communes_df, districts_df])

df = df.rename(
columns={
"codesPostaux": "codes_postaux",
"codeDepartement": "departement",
"codeRegion": "region",
"codeEpci": "siren_epci",
}
)

def create_point(geom):
return f"POINT({geom['coordinates'][0]} {geom['coordinates'][1]})"

df["centre"] = df["centre"].apply(create_point)

df = df.replace({np.nan: None})

df = df.sort_values(by="code")

commune_data_list = df.to_dict(orient="records")
stmt = insert(models.Commune).values(commune_data_list)

column_names = [
"code",
"nom",
"departement",
"region",
"siren_epci",
"centre",
"codes_postaux",
]
stmt = stmt.on_conflict_do_update(
index_elements=[models.Commune.code],
set_={col: getattr(stmt.excluded, col) for col in column_names},
)

with db.SessionLocal() as session:
session.execute(stmt)
session.commit()
21 changes: 21 additions & 0 deletions api/src/data_inclusion/api/decoupage_administratif/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import geoalchemy2
from sqlalchemy.orm import Mapped, mapped_column

from data_inclusion.api.core.db import Base


class Commune(Base):
code: Mapped[str] = mapped_column(primary_key=True)
nom: Mapped[str]
departement: Mapped[str]
region: Mapped[str]
siren_epci: Mapped[str | None]
# FIXME(vperron) : This column should have an index (spatial_index=True)
# but let's do it in a future migration
centre = mapped_column(geoalchemy2.Geometry("Geometry", srid=4326))
# FIXME(vperron) : This should not be nullable but at the time of migration
# it's necessary as the info is not there yet. Let's clean up later.
codes_postaux: Mapped[list[str] | None]

def __repr__(self) -> str:
return f"<Commune(code={self.code}, nom={self.nom})>"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from data_inclusion.api.code_officiel_geo import constants
from data_inclusion.api.decoupage_administratif import constants


def get_departement_by_code_or_slug(
Expand Down
17 changes: 0 additions & 17 deletions api/src/data_inclusion/api/inclusion_data/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ def load_inclusion_data():

structures_df = df_by_ressource["structures"]
services_df = df_by_ressource["services"]
communes_df = df_by_ressource["communes"]

structures_df = structures_df.replace({np.nan: None})
services_df = services_df.replace({np.nan: None})
communes_df = communes_df.replace({np.nan: None})

structures_df = structures_df.drop(columns=["_di_geocodage_score"])
services_df = services_df.drop(columns=["_di_geocodage_score"])
Expand All @@ -149,27 +147,13 @@ def load_inclusion_data():
)
]

commune_data_list = communes_df.to_dict(orient="records")
structure_data_list = structures_df.to_dict(orient="records")
service_data_list = services_df.to_dict(orient="records")

# TODO(vmttn): load in a temporary table, truncate and then insert
with db.SessionLocal() as session:
session.execute(sqla.delete(models.Service))
session.execute(sqla.delete(models.Structure))
session.execute(sqla.delete(models.Commune))

for commune_data in tqdm(commune_data_list):
commune_instance = models.Commune(**commune_data)
try:
with session.begin_nested():
session.add(commune_instance)
except sqla.exc.IntegrityError as exc:
logger.error(
f"commune source={commune_data['source']} "
f"code={commune_data['code']}"
)
logger.info(exc.orig)

for structure_data in tqdm(structure_data_list):
structure_instance = models.Structure(**structure_data)
Expand Down Expand Up @@ -199,7 +183,6 @@ def load_inclusion_data():
with db.default_db_engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as connection:
connection.execute(sqla.text("VACUUM ANALYZE api__communes"))
connection.execute(sqla.text("VACUUM ANALYZE api__structures"))
connection.execute(sqla.text("VACUUM ANALYZE api__services"))

Expand Down
19 changes: 1 addition & 18 deletions api/src/data_inclusion/api/inclusion_data/models.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
from datetime import date

import geoalchemy2
import sqlalchemy as sqla
from sqlalchemy.orm import Mapped, mapped_column, relationship

from data_inclusion.api.core.db import Base
from data_inclusion.api.decoupage_administratif.models import Commune

# all fields are nullable or have a default value. These models will only be used to
# query valid data coming from the data pipeline.


class Commune(Base):
code: Mapped[str] = mapped_column(primary_key=True)
nom: Mapped[str]
departement: Mapped[str]
region: Mapped[str]
siren_epci: Mapped[str | None]
# FIXME(vperron) : This column should have an index (spatial_index=True)
# but let's do it in a future migration
centre = mapped_column(geoalchemy2.Geometry("Geometry", srid=4326))
# FIXME(vperron) : This should not be nullable but at the time of migration
# it's necessary as the info is not there yet. Let's clean up later.
codes_postaux: Mapped[list[str] | None]

def __repr__(self) -> str:
return f"<Commune(code={self.code}, nom={self.nom})>"


class Structure(Base):
# internal metadata
_di_surrogate_id: Mapped[str] = mapped_column(primary_key=True)
Expand Down
13 changes: 7 additions & 6 deletions api/src/data_inclusion/api/inclusion_data/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@

from data_inclusion import schema as di_schema
from data_inclusion.api import auth
from data_inclusion.api.code_officiel_geo.constants import (
from data_inclusion.api.config import settings
from data_inclusion.api.core import db
from data_inclusion.api.decoupage_administratif.constants import (
DepartementCodeEnum,
DepartementSlugEnum,
RegionCodeEnum,
RegionSlugEnum,
)
from data_inclusion.api.code_officiel_geo.utils import (
from data_inclusion.api.decoupage_administratif.models import Commune
from data_inclusion.api.decoupage_administratif.utils import (
get_departement_by_code_or_slug,
get_region_by_code_or_slug,
)
from data_inclusion.api.config import settings
from data_inclusion.api.core import db
from data_inclusion.api.inclusion_data import models, schemas, services
from data_inclusion.api.inclusion_data import schemas, services
from data_inclusion.api.utils import pagination, soliguide

router = fastapi.APIRouter(tags=["Données"])
Expand Down Expand Up @@ -410,7 +411,7 @@ def search_services_endpoint(
commune_instance = None
search_point = None
if code_commune is not None:
commune_instance = db_session.get(models.Commune, code_commune)
commune_instance = db_session.get(Commune, code_commune)
if commune_instance is None:
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
Expand Down
15 changes: 7 additions & 8 deletions api/src/data_inclusion/api/inclusion_data/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
from fastapi_pagination.ext.sqlalchemy import paginate

from data_inclusion import schema as di_schema
from data_inclusion.api.code_officiel_geo.constants import (
from data_inclusion.api.decoupage_administratif.constants import (
Departement,
Region,
)
from data_inclusion.api.decoupage_administratif.models import Commune
from data_inclusion.api.inclusion_data import models

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -202,10 +203,10 @@ def list_structures(
query = query.filter(models.Structure.code_insee.startswith(departement.code))

if region is not None:
query = query.join(models.Commune).options(
query = query.join(Commune).options(
orm.contains_eager(models.Structure.commune_)
)
query = query.filter(models.Commune.region == region.code)
query = query.filter(Commune.region == region.code)

if typologie is not None:
query = query.filter_by(typologie=typologie.value)
Expand Down Expand Up @@ -319,10 +320,8 @@ def list_services(
query = query.filter(models.Service.code_insee.startswith(departement.code))

if region is not None:
query = query.join(models.Commune).options(
orm.contains_eager(models.Service.commune_)
)
query = query.filter(models.Commune.region == region.code)
query = query.join(Commune).options(orm.contains_eager(models.Service.commune_))
query = query.filter(Commune.region == region.code)

if code_commune is not None:
query = query.filter(models.Service.code_insee == code_commune)
Expand Down Expand Up @@ -350,7 +349,7 @@ def search_services(
request: fastapi.Request,
db_session: orm.Session,
sources: list[str] | None = None,
commune_instance: models.Commune | None = None,
commune_instance: Commune | None = None,
thematiques: list[di_schema.Thematique] | None = None,
frais: list[di_schema.Frais] | None = None,
modes_accueil: list[di_schema.ModeAccueil] | None = None,
Expand Down
4 changes: 2 additions & 2 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from data_inclusion.api.app import create_app
from data_inclusion.api.config import settings
from data_inclusion.api.core import db
from data_inclusion.api.inclusion_data import models
from data_inclusion.api.decoupage_administratif.models import Commune

from . import factories

Expand Down Expand Up @@ -121,7 +121,7 @@ def communes(db_connection):
df = df.to_wkt()
commune_data_list = df.to_dict(orient="records")

db_connection.execute(sqla.insert(models.Commune).values(commune_data_list))
db_connection.execute(sqla.insert(Commune).values(commune_data_list))
db_connection.commit()


Expand Down
2 changes: 1 addition & 1 deletion api/tests/e2e/api/test_inclusion_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from data_inclusion import schema
from data_inclusion.api.code_officiel_geo.constants import RegionEnum
from data_inclusion.api.decoupage_administratif.constants import RegionEnum
from data_inclusion.api.utils import soliguide

from ... import factories
Expand Down
1 change: 0 additions & 1 deletion pipeline/dags/dag_utils/marts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def _export_di_dataset_to_s3(logical_date, run_id):
RESOURCES = {
"structures": "public_marts.marts_inclusion__structures",
"services": "public_marts.marts_inclusion__services",
"communes": "public_staging.stg_decoupage_administratif__communes",
}

for resource_name, sql_table in RESOURCES.items():
Expand Down

0 comments on commit 5532082

Please sign in to comment.