Skip to content

Commit

Permalink
Merge pull request #119 from mjanez/develop
Browse files Browse the repository at this point in the history
Fix xls, postgres harvesters
  • Loading branch information
mjanez authored Nov 15, 2024
2 parents 88ccdef + 41fb759 commit a147dc2
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 35 deletions.
66 changes: 48 additions & 18 deletions ckanext/schemingdcat/harvesters/sql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dateutil
from urllib.parse import urlparse, urlunparse
from abc import ABC, abstractmethod
import uuid

from ckan import logic
import ckan.plugins as p
Expand Down Expand Up @@ -210,6 +211,7 @@ def gather_stage(self, harvest_job):
self._credentials = self.config.get("credentials")
credential_keys = ', '.join(self._credentials .keys())
log.debug('Loaded credentials with keys: %s', credential_keys)
dataset_id_colname = self.config.get("dataset_id_colname", "dataset_id")
else:
err_msg = f'The credentials are not provided. The harvest source: "{harvest_source_title}" has finished.'
log.error(err_msg)
Expand Down Expand Up @@ -334,68 +336,96 @@ def gather_stage(self, harvest_job):
for harvester in p.PluginImplementations(ISQLHarvester):
if hasattr(harvester, 'after_cleaning'):
clean_datasets, after_cleaning_errors = harvester.after_cleaning(clean_datasets)

for error_msg in after_cleaning_errors:
self._save_gather_error(error_msg, harvest_job)


# Log the length of clean_datasets after after_cleaning
log.debug(f"Length of clean_datasets after_cleaning ISQLHarvester: {len(clean_datasets)}")

# Add datasets to the database
try:
log.debug('Adding datasets to DB')
datasets_to_harvest = {}
source_dataset = model.Package.get(harvest_job.source.id)
skipped_datasets = 0 # Counter for omitted datasets
identifier_counts = {} # To track the frequency of identifiers

for dataset in clean_datasets:
#log.debug('dataset: %s', dataset)

# Set and update translated fields
dataset = self._set_translated_fields(dataset)

# Using name as identifier. Remote table datasets doesnt have identifier
try:
if not dataset.get('name'):
dataset['name'] = self._gen_new_name(dataset['title'])
while dataset['name'] in self._names_taken:
suffix = sum(name.startswith(dataset['name'] + '-') for name in self._names_taken) + 1
dataset['name'] = '{}-{}'.format(dataset['name'], suffix)
self._names_taken.append(dataset['name'])

# If the dataset has no identifier, use the name
# If the dataset has no identifier, use an UUID
if not dataset.get('identifier'):
dataset['identifier'] = self._generate_identifier(dataset)
dataset['identifier'] = str(uuid.uuid4())

except Exception as e:
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset['identifier'], e), harvest_job)
skipped_datasets += 1
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset.get('identifier'), e), harvest_job)
continue

if not dataset.get('identifier'):
skipped_datasets += 1
self._save_gather_error('Missing identifier for dataset with title: %s' % dataset.get('title'), harvest_job)
continue

# Check if a dataset with the same identifier exists can be overridden if necessary
#existing_dataset = self._check_existing_package_by_ids(dataset)
#log.debug('existing_dataset: %s', existing_dataset)

# Unless already set by the dateutil.parser.parser, get the owner organization (if any)
# from the harvest source dataset
if not dataset.get('owner_org'):
if source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if not dataset.get('owner_org') and source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if 'extras' not in dataset:
dataset['extras'] = []

# if existing_dataset:
# dataset['identifier'] = existing_dataset['identifier']

identifier = dataset['identifier']
# Track the frequency of each identifier
identifier_counts[identifier] = identifier_counts.get(identifier, 0) + 1
if identifier_counts[identifier] > 1:
log.warning(f'Duplicate identifier detected: {identifier}. This dataset will overwrite the previous one.')

# guids_in_db.add(dataset['identifier'])

guids_in_harvest.add(dataset['identifier'])
datasets_to_harvest[dataset['identifier']] = dataset

guids_in_harvest.add(identifier)
datasets_to_harvest[identifier] = dataset

# Register duplicate identifiers
duplicates = [id for id, count in identifier_counts.items() if count > 1]
if duplicates:
log.warning(f"The following duplicate identifiers {len(duplicates)} are found: {duplicates}")

except Exception as e:
self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()),
harvest_job)
self._save_gather_error('Error when processing dataset: %r / %s' % (e, traceback.format_exc()), harvest_job)
return []

# Check guids to create/update/delete
new = guids_in_harvest - guids_in_db
# Get objects/datasets to delete (ie in the DB but not in the source)
delete = set(guids_in_db) - set(guids_in_harvest)
change = guids_in_db & guids_in_harvest

log.debug('new: %s, delete: %s and change: %s', new, delete, change)

log.debug(f"Number of skipped datasets: {skipped_datasets}")
log.debug(f'guids_in_harvest ({len(guids_in_harvest)})')
log.debug(f'guids_in_db ({len(guids_in_db)}): {guids_in_db}')
log.debug(f'new ({len(new)})')
log.debug(f'delete ({len(delete)})')
log.debug(f'change ({len(change)})')

ids = []
for guid in new:
Expand Down
5 changes: 5 additions & 0 deletions ckanext/schemingdcat/harvesters/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ def _read_remote_database(self, field_mappings, conn_url):
for mapping, query in self._queries.items():
results, column_names = self.db_manager.execute_query(query)
results_df = pd.DataFrame(results, columns=column_names, dtype=str).fillna('')

# # Exporting the DataFrame to a CSV file
# log.debug('export to output.csv')
# filename = f'output_{mapping}.csv'
# results_df.to_csv(filename, index=False)

# Map the result to the correct category in the results dictionary
# Only add results_df if it is not empty
Expand Down
63 changes: 46 additions & 17 deletions ckanext/schemingdcat/harvesters/xls.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def gather_stage(self, harvest_job):
self._storage_type = self.config.get("storage_type")
self._auth = self.config.get("auth")
self._credentials = self.config.get("credentials")
dataset_id_colname = self.config.get("dataset_id_colname", "dataset_id")

# Get URLs for remote file
remote_xls_base_url = self._get_storage_base_url(source_url, self._storage_type)
Expand Down Expand Up @@ -843,31 +844,44 @@ def gather_stage(self, harvest_job):

for error_msg in after_cleaning_errors:
self._save_gather_error(error_msg, harvest_job)


# Log the length of clean_datasets after after_cleaning
log.debug(f"Length of clean_datasets after_cleaning ISQLHarvester: {len(clean_datasets)}")

# Add datasets to the database
try:
log.debug('Adding datasets to DB')
datasets_to_harvest = {}
source_dataset = model.Package.get(harvest_job.source.id)
skipped_datasets = 0 # Counter for omitted datasets
identifier_counts = {} # To track the frequency of identifiers

for dataset in clean_datasets:
#log.debug('dataset: %s', dataset)

# Set and update translated fields
dataset = self._set_translated_fields(dataset)

# Using name as identifier. Remote table datasets doesnt have identifier
try:
if not dataset.get('name'):
dataset['name'] = self._gen_new_name(dataset['title'])
while dataset['name'] in self._names_taken:
suffix = sum(name.startswith(dataset['name'] + '-') for name in self._names_taken) + 1
dataset['name'] = '{}-{}'.format(dataset['name'], suffix)
self._names_taken.append(dataset['name'])

# If the dataset has no identifier, use the name
# If the dataset has no identifier, use an UUID
if not dataset.get('identifier'):
dataset['identifier'] = self._generate_identifier(dataset)
dataset['identifier'] = str(uuid.uuid4())

except Exception as e:
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset['identifier'], e), harvest_job)
skipped_datasets += 1
self._save_gather_error('Error for the dataset identifier %s [%r]' % (dataset.get('identifier'), e), harvest_job)
continue

if not dataset.get('identifier'):
skipped_datasets += 1
self._save_gather_error('Missing identifier for dataset with title: %s' % dataset.get('title'), harvest_job)
continue

# Check if a dataset with the same identifier exists can be overridden if necessary
Expand All @@ -876,32 +890,47 @@ def gather_stage(self, harvest_job):

# Unless already set by the dateutil.parser.parser, get the owner organization (if any)
# from the harvest source dataset
if not dataset.get('owner_org'):
if source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if not dataset.get('owner_org') and source_dataset.owner_org:
dataset['owner_org'] = source_dataset.owner_org

if 'extras' not in dataset:
dataset['extras'] = []

# if existing_dataset:
# dataset['identifier'] = existing_dataset['identifier']

identifier = dataset['identifier']
# Track the frequency of each identifier
identifier_counts[identifier] = identifier_counts.get(identifier, 0) + 1
if identifier_counts[identifier] > 1:
log.warning(f'Duplicate identifier detected: {identifier}. This dataset will overwrite the previous one.')

# guids_in_db.add(dataset['identifier'])

guids_in_harvest.add(dataset['identifier'])
datasets_to_harvest[dataset['identifier']] = dataset

guids_in_harvest.add(identifier)
datasets_to_harvest[identifier] = dataset

# Register duplicate identifiers
duplicates = [id for id, count in identifier_counts.items() if count > 1]
if duplicates:
log.warning(f"The following duplicate identifiers {len(duplicates)} are found: {duplicates}")

except Exception as e:
self._save_gather_error('Error when processsing dataset: %r / %s' % (e, traceback.format_exc()),
harvest_job)
self._save_gather_error('Error when processing dataset: %r / %s' % (e, traceback.format_exc()), harvest_job)
return []

# Check guids to create/update/delete
new = guids_in_harvest - guids_in_db
# Get objects/datasets to delete (ie in the DB but not in the source)
delete = set(guids_in_db) - set(guids_in_harvest)
change = guids_in_db & guids_in_harvest

log.debug('new: %s, delete: %s and change: %s', new, delete, change)

log.debug(f"Number of skipped datasets: {skipped_datasets}")
log.debug(f'guids_in_harvest ({len(guids_in_harvest)})')
log.debug(f'guids_in_db ({len(guids_in_db)}): {guids_in_db}')
log.debug(f'new ({len(new)})')
log.debug(f'delete ({len(delete)})')
log.debug(f'change ({len(change)})')

ids = []
for guid in new:
Expand Down

0 comments on commit a147dc2

Please sign in to comment.