diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..074fcef6 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "bento"] + path = bento + url = https://github.com/CBIIT/bento-common.git diff --git a/bento b/bento new file mode 160000 index 00000000..5babf3cf --- /dev/null +++ b/bento @@ -0,0 +1 @@ +Subproject commit 5babf3cfa00afc5b589d76ec25d914c276c61d54 diff --git a/common/__init__.py b/common/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/common/config.py b/common/config.py deleted file mode 100644 index d1961a8f..00000000 --- a/common/config.py +++ /dev/null @@ -1,53 +0,0 @@ -from configparser import ConfigParser -import os, sys - -import yaml - -from .utils import get_logger - -PSWD_ENV = 'NEO_PASSWORD' -util_log = get_logger('Utils') -config = ConfigParser() -CONFIG_FILE_ENV_VAR = 'ICDC_DATA_LOADER_CONFIG' -config_file = os.environ.get(CONFIG_FILE_ENV_VAR, 'config/config.ini') -if config_file and os.path.isfile(config_file): - config.read(config_file) -else: - util_log.error('Can\'t find configuration file! Make a copy of config.sample.ini to config.ini' - + ' or specify config file in Environment variable {}'.format(CONFIG_FILE_ENV_VAR)) - sys.exit(1) - -LOG_LEVEL = os.environ.get('DL_LOG_LEVEL', config.get('log', 'log_level')) -ICDC_DOMAIN = config.get('main', 'domain') -QUEUE_LONG_PULL_TIME = int(config.get('sqs', 'long_pull_time')) -VISIBILITY_TIMEOUT = int(config.get('sqs', 'visibility_timeout')) - -TEMP_FOLDER = config.get('main', 'temp_folder') -BACKUP_FOLDER = config.get('main', 'backup_folder') -INDEXD_GUID_PREFIX = config.get('indexd', 'GUID_prefix') -INDEXD_MANIFEST_EXT = config.get('indexd', 'ext') - -if not INDEXD_MANIFEST_EXT.startswith('.'): - INDEXD_MANIFEST_EXT = '.' + INDEXD_MANIFEST_EXT -os.makedirs(BACKUP_FOLDER, exist_ok=True) -if not os.path.isdir(BACKUP_FOLDER): - util_log.error('{} is not a folder!'.format(BACKUP_FOLDER)) - sys.exit(1) - -SLACK_URL = config.get('slack', 'url') - -PROP_FILE_ENV_VAR = 'ICDC_DATA_LOADER_PROP' -property_file = os.environ.get(PROP_FILE_ENV_VAR, 'config/props.yml') -if property_file and os.path.isfile(property_file): - with open(property_file) as prop_file: - PROPS = yaml.safe_load(prop_file)['Properties'] - if not PROPS: - util_log.error('Can\'t read property file!') - sys.exit(1) -else: - util_log.error( - 'Can\'t find property file! Get a copy of prop.yml or specify property file in Environment variable {}'.format( - PROP_FILE_ENV_VAR)) - sys.exit(1) - - diff --git a/common/data_loader.py b/common/data_loader.py deleted file mode 100644 index 92f58fe6..00000000 --- a/common/data_loader.py +++ /dev/null @@ -1,725 +0,0 @@ -#!/usr/bin/env python3 - -import os -from collections import deque -import csv -from datetime import datetime, timedelta -import re - -from neo4j import Driver, Session, Transaction -from timeit import default_timer as timer - -from .icdc_schema import ICDC_Schema, get_uuid_for_node -from .utils import DATE_FORMAT, get_logger, NODES_CREATED, RELATIONSHIP_CREATED, UUID, \ - is_parent_pointer, RELATIONSHIP_TYPE, MULTIPLIER, ONE_TO_ONE, DEFAULT_MULTIPLIER, UPSERT_MODE, \ - NEW_MODE, DELETE_MODE, NODES_DELETED, RELATIONSHIP_DELETED -from .config import PROPS - -NODE_TYPE = 'type' -VISIT_NODE = 'visit' -VISIT_ID = 'visit_id' -VISIT_DATE = 'visit_date' -PROP_TYPE = 'Type' -PARENT_TYPE = 'parent_type' -PARENT_ID_FIELD = 'parent_id_field' -PARENT_ID = 'parent_id' -START_DATE = 'date_of_cycle_start' -END_DATE = 'date_of_cycle_end' -OF_CYCLE = 'of_cycle' -CYCLE_NODE = 'cycle' -excluded_fields = {NODE_TYPE} -CASE_NODE = 'case' -CASE_ID = 'case_id' -PREDATE = 7 -FOREVER = '9999-12-31' -INFERRED = 'inferred' -CREATED = 'created' -UPDATED = 'updated' -RELATIONSHIPS = 'relationships' -VISITS_CREATED = 'visits_created' -PROVIDED_PARENTS = 'provided_parents' - - -class DataLoader: - def __init__(self, driver, schema): - if not schema or not isinstance(schema, ICDC_Schema): - raise Exception('Invalid ICDC_Schema object') - self.log = get_logger('Data Loader') - self.driver = driver - self.schema = schema - - def check_files(self, file_list): - if not file_list: - self.log.error('Invalid file list') - return False - elif file_list: - for data_file in file_list: - if not os.path.isfile(data_file): - self.log.error('File "{}" doesn\'t exist'.format(data_file)) - return False - return True - - def validate_files(self, cheat_mode, file_list, max_violations): - if not cheat_mode: - validation_failed = False - for txt in file_list: - if not self.validate_file(txt, max_violations): - self.log.error('Validating file "{}" failed!'.format(txt)) - validation_failed = True - return not validation_failed - else: - self.log.info('Cheat mode enabled, all validations skipped!') - return True - - def load(self, file_list, cheat_mode, dry_run, loading_mode, wipe_db, max_violations): - if not self.check_files(file_list): - return False - start = timer() - if not self.validate_files(cheat_mode, file_list, max_violations): - return False - - if dry_run: - end = timer() - self.log.info('Dry run mode, no nodes or relationships loaded.') # Time in seconds, e.g. 5.38091952400282 - self.log.info('Running time: {:.2f} seconds'.format(end - start)) # Time in seconds, e.g. 5.38091952400282 - return {NODES_CREATED: 0, RELATIONSHIP_CREATED: 0} - - self.nodes_created = 0 - self.relationships_created = 0 - self.nodes_deleted = 0 - self.relationships_deleted = 0 - self.nodes_stat = {} - self.relationships_stat = {} - self.nodes_deleted_stat = {} - self.relationships_deleted_stat = {} - if not self.driver or not isinstance(self.driver, Driver): - self.log.error('Invalid Neo4j Python Driver!') - return False - with self.driver.session() as session: - tx = session.begin_transaction() - try: - if wipe_db: - self.wipe_db(tx) - - for txt in file_list: - self.load_nodes(tx, txt, loading_mode) - if loading_mode != DELETE_MODE: - for txt in file_list: - self.load_relationships(tx, txt, loading_mode) - tx.commit() - except Exception as e: - tx.rollback() - self.log.exception(e) - return False - end = timer() - - # Print statistics - for node in sorted(self.nodes_stat.keys()): - count = self.nodes_stat[node] - self.log.info('Node: (:{}) loaded: {}'.format(node, count)) - for rel in sorted(self.relationships_stat.keys()): - count = self.relationships_stat[rel] - self.log.info('Relationship: [:{}] loaded: {}'.format(rel, count)) - self.log.info('{} nodes and {} relationships loaded!'.format(self.nodes_created, self.relationships_created)) - self.log.info('{} nodes and {} relationships deleted!'.format(self.nodes_deleted, self.relationships_deleted)) - self.log.info('Loading time: {:.2f} seconds'.format(end - start)) # Time in seconds, e.g. 5.38091952400282 - return {NODES_CREATED: self.nodes_created, RELATIONSHIP_CREATED: self.relationships_created, - NODES_DELETED: self.nodes_deleted, RELATIONSHIP_DELETED: self.relationships_deleted} - - - - # Remove extra spaces at begining and end of the keys and values - @staticmethod - def cleanup_node(node): - obj = {} - for key, value in node.items(): - obj[key.strip()] = value.strip() - return obj - - # Cleanup values for Boolean, Int and Float types - # Add uuid to nodes if one not exists - # Add parent id(s) - # Add extra properties for "value with unit" properties - def prepare_node(self, node): - obj = self.cleanup_node(node) - - node_type = obj.get(NODE_TYPE, None) - # Cleanup values for Boolean, Int and Float types - if node_type: - for key, value in obj.items(): - key_type = self.schema.get_prop_type(node_type, key) - if key_type == 'Boolean': - cleaned_value = None - if isinstance(value, str): - if re.search(r'yes|true', value, re.IGNORECASE): - cleaned_value = True - elif re.search(r'no|false', value, re.IGNORECASE): - cleaned_value = False - else: - self.log.debug('Unsupported Boolean value: "{}"'.format(value)) - cleaned_value = None - obj[key] = cleaned_value - elif key_type == 'Int': - try: - if value is None: - cleaned_value = None - else: - cleaned_value = int(value) - except Exception: - cleaned_value = None - obj[key] = cleaned_value - elif key_type == 'Float': - try: - if value is None: - cleaned_value = None - else: - cleaned_value = float(value) - except Exception: - cleaned_value = None - obj[key] = cleaned_value - - if UUID not in obj: - id_field = self.schema.get_id_field(obj) - id_value = self.schema.get_id(obj) - node_type = obj.get(NODE_TYPE) - if node_type: - if not id_value: - obj[UUID] = get_uuid_for_node(node_type, self.get_signature(obj)) - elif id_field != UUID: - obj[UUID] = get_uuid_for_node(node_type, id_value) - else: - raise Exception('No "type" property in node') - - obj2 = {} - for key, value in obj.items(): - obj2[key] = value - # Add parent id field(s) into node - if is_parent_pointer(key): - header = key.split('.') - if len(header) > 2: - self.log.warning('Column header "{}" has multiple periods!'.format(key)) - field_name = header[1] - parent = header[0] - combined = '{}_{}'.format(parent, field_name) - if field_name in obj: - self.log.warning('"{}" field is in both current node and parent "{}", use {} instead !'.format(key, parent, combined)) - field_name = combined - # Add an value for parent id - obj2[field_name] = value - # Add extra properties if any - for extra_prop_name, extra_value in self.schema.get_extra_props(node_type, key, value).items(): - obj2[extra_prop_name] = extra_value - - return obj2 - - @staticmethod - def get_signature(node): - result = [] - for key, value in node.items(): - result.append('{}: {}'.format(key, value)) - return '{{ {} }}'.format(', '.join(result)) - - # Validate all cases exist in a data (TSV/TXT) file - def validate_cases_exist_in_file(self, file_name, max_violations): - if not self.driver or not isinstance(self.driver, Driver): - self.log.error('Invalid Neo4j Python Driver!') - return False - with self.driver.session() as session: - with open(file_name) as in_file: - self.log.info('Validating relationships in file "{}" ...'.format(file_name)) - reader = csv.DictReader(in_file, delimiter='\t') - line_num = 1 - validation_failed = False - violations = 0 - for org_obj in reader: - obj = self.prepare_node(org_obj) - line_num += 1 - # Validate parent exist - if CASE_ID in obj: - case_id = obj[CASE_ID] - if not self.node_exists(session, CASE_NODE, CASE_ID, case_id): - self.log.error( - 'Invalid data at line {}: Parent (:{} {{ {}: "{}" }}) doesn\'t exist!'.format( - line_num, CASE_NODE, CASE_ID, case_id)) - validation_failed = True - violations += 1 - if violations >= max_violations: - return False - return not validation_failed - - # Validate all parents exist in a data (TSV/TXT) file - def validate_parents_exist_in_file(self, file_name, max_violations): - validation_failed = True - if not self.driver or not isinstance(self.driver, Driver): - self.log.error('Invalid Neo4j Python Driver!') - return False - with self.driver.session() as session: - with open(file_name) as in_file: - self.log.info('Validating relationships in file "{}" ...'.format(file_name)) - reader = csv.DictReader(in_file, delimiter='\t') - line_num = 1 - validation_failed = False - violations = 0 - for org_obj in reader: - line_num += 1 - obj = self.prepare_node(org_obj) - results = self.collect_relationships(obj, session, False, line_num) - relationships = results[RELATIONSHIPS] - provided_parents = results[PROVIDED_PARENTS] - if provided_parents > 0: - if len(relationships) == 0: - self.log.error('Invalid data at line {}: No parents found!'.format(line_num)) - validation_failed = True - violations += 1 - if violations >= max_violations: - return False - else: - self.log.info('Line: {} - No parents found'.format(line_num)) - - return not validation_failed - - # Validate file - def validate_file(self, file_name, max_violations): - with open(file_name) as in_file: - self.log.info('Validating file "{}" ...'.format(file_name)) - reader = csv.DictReader(in_file, delimiter='\t') - line_num = 1 - validation_failed = False - violations = 0 - IDs = {} - for org_obj in reader: - obj = self.cleanup_node(org_obj) - line_num += 1 - id_field = self.schema.get_id_field(obj) - node_id = self.schema.get_id(obj) - if node_id: - if node_id in IDs: - validation_failed = True - self.log.error('Invalid data at line {}: duplicate {}: {}, found in line: {}'.format(line_num, - id_field, node_id, ', '.join(IDs[node_id]))) - IDs[node_id].append(str(line_num)) - else: - IDs[node_id] = [str(line_num)] - - validate_result = self.schema.validate_node(obj[NODE_TYPE], obj) - if not validate_result['result']: - for msg in validate_result['messages']: - self.log.error('Invalid data at line {}: "{}"!'.format(line_num, msg)) - validation_failed = True - violations += 1 - if violations >= max_violations: - return False - return not validation_failed - - @staticmethod - def get_new_statement(node_type, obj): - # statement is used to create current node - prop_stmts = [] - - for key in obj.keys(): - if key in excluded_fields: - continue - elif is_parent_pointer(key): - continue - - prop_stmts.append('{0}: {{{0}}}'.format(key)) - - statement = 'CREATE (:{0} {{ {1} }})'.format(node_type, ' ,'.join(prop_stmts)) - return statement - - @staticmethod - def get_upsert_statement(node_type, id_field, obj): - # statement is used to create current node - statement = '' - prop_stmts = [] - - for key in obj.keys(): - if key in excluded_fields: - continue - elif key == id_field: - continue - elif is_parent_pointer(key): - continue - - prop_stmts.append('n.{0} = {{{0}}}'.format(key)) - - statement += 'MERGE (n:{0} {{ {1}: {{{1}}} }})'.format(node_type, id_field) - statement += ' ON CREATE ' + 'SET n.{} = datetime(), '.format(CREATED) + ' ,'.join(prop_stmts) - statement += ' ON MATCH ' + 'SET n.{} = datetime(), '.format(UPDATED) + ' ,'.join(prop_stmts) - return statement - - # Delete a node and children with no other parents recursively - def delete_node(self, session, node): - delete_queue = deque([node]) - node_deleted = 0 - relationship_deleted = 0 - while len(delete_queue) > 0: - root = delete_queue.popleft() - delete_queue.extend(self.get_children_with_single_parent(session, root)) - n_deleted, r_deleted = self.delete_single_node(session, root) - node_deleted += n_deleted - relationship_deleted += r_deleted - return (node_deleted, relationship_deleted) - - - # Return children of node without other parents - def get_children_with_single_parent(self, session, node): - node_type = node[NODE_TYPE] - statement = 'MATCH (n:{0} {{ {1}: {{{1}}} }})<--(m)'.format(node_type, self.schema.get_id_field(node)) - statement += ' WHERE NOT (n)<--(m)-->() RETURN m' - result = session.run(statement, node) - children = [] - for obj in result: - children.append(self.get_node_from_result(obj, 'm')) - return children - - @staticmethod - def get_node_from_result(record, name): - node = record.data()[name] - result = dict(node.items()) - for label in node.labels: - result[NODE_TYPE] = label - break - return result - - - # Simple delete given node, and it's relationships - def delete_single_node(self, session, node): - node_type = node[NODE_TYPE] - statement = 'MATCH (n:{0} {{ {1}: {{{1}}} }}) detach delete n'.format(node_type, self.schema.get_id_field(node)) - result = session.run(statement, node) - nodes_deleted = result.summary().counters.nodes_deleted - self.nodes_deleted += nodes_deleted - self.nodes_deleted_stat[node_type] = self.nodes_deleted_stat.get(node_type, 0) + nodes_deleted - relationship_deleted = result.summary().counters.relationships_deleted - self.relationships_deleted += relationship_deleted - return (nodes_deleted, relationship_deleted) - - # load file - def load_nodes(self, session, file_name, loading_mode): - if loading_mode == NEW_MODE: - action_word = 'Loading new' - elif loading_mode == UPSERT_MODE: - action_word = 'Loading' - elif loading_mode == DELETE_MODE: - action_word = 'Deleting' - else: - raise Exception('Wrong loading_mode: {}'.format(loading_mode)) - self.log.info('{} nodes from file: {}'.format(action_word, file_name)) - - with open(file_name) as in_file: - reader = csv.DictReader(in_file, delimiter='\t') - nodes_created = 0 - nodes_deleted = 0 - relationship_deleted = 0 - line_num = 1 - for org_obj in reader: - line_num += 1 - obj = self.prepare_node(org_obj) - node_type = obj[NODE_TYPE] - node_id = self.schema.get_id(obj) - if not node_id: - raise Exception('Line:{}: No ids found!'.format(line_num)) - id_field = self.schema.get_id_field(obj) - if loading_mode == UPSERT_MODE: - statement = self.get_upsert_statement(node_type, id_field, obj) - elif loading_mode == NEW_MODE: - if self.node_exists(session, node_type, id_field, node_id): - raise Exception('Line: {}: Node (:{} {{ {}: {} }}) exists! Abort loading!'.format(line_num, node_type, id_field, node_id)) - else: - statement = self.get_new_statement(node_type, obj) - elif loading_mode == DELETE_MODE: - n_deleted, r_deleted = self.delete_node(session, obj) - nodes_deleted += n_deleted - relationship_deleted += r_deleted - else: - raise Exception('Wrong loading_mode: {}'.format(loading_mode)) - - if loading_mode != DELETE_MODE: - result = session.run(statement, obj) - count = result.summary().counters.nodes_created - self.nodes_created += count - nodes_created += count - self.nodes_stat[node_type] = self.nodes_stat.get(node_type, 0) + count - if loading_mode == DELETE_MODE: - self.log.info('{} node(s) deleted'.format(nodes_deleted)) - self.log.info('{} relationship(s) deleted'.format(relationship_deleted)) - else: - self.log.info('{} (:{}) node(s) loaded'.format(nodes_created, node_type)) - - - def node_exists(self, session, label, prop, value): - statement = 'MATCH (m:{0} {{ {1}: {{{1}}} }}) return m'.format(label, prop) - result = session.run(statement, {prop: value}) - count = result.detach() - if count > 1: - self.log.warning('More than one nodes found! ') - return count >= 1 - - def collect_relationships(self, obj, session, create_visit, line_num): - node_type = obj[NODE_TYPE] - relationships = [] - visits_created = 0 - provided_parents = 0 - for key, value in obj.items(): - if is_parent_pointer(key): - provided_parents += 1 - other_node, other_id = key.split('.') - relationship = self.schema.get_relationship(node_type, other_node) - relationship_name = relationship[RELATIONSHIP_TYPE] - multiplier = relationship[MULTIPLIER] - if not relationship_name: - self.log.error('Line: {}: Relationship not found!'.format(line_num)) - raise Exception('Undefined relationship, abort loading!') - if not self.node_exists(session, other_node, other_id, value): - if other_node == 'visit' and create_visit: - if self.create_visit(session, line_num, other_node, value, obj): - visits_created += 1 - relationships.append({PARENT_TYPE: other_node, PARENT_ID_FIELD: other_id, PARENT_ID: value, - RELATIONSHIP_TYPE: relationship_name, MULTIPLIER: multiplier}) - else: - self.log.error( - 'Line: {}: Couldn\'t create {} node automatically!'.format(line_num, VISIT_NODE)) - else: - self.log.warning( - 'Line: {}: Parent node (:{} {{{}: "{}"}} not found in DB!'.format(line_num, other_node, other_id, - value)) - else: - if multiplier == ONE_TO_ONE and self.parent_already_has_child(session, node_type, obj, relationship_name, other_node, other_id, value): - self.log.error('Line: {}: one_to_one relationship failed, parent already has a child!'.format(line_num)) - else: - relationships.append({PARENT_TYPE: other_node, PARENT_ID_FIELD: other_id, PARENT_ID: value, - RELATIONSHIP_TYPE: relationship_name, MULTIPLIER: multiplier}) - return {RELATIONSHIPS: relationships, VISITS_CREATED: visits_created, PROVIDED_PARENTS: provided_parents} - - def parent_already_has_child(self, session, node_type, node, relationship_name, parent_type, parent_id_field, parent_id): - statement = 'MATCH (n:{})-[r:{}]->(m:{} {{ {}: {{parent_id}} }}) return n'.format(node_type, relationship_name, parent_type, parent_id_field) - result = session.run(statement, {"parent_id": parent_id}) - if result: - child = result.single() - if child: - find_current_node_statement = 'MATCH (n:{0} {{ {1}: {{{1}}} }}) return n'.format(node_type, self.schema.get_id_field(node)) - current_node_result = session.run(find_current_node_statement, node) - if current_node_result: - current_node = current_node_result.single() - return child[0].id != current_node[0].id - else: - self.log.error('Could NOT find current node!') - - return False - - # Check if a relationship of same type exists, if so, return a statement which can delete it, otherwise return False - def has_existing_relationship(self, session, node_type, node, relationship, count_same_parent=False): - relationship_name = relationship[RELATIONSHIP_TYPE] - parent_type = relationship[PARENT_TYPE] - parent_id_field = relationship[PARENT_ID_FIELD] - - base_statement = 'MATCH (n:{0} {{ {1}: {{{1}}} }})-[r:{2}]->(m:{3})'.format(node_type, - self.schema.get_id_field(node), - relationship_name, parent_type) - statement = base_statement + ' return m.{} AS {}'.format(parent_id_field, PARENT_ID) - result = session.run(statement, node) - if result: - old_parent = result.single() - if old_parent: - if count_same_parent: - del_statement = base_statement + ' delete r' - return del_statement - else: - old_parent_id = old_parent[PARENT_ID] - if old_parent_id != relationship[PARENT_ID]: - self.log.warning('Old parent is different from new parent, delete relationship to old parent:' - + ' (:{} {{ {}: "{}" }})!'.format(parent_type, parent_id_field, old_parent_id)) - del_statement = base_statement + ' delete r' - return del_statement - else: - self.log.error('Remove old relationship failed: Query old relationship failed!') - - return False - - def remove_old_relationship(self, session, node_type, node, relationship): - del_statement = self.has_existing_relationship(session, node_type, node, relationship) - if del_statement: - del_result = session.run(del_statement, node) - if not del_result: - self.log.error('Delete old relationship failed!') - - def load_relationships(self, session, file_name, loading_mode): - if loading_mode == NEW_MODE: - action_word = 'Loading new' - elif loading_mode == UPSERT_MODE: - action_word = 'Loading' - else: - raise Exception('Wrong loading_mode: {}'.format(loading_mode)) - self.log.info('{} relationships from file: {}'.format(action_word, file_name)) - - with open(file_name) as in_file: - reader = csv.DictReader(in_file, delimiter='\t') - relationships_created = {} - visits_created = 0 - line_num = 1 - for org_obj in reader: - line_num += 1 - obj = self.prepare_node(org_obj) - node_type = obj[NODE_TYPE] - results = self.collect_relationships(obj, session, True, line_num) - relationships = results[RELATIONSHIPS] - visits_created += results[VISITS_CREATED] - provided_parents = results[PROVIDED_PARENTS] - if provided_parents > 0: - if len(relationships) == 0: - raise Exception('Line: {}: No parents found, abort loading!'.format(line_num)) - - for relationship in relationships: - relationship_name = relationship[RELATIONSHIP_TYPE] - multiplier = relationship[MULTIPLIER] - parent_node = relationship[PARENT_TYPE] - parent_id_field = relationship[PARENT_ID_FIELD] - if multiplier in [DEFAULT_MULTIPLIER, ONE_TO_ONE]: - if loading_mode == UPSERT_MODE: - self.remove_old_relationship(session, node_type, obj, relationship) - elif loading_mode == NEW_MODE: - if self.has_existing_relationship(session, node_type, obj, relationship, True): - raise Exception('Line: {}: Relationship already exists, abort loading!'.format(line_num)) - else: - raise Exception('Wrong loading_mode: {}'.format(loading_mode)) - else: - self.log.info('Multiplier: {}, no action needed!'.format(multiplier)) - statement = 'MATCH (m:{0} {{ {1}: {{{1}}} }}) '.format(parent_node, parent_id_field) - statement += 'MATCH (n:{0} {{ {1}: {{{1}}} }}) '.format(node_type, self.schema.get_id_field(obj)) - statement += 'MERGE (n)-[r:{}]->(m)'.format(relationship_name) - statement += ' ON CREATE SET r.{} = datetime()'.format(CREATED) - statement += ' ON MATCH SET r.{} = datetime()'.format(UPDATED) - - result = session.run(statement, obj) - count = result.summary().counters.relationships_created - self.relationships_created += count - relationship_pattern = '(:{})->[:{}]->(:{})'.format(node_type, relationship_name, parent_node) - relationships_created[relationship_pattern] = relationships_created.get(relationship_pattern, 0) + count - self.relationships_stat[relationship_name] = self.relationships_stat.get(relationship_name, 0) + count - - for rel, count in relationships_created.items(): - self.log.info('{} {} relationship(s) loaded'.format(count, rel)) - if visits_created > 0: - self.log.info('{} (:{}) node(s) loaded'.format(visits_created, VISIT_NODE)) - - return True - - def create_visit(self, session, line_num, node_type, node_id, src): - if node_type != VISIT_NODE: - self.log.error("Line: {}: Can't create (:{}) node for type: '{}'".format(line_num, VISIT_NODE, node_type)) - return False - if not node_id: - self.log.error("Line: {}: Can't create (:{}) node for id: '{}'".format(line_num, VISIT_NODE, node_id)) - return False - if not src: - self.log.error("Line: {}: Can't create (:{}) node for empty object".format(line_num, VISIT_NODE)) - return False - if not session or (not isinstance(session, Session) and not isinstance(session, Transaction)): - self.log.error("Neo4j session is not valid!") - return False - date_map = PROPS['visit_date_in_nodes'] - if NODE_TYPE not in src: - self.log.error('Line: {}: Given object doesn\'t have a "{}" field!'.format(line_num, NODE_TYPE)) - return False - source_type = src[NODE_TYPE] - date = src[date_map[source_type]] - if not date: - self.log.error('Line: {}: Visit date is empty!'.format(line_num)) - return False - if NODE_TYPE not in src: - self.log.error('Line: {}: Given object doesn\'t have a "{}" field!'.format(line_num, NODE_TYPE)) - return False - statement = 'MERGE (v:{} {{ {}: {{node_id}}, {}: {{date}}, {}: true, {}: {{{}}} }})'.format( - VISIT_NODE, VISIT_ID, VISIT_DATE, INFERRED, UUID, UUID) - statement += ' ON CREATE SET v.{} = datetime()'.format(CREATED) - statement += ' ON MATCH SET v.{} = datetime()'.format(UPDATED) - - result = session.run(statement, {"node_id": node_id, "date": date, UUID: get_uuid_for_node(VISIT_NODE, node_id)}) - if result: - count = result.summary().counters.nodes_created - self.nodes_created += count - self.nodes_stat[VISIT_NODE] = self.nodes_stat.get(VISIT_NODE, 0) + count - if count > 0: - case_id = src[CASE_ID] - if not self.connect_visit_to_cycle(session, line_num, node_id, case_id, date): - self.log.error('Line: {}: Visit: "{}" does NOT belong to a cycle!'.format(line_num, node_id)) - return True - else: - return False - - def connect_visit_to_cycle(self, session, line_num, visit_id, case_id, visit_date): - find_cycles_stmt = 'MATCH (c:cycle) WHERE c.case_id = {case_id} RETURN c ORDER BY c.date_of_cycle_start' - result = session.run(find_cycles_stmt, {'case_id': case_id}) - if result: - first_date = None - pre_date = None - relationship_name = self.schema.get_relationship(VISIT_NODE, CYCLE_NODE)[RELATIONSHIP_TYPE] - if not relationship_name: - return False - for record in result.records(): - cycle = record.data()['c'] - date = datetime.strptime(visit_date, DATE_FORMAT) - start_date = datetime.strptime(cycle[START_DATE], DATE_FORMAT) - if not first_date: - first_date = start_date - pre_date = first_date - timedelta(days=PREDATE) - if cycle[END_DATE]: - end_date = datetime.strptime(cycle[END_DATE], DATE_FORMAT) - else: - self.log.warning('Line: {}: No end dates for cycle started on {} for {}'.format(line_num, start_date.strftime(DATE_FORMAT), case_id)) - end_date = datetime.strptime(FOREVER, DATE_FORMAT) - if (date >= start_date and date <= end_date) or (date < first_date and date >= pre_date): - if date < first_date and date >= pre_date: - self.log.info('Line: {}: Date: {} is before first cycle, but within {}'.format(line_num, visit_date, PREDATE) - + ' days before first cycle started: {}, connected to first cycle'.format(first_date.strftime(DATE_FORMAT))) - cycle_id = cycle.id - connect_stmt = 'MATCH (v:{} {{ {}: {{visit_id}} }}) '.format(VISIT_NODE, VISIT_ID) - connect_stmt += 'MATCH (c:{}) WHERE id(c) = {{cycle_id}} '.format(CYCLE_NODE) - connect_stmt += 'MERGE (v)-[r:{} {{ {}: true }}]->(c)'.format(relationship_name, INFERRED) - connect_stmt += ' ON CREATE SET r.{} = datetime()'.format(CREATED) - connect_stmt += ' ON MATCH SET r.{} = datetime()'.format(UPDATED) - - cnt_result = session.run(connect_stmt, {'visit_id': visit_id, 'cycle_id': cycle_id}) - relationship_created = cnt_result.summary().counters.relationships_created - if relationship_created > 0: - self.relationships_created += relationship_created - self.relationships_stat[relationship_name] = self.relationships_stat.get(relationship_name, 0) + relationship_created - return True - else: - self.log.error('Line: {}: Create (:visit)-[:of_cycle]->(:cycle) relationship failed!'.format(line_num)) - return False - self.log.warning('Line: {}: Date: {} does not belong to any cycles, connected to case {} directly!'.format( - line_num, visit_date, case_id)) - return self.connect_visit_to_case(session, line_num, visit_id, case_id) - else: - self.log.error('Line: {}: No cycles found for case: {}'.format(line_num, case_id)) - return False - - def connect_visit_to_case(self, session, line_num, visit_id, case_id): - relationship_name = self.schema.get_relationship(VISIT_NODE, CASE_NODE)[RELATIONSHIP_TYPE] - if not relationship_name: - return False - cnt_statement = 'MATCH (c:case {{ case_id: {{case_id}} }}) MATCH (v:visit {{ {}: {{visit_id}} }}) '.format(VISIT_ID) - cnt_statement += 'MERGE (c)<-[r:{} {{ {}: true }}]-(v)'.format(relationship_name, INFERRED) - cnt_statement += ' ON CREATE SET r.{} = datetime()'.format(CREATED) - cnt_statement += ' ON MATCH SET r.{} = datetime()'.format(UPDATED) - - result = session.run(cnt_statement, {'case_id': case_id, 'visit_id': visit_id}) - relationship_created = result.summary().counters.relationships_created - if relationship_created > 0: - self.relationships_created += relationship_created - self.relationships_stat[relationship_name] = self.relationships_stat.get(relationship_name, 0) + relationship_created - return True - else: - self.log.error('Line: {}: Create (:{})-[:{}]->(:{}) relationship failed!'.format(line_num, VISIT_NODE, relationship_name, CASE_NODE)) - return False - - def wipe_db(self, session): - cleanup_db = 'MATCH (n) DETACH DELETE n' - - result = session.run(cleanup_db) - self.log.info('{} nodes deleted!'.format(result.summary().counters.nodes_deleted)) - self.log.info('{} relationships deleted!'.format(result.summary().counters.relationships_deleted)) - - diff --git a/common/icdc_schema.py b/common/icdc_schema.py deleted file mode 100644 index 709226ef..00000000 --- a/common/icdc_schema.py +++ /dev/null @@ -1,454 +0,0 @@ -from datetime import datetime -import os -import re -import sys -import uuid - -import yaml - -from .utils import get_logger, MULTIPLIER, DEFAULT_MULTIPLIER, RELATIONSHIP_TYPE, is_parent_pointer, DATE_FORMAT -from .config import ICDC_DOMAIN, PROPS - -NODES = 'Nodes' -RELATIONSHIPS = 'Relationships' -PROPERTIES = 'Props' -PROP_DEFINITIONS = 'PropDefinitions' -DEFAULT_TYPE = 'String' -PROP_TYPE = 'Type' -END_POINTS = 'Ends' -SRC = 'Src' -DEST = 'Dst' -VALUE_TYPE = 'value_type' -LABEL_NEXT = 'next' -NEXT_RELATIONSHIP = 'next' -UNITS = 'units' -REQUIRED = 'Req' -NODE_TYPE = 'type' -ENUM = 'enum' -DEFAULT_VALUE = 'default_value' -HAS_UNIT = 'has_unit' - - -def get_uuid_for_node(node_type, signature): - """Generate V5 UUID for a node - Arguments: - node_type - a string represents type of a node, e.g. case, study, file etc. - signature - a string that can uniquely identify a node within it's type, e.g. case_id, clinical_study_designation etc. - or a long string with all properties and values concat together if no id available - - """ - log = get_logger('Utils') - icdc_base_uuid = uuid.uuid5(uuid.NAMESPACE_URL, ICDC_DOMAIN) - # log.debug('Base UUID: {}'.format(icdc_base_uuid)) - type_uuid = uuid.uuid5(icdc_base_uuid, node_type) - # log.debug('Type UUID: {}'.format(type_uuid)) - node_uuid = uuid.uuid5(type_uuid, signature) - log.debug('Node UUID: {}'.format(node_uuid)) - return str(node_uuid) - - -class ICDC_Schema: - def __init__(self, files): - if not files: - raise Exception('File list is empty, couldn\'t initialize ICDC_Schema object!') - else: - for data_file in files: - if not os.path.isfile(data_file): - raise Exception('File "{}" doesn\'t exist'.format(data_file)) - self.log = get_logger('ICDC Schema') - self.org_schema = {} - for aFile in files: - try: - self.log.info('Reading schema file: {} ...'.format(aFile)) - if os.path.isfile(aFile): - with open(aFile) as schema_file: - schema = yaml.safe_load(schema_file) - if schema: - self.org_schema.update(schema) - except Exception as e: - self.log.exception(e) - - self.nodes = {} - self.relationships = {} - self.num_relationship = 0 - - self.log.debug("-------------processing nodes-----------------") - if NODES not in self.org_schema: - self.log.error('Can\'t load any nodes!') - sys.exit(1) - - elif PROP_DEFINITIONS not in self.org_schema: - self.log.error('Can\'t load any properties!') - sys.exit(1) - - for key, value in self.org_schema[NODES].items(): - # Assume all keys start with '_' are not regular nodes - if not key.startswith('_'): - self.process_node(key, value) - self.log.debug("-------------processing edges-----------------") - if RELATIONSHIPS in self.org_schema: - for key, value in self.org_schema[RELATIONSHIPS].items(): - # Assume all keys start with '_' are not regular nodes - if not key.startswith('_'): - self.num_relationship += self.process_edges(key, value) - - - def process_node(self, name, desc): - # Gather properties - props = {} - required = set() - if desc[PROPERTIES]: - for prop in desc[PROPERTIES]: - prop_type = self.get_type(prop) - props[prop] = prop_type - value_unit_props = self.process_value_unit_type(prop, prop_type) - if value_unit_props: - props.update(value_unit_props) - if self.is_required_prop(prop): - required.add(prop) - - self.nodes[name] = { PROPERTIES: props, REQUIRED: required } - - def process_edges(self, name, desc): - count = 0 - if MULTIPLIER in desc: - multiplier = desc[MULTIPLIER] - else: - multiplier = DEFAULT_MULTIPLIER - - if END_POINTS in desc: - for end_points in desc[END_POINTS]: - src = end_points[SRC] - dest = end_points[DEST] - if MULTIPLIER in end_points: - actual_multiplier = end_points[MULTIPLIER] - self.log.debug('End point multiplier: "{}" overriding relationship multiplier: "{}"'.format(actual_multiplier, multiplier)) - else: - actual_multiplier = multiplier - if src not in self.relationships: - self.relationships[src] = {} - self.relationships[src][dest] = { RELATIONSHIP_TYPE: name, MULTIPLIER: actual_multiplier } - - count += 1 - if src in self.nodes: - self.add_relationship_to_node(src, actual_multiplier, name, dest) - # nodes[src][self.plural(dest)] = '[{}] @relation(name:"{}")'.format(dest, name) - else: - self.log.error('Source node "{}" not found!'.format(src)) - if dest in self.nodes: - self.add_relationship_to_node(dest, actual_multiplier, name, src, True) - # nodes[dest][self.plural(src)] = '[{}] @relation(name:"{}", direction:IN)'.format(src, name) - else: - self.log.error('Destination node "{}" not found!'.format(dest)) - return count - - # Process singular/plural array/single value based on relationship multipliers like many-to-many, many-to-one etc. - # Return a relationship property to add into a node - def add_relationship_to_node(self, name, multiplier, relationship, otherNode, dest=False): - node = self.nodes[name] - if multiplier == 'many_to_one': - if dest: - node[PROPERTIES][self.plural(otherNode)] = { PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } - else: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } - elif multiplier == 'one_to_one': - if relationship == NEXT_RELATIONSHIP: - if dest: - node[PROPERTIES]['prior_' + otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } - else: - node[PROPERTIES]['next_' + otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } - else: - if dest: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } - else: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } - elif multiplier == 'many_to_many': - if dest: - node[PROPERTIES][self.plural(otherNode)] = {PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } - else: - node[PROPERTIES][self.plural(otherNode)] = {PROP_TYPE: '[{}] @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } - else: - self.log.warning('Unsupported relationship multiplier: "{}"'.format(multiplier)) - - def is_required_prop(self, name): - result = False - if name in self.org_schema[PROP_DEFINITIONS]: - prop = self.org_schema[PROP_DEFINITIONS][name] - result = prop.get(REQUIRED, False) - return result - - def get_prop_type(self, node_type, prop): - if node_type in self.nodes: - node = self.nodes[node_type] - if prop in node[PROPERTIES]: - return node[PROPERTIES][prop][PROP_TYPE] - return DEFAULT_TYPE - - def get_type(self, name): - result = { PROP_TYPE: DEFAULT_TYPE } - if name in self.org_schema[PROP_DEFINITIONS]: - prop = self.org_schema[PROP_DEFINITIONS][name] - if PROP_TYPE in prop: - prop_desc = prop[PROP_TYPE] - if isinstance(prop_desc, str): - result[PROP_TYPE] = self.map_type(prop_desc) - elif isinstance(prop_desc, dict): - if VALUE_TYPE in prop_desc: - result[PROP_TYPE] = self.map_type(prop_desc[VALUE_TYPE]) - if UNITS in prop_desc: - result[HAS_UNIT] = True - elif isinstance(prop_desc, list): - enum = set() - for t in prop_desc: - if not re.search(r'://', t): - enum.add(t) - if len(enum) > 0: - result[ENUM] = enum - else: - self.log.debug('Property type: "{}" not supported, use default type: "{}"'.format(prop_desc, DEFAULT_TYPE)) - - return result - - def get_prop(self, node_name, name): - if node_name in self.nodes: - node = self.nodes[node_name] - if name in node[PROPERTIES]: - return node[PROPERTIES][name] - return None - - def get_default_value(self, node_name, name): - prop = self.get_prop(node_name, name) - if prop: - return prop.get(DEFAULT_VALUE, None) - - def get_default_unit(self, node_name, name): - unit_prop_name = self.get_unit_property_name(name) - return self.get_default_value(node_name, unit_prop_name) - - - def get_valid_values(self, node_name, name): - prop = self.get_prop(node_name, name) - if prop: - return prop.get(ENUM, None) - - def get_valid_units(self, node_name, name): - unit_prop_name = self.get_unit_property_name(name) - return self.get_valid_values(node_name, unit_prop_name) - - def get_extra_props(self, node_name, name, value): - results = {} - prop = self.get_prop(node_name, name) - if prop and HAS_UNIT in prop and prop[HAS_UNIT]: - # For MVP use default unit for all values - results[self.get_unit_property_name(name)] = self.get_default_unit(node_name, name) - org_prop_name = self.get_original_value_property_name(name) - # For MVP use value is same as original value - results[org_prop_name] = value - results[self.get_unit_property_name(org_prop_name)] = self.get_default_unit(node_name, name) - return results - - def process_value_unit_type(self, name, prop_type): - results = {} - if name in self.org_schema[PROP_DEFINITIONS]: - prop = self.org_schema[PROP_DEFINITIONS][name] - if PROP_TYPE in prop: - prop_desc = prop[PROP_TYPE] - if isinstance(prop_desc, dict): - if UNITS in prop_desc: - units = prop_desc[UNITS] - if units: - enum = set(units) - unit_prop_name = self.get_unit_property_name(name) - results[unit_prop_name] = {PROP_TYPE: DEFAULT_TYPE, ENUM: enum, DEFAULT_VALUE: units[0]} - org_prop_name = self.get_original_value_property_name(name) - org_unit_prop_name = self.get_unit_property_name(org_prop_name) - results[org_prop_name] = prop_type - results[org_unit_prop_name] = {PROP_TYPE: DEFAULT_TYPE, ENUM: enum, DEFAULT_VALUE: units[0]} - return results - - @staticmethod - def get_unit_property_name(name): - return name + '_unit' - - @staticmethod - def get_original_value_property_name(name): - return name + '_original' - - def validate_node(self, model_type, obj): - if not model_type or model_type not in self.nodes: - return {'result': False, 'messages': ['Node type: "{}" doesn\'t exist!'.format(model_type)]} - if not obj: - return {'result': False, 'messages': ['Node is empty!']} - - if not isinstance(obj, dict): - return {'result': False, 'messages': ['Node is not a dict!']} - - # Make sure all required properties exist, and are not empty - result = {'result': True, 'messages': []} - for prop in self.nodes[model_type].get(REQUIRED, set()): - if prop not in obj: - result['result'] = False - result['messages'].append('Missing required property: "{}"!'.format(prop)) - elif not obj[prop]: - result['result'] = False - result['messages'].append('Required property: "{}" is empty!'.format(prop)) - - properties = self.nodes[model_type][PROPERTIES] - # Validate all properties in given object - for key, value in obj.items(): - if key == NODE_TYPE: - continue - elif is_parent_pointer(key): - continue - elif key not in properties: - self.log.debug('Property "{}" is not in data model!'.format(key)) - else: - model_type = properties[key] - if not self.valid_type(model_type, value): - result['result'] = False - result['messages'].append('Property: "{}":"{}" is not a valid "{}" type!'.format(key, value, model_type)) - - return result - - @staticmethod - def valid_type(model_type, value): - if model_type[PROP_TYPE] == 'Float': - try: - if value: - float(value) - except ValueError: - return False - elif model_type[PROP_TYPE] == 'Int': - try: - if value: - int(value) - except ValueError: - return False - elif model_type[PROP_TYPE] == 'Boolean': - if (value and not re.match(r'\byes\b|\btrue\b', value, re.IGNORECASE) - and not re.match(r'\bno\b|\bfalse\b', value, re.IGNORECASE) - and not re.match(r'\bltf\b', value, re.IGNORECASE)): - return False - elif model_type[PROP_TYPE] == 'Array': - if not isinstance(value, list): - return False - elif model_type[PROP_TYPE] == 'Object': - if not isinstance(value, dict): - return False - elif model_type[PROP_TYPE] == 'String': - if ENUM in model_type: - if not isinstance(value, str): - return False - if value not in model_type[ENUM]: - return False - elif model_type[PROP_TYPE] == 'Date': - if not isinstance(value, str): - return False - try: - if value.strip() != '': - datetime.strptime(value, DATE_FORMAT) - except ValueError: - return False - elif model_type[PROP_TYPE] == 'DateTime': - if not isinstance(value, str): - return False - try: - if value.strip() != '': - datetime.strptime(value, DATE_FORMAT) - except ValueError: - return False - return True - - # Find relationship type from src to dest - def get_relationship(self, src, dest): - if src in self.relationships: - relationships = self.relationships[src] - if relationships and dest in relationships: - return relationships[dest] - else: - self.log.error('No relationships found for "{}"-->"{}"'.format(src, dest)) - return None - else: - self.log.debug('No relationships start from "{}"'.format(src)) - return None - - # Find destination node name from (:src)-[:name]->(:dest) - def get_dest_node_for_relationship(self, src, name): - if src in self.relationships: - relationships = self.relationships[src] - if relationships: - for dest, rel in relationships.items(): - if rel[RELATIONSHIP_TYPE] == name: - return dest - else: - self.log.error('Couldn\'t find any relationship from (:{})'.format(src)) - return None - - - # Get type info from description - def map_type(self, type_name): - mapping = PROPS['type_mapping'] - result = DEFAULT_TYPE - - if type_name in mapping: - result = mapping[type_name] - else: - self.log.debug('Type: "{}" has no mapping, use default type: "{}"'.format(type_name, DEFAULT_TYPE)) - - return result - - def plural(self, word): - plurals = PROPS['plurals'] - if word in plurals: - return plurals[word] - else: - self.log.warning('Plural for "{}" not found!'.format(word)) - return 'NONE' - - # Get all node names, sorted - def get_node_names(self): - return sorted(self.nodes.keys()) - - def node_count(self): - return len(self.nodes) - - def relationship_count(self): - return self.num_relationship - - # Get all properties of a node (name) - def get_props_for_node(self, node_name): - if node_name in self.nodes: - return self.nodes[node_name][PROPERTIES] - else: - return None - - # Get node's id field, such as case_id for case node, or clinical_study_designation for study node - def get_id_field(self, obj): - if NODE_TYPE not in obj: - self.log.error('get_id_field: there is no "{}" field in node, can\'t retrieve id!'.format(NODE_TYPE)) - return None - node_type = obj[NODE_TYPE] - id_fields = PROPS['id_fields'] - if node_type: - return id_fields.get(node_type, 'uuid') - else: - self.log.error('get_id_field: "{}" field is empty'.format(NODE_TYPE)) - return None - - # Find node's id - def get_id(self, obj): - id_field = self.get_id_field(obj) - if not id_field: - return None - if id_field not in obj: - return None - else: - return obj[id_field] - - -if __name__ == '__main__': - files = ['/Users/yingm3/work/icdc/code/model-tool/model-desc/icdc-model.yml', '/Users/yingm3/work/icdc/code/model-tool/model-desc/icdc-model-props.yml'] - - schema = ICDC_Schema(files) - for key in schema.org_schema: - print(key) \ No newline at end of file diff --git a/common/s3.py b/common/s3.py deleted file mode 100755 index 320daee8..00000000 --- a/common/s3.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -import boto3 -import botocore -from .utils import get_logger -import os - -class S3Bucket: - def __init__(self, bucket): - self.bucket_name = bucket - self.client = boto3.client('s3') - self.s3 = boto3.resource('s3') - self.bucket = self.s3.Bucket(bucket) - self.log = get_logger('S3 Bucket') - - def upload_file_obj(self, key, data): - return self.bucket.put_object(Key=key, Body=data) - - def download_file(self, key, filename): - return self.bucket.download_file(key, filename) - - def download_file_obj(self, key, obj): - self.bucket.download_fileobj(key, obj) - - def delete_file(self, key): - response = self.bucket.delete_objects( - Delete={ - 'Objects': [ - { - 'Key': key - } - ] - } - ) - if 'Errors' in response: - self.log.error('S3: delete file {} failed!'.format(key)) - return False - else: - return True - - def upload_file(self, key, fileName): - with open(fileName, 'rb') as data: - obj = self.upload_file_obj(key, data) - if obj: - return {'bucket': self.bucket.name, 'key': key} - else: - message = "Upload file {} to S3 failed!".format(fileName) - self.log.error(message) - return None - - def download_files_in_folder(self, folder, local_path): - try: - self.client.head_bucket(Bucket=self.bucket_name) - result = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=folder) - for file in result.get('Contents', []): - if file['Size'] > 0: - key = file['Key'] - base_name = os.path.basename(key) - file_name = os.path.join(local_path, base_name) - self.log.info('Downloading "{}" from AWS S3'.format(base_name)) - self.download_file(key, file_name) - return True - except botocore.exceptions.ClientError as e: - # If a client error is thrown, then check that it was a 404 error. - # If it was a 404 error, then the bucket does not exist. - error_code = int(e.response['Error']['Code']) - if error_code == 403: - self.log.error('Don\'t have permission to access for Bucket: "{}"'.format(self.bucket_name)) - elif error_code == 404: - self.log.error('Bucket: "{}" does NOT exist!'.format(self.bucket_name)) - return False diff --git a/common/sqs.py b/common/sqs.py deleted file mode 100755 index 154cc9a8..00000000 --- a/common/sqs.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -import boto3 -import json -from threading import Timer - -from .utils import get_logger -from .config import QUEUE_LONG_PULL_TIME - -class Queue: - def __init__(self, queue_name): - self.log = get_logger('SQS') - self.sqs = boto3.resource('sqs') - self.queue = self.sqs.get_queue_by_name(QueueName=queue_name) - - def sendMsgToQueue(self, msg, msg_id): - response = self.queue.send_message(MessageBody=json.dumps(msg), - MessageGroupId=msg_id, - MessageDeduplicationId=msg_id) - self.log.debug(response.get('MessageId')) - - def receiveMsgs(self, visibilityTimeOut): - return self.queue.receive_messages(VisibilityTimeout = visibilityTimeOut, - WaitTimeSeconds = QUEUE_LONG_PULL_TIME, - MaxNumberOfMessages = 1) - - def getApproximateNumberOfMessages(self): - return self.queue.attributes.get('ApproximateNumberOfMessages', -1) - -# Automatically extend visibility timeout every timeOutValue // 2 seconds -class VisibilityExtender: - def __init__(self, msg, timeOutValue): - self._timeOutValue = timeOutValue if timeOutValue > 2 else 2 - self._currentTimeOut = self._timeOutValue - self._interval = int(timeOutValue // 2) if timeOutValue > 2 else 1 - self._msg = msg - self._timer = None - self.is_running = False - self.log = get_logger('Visibility Extender') - self.start() - - def _run(self): - try: - if self._msg: - self.is_running = False - self.start() - self._currentTimeOut += self._interval - self.log.info('Processing job ..., visibility timeout = {}s'.format(self._currentTimeOut)) - self._msg.change_visibility(VisibilityTimeout = self._currentTimeOut) - except Exception as e: - self.log.exception(e) - - - def start(self): - if not self.is_running: - self._timer = Timer(self._interval, self._run) - self._timer.start() - self.is_running = True - - def stop(self): - self._timer.cancel() - self.is_running = False - - def __del__(self): - self.stop() diff --git a/common/utils.py b/common/utils.py deleted file mode 100644 index f00b3b5e..00000000 --- a/common/utils.py +++ /dev/null @@ -1,73 +0,0 @@ -import logging -import os -import re -from urllib.parse import urlparse - -from requests import post - - -def get_logger(name): - formatter = logging.Formatter('%(asctime)s %(levelname)s: (%(name)s) - %(message)s') - # formatter = logging.Formatter('[%(levelname)s] %(module)s - %(message)s') - log_level = os.environ.get('DL_LOG_LEVEL', 'INFO') - log = logging.getLogger(name) - log.setLevel(log_level) - std_handler = logging.StreamHandler() - std_handler.setFormatter(formatter) - log.addHandler(std_handler) - return log - - -def removeTrailingSlash(uri): - if uri.endswith('/'): - return re.sub('/+$', '', uri) - else: - return uri - -def is_parent_pointer(field_name): - return re.fullmatch(r'\w+\.\w+', field_name) is not None - - -def get_host(uri): - parts = urlparse(uri) - return parts.hostname - -def check_schema_files(schemas, log): - if not schemas: - log.error('Please specify schema file(s) with -s or --schema argument') - return False - - for schema_file in schemas: - if not os.path.isfile(schema_file): - log.error('{} is not a file'.format(schema_file)) - return False - return True - - -def send_slack_message(url, messaage, log): - if url: - headers = {"Content-type": "application/json"} - result = post(url, json=messaage, headers=headers) - if not result or result.status_code != 200: - log.error('Sending Slack messages failed!') - if result: - log.error(result.content) - else: - log.error('Slack URL not set in configuration file!') - - -NODES_CREATED = 'nodes_created' -RELATIONSHIP_CREATED = 'relationship_created' -NODES_DELETED = 'nodes_deleted' -RELATIONSHIP_DELETED = 'relationship_deleted' -BLOCK_SIZE = 65536 -DATE_FORMAT = '%Y-%m-%d' -DATETIME_FORMAT = '%Y%m%d-%H%M%S' -RELATIONSHIP_TYPE = 'relationship_type' -MULTIPLIER = 'Mul' -DEFAULT_MULTIPLIER = 'many_to_one' -ONE_TO_ONE = 'one_to_one' -UUID = 'uuid' -NEW_MODE = 'new' -UPSERT_MODE = 'upsert' -DELETE_MODE = 'delete' diff --git a/file_loader.py b/file_loader.py index 19f8304d..3d525962 100755 --- a/file_loader.py +++ b/file_loader.py @@ -20,13 +20,13 @@ import boto3 from botocore.exceptions import ClientError -from common.utils import UUID, NODES_CREATED, RELATIONSHIP_CREATED, removeTrailingSlash,\ +from bento.common.utils import UUID, NODES_CREATED, RELATIONSHIP_CREATED, removeTrailingSlash,\ get_logger, UPSERT_MODE, send_slack_message -from common.config import INDEXD_GUID_PREFIX, INDEXD_MANIFEST_EXT, VISIBILITY_TIMEOUT, \ +from bento.common.config import INDEXD_GUID_PREFIX, INDEXD_MANIFEST_EXT, VISIBILITY_TIMEOUT, \ TEMP_FOLDER, PSWD_ENV, SLACK_URL -from common.sqs import Queue, VisibilityExtender -from common.data_loader import DataLoader -from common.icdc_schema import ICDC_Schema, get_uuid_for_node +from bento.common.sqs import Queue, VisibilityExtender +from bento.common.data_loader import DataLoader +from bento.common.icdc_schema import ICDC_Schema, get_uuid_for_node RAW_PREFIX = 'RAW' FINAL_PREFIX = 'Final' diff --git a/loader.py b/loader.py index a9a9d4b0..21d4fc55 100755 --- a/loader.py +++ b/loader.py @@ -7,12 +7,12 @@ from neo4j import GraphDatabase, ServiceUnavailable -from common.icdc_schema import ICDC_Schema -from common.utils import get_logger, removeTrailingSlash, check_schema_files, DATETIME_FORMAT, get_host, \ +from bento.common.icdc_schema import ICDC_Schema +from bento.common.utils import get_logger, removeTrailingSlash, check_schema_files, DATETIME_FORMAT, get_host, \ UPSERT_MODE, NEW_MODE, DELETE_MODE -from common.config import BACKUP_FOLDER, PSWD_ENV -from common.data_loader import DataLoader -from common.s3 import S3Bucket +from bento.common.config import BACKUP_FOLDER, PSWD_ENV +from bento.common.data_loader import DataLoader +from bento.common.s3 import S3Bucket def parse_arguments(): parser = argparse.ArgumentParser(description='Load TSV(TXT) files (from Pentaho) into Neo4j') diff --git a/model-converter.py b/model-converter.py index 98b0e1c4..f2a2d7fc 100755 --- a/model-converter.py +++ b/model-converter.py @@ -4,8 +4,8 @@ import argparse import sys -from common.icdc_schema import ICDC_Schema, PROP_TYPE -from common.utils import check_schema_files, get_logger +from bento.common.icdc_schema import ICDC_Schema, PROP_TYPE +from bento.common.utils import check_schema_files, get_logger if __name__ == '__main__': diff --git a/tests/test_file_loader.py b/tests/test_file_loader.py index 62f6030e..b93fd33e 100644 --- a/tests/test_file_loader.py +++ b/tests/test_file_loader.py @@ -3,8 +3,8 @@ import os from neo4j import GraphDatabase from file_loader import FileLoader -from common.icdc_schema import ICDC_Schema -from common.data_loader import DataLoader +from bento.common.icdc_schema import ICDC_Schema +from bento.common.data_loader import DataLoader class TestLambda(unittest.TestCase): diff --git a/tests/test_loader.py b/tests/test_loader.py index 71279d1a..453fc7d0 100644 --- a/tests/test_loader.py +++ b/tests/test_loader.py @@ -1,8 +1,8 @@ import unittest import os -from common.utils import get_logger, removeTrailingSlash, UUID -from common.data_loader import DataLoader -from common.icdc_schema import ICDC_Schema +from bento.common.utils import get_logger, removeTrailingSlash, UUID +from bento.common.data_loader import DataLoader +from bento.common.icdc_schema import ICDC_Schema from neo4j import GraphDatabase diff --git a/tests/test_reloading_data.py b/tests/test_reloading_data.py index 5feced2b..80ab36e7 100644 --- a/tests/test_reloading_data.py +++ b/tests/test_reloading_data.py @@ -1,7 +1,7 @@ import unittest -from common.utils import get_logger, NODES_CREATED, RELATIONSHIP_CREATED, NODES_DELETED, RELATIONSHIP_DELETED -from common.data_loader import DataLoader -from common.icdc_schema import ICDC_Schema +from bento.common.utils import get_logger, NODES_CREATED, RELATIONSHIP_CREATED, NODES_DELETED, RELATIONSHIP_DELETED +from bento.common.data_loader import DataLoader +from bento.common.icdc_schema import ICDC_Schema import os from neo4j import GraphDatabase diff --git a/tests/test_schema.py b/tests/test_schema.py index 68071649..0510c840 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,5 +1,5 @@ import unittest -from common.icdc_schema import ICDC_Schema +from bento.common.icdc_schema import ICDC_Schema class TestSchema(unittest.TestCase): diff --git a/tests/test_utils.py b/tests/test_utils.py index 99a44159..5a6f6689 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,5 @@ import unittest -from common.icdc_schema import get_uuid_for_node +from bento.common.icdc_schema import get_uuid_for_node class TestUtils(unittest.TestCase): def test_uuid(self):