Skip to content

Commit

Permalink
Merge pull request #303 from IATI/develop
Browse files Browse the repository at this point in the history
Merge develop into main
  • Loading branch information
odscjames authored Nov 29, 2023
2 parents 4465e37 + 982fcfb commit d4dd7be
Show file tree
Hide file tree
Showing 53 changed files with 1,461 additions and 81 deletions.
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ psycopg2
requests
pysolr
chardet
python-dateutil
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pycparser==2.21
# via cffi
pysolr==3.9.0
# via -r requirements.in
python-dateutil==2.8.2
# via -r requirements.in
requests==2.31.0
# via
# -r requirements.in
Expand All @@ -58,5 +60,6 @@ six==1.16.0
# via
# azure-core
# isodate
# python-dateutil
urllib3==2.0.4
# via requests
3 changes: 3 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pysolr==3.9.0
# via -r requirements.in
pytest==7.4.2
# via -r requirements_dev.in
python-dateutil==2.8.2
# via -r requirements.in
requests==2.31.0
# via
# -r requirements.in
Expand All @@ -76,6 +78,7 @@ six==1.16.0
# via
# azure-core
# isodate
# python-dateutil
urllib3==2.0.6
# via requests
wheel==0.41.2
Expand Down
7 changes: 1 addition & 6 deletions src/constants/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,7 @@
),
FLATTEN=dict(
# Number of parallel processes to run the flatten loop with
PARALLEL_PROCESSES=1,

# Flattener API URL/key
FLATTENER_URL=os.getenv('FLATTENER_API_URL'),
FLATTENER_KEY_NAME=os.getenv('FLATTENER_KEY_NAME'),
FLATTENER_KEY_VALUE=os.getenv('FLATTENER_KEY_VALUE')
PARALLEL_PROCESSES=1
),
LAKIFY=dict(
# Number of parallel processes to run the lakify loop with
Expand Down
188 changes: 161 additions & 27 deletions src/library/flatten.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,161 @@
import json
from json.decoder import JSONDecodeError
import library.utils as utils
from lxml import etree
import re
import tempfile
import os
import dateutil.parser

logger = getLogger("flatten")

config_explode_elements = json.loads(config['SOLRIZE']['EXPLODE_ELEMENTS'])


class Flattener:

def __init__(self, sub_list_elements=config_explode_elements):
self.sub_list_elements = sub_list_elements

def process(self, input_filename):
# Check right type of XML file, get attributes from root
large_parser = etree.XMLParser(huge_tree=True, recover=True, remove_comments=True)
root = etree.parse(input_filename, parser=large_parser).getroot()

if root.tag != 'iati-activities':
raise FlattenerException('Non-IATI XML')

root_attributes = {}

# Previous tool always added version, even if it was blank
self._add_to_output('dataset_version', root.attrib.get("version", ""), root_attributes)
if root.attrib.get("generated-datetime"):
self._add_to_output('dataset_generated_datetime', root.attrib.get("generated-datetime"), root_attributes)
if root.attrib.get("linked-data-default"):
self._add_to_output('dataset_linked_data_default', root.attrib.get("linked-data-default"), root_attributes)

del root

# Start Output
output = []

# Process
context = etree.iterparse(input_filename, tag='iati-activity', huge_tree=True, recover=True, remove_comments=True)
for _, activity in context:
nsmap = activity.nsmap
# Start
activity_output = root_attributes.copy()
# Activity Data
self._process_tag(activity, activity_output, nsmap=nsmap)
# Sub lists?
for child_tag_name in self.sub_list_elements:
child_tags = activity.findall(child_tag_name)
if child_tags:
activity_output["@"+child_tag_name] = []
for child_tag in child_tags:
child_tag_data = {}
# TODO this isn't the most efficient as we are parsing the same tag twice
# (here & above for activity)
# But for now, we'll do this to prove functionality then look at speed.
self._process_tag(child_tag, child_tag_data, prefix=child_tag_name, nsmap=nsmap)
activity_output["@"+child_tag_name].append(child_tag_data)
# We have output
output.append(activity_output)

# Return
return output

def _process_tag(self, xml_tag, output, prefix="", nsmap={}):

# Attributes
for attrib_k, attrib_v in xml_tag.attrib.items():

self._add_to_output(
self._convert_name_to_canonical(attrib_k, prefix=prefix, nsmap=nsmap),
attrib_v,
output
)

# Immediate text
if xml_tag.text and xml_tag.text.strip() and prefix:
self._add_to_output(
prefix,
xml_tag.text.strip(),
output
)

# Child tags
for child_xml_tag in xml_tag.getchildren():
self._process_tag(
child_xml_tag,
output,
prefix=self._convert_name_to_canonical(child_xml_tag.tag, prefix=prefix, nsmap=nsmap),
nsmap=nsmap
)

CANONICAL_NAMES_WITH_DATE_TIMES = ['iso_date', 'value_date', 'extraction_date', '_datetime']

def _add_to_output(self, canonical_name, value, output):
# Basic processing
value = value.strip()

# clean iati_identifier so hash matches lakifier
if canonical_name == 'iati_identifier':
value = value.replace("\n", '').replace("\r", '')

# Date time?
if [x for x in self.CANONICAL_NAMES_WITH_DATE_TIMES if x in canonical_name]:
try:
dt_object = utils.parse_xsd_date_value(value) or dateutil.parser.parse(value)
if dt_object:
# This mirrors output of old flaterrer system
value = dt_object.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]+"Z"
else:
# If can't parse, don't add as solr will error
return
except (dateutil.parser.ParserError, dateutil.parser.UnknownTimezoneWarning):
# If can't parse, don't add as solr will error
return

# Add to output
if canonical_name in output:
if isinstance(output[canonical_name], list):
output[canonical_name].append(value)
else:
output[canonical_name] = [output[canonical_name], value]
else:
output[canonical_name] = value

DEFAULT_NAMESPACES = {
"xml": "http://www.w3.org/XML/1998/namespace"
}

def _convert_name_to_canonical(self, name, prefix="", nsmap={}):
for ns, url in self.DEFAULT_NAMESPACES.items():
if name.startswith("{"+url+"}"):
name = ns + "_" + name[len(url)+2:]
for ns, url in nsmap.items():
if name.startswith("{"+url+"}"):
name = ns + "_" + name[len(url)+2:]
name = name.replace("-", "_")
name = name.replace(":", "_")
return prefix+"_"+name if prefix else name

class FlattenerException(Exception):
pass

def process_hash_list(document_datasets):

conn = db.getDirectConnection()
flattener = Flattener()

for file_data in document_datasets:
tempfile_name = None
try:
file_hash = file_data[0]
downloaded = file_data[1]
doc_id = file_data[2]
prior_error = file_data[3]
tempfile_handle, tempfile_name = tempfile.mkstemp(prefix='flatten'+file_hash)

# Explicit error codes returned from Flattener
if prior_error == 422 or prior_error == 400 or prior_error == 413:
Expand Down Expand Up @@ -51,48 +192,41 @@ def process_hash_list(document_datasets):
'Can not identify charset for hash {} doc id {}'.format(file_hash, doc_id))
continue

headers = {config['FLATTEN']['FLATTENER_KEY_NAME']
: config['FLATTEN']['FLATTENER_KEY_VALUE']}
response = requests.post(
config['FLATTEN']['FLATTENER_URL'], data=payload.encode('utf-8'), headers=headers)
del payload
os.write(tempfile_handle, payload.encode('utf-8'))
os.close(tempfile_handle)

if response.status_code != 200:
logger.warning('Flattener reports error status {} for hash {} doc id {}'.format(
str(response.status_code), file_hash, doc_id))
if response.status_code == 404:
logger.warning('Giving it a chance to come back up...')
time.sleep(360) # Give the thing time to come back up
logger.warning('...and off we go again.')
db.updateFlattenError(conn, doc_id, response.status_code)
continue
del payload

flattenedObject = response.json()
flattenedObject = flattener.process(tempfile_name)

db.completeFlatten(conn, doc_id, json.dumps(flattenedObject))

os.remove(tempfile_name)

except (AzureExceptions.ResourceNotFoundError) as e:
logger.warning('Blob not found for hash ' + file_hash +
' - updating as Not Downloaded for the refresher to pick up.')
db.updateFileAsNotDownloaded(conn, doc_id)
except JSONDecodeError:
logger.warning('Unable to decode JSON output from Flattener for hash {} doc id {}'.format(
file_hash, doc_id))
logger.warning('Assuming soft 404/500, waiting, and retrying...')
time.sleep(360)
logger.warning('...and off we go again.')
db.updateFlattenError(conn, doc_id, 404)
except Exception as e:
except (Exception, FlattenerException) as e:
# Log to logs
logger.error('ERROR with flattening ' + file_hash)
print(traceback.format_exc())
if hasattr(e, 'message'):
logger.error(e.message)
logger.error("ERROR message: " + str(e.message))
if hasattr(e, 'msg'):
logger.error(e.msg)
logger.error("ERROR msg: " + str(e.msg))
try:
logger.warning(e.args[0])
logger.warning("ERROR args: " + str(e.args[0]))
except:
pass
# Log to DB
db.updateFlattenError(conn, doc_id, 1)
# Delete temp file
if tempfile_name:
try:
os.remove(tempfile_name)
except:
pass

conn.close()

Expand Down
24 changes: 18 additions & 6 deletions src/library/lakify.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,33 @@ def clean_identifier(identifier):
def recursive_json_nest(element, output):
element_dict = {'@{}'.format(e_key): element.get(e_key)
for e_key in element.keys()}
element_tag = element.tag
if element_tag is etree.Comment:
element_tag = 'comment()'
elif element_tag is etree.PI:
element_tag = 'PI()'

if element.text is not None and element.text.strip() != '':
element_dict['text()'] = element.text
elif element.tag == 'narrative':
else:
inner_text = None
if element.tag is not etree.Comment and element.tag is not etree.PI:
inner_text = ''.join([inner_string.strip()
for inner_string in element.itertext(tag=element_tag)])
if inner_text is not None and inner_text != '':
element_dict['text()'] = inner_text

if element_tag == 'narrative' and 'text()' not in element_dict:
element_dict['text()'] = ''

for e_child in element.getchildren():
element_dict = recursive_json_nest(e_child, element_dict)
element_tag = element.tag
if element_tag is etree.Comment:
element_tag = 'comment()'
if element_tag is etree.PI:
element_tag = 'PI()'

if element_tag in output.keys():
output[element_tag].append(element_dict)
else:
output[element_tag] = [element_dict]

return output


Expand Down
Loading

0 comments on commit d4dd7be

Please sign in to comment.