diff --git a/.gitignore b/.gitignore index d922073..252c807 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,4 @@ dmypy.json .docker-compose-volumes .idea +/.data/ diff --git a/Dockerfile b/Dockerfile index b40eec4..bad391a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,10 @@ -FROM akariv/dgp-app:a4b48191b99a39068ec4f2c98a578d51add17d42 +# Pulled Jun 20, 2024 +FROM akariv/dgp-app@sha256:58fdcb67d72e337818b9a0d67d6705ae95e215c0d0d53202565f577f56f89d91 USER root -RUN apt-get update && apt-get install -y git +RUN apt-get update && apt-get install -y git zip libgdal-dev build-essential COPY requirements.txt . -RUN pip install -r requirements.txt +RUN pip install --upgrade pip && pip install -r requirements.txt COPY taxonomies taxonomies COPY configuration.template.json . COPY datacity_ckan_entrypoint.sh . diff --git a/configuration.template.json b/configuration.template.json index 8409d9b..7603855 100644 --- a/configuration.template.json +++ b/configuration.template.json @@ -103,6 +103,33 @@ "type": "text" } ] + }, + { + "name": "gis_fetcher", + "display": "Fetch data from GIS Server into a CKAN Package", + "fields": [ + { + "name": "gis_url", + "display": "GIS URL", + "type": "text" + }, + { + "name": "target_instance_name", + "display": "Target CKAN Instance", + "type": "enum", + "options": ["__CKAN_INSTANCES__"] + }, + { + "name": "target_package_id", + "display": "ID of Package to Update (or Create)", + "type": "text" + }, + { + "name": "target_organization_id", + "display": "Owner Organization of created package", + "type": "text" + } + ] } ], "theme": { diff --git a/datacity_ckan_dgp/ckan/__init__.py b/datacity_ckan_dgp/ckan/__init__.py index 0af37e6..77ccfef 100644 --- a/datacity_ckan_dgp/ckan/__init__.py +++ b/datacity_ckan_dgp/ckan/__init__.py @@ -70,12 +70,26 @@ def package_show(instance_name, package_name): return None +def resource_show(instance_name, resource_id): + res = api_get(instance_name, 'resource_show', auth=True, params={'id': resource_id}) + if res['success']: + return res['result'] + else: + return None + + def package_search(instance_name, params): res = api_get(instance_name, 'package_search', params=params) assert res['success'], res return res['result'] +def resource_search(instance_name, params): + res = api_get(instance_name, 'resource_search', params=params) + assert res['success'], res + return res['result'] + + def package_create(instance_name, data): return api_post(instance_name, 'package_create', json=data) diff --git a/datacity_ckan_dgp/operators/gis_fetcher.py b/datacity_ckan_dgp/operators/gis_fetcher.py new file mode 100644 index 0000000..7117814 --- /dev/null +++ b/datacity_ckan_dgp/operators/gis_fetcher.py @@ -0,0 +1,309 @@ +import os +import sys +import json +import shutil +import hashlib +import tempfile +import requests +import contextlib + +import geopandas +from osgeo import ogr +import dataflows as DF + +from datacity_ckan_dgp.package_processing_tasks.geojson import projector +from datacity_ckan_dgp import ckan + + +def gis_query_geojson_iterate_all(gis_url): + url = gis_url.rstrip('/') + '/query' + result_offset = 0 + result_record_count = 1000 + while True: + params = { + 'where': '1=1', + 'outFields': '*', + 'f': 'geojson', + 'resultOffset': result_offset, + 'resultRecordCount': result_record_count + } + res = requests.get(url, params=params) + res.raise_for_status() + data = res.json() + assert data['type'] == 'FeatureCollection' + if not data['features']: + break + for feature in data['features']: + yield feature + result_offset += 1 + + +def iterate_gis_jsonlines(tmpdir): + with open(os.path.join(tmpdir, 'gis.jsonlines'), 'r') as f: + for line in f: + if line.strip(): + try: + yield json.loads(line) + except Exception as e: + raise Exception(f'failed to parse json line: {line}') from e + + +def fetch_gis_json(gis_url): + gis_json_url = f'{gis_url}?f=pjson' + print(f'fetching gis json from {gis_json_url}') + res = requests.get(gis_json_url) + res.raise_for_status() + return res.json() + + +@contextlib.contextmanager +def tempdir(tmpdir): + if tmpdir: + os.makedirs(tmpdir, exist_ok=True) + yield tmpdir + else: + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +class FailedToConvertFeature(Exception): + pass + + +def geojson_feature_to_itm(feature): + if feature['type'] != 'Feature': + raise FailedToConvertFeature(f'feature is not a Feature: {feature}') + if feature['geometry']['type'] == 'Polygon': + new_coordinates = [] + for ring in feature['geometry']['coordinates']: + new_ring = [] + for coord in ring: + new_ring.append([*projector(*coord)]) + new_coordinates.append(new_ring) + feature['geometry']['coordinates'] = new_coordinates + elif feature['geometry']['type'] == 'MultiPolygon': + new_coordinates = [] + for polygon in feature['geometry']['coordinates']: + new_polygon = [] + for ring in polygon: + new_ring = [] + for coord in ring: + new_ring.append([*projector(*coord)]) + new_polygon.append(new_ring) + new_coordinates.append(new_polygon) + feature['geometry']['coordinates'] = new_coordinates + else: + raise FailedToConvertFeature(f'unsupported geometry type: {feature["geometry"]["type"]}') + return feature + + +def features_to_csv(features, fields=None): + for feature in features: + if not fields: + yield feature['properties'] + else: + yield {k: str(feature['properties'].get(k) or '') for k in fields} + + +def geojson_to_kml(geojson_path, kml_path): + ogr.RegisterAll() + geojson_ds = ogr.Open(geojson_path) + kml_driver = ogr.GetDriverByName('KML') + kml_ds = kml_driver.CreateDataSource(kml_path) + for i in range(geojson_ds.GetLayerCount()): + layer = geojson_ds.GetLayerByIndex(i) + kml_layer = kml_ds.CreateLayer(layer.GetName(), geom_type=layer.GetGeomType()) + layer_defn = layer.GetLayerDefn() + for j in range(layer_defn.GetFieldCount()): + field_defn = layer_defn.GetFieldDefn(j) + kml_layer.CreateField(field_defn) + for feature in layer: + kml_layer.CreateFeature(feature.Clone()) + + +def get_geoxml_coordinates_item(root_item): + res = '' + for item in root_item: + if isinstance(item, list): + res += get_geoxml_coordinates_item(item) + else: + res += f'{item}' + res += '' + return res + + +def geojson_to_geoxml(features, geoxml_path, itm=False): + with open(geoxml_path, 'w') as f: + f.write('\n') + f.write('\n') + f.write(' FeatureCollection\n') + f.write(' \n') + for feature in features: + if itm: + feature = geojson_feature_to_itm(feature) + f.write(' \n') + if feature['type'] != 'Feature': + raise FailedToConvertFeature(f'feature is not a Feature: {feature}') + f.write(' Feature\n') + geometry = feature['geometry'] + f.write(' \n') + if feature['geometry']['type'] not in ['Polygon', 'MultiPolygon']: + raise FailedToConvertFeature(f'unsupported geometry type: {feature["geometry"]["type"]}') + f.write(f' {geometry["type"]}\n') + f.write(' \n') + for item in geometry['coordinates']: + f.write(f' {get_geoxml_coordinates_item(item)}\n') + f.write(' \n') + f.write(' \n') + f.write(' \n') + f.write('\n') + + +def create_gis_data(gis_url, tmpdir): + with open(os.path.join(tmpdir, 'gis.json'), 'w') as f: + json.dump(fetch_gis_json(gis_url), f, ensure_ascii=False, indent=2) + with open(os.path.join(tmpdir, 'gis.jsonlines'), 'w') as f: + for feature in gis_query_geojson_iterate_all(gis_url): + f.write(json.dumps(feature, ensure_ascii=False) + '\n') + print("Create geojson") + with open(os.path.join(tmpdir, 'gis.geojson'), 'w') as f: + f.write('{"type": "FeatureCollection","features": [\n') + for i, feature in enumerate(iterate_gis_jsonlines(tmpdir)): + if i > 0: + f.write(',\n') + f.write(' ' + json.dumps(feature, ensure_ascii=False)) + f.write(']}') + print("Create gis.itm.geojson") + try: + with open(os.path.join(tmpdir, 'gis.itm.geojson'), 'w') as f: + f.write('{"type": "FeatureCollection","features": [\n') + for i, feature in enumerate(iterate_gis_jsonlines(tmpdir)): + if i > 0: + f.write(',\n') + feature = geojson_feature_to_itm(feature) + f.write(' ' + json.dumps(feature, ensure_ascii=False)) + f.write(']}') + except FailedToConvertFeature as e: + print(str(e)) + print('failed to convert feature to itm') + os.unlink(os.path.join(tmpdir, 'gis.itm.geojson')) + print("Create shapefile.zip") + geojson_data = geopandas.read_file(os.path.join(tmpdir, 'gis.geojson')) + os.makedirs(os.path.join(tmpdir, 'shapefile'), exist_ok=True) + geojson_data.to_file(os.path.join(tmpdir, 'shapefile/gis.shp'), driver='ESRI Shapefile') + os.system(f'cd {tmpdir} && zip -r shapefile.zip shapefile') + print("Create gis.csv, gis.xlsx") + feature_properties = set() + for row in features_to_csv(iterate_gis_jsonlines(tmpdir)): + feature_properties.update(row.keys()) + DF.Flow( + features_to_csv(iterate_gis_jsonlines(tmpdir), fields=feature_properties), + DF.dump_to_path(os.path.join(tmpdir, 'csv')), + DF.dump_to_path(os.path.join(tmpdir, 'xlsx'), format='xlsx') + ).process() + shutil.copyfile(os.path.join(tmpdir, 'csv', 'res_1.csv'), os.path.join(tmpdir, 'gis.csv')) + shutil.copyfile(os.path.join(tmpdir, 'xlsx', 'res_1.xlsx'), os.path.join(tmpdir, 'gis.xlsx')) + print("Create gis.xml") + with open(os.path.join(tmpdir, 'gis.xml'), 'w') as f: + f.write('\n') + f.write('\n') + for properties in features_to_csv(iterate_gis_jsonlines(tmpdir), fields=feature_properties): + f.write(' \n') + for k, v in properties.items(): + v = v.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace( + '\'', ''') + f.write(f' <{k} type="str">{v}\n') + f.write(' \n') + f.write('\n') + print("Create gis.kml") + geojson_to_kml(os.path.join(tmpdir, 'gis.geojson'), os.path.join(tmpdir, 'gis.kml')) + print("Create gis.geoxml") + try: + geojson_to_geoxml(iterate_gis_jsonlines(tmpdir), os.path.join(tmpdir, 'gis.geoxml')) + except FailedToConvertFeature as e: + print(str(e)) + print('failed to convert feature to geoxml') + os.unlink(os.path.join(tmpdir, 'gis.geoxml')) + print("Create gis.itm.geoxml") + try: + geojson_to_geoxml(iterate_gis_jsonlines(tmpdir), os.path.join(tmpdir, 'gis.itm.geoxml'), itm=True) + except FailedToConvertFeature as e: + print(str(e)) + print('failed to convert feature to geoxml') + os.unlink(os.path.join(tmpdir, 'gis.itm.geoxml')) + + +def update_resource(target_instance_name, package, format_, resource_name, file_path): + print(f'updating resource {resource_name}...') + if os.path.exists(file_path): + existing_resource_id = None + existing_resource_hash = None + for resource in package['resources']: + if resource['name'] == resource_name: + existing_resource_id = resource['id'] + existing_resource_hash = resource.get('hash') + new_resource_hash = hashlib.md5(open(file_path, 'rb').read()).hexdigest() + if not existing_resource_id: + print('no existing resource found, creating new resource') + res = ckan.resource_create(target_instance_name, { + 'package_id': package['id'], + 'format': format_, + 'name': resource_name, + 'hash': new_resource_hash + }, files=[('upload', open(file_path, 'rb'))]) + assert res['success'], str(res) + elif existing_resource_hash != new_resource_hash: + print('existing resource found, but hash is different, updating resource data') + res = ckan.resource_update(target_instance_name, { + 'id': existing_resource_id, + 'hash': new_resource_hash + }, files=[('upload', open(file_path, 'rb'))]) + assert res['success'], str(res) + else: + print('existing resource found, and hash is the same, skipping resource update') + else: + print(f'file {file_path} does not exist, skipping resource update') + + +def operator(name, params): + gis_url = params['gis_url'] + target_instance_name = params['target_instance_name'] + target_package_id = params['target_package_id'] + target_organization_id = params['target_organization_id'] + tmpdir = params.get('tmpdir') + with tempdir(tmpdir) as tmpdir: + print('starting gis_fetcher operator') + print(f'gis_url={gis_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}') + print(f'tmpdir={tmpdir}') + create_gis_data(gis_url, tmpdir) + with open(os.path.join(tmpdir, 'gis.json'), 'r') as f: + gis_json = json.load(f) + name = gis_json['name'] + print(f'gis name={name}') + package = ckan.package_show(target_instance_name, target_package_id) + if not package: + res = ckan.package_create(target_instance_name, { + 'name': target_package_id, + 'title': name, + 'owner_org': target_organization_id + }) + assert res['success'], str(res) + package = res['result'] + for format_, resource_name, file_name in [ + ('shapefile', 'SHP', 'shapefile.zip'), + ('csv', 'CSV', 'gis.csv'), + ('xlsx', 'XLSX', 'gis.xlsx'), + ('geojson', 'GeoJSON', 'gis.geojson'), + ('geojson', 'GeoJSON-ITM', 'gis.itm.geojson'), + ('xml', 'XML', 'gis.xml'), + ('kml', 'KML', 'gis.kml'), + ('geoxml', 'GeoXML', 'gis.geoxml'), + ('geoxml', 'GeoXML-ITM', 'gis.itm.geoxml'), + ]: + update_resource(target_instance_name, package, format_, resource_name, os.path.join(tmpdir, file_name)) + print('gis_fetcher operator completed successfully') + + +# python3 -m datacity_ckan_dgp.operators.gis_fetcher '{"gis_url": "https://gisserver.haifa.muni.il/arcgiswebadaptor/rest/services/PublicSite/Haifa_Eng_Public/MapServer/13", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "yeudei_karka", "target_organization_id": "muni", "tmpdir": ".data/gis_fetcher_tmpdir"}' +if __name__ == '__main__': + operator('_', json.loads(sys.argv[1])) diff --git a/datacity_ckan_dgp/package_processing_tasks/xlsx.py b/datacity_ckan_dgp/package_processing_tasks/xlsx.py index 2f39e77..b27efc3 100644 --- a/datacity_ckan_dgp/package_processing_tasks/xlsx.py +++ b/datacity_ckan_dgp/package_processing_tasks/xlsx.py @@ -2,7 +2,6 @@ import sys from glob import glob -import dataflows_xlsx import dataflows as DF from datacity_ckan_dgp import ckan @@ -14,7 +13,7 @@ def process_resource(instance_name, package, resource, package_extras_processed_ with utils.tempdir() as tmpdir: DF.Flow( DF.load(resource['url'], infer_strategy=DF.load.INFER_STRINGS), - dataflows_xlsx.dump_to_path(os.path.join(tmpdir), format='xlsx') + DF.dump_to_path(os.path.join(tmpdir), format='xlsx') ).process() filenames = list(glob(os.path.join(tmpdir, "*.xlsx"))) assert len(filenames) == 1 diff --git a/requirements.txt b/requirements.txt index 0a9ff3a..436196c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ -requests==2.24.0 -pyproj==2.6.1 -geojson==2.5.0 -ruamel.yaml==0.16.12 --e git+https://github.com/OriHoch/dataflows-xlsx.git@99b02e09d85ab5cc2e0fa5e4a26affdcabdcd827#egg=dataflows-xlsx +requests==2.32.2 +pyproj==3.6.1 +dataflows[speedup]==0.5.5 +pykml==0.2.0 +gdal==3.6.2 +geopandas==0.14.4 +ruamel.yaml==0.18.6 +geojson==3.1.0