Skip to content

Commit

Permalink
AB#101818 Only export actual record for csv and jsonlines
Browse files Browse the repository at this point in the history
Most users are not interested in the historical records. And, export is
done every night. Bright export in line with the DSO API export.

Changes:

- Only select current records for temporal tables
- Change headers (capitalize) to be in line with the DSO API export
  (althought it seems a bit strange to me to use capitalize)
- Change datetime notation to use iso format
- Foreign keys only use the identificatie (not the dot-volgnummer)

To do:
Mabye we should also export current records only for the geopackages.
  • Loading branch information
jjmurre committed Dec 15, 2023
1 parent 97900d7 commit 5ba84fe
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 15 deletions.
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 2023-12-15 (5.20.0)

* Change export to only use active records for csv and jsonlines,
so, no historical records. Also brought the export more in line
with the csv export of the DSO-API:
- headers using capitalize()
- date-time in iso notation
- foreign keys only with an `identificatie` (no `volgnummer`)
# 2023-12-05 (5.19.1)

* Updated Django version > 4.2
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = amsterdam-schema-tools
version = 5.19.1
version = 5.20.0
url = https://github.com/amsterdam/schema-tools
license = Mozilla Public 2.0
author = Team Data Diensten, van het Dataplatform onder de Directie Digitale Voorzieningen (Gemeente Amsterdam)
Expand Down
27 changes: 24 additions & 3 deletions src/schematools/exports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import date
from pathlib import Path
from typing import IO

Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(
table_ids: list[str] | None = None,
scopes: list[str] | None = None,
size: int | None = None,
temporal_date: date = date.today(),
):
"""Constructor.
Expand All @@ -66,6 +68,7 @@ def __init__(
self.table_ids = table_ids
self.scopes = set(scopes)
self.size = size
self.temporal_date = temporal_date

self.base_dir = Path(output)
self.tables = (
Expand All @@ -77,8 +80,14 @@ def __init__(

def _get_column(self, sa_table: Table, field: DatasetFieldSchema):
column = getattr(sa_table.c, field.db_name)
processor = self.geo_modifier if field.is_geo else lambda col, _fn: col
return processor(column, field.db_name)
# apply all processors
for processor in self.processors:
column = processor(field, column)

return column

# processor = self.geo_modifier if field.is_geo else lambda col, _fn: col
# return processor(column, field.db_name)

def _get_columns(self, sa_table: Table, table: DatasetTableSchema):
for field in _get_fields(self.dataset_schema, table, self.scopes):
Expand All @@ -87,13 +96,25 @@ def _get_columns(self, sa_table: Table, table: DatasetTableSchema):
except AttributeError:
pass # skip unavailable columns

def _get_temporal_clause(self, sa_table: Table, table: DatasetTableSchema):
if not table.is_temporal:
return None
temporal = table.temporal
for dimension in temporal.dimensions.values():
start = getattr(sa_table.c, dimension.start.db_name)
end = getattr(sa_table.c, dimension.end.db_name)
return (start <= self.temporal_date) & ((end > self.temporal_date) | (end == None))
return None

def export_tables(self):
for table in self.tables:
srid = table.crs.split(":")[1] if table.crs else None
if table.has_geometry_fields and srid is None:
raise ValueError("Table has geo fields, but srid is None.")
sa_table = self.sa_tables[table.id]
with open(self.base_dir / f"{table.db_name}.{self.extension}", "w") as file_handle:
with open(
self.base_dir / f"{table.db_name}.{self.extension}", "w", encoding="utf8"
) as file_handle:
self.write_rows(file_handle, table, sa_table, srid)

def write_rows(
Expand Down
39 changes: 33 additions & 6 deletions src/schematools/exports/csv.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,55 @@
from __future__ import annotations

import csv
from datetime import date
from typing import IO

from geoalchemy2 import functions as func # ST_AsEWKT
from sqlalchemy import MetaData, Table, select
from geoalchemy2 import functions as gfunc # ST_AsEWKT
from sqlalchemy import MetaData, Table, func, select
from sqlalchemy.engine import Connection

from schematools.exports import BaseExporter, enable_datetime_cast
from schematools.naming import toCamelCase
from schematools.types import DatasetSchema
from schematools.types import DatasetFieldSchema, DatasetSchema

metadata = MetaData()


class CsvExporter(BaseExporter): # noqa: D101
extension = "csv"
geo_modifier = staticmethod(lambda col, fn: func.ST_AsEWKT(col).label(fn))

@staticmethod
def geo_modifier(field: DatasetFieldSchema, column):
if not field.is_geo:
return column
return gfunc.ST_AsEWKT(column).label(field.db_name)

@staticmethod
def id_modifier(field: DatasetFieldSchema, column):
if field.table.is_temporal and field.is_composite_key:
return func.split_part(column, ".", 1).label(field.db_name)
return column

@staticmethod
def datetime_modifier(field: DatasetFieldSchema, column):
if field.type == "string" and field.format == "date-time":
return func.to_char(column, 'YYYY-MM-DD"T"HH24:MI:SS').label(field.db_name)
return column

processors = (geo_modifier, id_modifier, datetime_modifier)

def write_rows( # noqa: D102
self, file_handle: IO[str], table: DatasetSchema, sa_table: Table, srid: str
):
columns = list(self._get_columns(sa_table, table))
field_names = [c.name for c in columns]
writer = csv.DictWriter(file_handle, field_names, extrasaction="ignore")
writer.writerow({fn: toCamelCase(fn) for fn in field_names})
# Use capitalize() on headers, because csv export does the same
writer.writerow({fn: toCamelCase(fn).capitalize() for fn in field_names})
query = select(self._get_columns(sa_table, table))
temporal_clause = self._get_temporal_clause(sa_table, table)
if temporal_clause is not None:
query = query.where(temporal_clause)
if self.size is not None:
query = query.limit(self.size)
result = self.connection.execution_options(yield_per=1000).execute(query)
Expand All @@ -41,8 +65,11 @@ def export_csvs(
table_ids: list[str],
scopes: list[str],
size: int,
temporal_date: date = date.today(),
):
"""Utility function to wrap the Exporter."""
enable_datetime_cast()
exporter = CsvExporter(connection, dataset_schema, output, table_ids, scopes, size)
exporter = CsvExporter(
connection, dataset_schema, output, table_ids, scopes, size, temporal_date
)
exporter.export_tables()
21 changes: 17 additions & 4 deletions src/schematools/exports/jsonlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy.engine import Connection

from schematools.exports import BaseExporter
from schematools.exports.csv import enable_datetime_cast
from schematools.exports.csv import DatasetFieldSchema, enable_datetime_cast
from schematools.naming import toCamelCase
from schematools.types import DatasetSchema, DatasetTableSchema

Expand All @@ -34,9 +34,22 @@ def _dumps(obj: Any) -> str:

class JsonLinesExporter(BaseExporter): # noqa: D101
extension = "jsonl"
geo_modifier = staticmethod(
lambda col, fn: func.ST_AsGeoJSON(func.ST_Transform(col, 4326)).label(fn)
)

@staticmethod
def geo_modifier(field: DatasetFieldSchema, column):
if not field.is_geo:
return column
return func.ST_AsGeoJSON(func.ST_Transform(column, 4326)).label(field.db_name)

@staticmethod
def id_modifier(field: DatasetFieldSchema, column):
if field.table.is_temporal and field.is_composite_key:
return func.split_part(column, ".", 1).label(field.db_name)
return column

# We do not use the iso for datetime here, because json notation handles this.

processors = (geo_modifier, id_modifier)

def _get_row_modifier(self, table: DatasetTableSchema):
lookup = {}
Expand Down
2 changes: 2 additions & 0 deletions tests/files/data/ggwgebieden-history.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"identificatie": "03630950000000", "volgnummer": 1, "registratiedatum": "2020-07-21T13:39:23.856580", "naam": "Centrum-West", "code": "DX01", "begingeldigheid": "2014-02-20", "eindgeldigheid": "2019-10-03", "documentdatum": "2017-10-10", "ligtinstadsdeel": {"identificatie": "03630000000018", "volgnummer": 3}, "bestaatuitbuurten": [{"identificatie": "03630023753960", "volgnummer": 1}, {"identificatie": "03630023753950", "volgnummer": 1}, {"identificatie": "03630000000080", "volgnummer": 2}], "schema": "gebieden_ggwgebieden"}
{"identificatie": "03630950000000", "volgnummer": 2, "registratiedatum": "2020-07-21T13:39:23.856580", "naam": "Centrum-West", "code": "DX01", "begingeldigheid": "2019-10-03", "eindgeldigheid": null, "documentdatum": "2017-10-10", "ligtinstadsdeel": {"identificatie": "03630000000018", "volgnummer": 3}, "bestaatuitbuurten": [{"identificatie": "03630023753960", "volgnummer": 1}, {"identificatie": "03630023753950", "volgnummer": 1}, {"identificatie": "03630000000080", "volgnummer": 2}], "schema": "gebieden_ggwgebieden"}
15 changes: 14 additions & 1 deletion tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,24 @@ def test_csv_export(here, engine, meetbouten_schema, dbsession, tmp_path):
export_csvs(engine, meetbouten_schema, str(tmp_path), [], [], 1)
with open(tmp_path / "meetbouten_meetbouten.csv") as out_file:
assert out_file.read() == (
"identificatie,ligtInBuurtId,merkCode,merkOmschrijving,geometrie\n"
"Identificatie,Ligtinbuurtid,Merkcode,Merkomschrijving,Geometrie\n"
"1,10180001.1,12,De meetbout,SRID=28992;POINT(119434 487091.6)\n"
)


def test_csv_export_only_actual(here, engine, ggwgebieden_schema, dbsession, tmp_path):
"""Prove that csv export contains only the actual records, not the history."""
ndjson_path = here / "files" / "data" / "ggwgebieden-history.ndjson"
importer = NDJSONImporter(ggwgebieden_schema, engine)
importer.generate_db_objects("ggwgebieden", truncate=True, ind_extra_index=False)
importer.load_file(ndjson_path)
export_csvs(engine, ggwgebieden_schema, str(tmp_path), [], [], 1)
with open(tmp_path / "ggwgebieden_ggwgebieden.csv") as out_file:
lines = out_file.readlines()
assert len(lines) == 2 # includes the headerline
assert lines[1].split(",")[1] == "2" # volgnummer == 2


def test_jsonlines_export(here, engine, meetbouten_schema, dbsession, tmp_path):
"""Prove that jsonlines export contains the correct content."""
_load_meetbouten_content(here, engine, meetbouten_schema)
Expand Down

0 comments on commit 5ba84fe

Please sign in to comment.