Skip to content

Commit

Permalink
feat: add source for data edited by us
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Aug 16, 2023
1 parent 9ca7a7c commit b0d0b7f
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .template.env
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ AIRFLOW_CONN_S3_SOURCES=
BAN_API_URL=https://api-adresse.data.gouv.fr
CD35_FILE_URL=https://data.ille-et-vilaine.fr/dataset/8d5ec0f0-ebe1-442d-9d99-655b37d5ad07/resource/8b781e9d-e11d-486c-98cf-0f63abfae8ed/download/annuaire_sociale_fixe.csv
CD72_FILE_URL=
DI_EXTRA_SERVICES_FILE_URL=https://data-inclusion-lake.s3.fr-par.scw.cloud/sources/data-inclusion/2023-08-16/services.json
DI_EXTRA_STRUCTURES_FILE_URL=https://data-inclusion-lake.s3.fr-par.scw.cloud/sources/data-inclusion/2023-08-16/structures.json
DORA_API_TOKEN=
DORA_API_URL=https://api.dora.inclusion.beta.gouv.fr/api/v2/
EMPLOIS_API_TOKEN=
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ x-airflow-common:
AIRFLOW_VAR_DATAGOUV_API_URL: ${DATAGOUV_API_URL}
AIRFLOW_VAR_DATAGOUV_DI_DATASET_ID: ${DATAGOUV_DI_DATASET_ID}
AIRFLOW_VAR_DATAGOUV_DI_RESOURCE_IDS: ${DATAGOUV_DI_RESOURCE_IDS}
AIRFLOW_VAR_DI_EXTRA_SERVICES_FILE_URL: ${DI_EXTRA_SERVICES_FILE_URL}
AIRFLOW_VAR_DI_EXTRA_STRUCTURES_FILE_URL: ${DI_EXTRA_STRUCTURES_FILE_URL}
AIRFLOW_VAR_DORA_API_TOKEN: ${DORA_API_TOKEN}
AIRFLOW_VAR_DORA_API_URL: ${DORA_API_URL}
AIRFLOW_VAR_EMPLOIS_API_TOKEN: ${EMPLOIS_API_TOKEN}
Expand Down
17 changes: 17 additions & 0 deletions pipeline/dags/dags/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,21 @@
},
],
},
{
"id": "data-inclusion",
"schedule_interval": "@once",
"snapshot": False,
"streams": [
{
"id": "services",
"filename": "services.json",
"url": Variable.get("DI_EXTRA_SERVICES_FILE_URL", None),
},
{
"id": "structures",
"filename": "structures.json",
"url": Variable.get("DI_EXTRA_STRUCTURES_FILE_URL", None),
},
],
},
]
2 changes: 2 additions & 0 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _extract(
"annuaire-du-service-public": utils.extract_http_content,
"cd35": utils.extract_http_content,
"cd72": utils.extract_http_content,
"data-inclusion": utils.extract_http_content,
"dora": dora.extract,
"emplois-de-linclusion": emplois_de_linclusion.extract,
"finess": utils.extract_http_content,
Expand Down Expand Up @@ -138,6 +139,7 @@ def _load(
"annuaire-du-service-public": annuaire_du_service_public.read,
"cd35": lambda path: utils.read_csv(path, sep=";"),
"cd72": lambda path: utils.read_excel(path, sheet_name="Structures"),
"data-inclusion": utils.read_json,
"dora": utils.read_json,
"emplois-de-linclusion": utils.read_json,
"finess": lambda path: utils.read_csv(path, sep=","),
Expand Down
8 changes: 8 additions & 0 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ sources:
- name: extra__geocoded_results
- name: adresses_geocoded

- name: data_inclusion_extra
schema: data_inclusion
tables:
- name: structures
description: Entered by the data.inclusion team.
- name: services
description: Entered by the data.inclusion team.

- name: insee
schema: insee
tables:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,46 @@
version: 2

models:
- name: int_siretisation__annotations
- name: int_siretisation__annotations

- name: int_data_inclusion__adresses
columns:
- name: id
tests:
- unique
- not_null
- dbt_utils.not_empty_string

- name: int_data_inclusion__services
columns:
- name: id
tests:
- unique
- not_null
- dbt_utils.not_empty_string
- name: structure_id
tests:
- not_null
- relationships:
to: ref('int_data_inclusion__structures')
field: id
- name: adresse_id
tests:
- not_null
- relationships:
to: ref('int_data_inclusion__adresses')
field: id

- name: int_data_inclusion__structures
columns:
- name: id
tests:
- unique
- not_null
- dbt_utils.not_empty_string
- name: adresse_id
tests:
- not_null
- relationships:
to: ref('int_data_inclusion__adresses')
field: id
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
WITH structures AS (
SELECT * FROM {{ ref('stg_data_inclusion__structures') }}
),

services AS (
SELECT * FROM {{ ref('stg_data_inclusion__services') }}
),

structure_adresses AS (
SELECT
id AS "id",
longitude AS "longitude",
latitude AS "latitude",
_di_source_id AS "source",
complement_adresse AS "complement_adresse",
commune AS "commune",
adresse AS "adresse",
code_postal AS "code_postal",
code_insee AS "code_insee"
FROM structures
),

service_adresses AS (
SELECT
id AS "id",
longitude AS "longitude",
latitude AS "latitude",
_di_source_id AS "source",
complement_adresse AS "complement_adresse",
commune AS "commune",
adresse AS "adresse",
code_postal AS "code_postal",
code_insee AS "code_insee"
FROM services
),

final AS (
SELECT * FROM structure_adresses
UNION ALL
SELECT * FROM service_adresses
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
WITH services AS (
SELECT * FROM {{ ref('stg_data_inclusion__services') }}
),

di_profil_by_dora_profil AS (
-- dora's thematiques are not yet normalized
SELECT x.*
FROM (
VALUES
('Adultes', 'adultes'),
('Femmes', 'femmes'),
('Public bénéficiaire du Revenu de Solidarité Active (RSA)', 'beneficiaire-rsa'),
('Demandeur d''emploi', 'demandeur-demploi')
) AS x (dora_profil, di_profil)
),

final AS (
SELECT
id AS "adresse_id",
contact_public AS "contact_public",
NULL AS "contact_nom_prenom",
NULL AS "courriel",
cumulable AS "cumulable",
date_creation::DATE AS "date_creation",
date_maj::DATE AS "date_maj",
date_suspension::DATE AS "date_suspension",
formulaire_en_ligne AS "formulaire_en_ligne",
frais_autres AS "frais_autres",
id AS "id",
justificatifs AS "justificatifs",
NULL AS "lien_source", -- ignored
modes_accueil AS "modes_accueil",
NULL::TEXT [] AS "modes_orientation_accompagnateur",
NULL::TEXT [] AS "modes_orientation_beneficiaire",
nom AS "nom",
presentation_resume AS "presentation_resume",
presentation_detail AS "presentation_detail",
prise_rdv AS "prise_rdv",
ARRAY(
SELECT di_profil_by_dora_profil.di_profil
FROM di_profil_by_dora_profil
WHERE di_profil_by_dora_profil.dora_profil = ANY(services.profils)
)::TEXT [] AS "profils",
recurrence AS "recurrence",
_di_source_id AS "source",
structure_id AS "structure_id",
NULL AS "telephone",
thematiques AS "thematiques",
types AS "types",
zone_diffusion_code AS "zone_diffusion_code",
zone_diffusion_nom AS "zone_diffusion_nom",
zone_diffusion_type AS "zone_diffusion_type",
pre_requis AS "pre_requis",
ARRAY[frais] AS "frais"
FROM services
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
WITH structures AS (
SELECT * FROM {{ ref('stg_data_inclusion__structures') }}
),

final AS (
SELECT
accessibilite AS "accessibilite",
id AS "adresse_id",
antenne AS "antenne",
courriel AS "courriel",
date_maj::DATE AS "date_maj",
horaires_ouverture AS "horaires_ouverture",
id AS "id",
labels_autres AS "labels_autres",
labels_nationaux AS "labels_nationaux",
lien_source AS "lien_source",
nom AS "nom",
presentation_detail AS "presentation_detail",
presentation_resume AS "presentation_resume",
rna AS "rna",
siret AS "siret",
site_web AS "site_web",
_di_source_id AS "source",
telephone AS "telephone",
NULL::TEXT [] AS "thematiques",
typologie AS "typologie"
FROM structures
)

SELECT * FROM final
1 change: 1 addition & 0 deletions pipeline/dbt/models/intermediate/int__adresses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ WITH adresses AS (
dbt_utils.union_relations(
relations=[
ref('int_agefiph__adresses'),
ref('int_data_inclusion__adresses'),
ref('int_dora__adresses'),
ref('int_cd35__adresses'),
ref('int_cd72__adresses'),
Expand Down
1 change: 1 addition & 0 deletions pipeline/dbt/models/intermediate/int__services.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ WITH services AS (
dbt_utils.union_relations(
relations=[
ref('int_agefiph__services'),
ref('int_data_inclusion__services'),
ref('int_dora__services'),
ref('int_mediation_numerique__services'),
ref('int_monenfant__services'),
Expand Down
1 change: 1 addition & 0 deletions pipeline/dbt/models/intermediate/int__structures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ WITH structures AS (
dbt_utils.union_relations(
relations=[
ref('int_agefiph__structures'),
ref('int_data_inclusion__structures'),
ref('int_dora__structures'),
ref('int_cd35__structures'),
ref('int_cd72__structures'),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: 2

models:
- name: stg_data_inclusion__services
config:
tags: data_inclusion
columns:
- name: id
tests:
- unique
- not_null
- dbt_utils.not_empty_string

- name: stg_data_inclusion__structures
config:
tags: data_inclusion
columns:
- name: id
tests:
- unique
- not_null
- dbt_utils.not_empty_string
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
WITH source AS (
SELECT * FROM {{ source('data_inclusion_extra', 'services') }}
),

final AS (
SELECT
_di_source_id AS "_di_source_id",
(data ->> 'contact_public')::BOOLEAN AS "contact_public",
(data ->> 'cumulable')::BOOLEAN AS "cumulable",
(data ->> 'date_creation')::TIMESTAMP WITH TIME ZONE AS "date_creation",
(data ->> 'date_maj')::TIMESTAMP WITH TIME ZONE AS "date_maj",
(data ->> 'date_suspension')::TIMESTAMP WITH TIME ZONE AS "date_suspension",
(data ->> 'latitude')::FLOAT AS "latitude",
(data ->> 'longitude')::FLOAT AS "longitude",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'modes_accueil'))::TEXT [] AS "modes_accueil",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'profils'))::TEXT [] AS "profils",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'thematiques'))::TEXT [] AS "thematiques",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'types'))::TEXT [] AS "types",
NULLIF(TRIM(data ->> 'justificatifs'), '') AS "justificatifs",
NULLIF(TRIM(data ->> 'pre_requis'), '') AS "pre_requis",
data ->> 'adresse' AS "adresse",
data ->> 'code_insee' AS "code_insee",
data ->> 'code_postal' AS "code_postal",
data ->> 'commune' AS "commune",
data ->> 'complement_adresse' AS "complement_adresse",
NULLIF(TRIM(data ->> 'contact_nom'), '') AS "contact_nom",
NULLIF(TRIM(data ->> 'contact_prenom'), '') AS "contact_prenom",
NULLIF(TRIM(data ->> 'courriel'), '') AS "courriel",
data ->> 'formulaire_en_ligne' AS "formulaire_en_ligne",
data ->> 'frais_autres' AS "frais_autres",
data ->> 'frais' AS "frais",
data ->> 'id' AS "id",
data ->> 'lien_source' AS "lien_source",
data ->> 'nom' AS "nom",
data ->> 'presentation_resume' AS "presentation_resume",
data ->> 'presentation_detail' AS "presentation_detail",
data ->> 'prise_rdv' AS "prise_rdv",
data ->> 'recurrence' AS "recurrence",
data ->> 'source' AS "source",
data ->> 'structure_id' AS "structure_id",
NULLIF(TRIM(data ->> 'telephone'), '') AS "telephone",
NULLIF(TRIM(data ->> 'zone_diffusion_code'), '') AS "zone_diffusion_code",
NULLIF(TRIM(data ->> 'zone_diffusion_nom'), '') AS "zone_diffusion_nom",
data ->> 'zone_diffusion_type' AS "zone_diffusion_type"
FROM source
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
WITH source AS (
SELECT * FROM {{ source('data_inclusion_extra', 'structures') }}
),

final AS (
SELECT
_di_source_id AS "_di_source_id",
(data ->> 'antenne')::BOOLEAN AS "antenne",
(data ->> 'date_maj')::TIMESTAMP WITH TIME ZONE AS "date_maj",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'labels_autres'))::TEXT [] AS "labels_autres",
ARRAY(SELECT * FROM JSONB_ARRAY_ELEMENTS_TEXT(data -> 'labels_nationaux'))::TEXT [] AS "labels_nationaux",
(data ->> 'latitude')::FLOAT AS "latitude",
(data ->> 'longitude')::FLOAT AS "longitude",
(data ->> 'thematiques')::TEXT [] AS "thematiques",
data ->> 'accessibilite' AS "accessibilite",
data ->> 'adresse' AS "adresse",
data ->> 'code_insee' AS "code_insee",
data ->> 'code_postal' AS "code_postal",
data ->> 'commune' AS "commune",
NULLIF(TRIM(data ->> 'complement_adresse'), '') AS "complement_adresse",
data ->> 'courriel' AS "courriel",
data ->> 'horaires_ouverture' AS "horaires_ouverture",
data ->> 'id' AS "id",
data ->> 'lien_source' AS "lien_source",
NULLIF(TRIM(data ->> 'nom'), '') AS "nom",
data ->> 'presentation_detail' AS "presentation_detail",
data ->> 'presentation_resume' AS "presentation_resume",
data ->> 'rna' AS "rna",
data ->> 'siret' AS "siret",
data ->> 'site_web' AS "site_web",
data ->> 'source' AS "source",
data ->> 'telephone' AS "telephone",
data ->> 'typologie' AS "typologie"
FROM source
)

SELECT * FROM final

0 comments on commit b0d0b7f

Please sign in to comment.