Skip to content

Commit

Permalink
Enhance XLS harvester error date handling and logging for dataset pro…
Browse files Browse the repository at this point in the history
…cessing
  • Loading branch information
mjanez committed Dec 2, 2024
1 parent e940fa2 commit 7077a88
Showing 1 changed file with 85 additions and 41 deletions.
126 changes: 85 additions & 41 deletions ckanext/schemingdcat/harvesters/xls.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,17 @@ def gather_stage(self, harvest_job):
for error_msg in after_cleaning_errors:
self._save_gather_error(error_msg, harvest_job)

# Log the length of clean_datasets after after_cleaning
log.debug(f"Length of clean_datasets after_cleaning ISQLHarvester: {len(clean_datasets)}")

# Add datasets to the database
try:
log.debug('Adding datasets to DB')
datasets_to_harvest = {}
source_dataset = model.Package.get(harvest_job.source.id)
skipped_datasets = 0 # Counter for omitted datasets
identifier_counts = {} # To track the frequency of identifiers

for dataset in clean_datasets:

# Set and update translated fields
Expand All @@ -865,8 +871,15 @@ def gather_stage(self, harvest_job):
# If the dataset has no identifier, use the name
if not dataset.get('identifier'):
dataset['identifier'] = self._generate_identifier(dataset)

except Exception as e:
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset['identifier'], e), harvest_job)
skipped_datasets += 1
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset.get('identifier'), e), harvest_job)
continue

if not dataset.get('identifier'):
skipped_datasets += 1
self._save_gather_error('Missing identifier for dataset with title: %s' % dataset.get('title'), harvest_job)
continue

# Check if a dataset with the same identifier exists can be overridden if necessary
Expand All @@ -875,62 +888,92 @@ def gather_stage(self, harvest_job):

# Unless already set by the dateutil.parser.parser, get the owner organization (if any)
# from the harvest source dataset
if not dataset.get('owner_org'):
if source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if not dataset.get('owner_org') and source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if 'extras' not in dataset:
dataset['extras'] = []

# if existing_dataset:
# dataset['identifier'] = existing_dataset['identifier']

identifier = dataset['identifier']
# Track the frequency of each identifier
identifier_counts[identifier] = identifier_counts.get(identifier, 0) + 1
if identifier_counts[identifier] > 1:
log.warning(f'Duplicate identifier detected: {identifier}. This dataset will overwrite the previous one.')

# guids_in_db.add(dataset['identifier'])

guids_in_harvest.add(dataset['identifier'])
datasets_to_harvest[dataset['identifier']] = dataset

guids_in_harvest.add(identifier)
datasets_to_harvest[identifier] = dataset

# Register duplicate identifiers
duplicates = [id for id, count in identifier_counts.items() if count > 1]
if duplicates:
log.warning(f"The following duplicate identifiers {len(duplicates)} are found: {duplicates}")

except Exception as e:
self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()),
harvest_job)
self._save_gather_error('Error when processing dataset: %r / %s' % (e, traceback.format_exc()), harvest_job)
return []

# Check guids to create/update/delete
new = guids_in_harvest - guids_in_db
# Get objects/datasets to delete (ie in the DB but not in the source)
delete = set(guids_in_db) - set(guids_in_harvest)
change = guids_in_db & guids_in_harvest

log.debug('new: %s, delete: %s and change: %s', new, delete, change)

log.debug(f"Number of skipped datasets: {skipped_datasets}")
log.debug(f'guids_in_harvest ({len(guids_in_harvest)})')
log.debug(f'guids_in_db ({len(guids_in_db)}): {guids_in_db}')
log.debug(f'new ({len(new)})')
log.debug(f'delete ({len(delete)})')
log.debug(f'change ({len(change)})')

ids = []
for guid in new:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in change:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in delete:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})

dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

log.debug('Number of elements in clean_datasets: %s and object_ids: %s', len(clean_datasets), len(ids))

# Log clean_datasets/ ids
#self._log_export_clean_datasets_and_ids(harvest_source_title, clean_datasets, ids)

return [id_dict['id'] for id_dict in ids]


def fetch_stage(self, harvest_object):
# Nothing to do here - we got the package dict in the search in the gather stage
return True
Expand Down Expand Up @@ -1103,48 +1146,49 @@ def import_stage(self, harvest_object):

elif status == 'change':
# Check if the modified date is more recent
if not self.force_import and previous_object and dateutil.parser.parse(harvest_object.metadata_modified_date) <= previous_object.metadata_modified_date:
if not self.force_import and previous_object and previous_object.metadata_modified_date and dateutil.parser.parse(harvest_object.metadata_modified_date) <= previous_object.metadata_modified_date:
log.info('Package with GUID: %s unchanged, skipping...' % harvest_object.guid)
return 'unchanged'
else:
log.info("Dataset dates - Harvest date: %s and Previous date: %s", harvest_object.metadata_modified_date, previous_object.metadata_modified_date)

log.info("Dataset dates - Harvest date: %s and Previous date: %s", harvest_object.metadata_modified_date, previous_object.metadata_modified_date if previous_object else 'None')
# update_package_schema_for_update interface
package_schema = logic.schema.default_update_package_schema()
for harvester in p.PluginImplementations(ISchemingDCATHarvester):
if hasattr(harvester, 'update_package_schema_for_update'):
package_schema = harvester.update_package_schema_for_update(package_schema)
context['schema'] = package_schema

package_dict['id'] = harvest_object.package_id

try:
# before_update interface
for harvester in p.PluginImplementations(ISchemingDCATHarvester):
if hasattr(harvester, 'before_update'):
err = harvester.before_update(harvest_object, package_dict, harvester_tmp_dict)

if err:
self._save_object_error(f'TableHarvester plugin error: {err}', harvest_object, 'Import')
return False

result = self._create_or_update_package(
package_dict, harvest_object,
package_dict_form='package_show')

# after_update interface
for harvester in p.PluginImplementations(ISchemingDCATHarvester):
if hasattr(harvester, 'after_update'):
err = harvester.after_update(harvest_object, package_dict, harvester_tmp_dict)

if err:
self._save_object_error(f'TableHarvester plugin error: {err}', harvest_object, 'Import')
return False

log.info('Updated package %s with GUID: %s' % (package_dict["id"], harvest_object.guid))

except p.toolkit.ValidationError as e:
error_message = ', '.join(f'{k}: {v}' for k, v in e.error_dict.items())
self._save_object_error(f'Validation Error: {error_message}', harvest_object, 'Import')
return False

return result
return result

0 comments on commit 7077a88

Please sign in to comment.