Skip to content

Commit

Permalink
Improve dataset handling in harvesters by adding checks for dataset e…
Browse files Browse the repository at this point in the history
…xistence and logging warnings for missing datasets
  • Loading branch information
mjanez committed Nov 19, 2024
1 parent 41fb759 commit 1758b8c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 44 deletions.
62 changes: 38 additions & 24 deletions ckanext/schemingdcat/harvesters/sql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,40 +426,54 @@ def gather_stage(self, harvest_job):
log.debug(f'new ({len(new)})')
log.debug(f'delete ({len(delete)})')
log.debug(f'change ({len(change)})')

ids = []
for guid in new:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in change:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in delete:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})

dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

log.debug('Number of elements in clean_datasets: %s and object_ids: %s', len(clean_datasets), len(ids))

#DEBUG::Log clean_datasets/ ids
# Log clean_datasets/ ids
#self._log_export_clean_datasets_and_ids(harvest_source_title, clean_datasets, ids)

return [id_dict['id'] for id_dict in ids]

def fetch_stage(self, harvest_object):
# Nothing to do here - we got the package dict in the search in the gather stage
return True

#TODO: implementar el import stage

def import_stage(self, harvest_object):
"""
Performs the import stage of the SchemingDCATXLSHarvester.
Expand Down
54 changes: 34 additions & 20 deletions ckanext/schemingdcat/harvesters/xls.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,31 +931,45 @@ def gather_stage(self, harvest_job):
log.debug(f'new ({len(new)})')
log.debug(f'delete ({len(delete)})')
log.debug(f'change ({len(change)})')

ids = []
for guid in new:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
extras=[HarvestObjectExtra(key='status', value='new')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in change:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})
dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='change')])
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

for guid in delete:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(datasets_to_harvest.get(guid)),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': datasets_to_harvest.get(guid)['name'], 'identifier': datasets_to_harvest.get(guid)['identifier']})

dataset = datasets_to_harvest.get(guid)
if dataset:
obj = HarvestObject(guid=guid, job=harvest_job, content=json.dumps(dataset),
package_id=guid_to_package_id[guid],
extras=[HarvestObjectExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append({'id': obj.id, 'name': dataset['name'], 'identifier': dataset['identifier']})
else:
log.warning(f'Dataset for GUID {guid} not found in datasets_to_harvest')

log.debug('Number of elements in clean_datasets: %s and object_ids: %s', len(clean_datasets), len(ids))

# Log clean_datasets/ ids
#self._log_export_clean_datasets_and_ids(harvest_source_title, clean_datasets, ids)

Expand Down

0 comments on commit 1758b8c

Please sign in to comment.