From 7077a88fca076336a090a38ba7a05ce648d0a011 Mon Sep 17 00:00:00 2001 From: mjanez <96422458+mjanez@users.noreply.github.com> Date: Mon, 2 Dec 2024 01:29:59 +0100 Subject: [PATCH] Enhance XLS harvester error date handling and logging for dataset processing --- ckanext/schemingdcat/harvesters/xls.py | 126 +++++++++++++++++-------- 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/ckanext/schemingdcat/harvesters/xls.py b/ckanext/schemingdcat/harvesters/xls.py index 818462f0..a8df3fe0 100644 --- a/ckanext/schemingdcat/harvesters/xls.py +++ b/ckanext/schemingdcat/harvesters/xls.py @@ -843,11 +843,17 @@ def gather_stage(self, harvest_job): for error_msg in after_cleaning_errors: self._save_gather_error(error_msg, harvest_job) + # Log the length of clean_datasets after after_cleaning + log.debug(f"Length of clean_datasets after_cleaning ISQLHarvester: {len(clean_datasets)}") + # Add datasets to the database try: log.debug('Adding datasets to DB') datasets_to_harvest = {} source_dataset = model.Package.get(harvest_job.source.id) + skipped_datasets = 0 # Counter for omitted datasets + identifier_counts = {} # To track the frequency of identifiers + for dataset in clean_datasets: # Set and update translated fields @@ -865,8 +871,15 @@ def gather_stage(self, harvest_job): # If the dataset has no identifier, use the name if not dataset.get('identifier'): dataset['identifier'] = self._generate_identifier(dataset) + except Exception as e: - self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset['identifier'], e), harvest_job) + skipped_datasets += 1 + self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset.get('identifier'), e), harvest_job) + continue + + if not dataset.get('identifier'): + skipped_datasets += 1 + self._save_gather_error('Missing identifier for dataset with title: %s' % dataset.get('title'), harvest_job) continue # Check if a dataset with the same identifier exists can be overridden if necessary @@ -875,23 +888,33 @@ def gather_stage(self, harvest_job): # Unless already set by the dateutil.parser.parser, get the owner organization (if any) # from the harvest source dataset - if not dataset.get('owner_org'): - if source_dataset.owner_org: - dataset['owner_org'] = source_dataset.owner_org - + if not dataset.get('owner_org') and source_dataset.owner_org: + dataset['owner_org'] = source_dataset.owner_org + if 'extras' not in dataset: dataset['extras'] = [] # if existing_dataset: # dataset['identifier'] = existing_dataset['identifier'] + + identifier = dataset['identifier'] + # Track the frequency of each identifier + identifier_counts[identifier] = identifier_counts.get(identifier, 0) + 1 + if identifier_counts[identifier] > 1: + log.warning(f'Duplicate identifier detected: {identifier}. This dataset will overwrite the previous one.') + # guids_in_db.add(dataset['identifier']) - - guids_in_harvest.add(dataset['identifier']) - datasets_to_harvest[dataset['identifier']] = dataset + guids_in_harvest.add(identifier) + datasets_to_harvest[identifier] = dataset + + # Register duplicate identifiers + duplicates = [id for id, count in identifier_counts.items() if count > 1] + if duplicates: + log.warning(f"The following duplicate identifiers {len(duplicates)} are found: {duplicates}") + except Exception as e: - self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()), - harvest_job) + self._save_gather_error('Error when processing dataset: %r / %s' % (e, traceback.format_exc()), harvest_job) return [] # Check guids to create/update/delete @@ -899,38 +922,58 @@ def gather_stage(self, harvest_job): # Get objects/datasets to delete (ie in the DB but not in the source) delete = set(guids_in_db) - set(guids_in_harvest) change = guids_in_db & guids_in_harvest - - log.debug('new: %s, delete: %s and change: %s', new, delete, change) + + log.debug(f"Number of skipped datasets: {skipped_datasets}") + log.debug(f'guids_in_harvest ({len(guids_in_harvest)})') + log.debug(f'guids_in_db ({len(guids_in_db)}): {guids_in_db}') + log.debug(f'new ({len(new)})') + log.debug(f'delete ({len(delete)})') + log.debug(f'change ({len(change)})') ids = [] for guid in new: - obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)), - extras=[HarvestObjectExtra(key='status', value='new')]) - obj.save() - ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']}) + dataset = datasets_to_harvest.get(guid) + if dataset: + obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset), + extras=[HarvestObjectExtra(key='status', value='new')]) + obj.save() + ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']}) + else: + log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest') + for guid in change: - obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)), - package_id=guid_to_package_id[guid], - extras=[HarvestObjectExtra(key='status', value='change')]) - obj.save() - ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']}) + dataset = datasets_to_harvest.get(guid) + if dataset: + obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset), + package_id=guid_to_package_id[guid], + extras=[HarvestObjectExtra(key='status', value='change')]) + obj.save() + ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']}) + else: + log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest') + for guid in delete: - obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)), - package_id=guid_to_package_id[guid], - extras=[HarvestObjectExtra(key='status', value='delete')]) - model.Session.query(HarvestObject).\ - filter_by(guid=guid).\ - update({'current': False}, False) - obj.save() - ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']}) - + dataset = datasets_to_harvest.get(guid) + if dataset: + obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset), + package_id=guid_to_package_id[guid], + extras=[HarvestObjectExtra(key='status', value='delete')]) + model.Session.query(HarvestObject).\ + filter_by(guid=guid).\ + update({'current': False}, False) + obj.save() + ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']}) + else: + log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest') + log.debug('Number of elements in clean_datasets: %s and object_ids: %s', len(clean_datasets), len(ids)) - + # Log clean_datasets/ ids #self._log_export_clean_datasets_and_ids(harvest_source_title, clean_datasets, ids) return [id_dict['id'] for id_dict in ids] + def fetch_stage(self, harvest_object): # Nothing to do here - we got the package dict in the search in the gather stage return True @@ -1103,26 +1146,27 @@ def import_stage(self, harvest_object): elif status == 'change': # Check if the modified date is more recent - if not self.force_import and previous_object and dateutil.parser.parse(harvest_object.metadata_modified_date) <= previous_object.metadata_modified_date: + if not self.force_import and previous_object and previous_object.metadata_modified_date and dateutil.parser.parse(harvest_object.metadata_modified_date) <= previous_object.metadata_modified_date: log.info('Package with GUID: %s unchanged, skipping...' % harvest_object.guid) return 'unchanged' else: - log.info("Dataset dates - Harvest date: %s and Previous date: %s", harvest_object.metadata_modified_date, previous_object.metadata_modified_date) - + log.info("Dataset dates - Harvest date: %s and Previous date: %s", harvest_object.metadata_modified_date, previous_object.metadata_modified_date if previous_object else 'None') + # update_package_schema_for_update interface package_schema = logic.schema.default_update_package_schema() for harvester in p.PluginImplementations(ISchemingDCATHarvester): if hasattr(harvester, 'update_package_schema_for_update'): package_schema = harvester.update_package_schema_for_update(package_schema) context['schema'] = package_schema - + package_dict['id'] = harvest_object.package_id + try: # before_update interface for harvester in p.PluginImplementations(ISchemingDCATHarvester): if hasattr(harvester, 'before_update'): err = harvester.before_update(harvest_object, package_dict, harvester_tmp_dict) - + if err: self._save_object_error(f'TableHarvester plugin error: {err}', harvest_object, 'Import') return False @@ -1130,21 +1174,21 @@ def import_stage(self, harvest_object): result = self._create_or_update_package( package_dict, harvest_object, package_dict_form='package_show') - + # after_update interface for harvester in p.PluginImplementations(ISchemingDCATHarvester): if hasattr(harvester, 'after_update'): err = harvester.after_update(harvest_object, package_dict, harvester_tmp_dict) - + if err: self._save_object_error(f'TableHarvester plugin error: {err}', harvest_object, 'Import') return False - + log.info('Updated package %s with GUID: %s' % (package_dict["id"], harvest_object.guid)) except p.toolkit.ValidationError as e: error_message = ', '.join(f'{k}: {v}' for k, v in e.error_dict.items()) self._save_object_error(f'Validation Error: {error_message}', harvest_object, 'Import') return False - - return result + + return result \ No newline at end of file