Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Merge pull request #313 from IATI/network_data__new_update" #327

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ azure-storage-queue==12.2.0
lxml
psycopg2
requests
pysolr==3.9.0 # If this is upgraded, the Solrize needs revisiting, because 3.9 doesn't escape delete calls, so we've done it manually.
pysolr
chardet
python-dateutil
151 changes: 56 additions & 95 deletions src/library/solrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from library.logger import getLogger
from constants.config import config
from azure.storage.blob import BlobServiceClient
from xml.sax.saxutils import escape
import library.db as db
import pysolr
import library.utils as utils
Expand Down Expand Up @@ -88,54 +87,6 @@ def validateLatLon(point_pos):
pass
return None

def delete_from_solr(cores: dict, file_id: str, file_hash: str, query: dict):
"""Delete from the Solr cores"""

for core_name in cores:
# if one of these deletions fails, the docs from last time in
# core which failed to delete will be cleaned up at end of solrize
# process
try:
cores[core_name].delete(**query)
except Exception as e:
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

def get_blob_data(blob_client: object, conn: object, file_id: str, file_hash: str,
fa: dict, hashed_identifier: str, blob_type: str):
"""Downloads and returns activity blob data of the specified type"""

blob_name = '{}/{}.{}'.format(file_id, hashed_identifier, blob_type)

try:
blob_client = blob_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=blob_name)
downloader = blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError((
"Could not download {} activity blob: {}"
", file hash: {}, iati-identifier: {}."
" Sending back to Lakify."
).format(blob_type, blob_name, file_hash, fa['iati_identifier'])
)

try:
return utils.get_text_from_blob(downloader, blob_name)
except:
raise SolrizeSourceError((
"Could not identify charset for {} blob: {}"
", file hash: {}, iati-identifier: {}"
).format(blob_type, blob_name, file_hash, fa['iati_identifier']))


def escape_param_for_pysolr_delete(query_param: str):
# escape is used to do XML escaping, since PySolr doesn't do it for delete
# queries, and the \ and " escaping is to escape for a param for a Solr query
# where the param will be in double quotes
return escape(query_param).replace("\\", "\\\\").replace("\"", "\\\"")

def process_hash_list(document_datasets):
"""
Expand Down Expand Up @@ -175,46 +126,69 @@ def process_hash_list(document_datasets):

db.updateSolrizeStartDate(conn, file_id)

# check whether existing DS data and new dataset contain only "good" data:
# - flattened activities have any duplicated IATI identifiers in them?
# - existing Solr-docs in solr for this dataset have repeated IDs
new_identifiers = [fa['iati_identifier'] for fa in flattened_activities[0]]
has_dupes = len(set(new_identifiers)) != len(new_identifiers) or \
solr_cores['activity'].search("id:" + file_id + "--*--1", rows=0).hits > 0
logger.info('Removing all docs for doc with hash: ' +
file_hash + ' and id: ' + file_id)

identifiers_seen = []
identifier_indices = {}
for core_name in solr_cores:
try:
solr_cores[core_name].delete(
q='iati_activities_document_id:' + file_id)
except Exception as e:
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

logger.info('Adding docs for hash: ' + file_hash + ' and id: ' + file_id)

# log to say which update method to be used, but outside the activity
# processing loop to avoid thousands of log messages for large files
if has_dupes:
logger.info(('File with id: {} (hash: {}) had or has dupes, so removing all '
'activity, budget, transaction Solr-docs at start of processing '
'for each activity').format(file_id, file_hash))
else:
logger.info(('File with id: {} (hash: {}) had and has no dupes, so updating '
'activities in-place and removing budget, transaction Solr-docs '
'on a per-activity basis').format(file_id, file_hash))
identifier_indices = {}

for fa in flattened_activities[0]:
hashed_iati_identifier = utils.get_hash_for_identifier(fa['iati_identifier'])
blob_name = '{}/{}.xml'.format(file_id, hashed_iati_identifier)

try:
blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=blob_name)
downloader = blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError(
'Could not download XML activity blob: ' + blob_name +
', file hash: ' + file_hash +
', iati-identifier: ' + fa['iati_identifier'] +
'. Sending back to Lakify.'
)

# if first time seeing identifier in dataset, delete docs from Solr
if has_dupes and (fa['iati_identifier'] not in identifiers_seen):
delete_from_solr(solr_cores, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'iati_identifier_exact:"{}"').format(file_id,
escape_param_for_pysolr_delete(fa['iati_identifier']))})
try:
fa['iati_xml'] = utils.get_text_from_blob(downloader, blob_name)
except:
raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name +
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])

identifiers_seen.append(fa['iati_identifier'])
json_blob_name = '{}/{}.json'.format(file_id, hashed_iati_identifier)

try:
json_blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=json_blob_name)
json_downloader = json_blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError(
'Could not download JSON activity blob: ' + json_blob_name +
', file hash: ' + file_hash +
', iati-identifier: ' + fa['iati_identifier'] +
'. Sending back to Lakify.'
)

# add the Activity XML/JSON blobs to the flattened activity
fa['iati_xml'] = get_blob_data(blob_service_client, conn, file_id, file_hash,
fa, hashed_iati_identifier, 'xml')
fa['iati_json'] = get_blob_data(blob_service_client, conn, file_id, file_hash,
fa, hashed_iati_identifier, 'json')
try:
fa['iati_json'] = utils.get_text_from_blob(
json_downloader, json_blob_name)
except:
raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name +
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])

# add id and hash
fa['iati_activities_document_id'] = file_id
fa['iati_activities_document_hash'] = file_hash

Expand All @@ -232,7 +206,7 @@ def process_hash_list(document_datasets):
except KeyError:
pass

# Remove sub lists from flattened activity, saving data for use later
# Remove sub lists
sub_list_data = {}
for element_name in explode_elements:
if isinstance(fa.get('@'+element_name), list):
Expand All @@ -251,24 +225,11 @@ def process_hash_list(document_datasets):
del fa['iati_xml']
del fa['iati_json']

# For budget, transaction cores, delete docs (if data had & has no dupes), then re-add
# Now index explode_elements
for element_name, element_data in sub_list_data.items():
if not has_dupes:
delete_from_solr({element_name: solr_cores[element_name]}, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'iati_identifier_exact:"{}"').format(file_id,
escape_param_for_pysolr_delete(fa['iati_identifier']))})
results = get_explode_element_data(element_name, element_data, fa)
addToSolr(element_name, results, file_hash, file_id)

# now do cleanup delete for docs from activities that have been deleted
logger.info(('Removing remaining old Solr-docs for file with id: {} '
'where the hash is not equal to current hash: {}').format(file_id, file_hash))
delete_from_solr(solr_cores, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'NOT(iati_activities_document_hash:"{}")').format(file_id,
file_hash)})

logger.info('Updating DB with successful Solrize for hash: ' +
file_hash + ' and id: ' + file_id)
db.completeSolrize(conn, file_id)
Expand Down
Loading