diff --git a/README.md b/README.md index 003c86fa..0cb72324 100644 --- a/README.md +++ b/README.md @@ -6,17 +6,18 @@ This is the documentation index for the NCI ICDC/CTDC Data Loader ## Module List The NCI ICDC/CTDC Data Loader includes multiple data loading modules: -* **Data Loader** - * The Data Loader module is a versatile Python application used to load data into a Neo4j database. - * [Data Loader Documentation](docs/data-loader.md) -* **File Copier** - * The File Copier module copies files from a source URL to a designated AWS S3 Bucket. - * [File Copier Documentation](docs/file-copier.md) +- **Data Loader** + - The Data Loader module is a versatile Python application used to load data into a Neo4j database. + - [Data Loader Documentation](docs/data-loader.md) + +- **File Copier** + - The File Copier module copies files from a source URL to a designated AWS S3 Bucket. + - [File Copier Documentation](docs/file-copier.md) -* **File Loader** - * The File Loader module processes incoming S3 files and then calls the Data Loader module to load the processed file data into a Neo4j database. - * [File Loader Documentation](docs/file-loader.md) +- **File Loader** + - The File Loader module processes incoming S3 files and then calls the Data Loader module to load the processed file data into a Neo4j database. + - [File Loader Documentation](docs/file-loader.md) -* **Model Converter** - * The Model Converter uses a combination of YAML format schema files, a YAML formatted properties files, and a GraphQL formatted queries file to generate a GraphQL formatted schema. - * [Model Converter Documentation](docs/model-converter.md) +- **Model Converter** + - The Model Converter uses a combination of YAML format schema files, a YAML formatted properties files, and a GraphQL formatted queries file to generate a GraphQL formatted schema. + - [Model Converter Documentation](docs/model-converter.md) diff --git a/config.py b/config.py index bfe7b4fb..95800755 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,3 @@ -from configparser import ConfigParser import os import yaml diff --git a/copier.py b/copier.py index f070e26c..d4163548 100644 --- a/copier.py +++ b/copier.py @@ -9,6 +9,37 @@ from bento.common.s3 import S3Bucket +def _is_valid_url(org_url): + return re.search(r'^[^:/]+://', org_url) + + +def _is_local(org_url): + return org_url.startswith('file://') + + +def _get_local_path(org_url): + if _is_local(org_url): + return org_url.replace('file://', '') + else: + raise ValueError(f'{org_url} is not a local file!') + + +def _get_org_md5(org_url, local_file): + """ + Get original MD5, if adapter can't get it, calculate it from original file, download if necessary + :param org_url: + :return: + """ + if _is_local(org_url): + file_path = _get_local_path(org_url) + return get_md5(file_path) + else: + # Download to local and calculate MD5 + stream_download(org_url, local_file) + if not os.path.isfile(local_file): + raise Exception(f'Download file {org_url} to local failed!') + return get_md5(local_file) + class Copier: adapter_attrs = ['load_file_info', 'clear_file_info', 'get_org_url', 'get_file_name', 'get_org_md5', @@ -47,7 +78,7 @@ def __init__(self, bucket_name, prefix, adapter): # Verify adapter has all functions needed for attr in self.adapter_attrs: if not hasattr(adapter, attr): - raise TypeError(f'Adapter doesn\'t have "{attr}" attribute/method') + raise TypeError(f'Adapter does not have "{attr}" attribute/method') self.adapter = adapter self.log = get_logger('Copier') @@ -71,7 +102,7 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): :param file_info: dict that has file information :param overwrite: overwrite file in S3 bucket even existing file has same size :param dryrun: only do preliminary check, don't copy file - :param verify_md5: verify file size and MD5 in file_info against orginal file + :param verify_md5: verify file size and MD5 in file_info against original file :return: dict """ local_file = None @@ -79,7 +110,7 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): self.adapter.clear_file_info() self.adapter.load_file_info(file_info) org_url = self.adapter.get_org_url() - if not self._is_valid_url(org_url): + if not _is_valid_url(org_url): self.log.error(f'"{org_url}" is not a valid URL!') return {self.STATUS: False} if not self._file_exists(org_url): @@ -99,11 +130,11 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): if not org_md5: self.log.info(f'Original MD5 not available, calculate MD5 locally...') local_file = f'tmp/{file_name}' - org_md5 = self._get_org_md5(org_url, local_file) + org_md5 = _get_org_md5(org_url, local_file) elif verify_md5: self.log.info(f'Downloading file and verifying MD5 locally...') local_file = f'tmp/{file_name}' - local_md5 = self._get_org_md5(org_url, local_file) + local_md5 = _get_org_md5(org_url, local_file) if local_md5 != org_md5: self.log.error(f'MD5 verify failed! Original MD5: {org_md5}, local MD5: {local_md5}') return {self.STATUS: False} @@ -112,15 +143,13 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): self.log.info(f'Original MD5 {org_md5}') succeed = {self.STATUS: True, - self.MD5: org_md5, - self.NAME: file_name, - self.KEY: key, - self.FIELDS: self.adapter.get_fields(), - self.ACL: self.adapter.get_acl(), - self.SIZE: org_size - } - - + self.MD5: org_md5, + self.NAME: file_name, + self.KEY: key, + self.FIELDS: self.adapter.get_fields(), + self.ACL: self.adapter.get_acl(), + self.SIZE: org_size + } if dryrun: self.log.info(f'Copying file {key} skipped (dry run)') @@ -132,8 +161,8 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): self.log.info(f'Copying from {org_url} to s3://{self.bucket_name}/{key} ...') # Original file is local - if self._is_local(org_url): - file_path = self._get_local_path(org_url) + if _is_local(org_url): + file_path = _get_local_path(org_url) with open(file_path, 'rb') as stream: dest_size = self._upload_obj(stream, key, org_size) # Original file has been downloaded to local @@ -160,35 +189,19 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False): os.remove(local_file) def _upload_obj(self, stream, key, org_size): - parts = org_size // self.MULTI_PART_CHUNK_SIZE - chunk_size = self.MULTI_PART_CHUNK_SIZE if parts < self.PARTS_LIMIT else org_size // self.PARTS_LIMIT + parts = org_size // self.MULTI_PART_CHUNK_SIZE + chunk_size = self.MULTI_PART_CHUNK_SIZE if parts < self.PARTS_LIMIT else org_size // self.PARTS_LIMIT - t_config = TransferConfig(multipart_threshold=self.MULTI_PART_THRESHOLD, - multipart_chunksize=chunk_size) - self.bucket._upload_file_obj(key, stream, t_config) - self.files_copied += 1 - self.log.info(f'Copying file {key} SUCCEEDED!') - return self.bucket.get_object_size(key) - - def _get_org_md5(self, org_url, local_file): - """ - Get original MD5, if adapter can't get it, calculate it from original file, download if necessary - :param org_url: - :return: - """ - if self._is_local(org_url): - file_path = self._get_local_path(org_url) - return get_md5(file_path) - else: - # Download to local and calculate MD5 - stream_download(org_url, local_file) - if not os.path.isfile(local_file): - raise Exception(f'Download file {org_url} to local failed!') - return get_md5(local_file) + t_config = TransferConfig(multipart_threshold=self.MULTI_PART_THRESHOLD, + multipart_chunksize=chunk_size) + self.bucket.upload_file_obj(key, stream, t_config) + self.files_copied += 1 + self.log.info(f'Copying file {key} SUCCEEDED!') + return self.bucket.get_object_size(key) def _file_exists(self, org_url): - if self._is_local(org_url): - file_path = self._get_local_path(org_url) + if _is_local(org_url): + file_path = _get_local_path(org_url) if not os.path.isfile(file_path): self.log.error(f'"{file_path}" is not a file!') return False @@ -204,16 +217,3 @@ def _file_exists(self, org_url): else: self.log.error(f'Head file error - {r.status_code}: {org_url}') return False - - def _is_local(self, org_url): - return org_url.startswith('file://') - - def _is_valid_url(self, org_url): - return re.search(r'^[^:/]+://', org_url) - - def _get_local_path(self, org_url): - if self._is_local(org_url): - return org_url.replace('file://', '') - else: - raise ValueError(f'{org_url} is not a local file!') - diff --git a/data_loader.py b/data_loader.py index 19906b87..233dee9a 100644 --- a/data_loader.py +++ b/data_loader.py @@ -14,7 +14,7 @@ from neo4j import Driver -from icdc_schema import ICDC_Schema +from icdc_schema import ICDC_Schema, is_parent_pointer, get_list_values from bento.common.utils import get_logger, NODES_CREATED, RELATIONSHIP_CREATED, UUID, \ RELATIONSHIP_TYPE, MULTIPLIER, ONE_TO_ONE, DEFAULT_MULTIPLIER, UPSERT_MODE, \ NEW_MODE, DELETE_MODE, NODES_DELETED, RELATIONSHIP_DELETED, combined_dict_counters, \ @@ -68,7 +68,8 @@ def format_as_tuple(node_name, properties): def backup_neo4j(backup_dir, name, address, log): try: - restore_cmd = 'To restore DB from backup (to remove any changes caused by current data loading, run following commands:\n' + restore_cmd = 'To restore DB from backup (to remove any changes caused by current data loading, run ' \ + 'following commands:\n ' restore_cmd += '#' * 160 + '\n' neo4j_cmd = 'neo4j-admin restore --from={}/{} --force'.format(backup_dir, name) mkdir_cmd = [ @@ -114,8 +115,22 @@ def backup_neo4j(backup_dir, name, address, log): return False +def check_encoding(file_name): + utf8 = 'utf-8' + windows1252 = 'windows-1252' + try: + with open(file_name, encoding=utf8) as file: + for _ in file.readlines(): + pass + return utf8 + except UnicodeDecodeError: + return windows1252 + + class DataLoader: - def __init__(self, driver, schema, plugins=[]): + def __init__(self, driver, schema, plugins=None): + if plugins is None: + plugins = [] if not schema or not isinstance(schema, ICDC_Schema): raise Exception('Invalid ICDC_Schema object') self.log = get_logger('Data Loader') @@ -138,6 +153,15 @@ def __init__(self, driver, schema, plugins=[]): if not hasattr(plugin, 'relationships_created'): raise ValueError('Invalid Plugin!') self.plugins = plugins + self.nodes_created = 0 + self.relationships_created = 0 + self.indexes_created = 0 + self.nodes_deleted = 0 + self.relationships_deleted = 0 + self.nodes_stat = {} + self.relationships_stat = {} + self.nodes_deleted_stat = {} + self.relationships_deleted_stat = {} def check_files(self, file_list): if not file_list: @@ -146,7 +170,7 @@ def check_files(self, file_list): elif file_list: for data_file in file_list: if not os.path.isfile(data_file): - self.log.error('File "{}" doesn\'t exist'.format(data_file)) + self.log.error('File "{}" does not exist'.format(data_file)) return False return True @@ -257,7 +281,7 @@ def _load_all(self, tx, file_list, loading_mode, split, wipe_db): for txt in file_list: self.load_relationships(tx, txt, loading_mode, split) - # Remove extra spaces at begining and end of the keys and values + # Remove extra spaces at beginning and end of the keys and values @staticmethod def cleanup_node(node): obj = {} @@ -278,7 +302,7 @@ def prepare_node(self, node): for key, value in obj.items(): search_node_type = node_type search_key = key - if self.schema.is_parent_pointer(key): + if is_parent_pointer(key): search_node_type, search_key = key.split('.') elif self.schema.is_relationship_property(key): search_node_type, search_key = key.split(self.rel_prop_delimiter) @@ -301,7 +325,7 @@ def prepare_node(self, node): cleaned_value = None else: cleaned_value = int(value) - except Exception: + except ValueError: cleaned_value = None obj[key] = cleaned_value elif key_type == 'Float': @@ -310,28 +334,25 @@ def prepare_node(self, node): cleaned_value = None else: cleaned_value = float(value) - except Exception: + except ValueError: cleaned_value = None obj[key] = cleaned_value elif key_type == 'Array': - items = self.schema.get_list_values(value) + items = get_list_values(value) # todo: need to transform items if item type is not string obj[key] = json.dumps(items) elif key_type == 'DateTime' or key_type == 'Date': - try: - if value is None: - cleaned_value = None - else: - cleaned_value = reformat_date(value) - except Exception: + if value is None: cleaned_value = None + else: + cleaned_value = reformat_date(value) obj[key] = cleaned_value obj2 = {} for key, value in obj.items(): obj2[key] = value # Add parent id field(s) into node - if obj[NODE_TYPE] in self.schema.props.save_parent_id and self.schema.is_parent_pointer(key): + if obj[NODE_TYPE] in self.schema.props.save_parent_id and is_parent_pointer(key): header = key.split('.') if len(header) > 2: self.log.warning('Column header "{}" has multiple periods!'.format(key)) @@ -367,7 +388,7 @@ def get_signature(self, node): result = [] for key in sorted(node.keys()): value = node[key] - if not self.schema.is_parent_pointer(key): + if not is_parent_pointer(key): result.append('{}: {}'.format(key, value)) return '{{ {} }}'.format(', '.join(result)) @@ -377,7 +398,7 @@ def validate_cases_exist_in_file(self, file_name, max_violations): self.log.error('Invalid Neo4j Python Driver!') return False with self.driver.session() as session: - file_encoding = self.check_encoding(file_name) + file_encoding = check_encoding(file_name) with open(file_name, encoding=file_encoding) as in_file: self.log.info('Validating relationships in file "{}" ...'.format(file_name)) reader = csv.DictReader(in_file, delimiter='\t') @@ -392,7 +413,7 @@ def validate_cases_exist_in_file(self, file_name, max_violations): case_id = obj[CASE_ID] if not self.node_exists(session, CASE_NODE, CASE_ID, case_id): self.log.error( - 'Invalid data at line {}: Parent (:{} {{ {}: "{}" }}) doesn\'t exist!'.format( + 'Invalid data at line {}: Parent (:{} {{ {}: "{}" }}) does not exist!'.format( line_num, CASE_NODE, CASE_ID, case_id)) validation_failed = True violations += 1 @@ -402,12 +423,11 @@ def validate_cases_exist_in_file(self, file_name, max_violations): # Validate all parents exist in a data (TSV/TXT) file def validate_parents_exist_in_file(self, file_name, max_violations): - validation_failed = True if not self.driver or not isinstance(self.driver, Driver): self.log.error('Invalid Neo4j Python Driver!') return False with self.driver.session() as session: - file_encoding = self.check_encoding(file_name) + file_encoding = check_encoding(file_name) with open(file_name, encoding=file_encoding) as in_file: self.log.info('Validating relationships in file "{}" ...'.format(file_name)) reader = csv.DictReader(in_file, delimiter='\t') @@ -433,16 +453,16 @@ def validate_parents_exist_in_file(self, file_name, max_violations): return not validation_failed def get_node_properties(self, obj): - ''' + """ Generate a node with only node properties from input data :param obj: input data object (dict), may contain parent pointers, relationship properties etc. :return: an object (dict) that only contains properties on this node - ''' + """ node = {} for key, value in obj.items(): - if self.schema.is_parent_pointer(key): + if is_parent_pointer(key): continue elif self.schema.is_relationship_property(key): continue @@ -451,28 +471,16 @@ def get_node_properties(self, obj): return node - #Check encoding - def check_encoding(self, file_name): - utf8 = 'utf-8' - windows1252 = 'windows-1252' - try: - with open(file_name, encoding=utf8) as file: - for line in file.readlines(): - pass - return utf8 - except UnicodeDecodeError: - return windows1252 - # Validate file def validate_file(self, file_name, max_violations): - file_encoding = self.check_encoding(file_name) + file_encoding = check_encoding(file_name) with open(file_name, encoding=file_encoding) as in_file: self.log.info('Validating file "{}" ...'.format(file_name)) reader = csv.DictReader(in_file, delimiter='\t') line_num = 1 validation_failed = False violations = 0 - IDs = {} + ids = {} for org_obj in reader: obj = self.cleanup_node(org_obj) props = self.get_node_properties(obj) @@ -480,18 +488,21 @@ def validate_file(self, file_name, max_violations): id_field = self.schema.get_id_field(obj) node_id = self.schema.get_id(obj) if node_id: - if node_id in IDs: - if props != IDs[node_id]['props']: + if node_id in ids: + if props != ids[node_id]['props']: validation_failed = True self.log.error( - f'Invalid data at line {line_num}: duplicate {id_field}: {node_id}, found in line: {", ".join(IDs[node_id]["lines"])}') - IDs[node_id]['lines'].append(str(line_num)) + f'Invalid data at line {line_num}: duplicate {id_field}: {node_id}, found in line: ' + '{", ".join(ids[node_id]["lines"])}') + ids[node_id]['lines'].append(str(line_num)) else: - # Same ID exists in same file, but properties are also same, probably it's pointing same object to multiple parents + # Same ID exists in same file, but properties are also same, probably it's pointing same + # object to multiple parents self.log.debug( - f'Duplicated data at line {line_num}: duplicate {id_field}: {node_id}, found in line: {", ".join(IDs[node_id]["lines"])}') + f'Duplicated data at line {line_num}: duplicate {id_field}: {node_id}, found in line: ' + '{", ".join(ids[node_id]["lines"])}') else: - IDs[node_id] = {'props': props, 'lines': [str(line_num)]} + ids[node_id] = {'props': props, 'lines': [str(line_num)]} validate_result = self.schema.validate_node(obj[NODE_TYPE], obj) if not validate_result['result']: @@ -510,7 +521,7 @@ def get_new_statement(self, node_type, obj): for key in obj.keys(): if key in excluded_fields: continue - elif self.schema.is_parent_pointer(key): + elif is_parent_pointer(key): continue elif self.schema.is_relationship_property(key): continue @@ -530,7 +541,7 @@ def get_upsert_statement(self, node_type, id_field, obj): continue elif key == id_field: continue - elif self.schema.is_parent_pointer(key): + elif is_parent_pointer(key): continue elif self.schema.is_relationship_property(key): continue @@ -553,7 +564,7 @@ def delete_node(self, session, node): n_deleted, r_deleted = self.delete_single_node(session, root) node_deleted += n_deleted relationship_deleted += r_deleted - return (node_deleted, relationship_deleted) + return node_deleted, relationship_deleted # Return children of node without other parents def get_children_with_single_parent(self, session, node): @@ -585,7 +596,7 @@ def delete_single_node(self, session, node): self.nodes_deleted_stat[node_type] = self.nodes_deleted_stat.get(node_type, 0) + nodes_deleted relationship_deleted = result.consume().counters.relationships_deleted self.relationships_deleted += relationship_deleted - return (nodes_deleted, relationship_deleted) + return nodes_deleted, relationship_deleted # load file def load_nodes(self, session, file_name, loading_mode, split=False): @@ -599,7 +610,7 @@ def load_nodes(self, session, file_name, loading_mode, split=False): raise Exception('Wrong loading_mode: {}'.format(loading_mode)) self.log.info('{} nodes from file: {}'.format(action_word, file_name)) - file_encoding = self.check_encoding(file_name) + file_encoding = check_encoding(file_name) with open(file_name, encoding=file_encoding) as in_file: reader = csv.DictReader(in_file, delimiter='\t') nodes_created = 0 @@ -650,13 +661,12 @@ def load_nodes(self, session, file_name, loading_mode, split=False): if split and transaction_counter >= BATCH_SIZE: tx.commit() tx = session.begin_transaction() - self.log.info(f'{line_num -1} rows loaded ...') + self.log.info(f'{line_num - 1} rows loaded ...') transaction_counter = 0 # commit last transaction if split: tx.commit() - if loading_mode == DELETE_MODE: self.log.info('{} node(s) deleted'.format(nodes_deleted)) self.log.info('{} relationship(s) deleted'.format(relationship_deleted)) @@ -678,7 +688,7 @@ def collect_relationships(self, obj, session, create_intermediate_node, line_num provided_parents = 0 relationship_properties = {} for key, value in obj.items(): - if self.schema.is_parent_pointer(key): + if is_parent_pointer(key): provided_parents += 1 other_node, other_id = key.split('.') relationship = self.schema.get_relationship(node_type, other_node) @@ -696,11 +706,13 @@ def collect_relationships(self, obj, session, create_intermediate_node, line_num if plugin.should_run(other_node, MISSING_PARENT): if plugin.create_node(session, line_num, other_node, value, obj): int_node_created += 1 - relationships.append({PARENT_TYPE: other_node, PARENT_ID_FIELD: other_id, PARENT_ID: value, - RELATIONSHIP_TYPE: relationship_name, MULTIPLIER: multiplier}) + relationships.append( + {PARENT_TYPE: other_node, PARENT_ID_FIELD: other_id, PARENT_ID: value, + RELATIONSHIP_TYPE: relationship_name, MULTIPLIER: multiplier}) else: self.log.error( - 'Line: {}: Couldn\'t create {} node automatically!'.format(line_num, other_node)) + 'Line: {}: Could not create {} node automatically!'.format(line_num, + other_node)) else: self.log.warning( 'Line: {}: Parent node (:{} {{{}: "{}"}} not found in DB!'.format(line_num, other_node, @@ -726,14 +738,14 @@ def collect_relationships(self, obj, session, create_intermediate_node, line_num def parent_already_has_child(self, session, node_type, node, relationship_name, parent_type, parent_id_field, parent_id): statement = 'MATCH (n:{})-[r:{}]->(m:{} {{ {}: $parent_id }}) return n'.format(node_type, relationship_name, - parent_type, parent_id_field) + parent_type, parent_id_field) result = session.run(statement, {"parent_id": parent_id}) if result: child = result.single() if child: find_current_node_statement = 'MATCH (n:{0} {{ {1}: ${1} }}) return n'.format(node_type, - self.schema.get_id_field( - node)) + self.schema.get_id_field( + node)) current_node_result = session.run(find_current_node_statement, node) if current_node_result: current_node = current_node_result.single() @@ -750,8 +762,8 @@ def has_existing_relationship(self, session, node_type, node, relationship, coun parent_id_field = relationship[PARENT_ID_FIELD] base_statement = 'MATCH (n:{0} {{ {1}: ${1} }})-[r:{2}]->(m:{3})'.format(node_type, - self.schema.get_id_field(node), - relationship_name, parent_type) + self.schema.get_id_field(node), + relationship_name, parent_type) statement = base_statement + ' return m.{} AS {}'.format(parent_id_field, PARENT_ID) result = session.run(statement, node) if result: @@ -788,7 +800,7 @@ def load_relationships(self, session, file_name, loading_mode, split=False): raise Exception('Wrong loading_mode: {}'.format(loading_mode)) self.log.info('{} relationships from file: {}'.format(action_word, file_name)) - file_encoding = self.check_encoding(file_name) + file_encoding = check_encoding(file_name) with open(file_name, encoding=file_encoding) as in_file: reader = csv.DictReader(in_file, delimiter='\t') relationships_created = {} @@ -837,7 +849,7 @@ def load_relationships(self, session, file_name, loading_mode, split=False): prop_statement = ', '.join(self.get_relationship_prop_statements(properties)) statement = 'MATCH (m:{0} {{ {1}: ${1} }})'.format(parent_node, parent_id_field) statement += ' MATCH (n:{0} {{ {1}: ${1} }})'.format(node_type, - self.schema.get_id_field(obj)) + self.schema.get_id_field(obj)) statement += ' MERGE (n)-[r:{}]->(m)'.format(relationship_name) statement += ' ON CREATE SET r.{} = datetime()'.format(CREATED) statement += ', {}'.format(prop_statement) if prop_statement else '' @@ -860,7 +872,7 @@ def load_relationships(self, session, file_name, loading_mode, split=False): if split and transaction_counter >= BATCH_SIZE: tx.commit() tx = session.begin_transaction() - self.log.info(f'{line_num -1} rows loaded ...') + self.log.info(f'{line_num - 1} rows loaded ...') transaction_counter = 0 # commit last transaction if split: @@ -944,4 +956,3 @@ def create_index(self, node_name, node_property, existing, session): session.run(command) self.indexes_created += 1 self.log.info("Index created for \"{}\" on property \"{}\"".format(node_name, node_property)) - diff --git a/es_loader.py b/es_loader.py index 2532395f..9d0cb87a 100755 --- a/es_loader.py +++ b/es_loader.py @@ -1,17 +1,16 @@ #!/user/bin/env python3 -import os import argparse -from neo4j import GraphDatabase +import yaml from elasticsearch import Elasticsearch from elasticsearch.helpers import streaming_bulk -import yaml -import tqdm +from neo4j import GraphDatabase from bento.common.utils import get_logger logger = get_logger('ESLoader') + class ESLoader: def __init__(self, es_host, neo4j_driver): self.neo4j_driver = neo4j_driver @@ -59,7 +58,7 @@ def load(self, index_name, mapping, cypher_query): # progress = tqdm.tqdm(unit="docs", total=number_of_docs) successes = 0 total = 0 - for ok, action in streaming_bulk( + for ok, _ in streaming_bulk( client=self.es_client, index=index_name, actions=self.get_data(cypher_query, mapping.keys()) @@ -96,5 +95,6 @@ def main(): for index in indices: loader.load(index['index_name'], index['mapping'], index['cypher_query']) + if __name__ == '__main__': main() diff --git a/file_copier.py b/file_copier.py index d41354df..717bb5e0 100755 --- a/file_copier.py +++ b/file_copier.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 -from collections import deque import csv -from importlib import import_module import json import os +from collections import deque from bento.common.sqs import Queue, VisibilityExtender from bento.common.utils import get_logger, get_uuid, LOG_PREFIX, UUID, get_time_stamp, removeTrailingSlash, load_plugin @@ -23,7 +22,6 @@ class FileLoader: - GUID = 'GUID' MD5 = 'md5' SIZE = 'size' @@ -60,14 +58,15 @@ class FileLoader: PREFIX = 'prefix' VERIFY_MD5 = 'verify_md5' - def __init__(self, mode, adapter_module=None, adapter_class=None, adapter_params=None, domain=None, bucket=None, prefix=None, pre_manifest=None, first=1, count=-1, job_queue=None, result_queue=None, retry=3, overwrite=False, dryrun=False, verify_md5=False): + def __init__(self, mode, adapter_module=None, adapter_class=None, adapter_params=None, domain=None, bucket=None, + prefix=None, pre_manifest=None, first=1, count=-1, job_queue=None, result_queue=None, retry=3, + overwrite=False, dryrun=False, verify_md5=False): """" :param bucket: string type :param pre_manifest: string type, holds path to pre-manifest :param first: first file of files to process, file 1 is in line 2 of pre-manifest :param count: number of files to process - :param adapter: any object that has following methods/properties defined in adapter_attrs """ if mode not in Config.valid_modes: @@ -95,7 +94,7 @@ def __init__(self, mode, adapter_module=None, adapter_class=None, adapter_params raise ValueError(f'Invalid prefix: "{prefix}"') if not pre_manifest or not os.path.isfile(pre_manifest): - raise ValueError(f'Pre-manifest: "{pre_manifest}" dosen\'t exist') + raise ValueError(f'Pre-manifest: "{pre_manifest}" does not exist') self.pre_manifest = pre_manifest if not domain: @@ -138,11 +137,6 @@ def __init__(self, mode, adapter_module=None, adapter_class=None, adapter_params self.files_failed = 0 def _init_adapter(self, adapter_module, adapter_class, params): - """ - Initialize different adapters base on given adapter_name - :param adapter_name: - :return: - """ self.adapter = load_plugin(adapter_module, adapter_class, params) if not hasattr(self.adapter, 'filter_fields'): @@ -191,20 +185,20 @@ def populate_neo4j_record(self, record, result): @staticmethod def _clean_up_field_names(headers): - ''' + """ Removes leading and trailing spaces from header names :param headers: :return: - ''' + """ return [header.strip() for header in headers] @staticmethod def _clean_up_record(record): - ''' + """ Removes leading and trailing spaces from keys in org_record :param record: :return: - ''' + """ return {key.strip(): value for key, value in record.items()} def _read_pre_manifest(self): @@ -325,7 +319,6 @@ def process_all(self): self.log.debug(e) self.log.critical(f'Process files FAILED! Check debug log for detailed information.') - # read result from result queue - master mode def read_result(self, num_files): if self.mode != MASTER_MODE: @@ -345,19 +338,21 @@ def read_result(self, num_files): count = 0 while count < num_files: - self.log.info(f'Waiting for results on queue: {self.result_queue_name}, {num_files - count} files pending') + self.log.info(f'Waiting for results on queue: {self.result_queue_name}, \ + {num_files - count} files pending') for msg in self.result_queue.receiveMsgs(self.VISIBILITY_TIMEOUT): self.log.info(f'Received a result!') extender = None try: result = json.loads(msg.body) # Make sure result is in correct format - if (result and - Copier.STATUS in result and - Copier.MD5 in result and - Copier.NAME in result and - Copier.KEY in result and - Copier.FIELDS in result + if ( + result and + Copier.STATUS in result and + Copier.MD5 in result and + Copier.NAME in result and + Copier.KEY in result and + Copier.FIELDS in result ): extender = VisibilityExtender(msg, self.VISIBILITY_TIMEOUT) @@ -384,16 +379,15 @@ def read_result(self, num_files): except Exception as e: self.log.debug(e) - self.log.critical(f'Something wrong happened while processing file! Check debug log for details.') + self.log.critical( + f'Something wrong happened while processing file! Check debug log for details.') finally: if extender: extender.stop() - extender = None self.log.info(f'All {num_files} files finished!') - # Use this method in slave mode def start_work(self): if self.mode != SLAVE_MODE: @@ -402,7 +396,8 @@ def start_work(self): while True: try: - self.log.info(f'Waiting for jobs on queue: {self.job_queue_name}, {self.files_processed} files have been processed so far') + self.log.info(f'Waiting for jobs on queue: {self.job_queue_name}, ' + f'{self.files_processed} files have been processed so far') for msg in self.job_queue.receiveMsgs(self.VISIBILITY_TIMEOUT): self.log.info(f'Received a job!') extender = None @@ -412,14 +407,14 @@ def start_work(self): self.log.debug(data) # Make sure job is in correct format if ( - self.ADAPTER_CONF in data and - self.BUCKET in data and - self.INFO in data and - self.TTL in data and - self.OVERWRITE in data and - self.PREFIX in data and - self.DRY_RUN in data and - self.VERIFY_MD5 in data + self.ADAPTER_CONF in data and + self.BUCKET in data and + self.INFO in data and + self.TTL in data and + self.OVERWRITE in data and + self.PREFIX in data and + self.DRY_RUN in data and + self.VERIFY_MD5 in data ): extender = VisibilityExtender(msg, self.VISIBILITY_TIMEOUT) dryrun = data[self.DRY_RUN] @@ -446,7 +441,8 @@ def start_work(self): self.prefix = prefix self.copier.set_prefix(prefix) - result = self.copier.copy_file(data[self.INFO], data[self.OVERWRITE], dryrun or self.dryrun, verify_md5) + result = self.copier.copy_file(data[self.INFO], data[self.OVERWRITE], dryrun or self.dryrun, + verify_md5) if result[Copier.STATUS]: self.result_queue.sendMsgToQueue(result, f'{result[Copier.NAME]}_{get_time_stamp()}') @@ -465,14 +461,14 @@ def start_work(self): except Exception as e: self.log.debug(e) - self.log.critical(f'Something wrong happened while processing file! Check debug log for details.') + self.log.critical( + f'Something wrong happened while processing file! Check debug log for details.') if data: self._deal_with_failed_file_sqs(data) finally: if extender: extender.stop() - extender = None except KeyboardInterrupt: self.log.info('Good bye!') @@ -492,7 +488,6 @@ def run(self): self.start_work() - def main(): config = Config() if not config.validate(): @@ -501,5 +496,6 @@ def main(): loader = FileLoader(**config.data) loader.run() + if __name__ == '__main__': main() diff --git a/icdc_schema.py b/icdc_schema.py index eb8900fb..5bbfd876 100644 --- a/icdc_schema.py +++ b/icdc_schema.py @@ -35,20 +35,27 @@ EX_MAX = 'exclusiveMaximum' +def get_list_values(list_str): + return [item.strip() for item in list_str.split(LIST_DELIMITER) if item.strip()] + + +def is_parent_pointer(field_name): + return re.fullmatch(r'\w+\.\w+', field_name) is not None + class ICDC_Schema: def __init__(self, yaml_files, props): - assert isinstance(props, Props) + if not isinstance(props, Props): + raise AssertionError self.props = props self.rel_prop_delimiter = props.rel_prop_delimiter if not yaml_files: - raise Exception('File list is empty, couldn\'t initialize ICDC_Schema object!') - sys.exit(1) + raise Exception('File list is empty,could not initialize ICDC_Schema object!') else: for data_file in yaml_files: if not os.path.isfile(data_file): - raise Exception('File "{}" doesn\'t exist'.format(data_file)) + raise Exception('File "{}" does not exist'.format(data_file)) self.log = get_logger('ICDC Schema') self.org_schema = {} for aFile in yaml_files: @@ -89,22 +96,22 @@ def __init__(self, yaml_files, props): self.num_relationship += self.process_edges(key, value) def get_uuid_for_node(self, node_type, signature): - """Generate V5 UUID for a node - Arguments: - node_type - a string represents type of a node, e.g. case, study, file etc. - signature - a string that can uniquely identify a node within it's type, e.g. case_id, clinical_study_designation etc. - or a long string with all properties and values concat together if no id available + """ + Generate V5 UUID for a node Arguments: node_type - a string represents type of a node, e.g. case, study, + file etc. signature - a string that can uniquely identify a node within it's type, e.g. case_id, + clinical_study_designation etc. or a long string with all properties and values concat together if no id + available """ return get_uuid(self.props.domain, node_type, signature) def _process_properties(self, desc): - ''' + """ Gather properties from description :param desc: description of properties :return: a dict with properties, required property list and private property list - ''' + """ props = {} required = set() private = set() @@ -122,21 +129,20 @@ def _process_properties(self, desc): return {PROPERTIES: props, REQUIRED: required, PRIVATE: private} - def process_node(self, name, desc, isRelationship=False): - ''' + def process_node(self, name, desc, is_relationship=False): + """ Process input node/relationship properties and save it in self.nodes :param name: node/relationship name :param desc: - :param isRelationship: if input is a relationship + :param is_relationship: if input is a relationship :return: - ''' + """ properties = self._process_properties(desc) - # All nodes and relationships that has properties will be save to self.nodes # Relationship without properties will be ignored - if properties[PROPERTIES] or not isRelationship: + if properties[PROPERTIES] or not is_relationship: self.nodes[name] = properties def process_edges(self, name, desc): @@ -150,17 +156,19 @@ def process_edges(self, name, desc): self.relationship_props[name] = properties if END_POINTS in desc: - for end_points in desc[END_POINTS]: + for end_points in desc[END_POINTS]: src = end_points[SRC] dest = end_points[DEST] if MULTIPLIER in end_points: actual_multiplier = end_points[MULTIPLIER] - self.log.debug('End point multiplier: "{}" overriding relationship multiplier: "{}"'.format(actual_multiplier, multiplier)) + self.log.debug( + 'End point multiplier: "{}" overriding relationship multiplier: "{}"'.format(actual_multiplier, + multiplier)) else: actual_multiplier = multiplier if src not in self.relationships: self.relationships[src] = {} - self.relationships[src][dest] = { RELATIONSHIP_TYPE: name, MULTIPLIER: actual_multiplier } + self.relationships[src][dest] = {RELATIONSHIP_TYPE: name, MULTIPLIER: actual_multiplier} count += 1 if src in self.nodes: @@ -177,29 +185,37 @@ def process_edges(self, name, desc): # Process singular/plural array/single value based on relationship multipliers like many-to-many, many-to-one etc. # Return a relationship property to add into a node - def add_relationship_to_node(self, name, multiplier, relationship, otherNode, dest=False): + def add_relationship_to_node(self, name, multiplier, relationship, other_node, dest=False): node = self.nodes[name] if multiplier == 'many_to_one': if dest: - node[PROPERTIES][self.plural(otherNode)] = { PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } + node[PROPERTIES][self.plural(other_node)] = { + PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(other_node, relationship)} else: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } + node[PROPERTIES][other_node] = { + PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(other_node, relationship)} elif multiplier == 'one_to_one': if relationship == NEXT_RELATIONSHIP: if dest: - node[PROPERTIES]['prior_' + otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } + node[PROPERTIES]['prior_' + other_node] = { + PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(other_node, relationship)} else: - node[PROPERTIES]['next_' + otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } + node[PROPERTIES]['next_' + other_node] = { + PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(other_node, relationship)} else: if dest: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } + node[PROPERTIES][other_node] = { + PROP_TYPE: '{} @relation(name:"{}", direction:IN)'.format(other_node, relationship)} else: - node[PROPERTIES][otherNode] = {PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } + node[PROPERTIES][other_node] = { + PROP_TYPE: '{} @relation(name:"{}", direction:OUT)'.format(other_node, relationship)} elif multiplier == 'many_to_many': if dest: - node[PROPERTIES][self.plural(otherNode)] = {PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(otherNode, relationship) } + node[PROPERTIES][self.plural(other_node)] = { + PROP_TYPE: '[{}] @relation(name:"{}", direction:IN)'.format(other_node, relationship)} else: - node[PROPERTIES][self.plural(otherNode)] = {PROP_TYPE: '[{}] @relation(name:"{}", direction:OUT)'.format(otherNode, relationship) } + node[PROPERTIES][self.plural(other_node)] = { + PROP_TYPE: '[{}] @relation(name:"{}", direction:OUT)'.format(other_node, relationship)} else: self.log.warning('Unsupported relationship multiplier: "{}"'.format(multiplier)) @@ -218,14 +234,14 @@ def is_private_prop(self, name): return result def get_prop_type(self, node_type, prop): - if node_type in self.nodes: + if node_type in self.nodes: node = self.nodes[node_type] if prop in node[PROPERTIES]: return node[PROPERTIES][prop][PROP_TYPE] return DEFAULT_TYPE def get_type(self, name): - result = { PROP_TYPE: DEFAULT_TYPE } + result = {PROP_TYPE: DEFAULT_TYPE} if name in self.org_schema[PROP_DEFINITIONS]: prop = self.org_schema[PROP_DEFINITIONS][name] if PROP_TYPE in prop: @@ -248,7 +264,8 @@ def get_type(self, name): if len(enum) > 0: result[ENUM] = enum else: - self.log.debug('Property type: "{}" not supported, use default type: "{}"'.format(prop_desc, DEFAULT_TYPE)) + self.log.debug( + 'Property type: "{}" not supported, use default type: "{}"'.format(prop_desc, DEFAULT_TYPE)) # Add value boundary support if MIN in prop: @@ -294,7 +311,6 @@ def get_default_unit(self, node_name, name): unit_prop_name = self.get_unit_property_name(name) return self.get_default_value(node_name, unit_prop_name) - def get_valid_values(self, node_name, name): prop = self.get_prop(node_name, name) if prop: @@ -328,7 +344,7 @@ def process_value_unit_type(self, name, prop_type): if units: enum = set(units) unit_prop_name = self.get_unit_property_name(name) - results[unit_prop_name] = {PROP_TYPE: DEFAULT_TYPE, ENUM: enum, DEFAULT_VALUE: units[0]} + results[unit_prop_name] = {PROP_TYPE: DEFAULT_TYPE, ENUM: enum, DEFAULT_VALUE: units[0]} org_prop_name = self.get_original_value_property_name(name) org_unit_prop_name = self.get_unit_property_name(org_prop_name) results[org_prop_name] = prop_type @@ -345,7 +361,7 @@ def get_original_value_property_name(name): def validate_node(self, model_type, obj): if not model_type or model_type not in self.nodes: - return {'result': False, 'messages': ['Node type: "{}" doesn\'t exist!'.format(model_type)]} + return {'result': False, 'messages': ['Node type: "{}" does not exist!'.format(model_type)]} if not obj: return {'result': False, 'messages': ['Node is empty!']} @@ -367,7 +383,7 @@ def validate_node(self, model_type, obj): for key, value in obj.items(): if key == NODE_TYPE: continue - elif self.is_parent_pointer(key): + elif is_parent_pointer(key): continue elif self.is_relationship_property(key): rel_type, rel_prop = key.split(self.rel_prop_delimiter) @@ -392,19 +408,20 @@ def validate_node(self, model_type, obj): prop_type = properties[key] if not self._validate_type(prop_type, value): result['result'] = False - result['messages'].append('Property: "{}":"{}" is not a valid "{}" type!'.format(key, value, prop_type)) + result['messages'].append( + 'Property: "{}":"{}" is not a valid "{}" type!'.format(key, value, prop_type)) return result @staticmethod def _validate_value_range(model_type, value): - ''' + """ Validate an int of float value, return whether value is in range :param model_type: dict specify value type and boundary/range :param value: value to be validated :return: boolean - ''' + """ if MIN in model_type: if value < model_type[MIN]: @@ -443,7 +460,7 @@ def _validate_type(self, model_type, str_value): and not re.match(r'\bltf\b', str_value, re.IGNORECASE)): return False elif model_type[PROP_TYPE] == 'Array': - for item in self.get_list_values(str_value): + for item in get_list_values(str_value): if not self._validate_type(model_type[ITEM_TYPE], item): return False @@ -451,7 +468,7 @@ def _validate_type(self, model_type, str_value): if not isinstance(str_value, dict): return False elif model_type[PROP_TYPE] == 'String': - if ENUM in model_type: + if ENUM in model_type: if not isinstance(str_value, str): return False if str_value != '' and str_value not in model_type[ENUM]: @@ -474,9 +491,6 @@ def _validate_type(self, model_type, str_value): return False return True - def get_list_values(self, list_str): - return [item.strip() for item in list_str.split(LIST_DELIMITER) if item.strip()] - # Find relationship type from src to dest def get_relationship(self, src, dest): if src in self.relationships: @@ -499,10 +513,9 @@ def get_dest_node_for_relationship(self, src, name): if rel[RELATIONSHIP_TYPE] == name: return dest else: - self.log.error('Couldn\'t find any relationship from (:{})'.format(src)) + self.log.error('Could not find any relationship from (:{})'.format(src)) return None - # Get type info from description def map_type(self, type_name): mapping = self.props.type_mapping @@ -545,7 +558,7 @@ def get_public_props_for_node(self, node_name): if node_name in self.nodes: props = self.nodes[node_name][PROPERTIES].copy() for private_prop in self.nodes[node_name].get(PRIVATE, []): - del(props[private_prop]) + del (props[private_prop]) self.log.info('Delete private property: "{}"'.format(private_prop)) return props else: @@ -575,9 +588,4 @@ def get_id(self, obj): return obj[id_field] def is_relationship_property(self, key): - return re.match(r'^.+\{}.+$'.format(self.rel_prop_delimiter), key) - - - def is_parent_pointer(self, field_name): - return re.fullmatch(r'\w+\.\w+', field_name) is not None - + return re.match(r'^.+\\{}.+$'.format(self.rel_prop_delimiter), key) diff --git a/loader_plugins/individual_creator.py b/loader_plugins/individual_creator.py index 64881270..dd7fbbd1 100644 --- a/loader_plugins/individual_creator.py +++ b/loader_plugins/individual_creator.py @@ -1,7 +1,6 @@ - from icdc_schema import ICDC_Schema, NODE_TYPE from bento.common.utils import get_logger, NODE_LOADED -from data_loader import CREATED, UPDATED +from data_loader import CREATED from bento.common.utils import UUID REGISTRATION_NODE = 'registration' @@ -54,7 +53,6 @@ def create_node(self, session, **kwargs): msg = f"Line: {line_num}: More than one individuals associated with one dog!" self.log.error(msg) raise Exception(msg) - elif len(individual_nodes) == 1: individual = individual_nodes[0] i_id = individual.id @@ -68,12 +66,12 @@ def create_node(self, session, **kwargs): return individual_created - def create_individual(self, session, uuid): id_field = self.schema.props.id_fields.get(INDIVIDUAL_NODE) statement = f''' - MATCH (i:{INDIVIDUAL_NODE}) WITH apoc.number.format(coalesce(max(toInteger(i.canine_individual_id)) + 1, 1), '0000') AS i_id - CREATE (i:{INDIVIDUAL_NODE} {{ {id_field}: i_id, {CREATED}: datetime(), {UUID}:${UUID} }}) + MATCH (i:{INDIVIDUAL_NODE}) WITH apoc.number.format(coalesce(max(toInteger(i.canine_individual_id)) + 1, + 1), '0000') AS i_id CREATE (i:{INDIVIDUAL_NODE} {{ {id_field}: i_id, {CREATED}: datetime(), + {UUID}:${UUID} }}) RETURN id(i) AS node_id ''' result = session.run(statement, {UUID: uuid}) @@ -101,4 +99,3 @@ def connect_case_to_individual(self, session, c_id, i_id): count = result.consume().counters.relationships_created self.relationships_created += count self.relationships_stat[relationship_name] = self.relationships_stat.get(relationship_name, 0) + count - diff --git a/loader_plugins/visit_creator.py b/loader_plugins/visit_creator.py index 55e84116..b5fa7495 100644 --- a/loader_plugins/visit_creator.py +++ b/loader_plugins/visit_creator.py @@ -46,7 +46,7 @@ def should_run(self, node_type, event): def create_node(self, session, line_num, node_type, node_id, src): if node_type != VISIT_NODE: - self.log.debug("Line: {}: Won't create node for type: '{}'".format(line_num, VISIT_NODE, node_type)) + self.log.debug("Line: {}: Won't create node for type: '{}'".format(line_num, VISIT_NODE)) return False if not node_id: self.log.error("Line: {}: Can't create (:{}) node for id: '{}'".format(line_num, VISIT_NODE, node_id)) diff --git a/uuid_util.py b/uuid_util.py index fda43285..03ab31e2 100755 --- a/uuid_util.py +++ b/uuid_util.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 # This script can be used to verify or replace UUIDs generated by file-copier -# UUID should be generated by using project's domain name, node type of 'file' and file's file_location (new) or MD5 (legacy used before UBC01 and UBC02) as signature +# UUID should be generated by using project's domain name, node type of 'file' and file's file_location (new) or MD5 +# (legacy used before UBC01 and UBC02) as signature import argparse import csv import os -from bento.common.utils import LOG_PREFIX, APP_NAME, get_logger, get_uuid, get_time_stamp, removeTrailingSlash, \ - get_log_file, format_bytes +from bento.common.utils import LOG_PREFIX, APP_NAME, get_logger, get_uuid if LOG_PREFIX not in os.environ: os.environ[LOG_PREFIX] = 'UUID_util' @@ -23,6 +23,7 @@ def get_new_manifest_name(manifest): new_name = f"{org_name}_corrected.{ext}" return os.path.join(folder, new_name) + def process_file(file_obj, signature_column, uuid_column, domain, indexd_mode): file_name = file_obj.name log.info(f"Processing {file_name}") @@ -67,7 +68,7 @@ def process_file(file_obj, signature_column, uuid_column, domain, indexd_mode): writer.writerow(obj) log.info("Done") log.info("=" * 70) - return (total, succeeded, failed) + return total, succeeded, failed def main(): @@ -83,7 +84,9 @@ def main(): ) parser.add_argument('-p', '--project', help='Project name', choices=valid_projects.keys(), default='ICDC') parser.add_argument('-u', '--uuid-column', help='column that contains UUID', default='uuid') - parser.add_argument('-s', '--signature-column', help='column that contains file signature new files should use file_location, legacy files (before UBC01) should use md5sum', default='file_location') + parser.add_argument('-s', '--signature-column', help='column that contains file signature new files should use ' + 'file_location, legacy files (before UBC01) should use ' + 'md5sum', default='file_location') parser.add_argument('-i', '--indexd-mode', help='IndexD Mode', action='store_true') args = parser.parse_args()