diff --git a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py index ee6f2d2..c4324a7 100644 --- a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py +++ b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py @@ -1,6 +1,7 @@ import os import json import shutil +import datetime import requests import dataflows as DF @@ -50,6 +51,56 @@ def check_row_kv(row, key, val): return rowval == val +def get_resource_last_modified(resource): + last_modified = resource.get('last_modified') + if last_modified: + try: + return datetime.datetime.strptime(last_modified, '%Y-%m-%dT%H:%M:%S.%f') + except: + pass + return None + + +def find_resource_by_name_and_format(resources, name, format_): + for resource in resources: + if (resource.get('name') or '').lower() == name.lower() and (resource.get('format') or '').lower() == format_.lower(): + return resource + return None + + +def post_processing_resource( + source_resources, existing_target_resources, post_processing, target_package_id, + format_, name, hash_, description, upload_filename, target_instance_name +): + if not post_processing: + post_processing = [] + for post_process in post_processing: + if post_process.get('type') == 'keep-last-updated-per-year': + post_process_month = post_process['month'] + post_process_format = post_process['format'] + post_process_resource_name = post_process['resource_name'] + source_resource = find_resource_by_name_and_format(source_resources.values(), post_process_resource_name, post_process_format) + source_last_modified = get_resource_last_modified(source_resource) if source_resource else None + source_month = source_last_modified.month + year = datetime.datetime.now().year + if ( + name.lower() == post_process_resource_name.lower() and format_.lower() == post_process_format.lower() + and source_last_modified and source_last_modified.year == year + and (post_process_month == source_month or post_process_month + 1 == source_month) + and f'{name} ({year}).{format_}'.lower() not in existing_target_resources + ): + print(f'post process keep-last-updated-per-year: creating yearly resource ({name}.{format_})') + data = { + 'package_id': target_package_id, + 'format': format_, + 'name': f'{name} ({year})', + 'hash': hash_, + 'description': description + } + res = ckan.resource_create(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))]) + assert res['success'], str(res) + + def filter_rows(source_filter): def filter_row(row): @@ -152,7 +203,7 @@ def get_resources_to_update(resources, tmpdir, headers, existing_target_resource return resources_to_update -def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter): +def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter, post_processing): res = ckan.package_show(target_instance_name, target_package_id) target_package_exists = False existing_target_resources = {} @@ -193,7 +244,12 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati notes = res['result'].get('notes') or '' if notes: description = f'{notes}\n\n{description}' - resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter) + source_resources = { + resource['id']: resource for resource in res['result']['resources'] + } + resources_to_update = get_resources_to_update( + res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter + ) if resources_to_update: with instance_package_lock(target_instance_name, target_package_id): print(f'updating {len(resources_to_update)} resources') @@ -216,22 +272,34 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati print('existing resource found and hash is the same, skipping resource data update') else: print('existing resource found, but hash is different, updating resource data') - res = ckan.resource_update(target_instance_name, { + data = { 'id': existing_target_resources[f'{name}.{format_}'.lower()]['id'], 'hash': hash_, 'description': description - }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + } + upload_filename = f'{tmpdir}/{filename}' + res = ckan.resource_update(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))]) assert res['success'], str(res) + post_processing_resource( + source_resources, existing_target_resources, post_processing, target_package_id, + format_, name, hash_, description, upload_filename, target_instance_name + ) else: print('no existing resource found, creating new resource') - res = ckan.resource_create(target_instance_name, { + data = { 'package_id': target_package_id, 'format': format_, 'name': name, 'hash': hash_, 'description': description - }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + } + upload_filename = f'{tmpdir}/{filename}' + res = ckan.resource_create(target_instance_name, data, files=[('upload', open(upload_filename, 'rb'))]) assert res['success'], str(res) + post_processing_resource( + source_resources, existing_target_resources, post_processing, target_package_id, + format_, name, hash_, description, upload_filename, target_instance_name + ) run_packages_processing(target_instance_name, target_package_id) print('done, all resources created/updated') else: diff --git a/datacity_ckan_dgp/operators/generic_fetcher.py b/datacity_ckan_dgp/operators/generic_fetcher.py index 2aa6c4f..7fd6756 100644 --- a/datacity_ckan_dgp/operators/generic_fetcher.py +++ b/datacity_ckan_dgp/operators/generic_fetcher.py @@ -33,13 +33,16 @@ def operator(name, params): target_package_id = params['target_package_id'] target_organization_id = params['target_organization_id'] tmpdir = params.get('tmpdir') + post_processing = params.get('post_processing') with tempdir(tmpdir) as tmpdir: print('starting generic_fetcher operator') print(json.dumps(params, ensure_ascii=False)) for fetcher in FETCHERS: assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment' if fetcher['match']['url_contains'] in source_url: - import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter) + import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch( + source_url, target_instance_name, target_package_id, target_organization_id, tmpdir, source_filter, post_processing + ) break