Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AB#101818 Only export actual record for csv and jsonlines #532

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 2023-12-15 (5.20.0)

* Change export to only use active records for csv and jsonlines,
so, no historical records. Also brought the export more in line
with the csv export of the DSO-API:
- headers using capitalize()
- date-time in iso notation
- foreign keys only with an `identificatie` (no `volgnummer`)
# 2023-12-05 (5.19.1)

* Updated Django version > 4.2
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = amsterdam-schema-tools
version = 5.19.1
version = 5.20.0
url = https://github.com/amsterdam/schema-tools
license = Mozilla Public 2.0
author = Team Data Diensten, van het Dataplatform onder de Directie Digitale Voorzieningen (Gemeente Amsterdam)
Expand Down
29 changes: 26 additions & 3 deletions src/schematools/exports/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import date
from pathlib import Path
from typing import IO

Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(
table_ids: list[str] | None = None,
scopes: list[str] | None = None,
size: int | None = None,
temporal_date: date = date.today(),
):
"""Constructor.

Expand All @@ -66,6 +68,7 @@ def __init__(
self.table_ids = table_ids
self.scopes = set(scopes)
self.size = size
self.temporal_date = temporal_date

self.base_dir = Path(output)
self.tables = (
Expand All @@ -77,8 +80,14 @@ def __init__(

def _get_column(self, sa_table: Table, field: DatasetFieldSchema):
column = getattr(sa_table.c, field.db_name)
processor = self.geo_modifier if field.is_geo else lambda col, _fn: col
return processor(column, field.db_name)
# apply all processors
for processor in self.processors:
column = processor(field, column)

return column

# processor = self.geo_modifier if field.is_geo else lambda col, _fn: col
# return processor(column, field.db_name)

def _get_columns(self, sa_table: Table, table: DatasetTableSchema):
for field in _get_fields(self.dataset_schema, table, self.scopes):
Expand All @@ -87,13 +96,25 @@ def _get_columns(self, sa_table: Table, table: DatasetTableSchema):
except AttributeError:
pass # skip unavailable columns

def _get_temporal_clause(self, sa_table: Table, table: DatasetTableSchema):
if not table.is_temporal:
return None
temporal = table.temporal
for dimension in temporal.dimensions.values():
start = getattr(sa_table.c, dimension.start.db_name)
end = getattr(sa_table.c, dimension.end.db_name)
return (start <= self.temporal_date) & ((end > self.temporal_date) | (end == None))
return None

def export_tables(self):
for table in self.tables:
srid = table.crs.split(":")[1] if table.crs else None
if table.has_geometry_fields and srid is None:
raise ValueError("Table has geo fields, but srid is None.")
sa_table = self.sa_tables[table.id]
with open(self.base_dir / f"{table.db_name}.{self.extension}", "w") as file_handle:
with open(
self.base_dir / f"{table.db_name}.{self.extension}", "w", encoding="utf8"
) as file_handle:
self.write_rows(file_handle, table, sa_table, srid)

def write_rows(
Expand All @@ -107,5 +128,7 @@ def _get_fields(dataset_schema: DatasetSchema, table: DatasetTableSchema, scopes
for field in table.fields:
if field.is_array:
continue
if field.is_internal:
continue
if parent_scopes | set(field.auth) - {_PUBLIC_SCOPE} <= set(scopes):
yield field
36 changes: 30 additions & 6 deletions src/schematools/exports/csv.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
from __future__ import annotations

import csv
from datetime import date
from typing import IO

from geoalchemy2 import functions as func # ST_AsEWKT
from sqlalchemy import MetaData, Table, select
from geoalchemy2 import functions as gfunc # ST_AsEWKT
from sqlalchemy import MetaData, Table, func, select
from sqlalchemy.engine import Connection

from schematools.exports import BaseExporter, enable_datetime_cast
from schematools.naming import toCamelCase
from schematools.types import DatasetSchema
from schematools.types import DatasetFieldSchema, DatasetSchema

metadata = MetaData()


class CsvExporter(BaseExporter): # noqa: D101
extension = "csv"
geo_modifier = staticmethod(lambda col, fn: func.ST_AsEWKT(col).label(fn))

def geo_modifier(field: DatasetFieldSchema, column):
if not field.is_geo:
return column
return gfunc.ST_AsEWKT(column).label(field.db_name)

def id_modifier(field: DatasetFieldSchema, column):
if field.table.is_temporal and field.is_composite_key:
return func.split_part(column, ".", 1).label(field.db_name)
return column

def datetime_modifier(field: DatasetFieldSchema, column):
if field.type == "string" and field.format == "date-time":
return func.to_char(column, 'YYYY-MM-DD"T"HH24:MI:SS').label(field.db_name)
return column

processors = (geo_modifier, id_modifier, datetime_modifier)

def write_rows( # noqa: D102
self, file_handle: IO[str], table: DatasetSchema, sa_table: Table, srid: str
):
columns = list(self._get_columns(sa_table, table))
field_names = [c.name for c in columns]
writer = csv.DictWriter(file_handle, field_names, extrasaction="ignore")
writer.writerow({fn: toCamelCase(fn) for fn in field_names})
# Use capitalize() on headers, because csv export does the same
writer.writerow({fn: toCamelCase(fn).capitalize() for fn in field_names})
query = select(self._get_columns(sa_table, table))
temporal_clause = self._get_temporal_clause(sa_table, table)
if temporal_clause is not None:
query = query.where(temporal_clause)
if self.size is not None:
query = query.limit(self.size)
result = self.connection.execution_options(yield_per=1000).execute(query)
Expand All @@ -41,8 +62,11 @@ def export_csvs(
table_ids: list[str],
scopes: list[str],
size: int,
temporal_date: date = date.today(),
):
"""Utility function to wrap the Exporter."""
enable_datetime_cast()
exporter = CsvExporter(connection, dataset_schema, output, table_ids, scopes, size)
exporter = CsvExporter(
connection, dataset_schema, output, table_ids, scopes, size, temporal_date
)
exporter.export_tables()
19 changes: 15 additions & 4 deletions src/schematools/exports/jsonlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy.engine import Connection

from schematools.exports import BaseExporter
from schematools.exports.csv import enable_datetime_cast
from schematools.exports.csv import DatasetFieldSchema, enable_datetime_cast
from schematools.naming import toCamelCase
from schematools.types import DatasetSchema, DatasetTableSchema

Expand All @@ -34,9 +34,20 @@ def _dumps(obj: Any) -> str:

class JsonLinesExporter(BaseExporter): # noqa: D101
extension = "jsonl"
geo_modifier = staticmethod(
lambda col, fn: func.ST_AsGeoJSON(func.ST_Transform(col, 4326)).label(fn)
)

def geo_modifier(field: DatasetFieldSchema, column):
if not field.is_geo:
return column
return func.ST_AsGeoJSON(func.ST_Transform(column, 4326)).label(field.db_name)

def id_modifier(field: DatasetFieldSchema, column):
if field.table.is_temporal and field.is_composite_key:
return func.split_part(column, ".", 1).label(field.db_name)
return column

# We do not use the iso for datetime here, because json notation handles this.

processors = (geo_modifier, id_modifier)

def _get_row_modifier(self, table: DatasetTableSchema):
lookup = {}
Expand Down
5 changes: 5 additions & 0 deletions src/schematools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,11 @@ def is_identifier_part(self) -> bool:
self.id == "id" or self._id in self._parent_table.identifier
) and self._parent_field is None

@cached_property
def is_internal(self) -> bool:
"""Id fields for table with composite key is only for internal (Django) use."""
return self.is_primary and self._parent_table.has_composite_key

@cached_property
def relation(self) -> str | None:
"""Give the 1:N relation, if it exists."""
Expand Down
2 changes: 2 additions & 0 deletions tests/files/data/ggwgebieden-history.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"identificatie": "03630950000000", "volgnummer": 1, "registratiedatum": "2020-07-21T13:39:23.856580", "naam": "Centrum-West", "code": "DX01", "begingeldigheid": "2014-02-20", "eindgeldigheid": "2019-10-03", "documentdatum": "2017-10-10", "ligtinstadsdeel": {"identificatie": "03630000000018", "volgnummer": 3}, "bestaatuitbuurten": [{"identificatie": "03630023753960", "volgnummer": 1}, {"identificatie": "03630023753950", "volgnummer": 1}, {"identificatie": "03630000000080", "volgnummer": 2}], "schema": "gebieden_ggwgebieden"}
{"identificatie": "03630950000000", "volgnummer": 2, "registratiedatum": "2020-07-21T13:39:23.856580", "naam": "Centrum-West", "code": "DX01", "begingeldigheid": "2019-10-03", "eindgeldigheid": null, "documentdatum": "2017-10-10", "ligtinstadsdeel": {"identificatie": "03630000000018", "volgnummer": 3}, "bestaatuitbuurten": [{"identificatie": "03630023753960", "volgnummer": 1}, {"identificatie": "03630023753950", "volgnummer": 1}, {"identificatie": "03630000000080", "volgnummer": 2}], "schema": "gebieden_ggwgebieden"}
15 changes: 14 additions & 1 deletion tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,24 @@ def test_csv_export(here, engine, meetbouten_schema, dbsession, tmp_path):
export_csvs(engine, meetbouten_schema, str(tmp_path), [], [], 1)
with open(tmp_path / "meetbouten_meetbouten.csv") as out_file:
assert out_file.read() == (
"identificatie,ligtInBuurtId,merkCode,merkOmschrijving,geometrie\n"
"Identificatie,Ligtinbuurtid,Merkcode,Merkomschrijving,Geometrie\n"
"1,10180001.1,12,De meetbout,SRID=28992;POINT(119434 487091.6)\n"
)


def test_csv_export_only_actual(here, engine, ggwgebieden_schema, dbsession, tmp_path):
"""Prove that csv export contains only the actual records, not the history."""
ndjson_path = here / "files" / "data" / "ggwgebieden-history.ndjson"
importer = NDJSONImporter(ggwgebieden_schema, engine)
importer.generate_db_objects("ggwgebieden", truncate=True, ind_extra_index=False)
importer.load_file(ndjson_path)
export_csvs(engine, ggwgebieden_schema, str(tmp_path), [], [], 1)
with open(tmp_path / "ggwgebieden_ggwgebieden.csv") as out_file:
lines = out_file.readlines()
assert len(lines) == 2 # includes the headerline
assert lines[1].split(",")[0] == "2" # volgnummer == 2


def test_jsonlines_export(here, engine, meetbouten_schema, dbsession, tmp_path):
"""Prove that jsonlines export contains the correct content."""
_load_meetbouten_content(here, engine, meetbouten_schema)
Expand Down