diff --git a/configuration.template.json b/configuration.template.json index 7603855..9e23497 100644 --- a/configuration.template.json +++ b/configuration.template.json @@ -130,6 +130,33 @@ "type": "text" } ] + }, + { + "name": "generic_fetcher", + "display": "Fetch/Update data from different source types into a CKAN Package", + "fields": [ + { + "name": "source_url", + "display": "Source URL (source type will be inferred from the URL, see https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/operators/generic_fetcher.py for details)", + "type": "text" + }, + { + "name": "target_instance_name", + "display": "Target CKAN Instance", + "type": "enum", + "options": ["__CKAN_INSTANCES__"] + }, + { + "name": "target_package_id", + "display": "ID of Package to Update (or Create)", + "type": "text" + }, + { + "name": "target_organization_id", + "display": "Owner Organization of created package", + "type": "text" + } + ] } ], "theme": { diff --git a/datacity_ckan_dgp/generic_fetchers/__init__.py b/datacity_ckan_dgp/generic_fetchers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py new file mode 100644 index 0000000..3148aad --- /dev/null +++ b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py @@ -0,0 +1,80 @@ +import os + +import requests + +from .. import ckan +from ..utils import http_stream_download + + +def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir): + res = ckan.package_show(target_instance_name, target_package_id) + target_package_exists = False + existing_target_resources = {} + if res: + target_package_exists = True + for resource in res['resources']: + format_ = resource.get('format') or '' + name = resource.get('name') or '' + hash_ = resource.get('hash') or '' + id_ = resource.get('id') or '' + if format_ and name and hash_ and id_: + existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_} + source_package_id = source_url.split('/dataset/')[1].split('/')[0] + source_instance_baseurl = source_url.split('/dataset/')[0] + if 'data.gov.il' in source_instance_baseurl: + headers = {'user-agent': 'datagov-external-client'} + else: + headers = None + res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers).json() + assert res['success'] + package_title = res['result']['title'] + resources_to_update = [] + for resource in res['result']['resources']: + id_ = resource.get('id') or '' + url = resource.get('url') or '' + if url and id_: + if 'e.data.gov.il' in url: + url = url.replace('e.data.gov.il', 'data.gov.il') + filename = url.split('/')[-1] + source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers}) + source_format = resource.get('format') or '' + source_name = resource.get('name') or '' + description = resource.get('description') or '' + if existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash: + resources_to_update.append((id_, source_name, source_format, source_hash, description, filename)) + if resources_to_update: + print(f'updating {len(resources_to_update)} resources') + if not target_package_exists: + print('creating target package') + res = ckan.package_create(target_instance_name, { + 'name': target_package_id, + 'title': package_title, + 'owner_org': target_organization_id + }) + assert res['success'], str(res) + for id_, name, format_, hash_, description, filename in resources_to_update: + print(f'{name}.{format_}') + if os.path.exists(f'{tmpdir}/{filename}'): + os.unlink(f'{tmpdir}/{filename}') + os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}') + if f'{name}.{format_}' in existing_target_resources: + print('existing resource found, but hash is different, updating resource data') + res = ckan.resource_update(target_instance_name, { + 'id': existing_target_resources[f'{name}.{format_}']['id'], + 'hash': hash_, + 'description': description + }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + assert res['success'], str(res) + else: + print('no existing resource found, creating new resource') + res = ckan.resource_create(target_instance_name, { + 'package_id': target_package_id, + 'format': format_, + 'name': name, + 'hash': hash_, + 'description': description + }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + assert res['success'], str(res) + print('done, all resources created/updated') + else: + print('no resources to create/update') diff --git a/datacity_ckan_dgp/operators/generic_fetcher.py b/datacity_ckan_dgp/operators/generic_fetcher.py new file mode 100644 index 0000000..13eaef0 --- /dev/null +++ b/datacity_ckan_dgp/operators/generic_fetcher.py @@ -0,0 +1,50 @@ +import os +import sys +import json +import tempfile +import contextlib +from importlib import import_module + + +# the source url will be checked against the following types in order to determine which type of source it is +FETCHERS = [ + { + # python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' + 'fetcher': 'ckan_dataset', + 'match': { + 'url_contains': '/dataset/' + } + } +] + + +@contextlib.contextmanager +def tempdir(tmpdir): + if tmpdir: + os.makedirs(tmpdir, exist_ok=True) + yield tmpdir + else: + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +def operator(name, params): + source_url = params['source_url'] + target_instance_name = params['target_instance_name'] + target_package_id = params['target_package_id'] + target_organization_id = params['target_organization_id'] + tmpdir = params.get('tmpdir') + with tempdir(tmpdir) as tmpdir: + print('starting generic_fetcher operator') + print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}') + print(f'tmpdir={tmpdir}') + for fetcher in FETCHERS: + assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment' + if fetcher['match']['url_contains'] in source_url: + import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir) + break + + +# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' +if __name__ == '__main__': + operator('_', json.loads(sys.argv[1])) diff --git a/datacity_ckan_dgp/operators/instance_initializer.py b/datacity_ckan_dgp/operators/instance_initializer.py index 2374979..2c28dee 100644 --- a/datacity_ckan_dgp/operators/instance_initializer.py +++ b/datacity_ckan_dgp/operators/instance_initializer.py @@ -1,7 +1,7 @@ import os import traceback from glob import glob -from ruamel import yaml +from ruamel.yaml import YAML from collections import defaultdict import pyproj @@ -11,6 +11,9 @@ from datacity_ckan_dgp import utils +yaml = YAML(typ='safe', pure=True) + + AUTOMATION_GROUP_NAME = 'instance_initializer' ORGANIZATIONS_YAML = os.path.join(os.path.dirname(__file__), '..', 'instance_initializer_organizations.yaml') GROUPS_YAML = os.path.join(os.path.dirname(__file__), '..', 'instance_initializer_groups.yaml') @@ -35,7 +38,7 @@ def init_groups(instance_name): print("Initializing groups") if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_groups'): with open(GROUPS_YAML) as f: - groups = yaml.safe_load(f) + groups = yaml.load(f) for group in groups: if not ckan.group_show(instance_name, group['id']): ckan.group_create(instance_name, group['id'], title=group['title'], image_url=group['icon']) @@ -49,7 +52,7 @@ def init_organizations(instance_name, default_organization_title): print("Initializing organizations") if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_organizations'): with open(ORGANIZATIONS_YAML) as f: - organizations = yaml.safe_load(f) + organizations = yaml.load(f) for organization in organizations: if not ckan.organization_show(instance_name, organization['id']): title = default_organization_title if organization['id'] == 'muni' else organization['title'] @@ -214,7 +217,7 @@ def init_packages(instance_name, muni_filter_texts, init_package_id=None): print("Initializing packages") if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_packages'): with open(PACKAGES_YAML) as f: - packages = yaml.safe_load(f) + packages = yaml.load(f) num_errors = 0 num_success = 0 for package in packages: @@ -250,6 +253,9 @@ def operator(name, params, init_package_id=None): init_packages(instance_name, muni_filter_texts, init_package_id) +# python3 -m datacity_ckan_dgp.operators.instance_initializer '{"instance_name": "local_development", "default_organization_title": "עיריית חיפה", "muni_filter_texts": "חיפה"}' + + if __name__ == '__main__': import sys import json