From dfc7176114824fe27324d1983dc324bc29f8d63d Mon Sep 17 00:00:00 2001 From: YaphetKG <45075777+YaphetKG@users.noreply.github.com> Date: Thu, 18 Apr 2024 17:23:10 -0400 Subject: [PATCH] release sync (#100) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update _version.py (#86) * Update _version.py * Rti merge (#84) * roger cli preped for Merge Deploy * Update Makefile to work with python env * Update redisgraph-bulk-loader to fix issue with loading MODULE LIST * Revert "Update redisgraph-bulk-loader to fix issue with loading MODULE LIST" This reverts commit 7baf7efa725caac77e5501e948f545a0f4b20e3d. * Finalized dev deployment of dug inside Catapult Merge, deployment yamls, code changes and configurations * updated to reflect the Dug-Api updates to FastAPI * adding multi label redis by removing 'biolink:' on nodes, edges cannot be fixed after update so they need to be solved either by changing TranQl AND Plater or forking bulk-redisgraph to allow for colons to be added in the edges * Working multi label redis nodes w/ no biolink label * Latest code changes to deploy working Roger in Merge * biolink data move to '.' separator * updates to include new dug fixes, upgraded redis-bulk-loader and made changes to for biolink variables to specify it's domain with a 'biolink.' * adding test roger code * removed helm deployments * change docker owner * remove core.py * remove dup dev config * redis graph is not directly used removing cruft * remove print statement * remove logging files * update requriemtns * update requriemtns * add redis graph.py * fix import error for logger * adding es scheme and ca_path config * adding es scheme and ca_path config * adding debug code * removing debug * adding nodes args * adding biolink. * adding biolink. * Update requirements.txt * Update .gitignore * Update dug_utils.py Handle Error when curie not found in validate * Update __init__.py * Update config.yaml * Update dev-config.yaml * Update docker-compose.yaml * fixed docker-compose * adding back postgres volume to docker compose * env correction , docker compose updates --------- Co-authored-by: Nathan Braswell Co-authored-by: esurface Co-authored-by: braswent * adding v5.0 * cde-links branch * pin linkml * Update config.yaml collection_action to action * pop total items before result * print extracted elements * Update requirements.txt * Keep edge provenance (#94) * Update kgx.py * Update kgx.py * Update kgx.py can't delete edge keys while looping over them. * just collect then update * Update requirements.txt (#93) * Pipeline parameterize restructure (#95) * roger cli preped for Merge Deploy * Update Makefile to work with python env * Update redisgraph-bulk-loader to fix issue with loading MODULE LIST * Revert "Update redisgraph-bulk-loader to fix issue with loading MODULE LIST" This reverts commit 7baf7efa725caac77e5501e948f545a0f4b20e3d. * Finalized dev deployment of dug inside Catapult Merge, deployment yamls, code changes and configurations * updated to reflect the Dug-Api updates to FastAPI * adding multi label redis by removing 'biolink:' on nodes, edges cannot be fixed after update so they need to be solved either by changing TranQl AND Plater or forking bulk-redisgraph to allow for colons to be added in the edges * Working multi label redis nodes w/ no biolink label * Latest code changes to deploy working Roger in Merge * biolink data move to '.' separator * updates to include new dug fixes, upgraded redis-bulk-loader and made changes to for biolink variables to specify it's domain with a 'biolink.' * adding test roger code * removed helm deployments * change docker owner * remove core.py * remove dup dev config * redis graph is not directly used removing cruft * remove print statement * remove logging files * update requriemtns * update requriemtns * add redis graph.py * fix import error for logger * adding es scheme and ca_path config * adding es scheme and ca_path config * Parameterized annotate tasks with input_data_path and output_data_path * adding debug code * removing debug * adding nodes args * adding biolink. * adding biolink. * Parameterized annotate tasks with input_data_path and output_data_path (#85) * adding lakefs changes to roger-2.0 * point avalon to vg1 branch * change avalon dep * update airflow * fix avalon tag typo * update jenkins to tag version on main branch only * update jenkins to tag version * update jenkins to tag version * psycopg2 installation * add cncf k8s req * use airflow non-slim * simplified for testing * simplified for testing * change dag name * Erroneous parameter passed, should not be None * adding pre-exec * adding pre-exec * adding pre-exec * typo preexec * typo preexec * fix context * get files from repo * get files from repo * get files from repo * get files from repo * First shot at moving pipeline into base class and implementing. Anvil pipeline not complete * Syntax fix, docker image version bump to airflow 2.7.2-python3.11 * update storage dir * update remove dir code * update remove dir code * remote path to * * fix input dir for annotators * fix input dir for annotators * fix input dir for annotators * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * kwargs to task * adding branch info on lakefs config * callback push to branch * back to relative import * reformat temp branch name based on unique task id * add logging * add logging * convert posix path to str for avalon * add extra / to root path * New dag created using DugPipeline subclasses * EmptyOperator imported from wrong place * import and syntax fixes * utterly silly syntax error * Added anvil to default input data sets for testing purposes * adding / to local path * commit meta task args empty string * add merge logic * add merge logic * upstream task dir pull for downstream task * Switched from subdag to taskgroup because latest Airflow depricated subdag * Added BACPAC pipeline object * Temporarily ignoring configuration variable for enabled datasets for testing * Passed dag in to create task group to see if it helps dag errors * Fixed silly syntax error * adding input / output dir params for make kgx * Trying different syntax to make taskgroups work. * adding input / output dir params for make kgx * Parsing, syntax, pylint fixes * adding input / output dir params for make kgx * Added pipeline name to task group name to ensure uniqueness * oops, moved something out of scope. Fixed * Filled out pipeline with methods from dug_utils. Needs data path changes * Finished implementing input_data_path and output_data_path handling, pylint cleanup * Update requirements.txt * adding toggle to avoid sending config obj * adding toggle to avoid sending config obj * disable to string for test * control pipelines for testing * add self to anvil get files * add log stream to make it available * typo fix * correcting branch id * adding source repo * adding source repo * patch name-resolver response * no pass input repo and branch , if not overriden to pre-exec * no pass input repo and branch , if not overriden to pre-exec * no pass input repo and branch , if not overriden to pre-exec * dug pipeline edit * recurisvely find recursively * recurisvely find recursively * setup output path for crawling * all task functions should have input and output params * adding annotation as upstream for validate index * revamp create task , and task wrapper * add validate concepts index task * adding concept validation * add index_variables task as dependecy for validate concepts * add index_variables task as dependecy for validate concepts * await client exist * await client exist * concepts not getting picked up for indexing * concepts not getting picked up for indexing * fix search elements * converting annotation output to json * json format annotation outputs * adding support for json format elements and concepts read * json back to dug objects * fixing index valriables with json objects * indetation and new line for better change detection :? * indetation and new line for better change detection * treat dictionary concepts as dictionary * read concepts json as a dict * concepts files are actually file paths * debug message * make output jsonable * clear up dir after commit , and delete unmerged branch even if no changes * don`t clear indexes, parallel dataset processing will be taxed * memory leak? * memory leak? * memory leak? * dumping pickles to debug locally * find out why concepts are being added to every other element * find out why concepts are being added to every other element * pointless shuffle 🤷‍♂️ * revert back in time * back to sanitize dug * output just json for annotation * adding jsonpickle * jsonpickle 🥒 * unpickle for index * unpickle for validate index * crawling fixes * crawling fixes * crawling validation fixes * fix index concepts * fix makekgx * adding other bdc pipelines * adding pipeline paramters to be able to configure per instance * fix * add input dataset for pipelines * Adding README to document how to create data set-specific pipelines * catchup on base.py * Added dbgap and nida pipelines * fix import errors * annotator modules added by passing config val (#90) * annotator modules added by passing config val * fix merge conflict * following same pattern as parsers , modify configs * fix to dug config method * fix old dug pipeline for backward compatiblity * correct default annotator type * reflective changes * typo extra quotes * annotator type not being picked up from config * remove annotate simple , log env value for lakefs enabled * testing lakefs off * add more logging * add more logging * post init for config to parse to boolean * put back task calls * revert some changes * adding new pipeline * lakefs io support for merge task * fix name * add io params for kg tasks * wire up i/o paths for merge * fix variable name * print files * few debug logs * few debug logs * treat path as path not str * few debug logs * some fixes * logging edge files * bug fix knowledge has edge * re-org graph structure * adding pathing for other tasks * pagenation logic fix for avalon * update lakefs client code * fix glob for get kgx files * fix up get merged objects * send down fake commit id for metadata * working on edges schema * bulk create nodes I/O * find schema file * bulk create edges I/O * bulk create edges I/O * bulk load io * no outputs for final tasks * add recursive glob * fix globbing * oops * delete dags * pin dug to latest release * cruft cleanup * re-org kgx config * add support for multiple initial repos * fix comma * create dir to download to * swap branch and repo * clean up dirs * fix up other pipeline 👌 --------- Co-authored-by: YaphetKG * Add heal parsers (#96) * annotator modules added by passing config val * fix merge conflict * following same pattern as parsers , modify configs * fix to dug config method * fix old dug pipeline for backward compatiblity * correct default annotator type * reflective changes * typo extra quotes * annotator type not being picked up from config * remove annotate simple , log env value for lakefs enabled * testing lakefs off * add more logging * add more logging * post init for config to parse to boolean * put back task calls * revert some changes * adding new pipeline * lakefs io support for merge task * fix name * add io params for kg tasks * wire up i/o paths for merge * fix variable name * print files * few debug logs * few debug logs * treat path as path not str * few debug logs * some fixes * logging edge files * bug fix knowledge has edge * re-org graph structure * adding pathing for other tasks * pagenation logic fix for avalon * update lakefs client code * fix glob for get kgx files * fix up get merged objects * send down fake commit id for metadata * working on edges schema * bulk create nodes I/O * find schema file * bulk create edges I/O * bulk create edges I/O * bulk load io * no outputs for final tasks * add recursive glob * fix globbing * oops * delete dags * pin dug to latest release * cruft cleanup * re-org kgx config * add support for multiple initial repos * fix comma * create dir to download to * swap branch and repo * clean up dirs * fix up other pipeline 👌 * add remaining pipelines * adding ctn parser * change merge strategy * merge init fix * debug dir * fix topmed file read * fix topmed file read * return file names as strings * topmed kgx builder custom * topmed kgx builder custom * add skip * get files pattern recursive * version pin avalon * pin dug --------- Co-authored-by: braswent * Add heal parsers (#97) * annotator modules added by passing config val * fix merge conflict * following same pattern as parsers , modify configs * fix to dug config method * fix old dug pipeline for backward compatiblity * correct default annotator type * reflective changes * typo extra quotes * annotator type not being picked up from config * remove annotate simple , log env value for lakefs enabled * testing lakefs off * add more logging * add more logging * post init for config to parse to boolean * put back task calls * revert some changes * adding new pipeline * lakefs io support for merge task * fix name * add io params for kg tasks * wire up i/o paths for merge * fix variable name * print files * few debug logs * few debug logs * treat path as path not str * few debug logs * some fixes * logging edge files * bug fix knowledge has edge * re-org graph structure * adding pathing for other tasks * pagenation logic fix for avalon * update lakefs client code * fix glob for get kgx files * fix up get merged objects * send down fake commit id for metadata * working on edges schema * bulk create nodes I/O * find schema file * bulk create edges I/O * bulk create edges I/O * bulk load io * no outputs for final tasks * add recursive glob * fix globbing * oops * delete dags * pin dug to latest release * cruft cleanup * re-org kgx config * add support for multiple initial repos * fix comma * create dir to download to * swap branch and repo * clean up dirs * fix up other pipeline 👌 * add remaining pipelines * adding ctn parser * change merge strategy * merge init fix * debug dir * fix topmed file read * fix topmed file read * return file names as strings * topmed kgx builder custom * topmed kgx builder custom * add skip * get files pattern recursive * version pin avalon * pin dug --------- Co-authored-by: braswent * Radx pipeline (#99) * point to large download * fix schema path * debug bulk input dir * fix schema read * fix schema read * fix schema read * commenting steup dir for test * adding logs * fix path stuff * add commented stuff back in * testing radx parser * adding parser * skip indexing vars with no id * adding indexes as part of bulk loader paramters * fix id index cli arg * fix local cli * dug latest --------- Co-authored-by: Nathan Braswell Co-authored-by: esurface Co-authored-by: braswent Co-authored-by: Michael T. Bacon Co-authored-by: Michael T Bacon <110547969+mbacon-renci@users.noreply.github.com> * pin avalon --------- Co-authored-by: Nathan Braswell Co-authored-by: esurface Co-authored-by: braswent Co-authored-by: Howard Lander Co-authored-by: Michael T. Bacon Co-authored-by: Michael T Bacon <110547969+mbacon-renci@users.noreply.github.com> --- .env | 6 +- .gitignore | 2 + Dockerfile | 10 +- Jenkinsfile | 32 +- dags/annotate.py | 103 -- dags/annotate_and_index.py | 44 + dags/dug_helpers/dug_utils.py | 201 ++-- dags/index_dag.py | 28 - dags/knowledge_graph_build.py | 102 ++ dags/roger/config/__init__.py | 42 +- dags/roger/config/config.yaml | 17 +- dags/roger/config/dev-config.yaml | 2 +- dags/roger/core/base.py | 36 +- dags/roger/core/bulkload.py | 71 +- dags/roger/core/storage.py | 200 ++-- dags/roger/models/kgx.py | 55 +- dags/roger/pipelines/README.md | 99 ++ dags/roger/pipelines/__init__.py | 28 + dags/roger/pipelines/anvil.py | 24 + dags/roger/pipelines/bacpac.py | 8 + dags/roger/pipelines/base.py | 964 ++++++++++++++++++ dags/roger/pipelines/bdc.py | 19 + dags/roger/pipelines/crdc.py | 19 + dags/roger/pipelines/ctn.py | 10 + dags/roger/pipelines/db_gap.py | 10 + .../roger/pipelines/heal_research_programs.py | 16 + dags/roger/pipelines/heal_studies.py | 16 + dags/roger/pipelines/kfdrc.py | 19 + dags/roger/pipelines/nida.py | 18 + dags/roger/pipelines/radx.py | 8 + dags/roger/pipelines/sparc.py | 17 + dags/roger/pipelines/topmed.py | 41 + dags/roger/tasks.py | 393 ++++++- dags/tranql_translate.py | 43 - requirements.txt | 5 +- 35 files changed, 2238 insertions(+), 470 deletions(-) delete mode 100755 dags/annotate.py create mode 100644 dags/annotate_and_index.py delete mode 100755 dags/index_dag.py create mode 100644 dags/knowledge_graph_build.py create mode 100644 dags/roger/pipelines/README.md create mode 100644 dags/roger/pipelines/__init__.py create mode 100644 dags/roger/pipelines/anvil.py create mode 100644 dags/roger/pipelines/bacpac.py create mode 100644 dags/roger/pipelines/base.py create mode 100644 dags/roger/pipelines/bdc.py create mode 100644 dags/roger/pipelines/crdc.py create mode 100644 dags/roger/pipelines/ctn.py create mode 100644 dags/roger/pipelines/db_gap.py create mode 100644 dags/roger/pipelines/heal_research_programs.py create mode 100644 dags/roger/pipelines/heal_studies.py create mode 100644 dags/roger/pipelines/kfdrc.py create mode 100644 dags/roger/pipelines/nida.py create mode 100644 dags/roger/pipelines/radx.py create mode 100644 dags/roger/pipelines/sparc.py create mode 100644 dags/roger/pipelines/topmed.py delete mode 100755 dags/tranql_translate.py diff --git a/.env b/.env index 9e2ba0e3..2b42e8d7 100644 --- a/.env +++ b/.env @@ -9,9 +9,9 @@ DATA_DIR=./local_storage DUG_LOG_LEVEL=INFO -ELASTIC_PASSWORD=12345 -ELASTIC_API_HOST=elasticsearch -ELASTIC_USERNAME=elastic +ELASTICSEARCH_PASSWORD=12345 +ELASTICSEARCH_HOST=elasticsearch +ELASTICSEARCH_USERNAME=elastic NBOOST_API_HOST=nboost diff --git a/.gitignore b/.gitignore index 8ca8ea91..6c46fe7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ # Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore +.secret-env +.vscode/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/Dockerfile b/Dockerfile index 5556709c..49c1fd26 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,9 @@ -FROM apache/airflow:2.5.0-python3.10 +FROM apache/airflow:2.7.2-python3.11 + USER root RUN apt-get update && \ - apt-get install -y git gcc python3-dev nano vim + apt-get install -y git nano vim COPY requirements.txt requirements.txt USER airflow -# dependency resolution taking hours eventually failing, -# @TODO fix click lib dependency -RUN pip install -r requirements.txt && \ - pip uninstall -y elasticsearch-dsl +RUN pip install -r requirements.txt RUN rm -f requirements.txt diff --git a/Jenkinsfile b/Jenkinsfile index 3372b9e0..a68ba92b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -70,31 +70,15 @@ spec: steps { script { container(name: 'kaniko', shell: '/busybox/sh') { - kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2", "$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"]) + if (env.BRANCH_NAME == "main") { + // Tag with latest and version iff when pushed to master + kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2", "$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"]) + } else { + kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2"]) + } } } - } - // post { - // always { - // archiveArtifacts artifacts: 'image.tar', onlyIfSuccessful: true - // } - // } - } - // stage('Publish') { - // steps { - // script { - // container(name: 'crane', shell: '/busybox/sh') { - // def imageTagsPushAlways = ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2"] - // def imageTagsPushForDevelopBranch = ["$IMAGE_NAME:$TAG3"] - // def imageTagsPushForMasterBranch = ["$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"] - // image.publish( - // imageTagsPushAlways, - // imageTagsPushForDevelopBranch, - // imageTagsPushForMasterBranch - // ) - // } - // } - // } - // } + } + } } } diff --git a/dags/annotate.py b/dags/annotate.py deleted file mode 100755 index e0d4a2e4..00000000 --- a/dags/annotate.py +++ /dev/null @@ -1,103 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from dug_helpers.dug_utils import ( - DugUtil, - get_topmed_files, - get_dbgap_files, - get_nida_files, - get_sparc_files, - get_anvil_files, - get_cancer_data_commons_files, - get_kids_first_files, - get_sprint_files, - get_bacpac_files, - get_heal_study_files, - get_heal_research_program_files - ) -from roger.tasks import default_args, create_python_task - -DAG_ID = 'annotate_dug' - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id=DAG_ID, - default_args=default_args, - schedule_interval=None -) as dag: - - """Build workflow tasks.""" - intro = EmptyOperator(task_id='Intro', dag=dag) - - # Unzip and get files, avoid this because - # 1. it takes a bit of time making the dag itself, webserver hangs - # 2. Every task in this dag would still need to execute this part making it redundant - # 3. tasks like intro would fail because they don't have the data dir mounted. - - make_kg_tagged = create_python_task(dag, "make_tagged_kgx", DugUtil.make_kg_tagged) - - dummy_stepover = EmptyOperator(task_id="continue") - - #intro >> run_printlog - envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed") - data_sets = envspec.split(",") - - clear_annotation_items = create_python_task(dag, "clear_annotation_files", DugUtil.clear_annotation_cached) - - for i, data_set in enumerate(data_sets): - annotate_files = None - if data_set.startswith("bdc"): - prepare_files = create_python_task(dag, "get_dbgap_data", get_dbgap_files) - annotate_files = create_python_task(dag, "annotate_db_gap_files", DugUtil.annotate_db_gap_files) - elif data_set.startswith("nida"): - prepare_files = create_python_task(dag, "get_nida_files", get_nida_files) - annotate_files = create_python_task(dag, "annotate_nida_files", DugUtil.annotate_nida_files) - elif data_set.startswith("sparc"): - prepare_files = create_python_task(dag, "get_sparc_files", get_sparc_files) - annotate_files = create_python_task(dag, "annotate_sparc_files", DugUtil.annotate_sparc_files) - elif data_set.startswith("topmed"): - prepare_files = create_python_task(dag, "get_topmed_data", get_topmed_files) - annotate_files = create_python_task(dag, "annotate_topmed_files", DugUtil.annotate_topmed_files) - elif data_set.startswith("anvil"): - prepare_files = create_python_task(dag, "get_anvil_data", get_anvil_files) - annotate_files = create_python_task(dag, "annotate_anvil_files", DugUtil.annotate_anvil_files) - elif data_set.startswith("crdc"): - prepare_files = create_python_task(dag, "get_cancer_commons_files", get_cancer_data_commons_files) - annotate_files = create_python_task(dag, "annotate_cancer_commons_files", - DugUtil.annotate_cancer_commons_files) - elif data_set.startswith("kfdrc"): - prepare_files = create_python_task(dag, "get_kids_first_files", get_kids_first_files) - annotate_files = create_python_task(dag, "annotate_kids_first_files", - DugUtil.annotate_kids_first_files) - elif data_set.startswith("sprint"): - prepare_files = create_python_task(dag, "get_sprint_files", get_sprint_files) - annotate_files = create_python_task(dag, "annotate_sprint_files", - DugUtil.annotate_sprint_files) - elif data_set.startswith("bacpac"): - prepare_files = create_python_task(dag, "get_bacpac_files", get_bacpac_files) - annotate_files = create_python_task(dag, "annotate_bacpac_files", - DugUtil.annotate_bacpac_files) - - elif data_set.startswith("heal-studies"): - prepare_files = create_python_task(dag, "get_heal_study_files", get_heal_study_files) - annotate_files = create_python_task(dag, "annotate_heal_study_files", - DugUtil.annotate_heal_study_files) - # elif data_set.startswith("heal-mds-imports"): - # prepare_files = create_python_task(dag, "get_heal_mds_imports", get_heal_study_files) - - elif data_set.startswith("heal-research-programs"): - prepare_files = create_python_task(dag, "get_heal_research_program_files", get_heal_research_program_files) - annotate_files = create_python_task(dag, "annotate_heal_research_program_files", - DugUtil.annotate_heal_research_program_files) - - - intro >> prepare_files - prepare_files >> clear_annotation_items - - if annotate_files: - clear_annotation_items >> annotate_files - annotate_files >> dummy_stepover - - dummy_stepover >> make_kg_tagged diff --git a/dags/annotate_and_index.py b/dags/annotate_and_index.py new file mode 100644 index 00000000..884cd149 --- /dev/null +++ b/dags/annotate_and_index.py @@ -0,0 +1,44 @@ +"""DAG which performs Dug annotate and index operations + +This DAG differes slightly from prior versions of the same functionality in +Roger not only in that the annotation and indexing happen in the same DAG, but +also those tasks are broken out into sub-DAGs organized by dataset. Each dataset +has a subdag for all tasks. +""" + +import os + +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +from roger.tasks import default_args, create_pipeline_taskgroup + +env_enabled_datasets = os.getenv( + "ROGER_DUG__INPUTS_DATA__SETS", "topmed,anvil").split(",") + +with DAG( + dag_id='annotate_and_index', + default_args=default_args, + schedule_interval=None +) as dag: + init = EmptyOperator(task_id="init", dag=dag) + finish = EmptyOperator(task_id="finish", dag=dag) + + from roger import pipelines + from roger.config import config + envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed:v2.0") + data_sets = envspec.split(",") + pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in data_sets} + for pipeline_class in pipelines.get_pipeline_classes(pipeline_names): + # Only use pipeline classes that are in the enabled datasets list and + # that have a properly defined pipeline_name attribute + + # TODO + # Overriding environment variable just to see if this is working. + # name = getattr(pipeline_class, 'pipeline_name', '*not defined*') + # if not name in env_enabled_datasets: + # continue + + # Do the thing to add the pipeline's subdag to the dag in the right way + # . . . + + init >> create_pipeline_taskgroup(dag, pipeline_class, config) >> finish diff --git a/dags/dug_helpers/dug_utils.py b/dags/dug_helpers/dug_utils.py index 4d417e04..52db9624 100644 --- a/dags/dug_helpers/dug_utils.py +++ b/dags/dug_helpers/dug_utils.py @@ -11,8 +11,9 @@ from typing import Union, List import requests -from dug.core import get_parser, get_plugin_manager, DugConcept -from dug.core.annotate import DugAnnotator, ConceptExpander +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.annotators._base import Annotator +from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory from dug.core.parsers import Parser, DugElement @@ -44,7 +45,7 @@ def __init__(self, config: RogerConfig, to_string=True): self.string_handler = logging.StreamHandler(self.log_stream) log.addHandler(self.string_handler) - self.annotator: DugAnnotator = self.factory.build_annotator() + self.annotator_name: str = config.annotation.annotator_type self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() @@ -85,7 +86,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): log.error(f"{exc_val} {exc_val} {exc_tb}") log.exception("Got an exception") - def annotate_files(self, parser_name, parsable_files): + def annotate_files(self, parser_name, parsable_files, + output_data_path=None): """ Annotates a Data element file using a Dug parser. :param parser_name: Name of Dug parser to use. @@ -94,14 +96,16 @@ def annotate_files(self, parser_name, parsable_files): """ dug_plugin_manager = get_plugin_manager() parser: Parser = get_parser(dug_plugin_manager.hook, parser_name) - output_base_path = storage.dug_annotation_path('') + annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf()) + if not output_data_path: + output_data_path = storage.dug_annotation_path('') log.info("Parsing files") for parse_file in parsable_files: log.debug("Creating Dug Crawler object") crawler = Crawler( crawl_file=parse_file, parser=parser, - annotator=self.annotator, + annotator=annotator, tranqlizer='', tranql_queries=[], http_session=self.cached_session @@ -109,7 +113,7 @@ def annotate_files(self, parser_name, parsable_files): # configure output space. current_file_name = '.'.join(os.path.basename(parse_file).split('.')[:-1]) - elements_file_path = os.path.join(output_base_path, current_file_name) + elements_file_path = os.path.join(output_data_path, current_file_name) elements_file_name = 'elements.pickle' concepts_file_name = 'concepts.pickle' @@ -566,141 +570,224 @@ def clear_annotation_cached(config=None, to_string=False): dug.cached_session.cache.clear() @staticmethod - def annotate_db_gap_files(config=None, to_string=False, files=None): + def annotate_db_gap_files(config=None, to_string=False, input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_dd_xml_objects() + if not input_data_path: + files = storage.dug_dd_xml_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "DbGaP" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_anvil_files(config=None, to_string=False, files=None): + def annotate_anvil_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_anvil_objects() + if not input_data_path: + files = storage.dug_anvil_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "Anvil" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_cancer_commons_files(config=None, to_string=False, files=None): + def annotate_cancer_commons_files(config=None, to_string=False, + input_data_path=None, + output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_crdc_objects() + if not input_data_path: + files = storage.dug_crdc_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "crdc" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_kids_first_files(config=None, to_string=False, files=None): + def annotate_kids_first_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_kfdrc_objects() + if not input_data_path: + files = storage.dug_kfdrc_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "kfdrc" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_nida_files(config=None, to_string=False, files=None): + def annotate_nida_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_nida_objects() + if not input_data_path: + files = storage.dug_nida_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "NIDA" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_sparc_files(config=None, to_string=False, files=None): + def annotate_sparc_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_sparc_objects() + if not input_data_path: + files = storage.dug_sparc_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "SciCrunch" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_sprint_files(config=None, to_string=False, files=None): + def annotate_sprint_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_sprint_objects() + if not input_data_path: + files = storage.dug_sprint_objects( + input_data_path=input_data_path) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "SPRINT" dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_topmed_files(config=None, to_string=False, files=None): + def annotate_topmed_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_topmed_objects() + if not input_data_path: + files = storage.dug_topmed_objects( + input_data_path=None) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "TOPMedTag" log.info(files) dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_bacpac_files(config=None, to_string=False, files=None): + def annotate_bacpac_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): + + log.info(f"Input data path is: {input_data_path}") with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_bacpac_objects() + files = storage.dug_bacpac_objects( + input_data_path=input_data_path) + parser_name = "BACPAC" log.info(files) dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_heal_study_files(config=None, to_string=False, files=None): + def annotate_heal_study_files(config=None, to_string=False, + input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_heal_study_objects() + if not input_data_path: + files = storage.dug_heal_study_objects( + input_data_path=None) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "heal-studies" log.info(files) dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def annotate_heal_research_program_files(config=None, to_string=False, files=None): + def annotate_heal_research_program_files(config=None, to_string=False, + input_data_path=None, + output_data_path=None): with Dug(config, to_string=to_string) as dug: - if files is None: - files = storage.dug_heal_research_program_objects() - + if not input_data_path: + files = storage.dug_heal_research_program_objects( + input_data_path=None) + else: + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) parser_name = "heal-research" log.info(files) dug.annotate_files(parser_name=parser_name, - parsable_files=files) + parsable_files=files, + output_data_path=output_data_path) output_log = dug.log_stream.getvalue() if to_string else '' return output_log @staticmethod - def make_kg_tagged(config=None, to_string=False): + def make_kg_tagged(config=None, to_string=False, input_data_path=None, output_data_path=None): with Dug(config, to_string=to_string) as dug: - output_base_path = storage.dug_kgx_path("") - storage.clear_dir(output_base_path) + output_base_path = output_data_path + if not output_data_path: + output_base_path = storage.dug_kgx_path("") + storage.clear_dir(output_base_path) log.info("Starting building KGX files") - elements_files = storage.dug_elements_objects() + if not input_data_path: + elements_files = storage.dug_elements_objects() + else: + import glob + glob_pattern = str(input_data_path / "**" / 'elements.pickle') + elements_files = glob.glob(glob_pattern, recursive=True) + log.info(f"making kgx files for the following pickles: {elements_files}") for file in elements_files: elements = storage.read_object(file) if "topmed_" in file: @@ -930,5 +1017,3 @@ def get_heal_study_files(config: RogerConfig, to_string=False) -> List[str]: def get_heal_research_program_files(config: RogerConfig, to_string=False) -> List[str]: return get_versioned_files(config, "heal-research", "heal-research-programs", data_store=config.dug_inputs.data_source, unzip=True) - - diff --git a/dags/index_dag.py b/dags/index_dag.py deleted file mode 100755 index 6ddc4b02..00000000 --- a/dags/index_dag.py +++ /dev/null @@ -1,28 +0,0 @@ -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from roger.tasks import get_executor_config, default_args, create_python_task -from dug_helpers.dug_utils import DugUtil - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='index_dug', - default_args=default_args, - schedule_interval=None -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - index_variables = create_python_task (dag, "IndexVariables", DugUtil.index_variables) - validate_index_variables = create_python_task(dag,"ValidateIndexVariables", DugUtil.validate_indexed_variables) - crawl_tags = create_python_task(dag, "CrawlConcepts", DugUtil.crawl_tranql) - index_concepts = create_python_task(dag, "IndexConcepts", DugUtil.index_concepts) - dummy_stepover = EmptyOperator(task_id="continue") - index_extracted_dug_elements = create_python_task(dag, "IndexExtractedElements", DugUtil.index_extracted_elements) - validate_index_concepts = create_python_task(dag, "ValidateIndexConcepts", DugUtil.validate_indexed_concepts) - finish = EmptyOperator(task_id='Finish') - """ Build the DAG. """ - intro >> index_variables >> validate_index_variables >> finish - intro >> crawl_tags >> index_concepts >> dummy_stepover - intro >> crawl_tags >> index_extracted_dug_elements >> dummy_stepover - dummy_stepover >> validate_index_concepts >> finish \ No newline at end of file diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py new file mode 100644 index 00000000..de28aa78 --- /dev/null +++ b/dags/knowledge_graph_build.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# + +""" +An Airflow workflow for the Roger Translator KGX data pipeline. +""" + +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +import roger +from roger.tasks import default_args, create_python_task +from roger.config import config + +""" Build the workflow's tasks and DAG. """ +with DAG( + dag_id='knowledge_graph_build', + default_args=default_args, + schedule_interval=None +) as dag: + + """ Build the workflow tasks. """ + intro = EmptyOperator(task_id='Intro') + + # Merge nodes needs inputs from two sources + # 1. baseline and/or CDE KGX files from LakeFS (External repo) + # 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo + + # build the annotate and index pipeline output locations + #lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/ + working_repo = config.lakefs_config.repo + branch = config.lakefs_config.branch + kgx_repos = config.kgx.data_sets + input_repos = [{ + 'name': repo.split(':')[0], + 'branch': repo.split(':')[1], + 'path': '*' + } for repo in kgx_repos] + + # Figure out a way to extract paths + get_path_on_lakefs = lambda d: f"annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" + + + for dataset in config.dug_inputs.data_sets: + dataset_name = dataset.split(":")[0] + # add datasets from the other pipeline + input_repos.append( + { + 'name': working_repo, + 'branch': branch, + 'path': get_path_on_lakefs(dataset_name) + } + ) + + merge_nodes = create_python_task (dag, name="MergeNodes", + a_callable=roger.merge_nodes, + external_repos=input_repos + ) + + # The rest of these guys can just operate on the local lakefs repo/branch + # we need to add input dir and output dir similar to what we did for dug tasks + + create_nodes_schema = create_python_task(dag, + name="CreateNodesSchema", + a_callable=roger.create_nodes_schema + ) + create_edges_schema = create_python_task(dag, + name="CreateEdgesSchema", + a_callable=roger.create_edges_schema) + + create_bulk_load_nodes = create_python_task(dag, + name="CreateBulkLoadNodes", + a_callable=roger.create_bulk_nodes) + create_bulk_load_edges = create_python_task(dag, + name="CreateBulkLoadEdges", + a_callable=roger.create_bulk_edges) + bulk_load = create_python_task(dag, + name="BulkLoad", + a_callable=roger.bulk_load, + no_output_files=True) + check_tranql = create_python_task(dag, + name="CheckTranql", + a_callable=roger.check_tranql, + no_output_files=True) + validate = create_python_task(dag, + name="Validate", + a_callable=roger.validate, + no_output_files=True) + + + """ Build the DAG. """ + merge_nodes.set_upstream(intro) + create_nodes_schema.set_upstream(merge_nodes) + create_edges_schema.set_upstream(merge_nodes) + create_bulk_load_nodes.set_upstream(create_nodes_schema) + create_bulk_load_nodes.set_upstream(merge_nodes) + create_bulk_load_edges.set_upstream(create_edges_schema) + create_bulk_load_edges.set_upstream(merge_nodes) + bulk_load.set_upstream(create_bulk_load_nodes) + bulk_load.set_upstream(create_bulk_load_edges) + validate.set_upstream(bulk_load) + check_tranql.set_upstream(bulk_load) + diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 403b25b1..ac9eb23a 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -26,6 +26,21 @@ def __post_init__(self): self.port = int(self.port) +@dataclass +class LakefsConfig(DictLike): + host: str + access_key_id: str + secret_access_key: str + branch: str + repo: str + enabled: bool = False + + def __post_init__(self): + if isinstance(self.enabled, str): + self.enabled = self.enabled.lower() == "true" + + + @dataclass class LoggingConfig(DictLike): level: str = "DEBUG" @@ -35,10 +50,8 @@ class LoggingConfig(DictLike): @dataclass class KgxConfig(DictLike): biolink_model_version: str = "1.5.0" - dataset_version: str = "v1.0" - merge_db_id: int = 1 merge_db_temp_dir: str = "workspace" - data_sets: List = field(default_factory=lambda: ['baseline-graph']) + data_sets: List = field(default_factory=lambda: ['baseline-graph:v5.0']) def __post_init__(self): # Convert strings to list. In cases where this is passed as env variable with a single value @@ -77,7 +90,18 @@ class BulkLoaderConfig(DictLike): @dataclass class AnnotationConfig(DictLike): - annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: str = "monarch" + annotator_args: dict = field( + default_factory=lambda: { + "monarch": { + "url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + }, + "sapbert": { + "classification_url": "https://med-nemo.apps.renci.org/annotate/", + "annotator_url": "https://babel-sapbert.apps.renci.org/annotate/", + }, + } + ) normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" synonym_service: str = "https://onto.renci.org/synonyms/" ontology_metadata: str = "https://api.monarchinitiative.org/api/bioentity/" @@ -116,7 +140,7 @@ class IndexingConfig(DictLike): "anat_to_disease": ["anatomical_entity", "disease"], "anat_to_pheno": ["anatomical_entity", "phenotypic_feature"], }) - tranql_endpoint: str = "http://tranql:8081/tranql/query?dynamic_id_resolution=true&asynchronous=false" + tranql_endpoint: str = "http://tranql-service/tranql/query?dynamic_id_resolution=true&asynchronous=false" # by default skips node to element queries node_to_element_queries: dict = field(default_factory=lambda: {}) element_mapping: str = "" @@ -170,6 +194,7 @@ def __init__(self, **kwargs): self.annotation_base_data_uri: str = kwargs.pop("annotation_base_data_uri", "") self.validation = kwargs.pop("validation") self.dag_run = kwargs.pop('dag_run', None) + self.lakefs_config = LakefsConfig(**kwargs.pop("lakefs_config")) def to_dug_conf(self) -> DugConfig: return DugConfig( @@ -183,9 +208,8 @@ def to_dug_conf(self) -> DugConfig: redis_port=self.redisgraph.port, nboost_host=self.elasticsearch.nboost_host, preprocessor=self.annotation.preprocessor, - annotator={ - 'url': self.annotation.annotator, - }, + annotator_type=self.annotation.annotator_type, + annotator_args=self.annotation.annotator_args, normalizer={ 'url': self.annotation.normalizer, }, @@ -328,7 +352,7 @@ def update(new_value: Dict): def __str__(self): flat = flatten(Config.__instance__) for k in flat: - if 'PASSWORD' in k or 'password' in k: + if 'PASSWORD' in k or 'password' in k or 'key' in k.lower(): flat[k] = '******' flat = unflatten(flat) result = json.dumps(flat) diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index 49bc927b..e9402ce4 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -16,11 +16,9 @@ annotation_base_data_uri: https://stars.renci.org/var/dug/ kgx: biolink_model_version: v3.1.2 - dataset_version: v5.0 - merge_db_id: 1 merge_db_temp_dir: workspace data_sets: - - baseline-graph + - baseline-graph:v5.0 dug_inputs: data_source: s3 @@ -44,10 +42,17 @@ bulk_loader: annotation: clear_http_cache: false - annotator: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: monarch + annotator_args: + monarch: + url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + sapbert: + classification_url: "https://med-nemo.apps.renci.org/annotate/" + annotator_url: "https://babel-sapbert.apps.renci.org/annotate/" normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/" + preprocessor: debreviator: BMI: "body mass index" @@ -72,7 +77,7 @@ indexing: "small_molecule_to_disease": ["small_molecule", "disease"] "chemical_mixture_to_disease": ["chemical_mixture", "disease"] "phen_to_anat": ["phenotypic_feature", "anatomical_entity"] - tranql_endpoint: "http://tranql:8081/tranql/query?dynamic_id_resolution=true&asynchronous=false" + tranql_endpoint: "http://tranql-service/tranql/query?dynamic_id_resolution=true&asynchronous=false" node_to_element_queries: enabled: false cde: @@ -157,3 +162,5 @@ lakefs_config: access_key_id: "" secret_access_key: "" host: "" + branch: "" + repo: "" diff --git a/dags/roger/config/dev-config.yaml b/dags/roger/config/dev-config.yaml index 91fc0af3..bece11a8 100644 --- a/dags/roger/config/dev-config.yaml +++ b/dags/roger/config/dev-config.yaml @@ -54,7 +54,7 @@ indexing: "phen_to_anat": ["phenotypic_feature", "anatomical_entity"] "anat_to_disease": ["anatomical_entity", "disease"] "anat_to_pheno": ["anatomical_entity", "phenotypic_feature"] - tranql_endpoint: "http://tranql:8081/tranql/query?dynamic_id_resolution=true&asynchronous=false" + tranql_endpoint: "http://tranql-service/tranql/query?dynamic_id_resolution=true&asynchronous=false" elasticsearch: host: elasticsearch diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 6696f81d..7ba9409a 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -73,62 +73,68 @@ def create_schema(to_string=False, config=None): output = (o1 + o2 ) if to_string else None return output -def create_edges_schema(to_string=False, config=None): +def create_edges_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create edges schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_edges_schema() + roger.kgx.create_edges_schema( + input_data_path=input_data_path, + output_data_path=output_data_path + ) output = roger.log_stream.getvalue() if to_string else None return output -def create_nodes_schema(to_string=False, config=None): +def create_nodes_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create nodes schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_nodes_schema() + roger.kgx.create_nodes_schema(input_data_path=input_data_path, + output_data_path=output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def merge_nodes(to_string=False, config=None): +def merge_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run KGX merge" output = None with Roger (to_string, config=config) as roger: - roger.kgx.merge() + roger.kgx.merge(input_path=input_data_path, output_path=output_data_path) output = roger.log_stream.getvalue () if to_string else None return output -def create_bulk_load(to_string=False, config=None): +def create_bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk load files" o1 = create_bulk_nodes(to_string=to_string, config=config) o2 = create_bulk_edges(to_string=to_string, config=config) output = (o1 + o2) if to_string else None return output -def create_bulk_nodes(to_string=False, config=None): +def create_bulk_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk node CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_nodes_csv_file() + log.info("input path: %s", input_data_path) + log.info("output path: %s", output_data_path) + roger.bulk.create_nodes_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def create_bulk_edges(to_string=False, config=None): +def create_bulk_edges(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create bulk edges CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_edges_csv_file() + roger.bulk.create_edges_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def bulk_load(to_string=False, config=None): +def bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk load insert process" output = None with Roger (to_string, config=config) as roger: - roger.bulk.insert() + roger.bulk.insert(input_data_path=input_data_path) output = roger.log_stream.getvalue () if to_string else None return output -def validate (to_string=False, config=None): +def validate (to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk validate process" output = None with Roger (to_string, config=config) as roger: @@ -136,7 +142,7 @@ def validate (to_string=False, config=None): output = roger.log_stream.getvalue () if to_string else None return output -def check_tranql(to_string=False, config=None): +def check_tranql(to_string=False, config=None, input_data_path=None, output_data_path=None): "Tranql server smoke check" output = None with Roger(to_string, config=config) as roger: diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 1eca92db..0f0fecee 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -33,31 +33,24 @@ def __init__(self, biolink, config=None): self.separator =(chr(separator) if isinstance(separator, int) else separator) - def tables_up_to_date (self): - return storage.is_up_to_date ( - source=[ - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json"), - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json") - ] + storage.merged_objects (), - targets=glob.glob (storage.bulk_path ("nodes/**.csv")) + \ - glob.glob (storage.bulk_path ("edges/**.csv"))) - - def create_nodes_csv_file(self): - if self.tables_up_to_date (): - log.info ("up to date.") - return + def create (self): + """Used in the CLI on args.create_bulk""" + self.create_nodes_csv_file() + self.create_edges_csv_file() + + def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): # clear out previous data - bulk_path = storage.bulk_path("nodes") + bulk_path = storage.bulk_path("nodes", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - categories_schema = storage.read_schema (SchemaType.CATEGORY) + categories_schema = storage.read_schema (SchemaType.CATEGORY, input_data_path) state = defaultdict(lambda: None) log.info(f"processing nodes") """ Write node data for bulk load. """ categories = defaultdict(lambda: []) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merged_objects('nodes', input_data_path) counter = 1 for node in storage.json_line_iter(merged_nodes_file): if not node['category']: @@ -74,7 +67,7 @@ def create_nodes_csv_file(self): f"Showing first 10: {list(category_error_nodes)[:10]}.") # flush every 100K if counter % 100_000 == 0: - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) # reset variables. @@ -83,22 +76,19 @@ def create_nodes_csv_file(self): counter += 1 # write back if any thing left. if len(categories): - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) - def create_edges_csv_file(self): + def create_edges_csv_file(self, input_data_path=None, output_data_path=None): """ Write predicate data for bulk load. """ - if self.tables_up_to_date (): - log.info ("up to date.") - return # Clear out previous data - bulk_path = storage.bulk_path("edges") + bulk_path = storage.bulk_path("edges", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - predicates_schema = storage.read_schema(SchemaType.PREDICATE) + predicates_schema = storage.read_schema(SchemaType.PREDICATE, input_data_path) predicates = defaultdict(lambda: []) - edges_file = storage.merge_path('edges.jsonl') + edges_file = storage.merged_objects('edges', input_data_path) counter = 1 state = {} for edge in storage.json_line_iter(edges_file): @@ -106,14 +96,14 @@ def create_edges_csv_file(self): # write out every 100K , to avoid large predicate dict. if counter % 100_000 == 0: self.write_bulk( - storage.bulk_path("edges"),predicates, predicates_schema, + storage.bulk_path("edges", output_data_path),predicates, predicates_schema, state=state, is_relation=True) predicates = defaultdict(lambda : []) counter += 1 # if there are some items left (if loop ended before counter reached the # specified value) if len(predicates): - self.write_bulk(storage.bulk_path("edges"), predicates, + self.write_bulk(storage.bulk_path("edges", output_data_path), predicates, predicates_schema,state=state, is_relation=True) @staticmethod @@ -316,17 +306,10 @@ def write_bulk(self, bulk_path, obj_map, schema, state={}, state['processed_id'] = processed_objects_id state['called_times'] = called_x_times - def insert (self): - - redisgraph = { - 'host': os.getenv('REDIS_HOST'), - 'port': os.getenv('REDIS_PORT', '6379'), - 'password': os.getenv('REDIS_PASSWORD'), - 'graph': os.getenv('REDIS_GRAPH'), - } + def insert (self, input_data_path=None): redisgraph = self.config.redisgraph - nodes = sorted(glob.glob (storage.bulk_path ("nodes/**.csv*"))) - edges = sorted(glob.glob (storage.bulk_path ("edges/**.csv*"))) + nodes = sorted(glob.glob (storage.bulk_path ("**/nodes/**.csv*", input_data_path), recursive=True)) + edges = sorted(glob.glob (storage.bulk_path ("**/edges/**.csv*", input_data_path), recursive=True)) graph = redisgraph['graph'] log.info(f"bulk loading \n nodes: {nodes} \n edges: {edges}") @@ -340,8 +323,9 @@ def insert (self): log.info ("bulk loading graph: %s", str(graph)) args = [] if len(nodes) > 0: - bulk_path_root = storage.bulk_path('nodes') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/nodes', path=input_data_path), recursive=True)[0] + os.path.sep nodes_with_type = [] + collect_labels = set() for x in nodes: """ These lines prep nodes bulk load by: @@ -350,24 +334,29 @@ def insert (self): """ file_name_type_part = x.replace(bulk_path_root, '').split('.')[0].split('~')[1] all_labels = "biolink." + file_name_type_part + ":" + ":".join([f'biolink.{v.lstrip("biolink:")}' for v in self.biolink.toolkit.get_ancestors("biolink:" + file_name_type_part, reflexive=False, formatted=True )] ) + collect_labels.add("biolink." + file_name_type_part) + for v in self.biolink.toolkit.get_ancestors("biolink:" + file_name_type_part, reflexive=False, + formatted=True): + collect_labels.add(f'biolink.{v.lstrip("biolink:")}') nodes_with_type.append(f"{all_labels} {x}") args.extend(("-N " + " -N ".join(nodes_with_type)).split()) if len(edges) > 0: - bulk_path_root = storage.bulk_path('edges') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/edges', path=input_data_path), recursive=True)[0] + os.path.sep edges_with_type = [f"biolink.{x.replace(bulk_path_root, '').strip(os.path.sep).split('.')[0].split('~')[1]} {x}" for x in edges] # Edge label now no longer has 'biolink:' args.extend(("-R " + " -R ".join(edges_with_type)).split()) args.extend([f"--separator={self.separator}"]) - log.debug(f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}") args.extend([f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}"]) args.extend(['--enforce-schema']) + for lbl in collect_labels: + args.extend([f'-i `{lbl}`:id', f'-f {lbl}:name', f'-f {lbl}:synonyms']) args.extend([f"{redisgraph['graph']}"]) """ standalone_mode=False tells click not to sys.exit() """ log.debug(f"Calling bulk_insert with extended args: {args}") try: bulk_insert(args, standalone_mode=False) - self.add_indexes() + # self.add_indexes() except Exception as e: log.error(f"Unexpected {e.__class__.__name__}: {e}") raise diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 48034b07..c1a3e808 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -85,7 +85,7 @@ def read_object(path, key=None): elif path.endswith(".pickle"): with open(file=path, mode="rb") as stream: obj = pickle.load(stream) - elif path.endswith(".jsonl"): + elif path.endswith(".jsonl") or path.endswith('.txt'): obj = read_data(path) return obj @@ -117,11 +117,11 @@ def write_object (obj, path, key=None): yaml.dump (obj, outfile) elif path.endswith (".json"): with open (path, "w", encoding='utf-8') as stream: - stream.write(str(json.dumps (obj).decode('utf-8'))) + stream.write(str(json.dumps (obj, option=json.OPT_INDENT_2).decode('utf-8'))) elif path.endswith(".pickle"): with open (path, "wb") as stream: pickle.dump(obj, file=stream) - elif path.endswith(".jsonl"): + elif path.endswith(".jsonl") or path.endswith('.txt'): with open (path, "w", encoding="utf-8") as stream: stream.write(obj) else: @@ -152,30 +152,50 @@ def kgx_path(name): :path name: Name of the KGX object. """ return str(ROGER_DATA_DIR / "kgx" / name) -def kgx_objects(format_="json"): +def kgx_objects(format_="json", path=None): """ A list of KGX objects. """ kgx_pattern = kgx_path(f"**.{format_}") - return sorted(glob.glob (kgx_pattern)) + if path: + kgx_pattern = f"{path}/**/*.{format_}" + return sorted(glob.glob (kgx_pattern, recursive=True)) -def merge_path(name): +def merge_path(name, path: Path=None): """ Form a merged KGX object path. :path name: Name of the merged KGX object. """ - return str(ROGER_DATA_DIR / 'merge' / name) - -def merged_objects(): + if path is None: + # create output dir + if not os.path.exists(ROGER_DATA_DIR / 'merge'): + os.makedirs(ROGER_DATA_DIR / 'merge') + return str(ROGER_DATA_DIR / 'merge' / name) + return str(path.joinpath(name)) + +def merged_objects(file_type, path=None): """ A list of merged KGX objects. """ - merged_pattern = merge_path("**.json") - return sorted(glob.glob (merged_pattern)) + if not path: + merged_pattern = merge_path(f"**/{file_type}.jsonl") + else: + merged_pattern = merge_path(f"**/{file_type}.jsonl", path=path) + # this thing should always return one edges or nodes file (based on file_type) + try: + return sorted(glob.glob(merged_pattern, recursive=True))[0] + except IndexError: + raise ValueError(f"Could not find merged KGX of type {file_type} in {merged_pattern}") -def schema_path(name): + +def schema_path(name, path=None): """ Path to a schema object. :param name: Name of the object to get a path for. """ - return str(ROGER_DATA_DIR / 'schema' / name) + if not path: + return str(ROGER_DATA_DIR / 'schema' / name) + return str (path / 'schema' / name) -def bulk_path(name): +def bulk_path(name, path=None): """ Path to a bulk load object. :param name: Name of the object. """ - return str(ROGER_DATA_DIR / 'bulk' / name) + if not path: + return str(ROGER_DATA_DIR / 'bulk' / name) + else: + return str(path / name) def metrics_path(name): """ @@ -194,10 +214,14 @@ def dug_annotation_path(name): def dug_expanded_concepts_path(name): return str(ROGER_DATA_DIR / 'dug' / 'expanded_concepts' / name) -def dug_expanded_concept_objects(): - file_pattern = dug_expanded_concepts_path( - os.path.join('*','expanded_concepts.pickle')) - return sorted(glob.glob(file_pattern)) +def dug_expanded_concept_objects(data_path=None, format="pickle"): + "Return a list of files containing expaneded concept objects" + if data_path: + file_pattern = os.path.join(data_path, '**', f'expanded_concepts.{format}') + else: + file_pattern = dug_expanded_concepts_path( + os.path.join('*',f'expanded_concepts.{format}')) + return sorted(glob.glob(file_pattern, recursive=True)) def dug_extracted_elements_objects(): file_pattern = dug_expanded_concepts_path( @@ -212,17 +236,25 @@ def dug_kgx_objects(): dug_kgx_pattern = dug_kgx_path("**.json") return sorted(glob.glob(dug_kgx_pattern)) -def dug_concepts_objects(): +def dug_concepts_objects(data_path, format="pickle"): """ A list of dug annotation Objects. """ - concepts_file_path = dug_annotation_path( - os.path.join('*','concepts.pickle')) - return sorted(glob.glob(concepts_file_path)) + if not data_path: + concepts_file_path = dug_annotation_path( + os.path.join('*',f'concepts.{format}')) + else: + concepts_file_path = os.path.join( + data_path, '**', f'concepts.{format}') + return sorted(glob.glob(concepts_file_path, recursive=True)) -def dug_elements_objects(): +def dug_elements_objects(data_path=None, format='pickle'): """ A list of dug annotation Objects. """ - concepts_file_path = dug_annotation_path( - os.path.join('*', 'elements.pickle')) - return sorted(glob.glob(concepts_file_path)) + if not data_path: + concepts_file_pattern = dug_annotation_path( + os.path.join('*', f'elements.{format}')) + else: + concepts_file_pattern = os.path.join( + data_path, '**', f'elements.{format}') + return sorted(glob.glob(concepts_file_pattern, recursive=True)) def dug_input_files_path(name) -> pathlib.Path: path = ROGER_DATA_DIR / "dug" / "input_files" / name @@ -233,22 +265,13 @@ def dug_input_files_path(name) -> pathlib.Path: log.info(f"Input file path: {path} already exists") return path -def dug_topmed_path(name): - """ Topmed source files""" - return dug_input_files_path('topmed') / name - -def dug_topmed_objects(): - topmed_file_pattern = str(dug_topmed_path("topmed_*.csv")) +def dug_topmed_objects(input_data_path=None): + "Return list of TOPMed source files" + if not input_data_path: + input_data_path = str(dug_input_files_path('topmed')) + topmed_file_pattern = os.path.join(input_data_path, "topmed_*.csv") return sorted(glob.glob(topmed_file_pattern)) -def dug_nida_path(name): - """ NIDA source files""" - return dug_input_files_path('nida') / name - -def dug_sparc_path(name): - """ NIDA source files""" - return dug_input_files_path('sparc') / name - def dug_anvil_path(): """Anvil source files""" return dug_input_files_path('anvil') @@ -281,62 +304,78 @@ def dug_kfdrc_path(): """Anvil source files""" return dug_input_files_path('kfdrc') -def dug_nida_objects(): - nida_file_pattern = str(dug_nida_path("NIDA-*.xml")) +def dug_nida_objects(input_data_path=None): + "Return list of NIDA source files" + if not input_data_path: + input_data_path = str(dug_input_files_path('nida')) + nida_file_pattern = os.path.join(input_data_path, "NIDA-*.xml") return sorted(glob.glob(nida_file_pattern)) -def dug_sparc_objects(): - file_pattern = str(dug_sparc_path("scicrunch/*.xml")) +def dug_sparc_objects(input_data_path=None): + if not input_data_path: + input_data_path = str(dug_input_files_path('sparc')) + file_pattern = os.path.join(input_data_path, "scicrunch/*.xml") return sorted(glob.glob(file_pattern)) -def dug_anvil_objects(): - file_path = dug_anvil_path() +def dug_anvil_objects(input_data_path=None): + if not input_data_path: + input_data_path = dug_anvil_path() files = get_files_recursive( lambda file_name: ( not file_name.startswith('GapExchange_') and file_name.endswith('.xml')), - file_path) + input_data_path) return sorted([str(f) for f in files]) -def dug_sprint_objects(): - file_path = dug_sprint_path() +def dug_sprint_objects(input_data_path=None): + if not input_data_path: + input_data_path = dug_sprint_path() files = get_files_recursive( - lambda file_name: file_name.endswith('.xml'), file_path) + lambda file_name: file_name.endswith('.xml'), input_data_path) return sorted([str(f) for f in files]) -def dug_bacpac_objects(): - file_path = dug_bacpac_path() +def dug_bacpac_objects(input_data_path=None): + "Return list of BACPAC source files" + if not input_data_path: + input_data_path = dug_bacpac_path() files = get_files_recursive( - lambda file_name: file_name.endswith('.xml'), file_path) + lambda file_name: file_name.endswith('.xml'), input_data_path) return sorted([str(f) for f in files]) -def dug_crdc_objects(): - file_path = dug_crdc_path() +def dug_crdc_objects(input_data_path=None): + if not input_data_path: + input_data_path = dug_crdc_path() files = get_files_recursive( lambda file_name: ( not file_name.startswith('GapExchange_') and file_name.endswith('.xml')), - file_path) + input_data_path) return sorted([str(f) for f in files]) -def dug_heal_study_objects(): - file_path = dug_heal_study_path() - files = get_files_recursive(lambda file_name : file_name.endswith('.xml'), file_path) +def dug_heal_study_objects(input_data_path=None): + "Return list of HEAL study source files" + if not input_data_path: + input_data_path = dug_heal_study_path() + files = get_files_recursive(lambda file_name : file_name.endswith('.xml'), + input_data_path) return sorted([str(f) for f in files]) -def dug_heal_research_program_objects(): - file_path = dug_heal_research_program_path() - files = get_files_recursive(lambda file_name : file_name.endswith('.xml'), file_path) +def dug_heal_research_program_objects(input_data_path=None): + "Return list of HEAL research program source files" + if not input_data_path: + input_data_path = dug_heal_research_program_path() + files = get_files_recursive(lambda file_name : file_name.endswith('.xml'), + input_data_path) return sorted([str(f) for f in files]) - -def dug_kfdrc_objects(): - file_path = dug_kfdrc_path() +def dug_kfdrc_objects(input_data_path=None): + if not input_data_path: + input_data_path = dug_kfdrc_path() files = get_files_recursive( lambda file_name: ( not file_name.startswith('GapExchange_') and file_name.endswith('.xml')), - file_path) + input_data_path) return sorted([str(f) for f in files]) @@ -356,23 +395,26 @@ def get_files_recursive(file_name_filter, current_dir): file_paths += [child] return file_paths -def dug_dd_xml_objects(): - file_path = dug_dd_xml_path() +def dug_dd_xml_objects(input_data_path=None): + if not input_data_path: + input_data_path = dug_dd_xml_path() files = get_files_recursive( lambda file_name: ( not file_name.startswith('._') and file_name.endswith('.xml')), - file_path) + input_data_path) return sorted([str(f) for f in files]) def copy_file_to_dir(file_location, dir_name): return shutil.copy(file_location, dir_name) -def read_schema (schema_type: SchemaType): +def read_schema (schema_type: SchemaType, path=None): """ Read a schema object. :param schema_type: Schema type of the object to read. """ - path = schema_path (f"{schema_type.value}-schema.json") - return read_object (path) + if path is not None: + path = path / '**' + location = glob.glob(schema_path (f"{schema_type.value}-schema.json", path=path), recursive=True)[0] + return read_object (location) def get_uri (path, key): """ Build a URI. @@ -392,17 +434,7 @@ def read_relative_object (path): def trunc(text, limit): return ('..' + text[-limit-2:]) if len(text) > limit else text -def is_up_to_date (source, targets): - target_time_list = [ - os.stat (f).st_mtime for f in targets if os.path.exists(f)] - if len(target_time_list) == 0: - log.debug (f"no targets found") - return False - source = [ os.stat (f).st_mtime for f in source if os.path.exists (f) ] - if len(source) == 0: - log.debug ("no source found. up to date") - return True - return max(source) < min(target_time_list) + def json_line_iter(jsonl_file_path): f = open(file=jsonl_file_path, mode='r', encoding='utf-8') diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index a8976a00..e80e65db 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -8,9 +8,8 @@ from collections import defaultdict from xxhash import xxh64_hexdigest import orjson as json -import redis import ntpath -from kg_utils.merging import GraphMerger, MemoryGraphMerger, DiskGraphMerger +from kg_utils.merging import DiskGraphMerger from kg_utils.constants import * from roger.config import get_default_config @@ -43,18 +42,11 @@ def __init__(self, biolink=None, config=None): self.merger = DiskGraphMerger(temp_directory=self.temp_directory, chunk_size=5_000_000) self.biolink_version = self.config.kgx.biolink_model_version - self.merge_db_id = self.config.kgx.merge_db_id - self.merge_db_name = f'db{self.merge_db_id}' log.debug(f"Trying to get biolink version : {self.biolink_version}") if biolink is None: self.biolink = BiolinkModel(self.biolink_version) else: self.biolink = biolink - self.redis_conn = redis.Redis( - host=self.config.redisgraph.host, - port=self.config.redisgraph.port, - password=self.config.redisgraph.password, - db=self.merge_db_id) self.enable_metrics = self.config.get('enable_metrics', False) def get_kgx_json_format(self, files: list, dataset_version: str): @@ -301,7 +293,7 @@ def fetch_dug_kgx(self): log.info("Done copying dug KGX files.") return all_kgx_files - def create_nodes_schema(self): + def create_nodes_schema(self, input_data_path=None, output_data_path=None): """ Extracts schema for nodes based on biolink leaf types :return: @@ -309,7 +301,7 @@ def create_nodes_schema(self): category_schemas = defaultdict(lambda: None) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merged_objects("nodes", input_data_path) log.info(f"Processing : {merged_nodes_file}") counter = 0 for node in storage.json_line_iter(merged_nodes_file): @@ -356,15 +348,15 @@ def create_nodes_schema(self): f"These will be treated as {BiolinkModel.root_type}.") # Write node schemas. - self.write_schema(category_schemas, SchemaType.CATEGORY) + self.write_schema(category_schemas, SchemaType.CATEGORY, output_path=output_data_path) - def create_edges_schema(self): + def create_edges_schema(self, input_data_path=None, output_data_path=None): """ Create unified schema for all edges in an edges jsonl file. :return: """ predicate_schemas = defaultdict(lambda: None) - merged_edges_file = storage.merge_path("edges.jsonl") + merged_edges_file = storage.merged_objects("edges", input_data_path) """ Infer predicate schemas. """ for edge in storage.json_line_iter(merged_edges_file): predicate = edge['predicate'] @@ -378,7 +370,7 @@ def create_edges_schema(self): previous_type = predicate_schemas[predicate][k] predicate_schemas[predicate][k] = compare_types( previous_type, current_type) - self.write_schema(predicate_schemas, SchemaType.PREDICATE) + self.write_schema(predicate_schemas, SchemaType.PREDICATE, output_path=output_data_path) def create_schema (self): """Determine the schema of each type of object. @@ -403,31 +395,40 @@ def schema_up_to_date (self): f"{SchemaType.PREDICATE.value}-schema.json") ]) - def write_schema(self, schema, schema_type: SchemaType): + def write_schema(self, schema, schema_type: SchemaType ,output_path=None): """ Output the schema file. :param schema: Schema to get keys from. :param schema_type: Type of schema to write. """ - file_name = storage.schema_path (f"{schema_type.value}-schema.json") + file_name = storage.schema_path (f"{schema_type.value}-schema.json", output_path) log.info("writing schema: %s", file_name) dictionary = { k : v for k, v in schema.items () } storage.write_object (dictionary, file_name) - def merge(self): + def merge(self, input_path=None, output_path=None): """ This version uses the disk merging from the kg_utils module """ - data_set_version = self.config.get('kgx', {}).get('dataset_version') + metrics = {} start = time.time() - json_format_files = storage.kgx_objects("json") - jsonl_format_files = storage.kgx_objects("jsonl") + + log.info(f"Input path = {input_path}, Output path = {output_path}") + + if input_path: + json_format_files = storage.kgx_objects("json", input_path) + jsonl_format_files = storage.kgx_objects("jsonl", input_path) + else: + json_format_files = storage.kgx_objects("json") + jsonl_format_files = storage.kgx_objects("jsonl") # Create lists of the nodes and edges files in both json and jsonl # formats jsonl_node_files = {file for file in jsonl_format_files - if "node" in file} + if "node" in file.split('/')[-1]} jsonl_edge_files = {file for file in jsonl_format_files - if "edge" in file} + if "edge" in file.split('/')[-1]} + log.info(f"Jsonl edge files : {jsonl_edge_files}") + log.info(f"Jsonl node files : {jsonl_node_files}") # Create all the needed iterators and sets thereof jsonl_node_iterators = [storage.jsonl_iter(file_name) @@ -449,13 +450,16 @@ def merge(self): self.merger.merge_nodes(node_iterators) merged_nodes = self.merger.get_merged_nodes_jsonl() + self.merger.merge_edges(edge_iterators) merged_edges = self.merger.get_merged_edges_jsonl() write_merge_metric = {} t = time.time() start_nodes_jsonl = time.time() - nodes_file_path = storage.merge_path("nodes.jsonl") + + + nodes_file_path = storage.merge_path("nodes.jsonl", output_path) # stream out nodes to nodes.jsonl file with open(nodes_file_path, 'w') as stream: @@ -468,7 +472,7 @@ def merge(self): start_edge_jsonl = time.time() # stream out edges to edges.jsonl file - edges_file_path = storage.merge_path("edges.jsonl") + edges_file_path = storage.merge_path("edges.jsonl", output_path) with open(edges_file_path, 'w') as stream: for edges in merged_edges: edges = json.loads(edges) @@ -496,3 +500,4 @@ def merge(self): if self.enable_metrics: metricsfile_path = storage.metrics_path('merge_metrics.yaml') storage.write_object(metrics, metricsfile_path) + diff --git a/dags/roger/pipelines/README.md b/dags/roger/pipelines/README.md new file mode 100644 index 00000000..e77e6a29 --- /dev/null +++ b/dags/roger/pipelines/README.md @@ -0,0 +1,99 @@ +# Building custom Dug data pipelines + +The pipelines submodule is where data pipelines can be defined for specific data +sets with specific, custom behaviors for each one. In previous versions of the +code, customizations for each pipeline were spread across several modules. With +this instantiation, the customizations for each data set pipeline are +consolidated into a single overridden subclass of the DataPipeline class. + +## What the base pipeline does + +The function `roger.tasks.create_pipeline_taskgroup`, when called with the given +data pipeline class, will emit an Airflow task group with the following +structure. If Airflow is not being used, another executor should use a similarly +structured set of calls and dependencies to ensure that the task pipeline +executes fully and in order. + +```mermaid +graph TD; + annotate-->index_variables; + annotate-->validate_index_variables; + index_variables-->validate_index_variables; + annotate-->make_kg_tagged; + annotate-->crawl_tranql; + annotate-->index_concepts; + crawl_tranql-->validate_index_concepts; + index_concepts-->validate_index_concepts; + annotate-->validate_index_concepts; +``` +The pipeline steps are briefly described below + +### annotate + +By default, `annotate` will call the `get_objects` method to collect a list of +parsable files. For each of these files, a Dug Crawler object will be created +which will apply the parser returned by the pipeline class's `get_parser_name` +method. (This by default will return `parser_name` if it's defined, or will fall +back to `pipeline_name`.) The results will be written to `elements.json` and +`concepts.json` as appropriate. + +### index_variables + +This will load the `elements.json` files from `annotate` and pass them to the +indexer built from a DugFactory object. (This is sending them to ElasticSearch +for indexing under the hood.) + +### make_kg_tagged + +All `elements.json` files will be loaded, and based on the annotations, a +Translator-compliant knowledge graph will be written to a `_kgx.json` file. + +### index_concepts + +The `concepts.json` files are read and submitted to ElasticSearch using the +indexer object derived from the embedded DugFactory object. + +### validate_index_concepts + +Concepts from `concepts.json` are double-checked to ensure that the ES indexing +process actually worked. + +## Defining a basic pipeline, with no customizations + +Simple pipelines, such as that for the BACPAC dataset, need very little +customization. All pipelines must define a `pipeline_name`, which will be used +as the default value for a number of other parameters if they are not +defined. In the case of BACPAC, a difference in case means that both the +`pipeline_name` and the `parser_name` need to be defined. + +```python +from roger.pipelines import DugPipeline + +class BacPacPipeline(DugPipeline): + "Pipeline for BACPAC data set" + pipeline_name = "bacpac" + parser_name = "BACPAC" +``` + +This is the full extent of the code needed to adapt the DugPipeline object to +BACPAC. Other data sets have more specific customizations that need more custom +code or variables defined. + +## More extensive customization + +Because the base pipeline (defined in `roger/pipelines/base.py:DugPipeline`) is +inherited as a subclass for customizing, effectively any part of the pipeline +that isn't part of Dug proper can be overriden. Here are some common +customizations that are expected to be necessary for many parts of the process: + +### get_objects + +The `get_objects` method by default looks in the `input_data_path` that is +passed to it, and if that is None, loads the default from the `ROGER_DATA_DIR` +environment variable. By default, it reads all files with the `.xml` extension +recursively anywhere in that directory or its subdirectories. + +One example customization is the anvil data pipeline, which additionally +excludes any file that starts with 'GapExchange_'. Any overriden method should +accept an optional `input_data_path` parameter and return a list of files, +sorted in the order that they should be processed. diff --git a/dags/roger/pipelines/__init__.py b/dags/roger/pipelines/__init__.py new file mode 100644 index 00000000..d6664b8e --- /dev/null +++ b/dags/roger/pipelines/__init__.py @@ -0,0 +1,28 @@ +"Modules for individual datasets" + +import pkgutil +from pathlib import Path +import importlib + +from .base import DugPipeline + +def get_pipeline_classes(pipeline_names_dict): + """Return a list of all defined pipeline classes + """ + + base_path = Path(__file__).resolve().parent + + for (_, mod_name, _) in pkgutil.iter_modules([base_path]): + if mod_name == 'base': + continue + + # No need to actuall get the module symbol, once it's imported, it will + # show up below in __subclasses__. + importlib.import_module(f"{__name__}.{mod_name}") + pipeline_list = [] + + for subclass in DugPipeline.__subclasses__(): + if getattr(subclass, 'pipeline_name') and getattr(subclass, 'pipeline_name') in pipeline_names_dict.keys(): + subclass.input_version = pipeline_names_dict[getattr(subclass, 'pipeline_name')] + pipeline_list.append(subclass) + return pipeline_list diff --git a/dags/roger/pipelines/anvil.py b/dags/roger/pipelines/anvil.py new file mode 100644 index 00000000..baa82c05 --- /dev/null +++ b/dags/roger/pipelines/anvil.py @@ -0,0 +1,24 @@ +"Pipeline for anvil data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class AnvilPipeline(DugPipeline): + "Pipeline for Anvil data set" + pipeline_name = 'anvil' + parser_name = 'Anvil' + + def get_objects(self, input_data_path=None): + """Retrieve anvil objects + + This code is imported from roger.core.storage.dug_anvil_objects + """ + if not input_data_path: + input_data_path = storage.dug_input_files_path( + self.files_dir) + files = storage.get_files_recursive( + lambda file_name: ( + not file_name.startswith('GapExchange_') + and file_name.endswith('.xml')), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/bacpac.py b/dags/roger/pipelines/bacpac.py new file mode 100644 index 00000000..495ba3b9 --- /dev/null +++ b/dags/roger/pipelines/bacpac.py @@ -0,0 +1,8 @@ +"Pipeline for BACPAC data" + +from roger.pipelines import DugPipeline + +class BacPacPipeline(DugPipeline): + "Pipeline for BACPAC data set" + pipeline_name = "bacpac" + parser_name = "BACPAC" diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py new file mode 100644 index 00000000..2a1b605e --- /dev/null +++ b/dags/roger/pipelines/base.py @@ -0,0 +1,964 @@ +"Base class for implementing a dataset annotate, crawl, and index pipeline" + +import os +import asyncio +from io import StringIO +import logging +import re +import hashlib +import traceback +from functools import reduce +from pathlib import Path +import tarfile +from typing import Union +import jsonpickle + +import requests + +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.concept_expander import ConceptExpander +from dug.core.crawler import Crawler +from dug.core.factory import DugFactory +from dug.core.parsers import Parser, DugElement +from dug.core.annotators import Annotator +from dug.core.async_search import Search +from dug.core.index import Index + +from roger.config import RogerConfig +from roger.core import storage +from roger.models.biolink import BiolinkModel +from roger.logger import get_logger + +from utils.s3_utils import S3Utils + +log = get_logger() + +class PipelineException(Exception): + "Exception raised from DugPipeline and related classes" + +def make_edge(subj, + obj, + predicate='biolink:related_to', + predicate_label='related to', + relation='biolink:related_to', + relation_label='related to' + ): + """Create an edge between two nodes. + + :param subj: The identifier of the subject. + :param pred: The predicate linking the subject and object. + :param obj: The object of the relation. + :param predicate: Biolink compatible edge type. + :param predicate_label: Edge label. + :param relation: Ontological edge type. + :param relation_label: Ontological edge type label. + :returns: Returns and edge. + """ + edge_id = hashlib.md5( + f'{subj}{predicate}{obj}'.encode('utf-8')).hexdigest() + return { + "subject": subj, + "predicate": predicate, + "predicate_label": predicate_label, + "id": edge_id, + "relation": relation, + "relation_label": relation_label, + "object": obj, + "provided_by": "renci.bdc.semanticsearch.annotator" + } + +class FileFetcher: + """A basic remote file fetcher class + """ + + def __init__( + self, + remote_host: str, + remote_dir: Union[str, Path], + local_dir: Union[str, Path] = "." + ): + self.remote_host = remote_host + if isinstance(remote_dir, str): + self.remote_dir = remote_dir.rstrip("/") + else: + self.remote_dir = str(remote_dir.as_posix()) + self.local_dir = Path(local_dir).resolve() + + def __call__(self, remote_file_path: Union[str, Path]) -> Path: + remote_path = self.remote_dir + "/" + remote_file_path + local_path = self.local_dir / remote_file_path + url = f"{self.remote_host}{remote_path}" + log.debug("Fetching %s", url) + try: + response = requests.get(url, allow_redirects=True, timeout=60) + except Exception as e: + log.error("Unexpected %s: %s", e.__class__.__name__, str(e)) + raise RuntimeError(f"Unable to fetch {url}") from e + + log.debug("Response: %d", response.status_code) + if response.status_code != 200: + log.debug("Unable to fetch %s: %d", url, response.status_code) + raise RuntimeError(f"Unable to fetch {url}") + + with local_path.open('wb') as file_obj: + file_obj.write(response.content) + return local_path + +class DugPipeline(): + "Base class for dataset pipelines" + + pipeline_name = None + unzip_source = True + input_version = "" + + def __init__(self, config: RogerConfig, to_string=False): + "Set instance variables and check to make sure we're overriden" + if not self.pipeline_name: + raise PipelineException( + "Subclass must at least define pipeline_name as class var") + self.config = config + self.bl_toolkit = BiolinkModel() + dug_conf = config.to_dug_conf() + self.element_mapping = config.indexing.element_mapping + self.factory = DugFactory(dug_conf) + self.cached_session = self.factory.build_http_session() + self.event_loop = asyncio.new_event_loop() + self.log_stream = StringIO() + if to_string: + self.string_handler = logging.StreamHandler(self.log_stream) + log.addHandler(self.string_handler) + self.s3_utils = S3Utils(self.config.s3_config) + + self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() + + graph_name = self.config["redisgraph"]["graph"] + source = f"redis:{graph_name}" + self.tranql_queries: dict = self.factory.build_tranql_queries(source) + self.node_to_element_queries: list = ( + self.factory.build_element_extraction_parameters(source)) + + indexing_config = config.indexing + self.variables_index = indexing_config.get('variables_index') + self.concepts_index = indexing_config.get('concepts_index') + self.kg_index = indexing_config.get('kg_index') + + self.search_obj: Search = self.factory.build_search_obj([ + self.variables_index, + self.concepts_index, + self.kg_index, + ]) + self.index_obj: Index = self.factory.build_indexer_obj([ + self.variables_index, + self.concepts_index, + self.kg_index, + + ]) + + def __enter__(self): + self.event_loop = asyncio.new_event_loop() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # close elastic search connection + self.event_loop.run_until_complete(self.search_obj.es.close()) + # close async loop + if self.event_loop.is_running() and not self.event_loop.is_closed(): + self.event_loop.close() + if exc_type or exc_val or exc_tb: + traceback.print_exc() + log.error("%s %s %s", exc_val, exc_val, exc_tb) + log.exception("Got an exception") + + def get_data_format(self): + """Access method for data_format parameter + + Defaults to pipeline_name unless self.data_format is set. This method + can also be overriden + """ + return getattr(self, 'data_format', self.pipeline_name) + + def get_files_dir(self): + """Access method for files_dir parameter + + Defaults to pipeline_name unless self.files_dir is set. This method can + also be overriden. + """ + return getattr(self, 'files_dir', self.pipeline_name) + + def get_parser_name(self): + """Access method for parser_name + + Defaults to pipeline_name unless self.parser_name is set. This method + can also be overriden. + """ + return getattr(self, 'parser_name', self.pipeline_name) + + def get_annotator_name(self): + """ + Access method for annotator_name + Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. + """ + return self.config.annotation.annotator_type + + + def get_parser(self): + dug_plugin_manager = get_plugin_manager() + parser: Parser = get_parser(dug_plugin_manager.hook, + self.get_parser_name()) + return parser + + def get_annotator(self): + dug_plugin_manager = get_plugin_manager() + annotator: Annotator = get_annotator( + dug_plugin_manager.hook, + self.get_annotator_name(), + self.config.to_dug_conf() + ) + return annotator + + def annotate_files(self, parsable_files, output_data_path=None): + """ + Annotates a Data element file using a Dug parser. + :param parser_name: Name of Dug parser to use. + :param parsable_files: Files to parse. + :return: None. + """ + if not output_data_path: + output_data_path = storage.dug_annotation_path('') + log.info("Parsing files") + for _, parse_file in enumerate(parsable_files): + log.debug("Creating Dug Crawler object on parse_file %s at %d of %d", + parse_file, _ , len(parsable_files)) + parser = self.get_parser() + annotator = self.get_annotator() + crawler = Crawler( + crawl_file=parse_file, + parser=parser, + annotator=annotator, + tranqlizer='', + tranql_queries=[], + http_session=self.cached_session + ) + + # configure output space. + current_file_name = '.'.join( + os.path.basename(parse_file).split('.')[:-1]) + elements_file_path = os.path.join( + output_data_path, current_file_name) + elements_file = os.path.join(elements_file_path, 'elements.txt') + concepts_file = os.path.join(elements_file_path, 'concepts.txt') + # This is a file that the crawler will later populate. We start here + # by creating an empty elements file. + # This also creates output dir if it doesn't exist. + # elements_json = os.path.join(elements_file_path, + # 'element_file.json') + # log.debug("Creating empty file: %s", elements_json) + # storage.write_object({}, elements_json) + + # Use the specified parser to parse the parse_file into elements. + log.debug("Parser is %s", str(parser)) + elements = parser(parse_file) + log.debug("Parsed elements: %s", str(elements)) + + # This inserts the list of elements into the crawler where + # annotate_elements expects to find it. Maybe in some future version + # of Dug this could be a parameter instead of an attribute? + crawler.elements = elements + + # @TODO propose for Dug to make this a crawler class init param(??) + crawler.crawlspace = elements_file_path + log.debug("Crawler annotator: %s", str(crawler.annotator)) + crawler.annotate_elements() + + # Extract out the concepts gotten out of annotation + # Extract out the elements + non_expanded_concepts = crawler.concepts + # The elements object will have been modified by annotate_elements, + # so we want to make sure to catch those modifications. + elements = crawler.elements + + # Write pickles of objects to file + log.info("Parsed and annotated: %s", parse_file) + + storage.write_object(jsonpickle.encode(elements, indent=2), elements_file) + log.info("Serialized annotated elements to : %s", elements_file) + + storage.write_object(jsonpickle.encode(non_expanded_concepts, indent=2), concepts_file) + log.info("Serialized annotated concepts to : %s", concepts_file) + + def convert_to_kgx_json(self, elements, written_nodes=None): + """ + Given an annotated and normalized set of study variables, + generate a KGX compliant graph given the normalized annotations. + Write that grpah to a graph database. + See BioLink Model for category descriptions. + https://biolink.github.io/biolink-model/notes.html + """ + if written_nodes is None: + written_nodes = set() + graph = { + "nodes": [], + "edges": [] + } + edges = graph['edges'] + nodes = graph['nodes'] + + for _, element in enumerate(elements): + # DugElement means a variable (Study variable...) + if not isinstance(element, DugElement): + continue + study_id = element.collection_id + if study_id not in written_nodes: + nodes.append({ + "id": study_id, + "category": ["biolink:Study"], + "name": study_id + }) + written_nodes.add(study_id) + + # connect the study and the variable. + edges.append(make_edge( + subj=element.id, + relation_label='part of', + relation='BFO:0000050', + obj=study_id, + predicate='biolink:part_of', + predicate_label='part of')) + edges.append(make_edge( + subj=study_id, + relation_label='has part', + relation="BFO:0000051", + obj=element.id, + predicate='biolink:has_part', + predicate_label='has part')) + + # a node for the variable. Should be BL compatible + variable_node = { + "id": element.id, + "name": element.name, + "category": ["biolink:StudyVariable"], + # bulk loader parsing issue + "description": ( + element.description.replace("'", '`').replace('\n', ' ')) + } + if element.id not in written_nodes: + nodes.append(variable_node) + written_nodes.add(element.id) + + for identifier, metadata in element.concepts.items(): + identifier_object = metadata.identifiers.get(identifier) + # This logic is treating DBGap files. + # First item in current DBGap xml files is a topmed tag, + # This is treated as a DugConcept Object. But since its not + # a concept we get from annotation (?) its never added to + # variable.concepts.items (Where variable is a DugElement obj) + # The following logic is trying to extract types, and for the + # aformentioned topmed tag it adds + # `biolink:InfomrmationContentEntity` + # Maybe a better solution could be adding types on + # DugConcept objects + # More specifically Biolink compatible types (?) + # + if identifier_object: + category = identifier_object.types + elif identifier.startswith("TOPMED.TAG:"): + category = ["biolink:InformationContentEntity"] + else: + continue + if identifier not in written_nodes: + if isinstance(category, str): + bl_element = self.bl_toolkit.toolkit.get_element( + category) + category = [bl_element.class_uri or bl_element.slot_uri] + nodes.append({ + "id": identifier, + "category": category, + "name": metadata.name + }) + written_nodes.add(identifier) + # related to edge + edges.append(make_edge( + subj=element.id, + obj=identifier + )) + # related to edge + edges.append(make_edge( + subj=identifier, + obj=element.id)) + return graph + + def make_tagged_kg(self, elements): + """ Make a Translator standard knowledge graph representing + tagged study variables. + :param variables: The variables to model. + :param tags: The tags characterizing the variables. + :returns: dict with nodes and edges modeling a Translator/Biolink KG. + """ + graph = { + "nodes": [], + "edges": [] + } + edges = graph['edges'] + nodes = graph['nodes'] + + # Create graph elements to model tags and their + # links to identifiers gathered by semantic tagging + tag_map = {} + # @TODO extract this into config or maybe dug ?? + topmed_tag_concept_type = "TOPMed Phenotype Concept" + nodes_written = set() + for tag in elements: + if not (isinstance(tag, DugConcept) + and tag.type == topmed_tag_concept_type): + continue + tag_id = tag.id + tag_map[tag_id] = tag + nodes.append({ + "id": tag_id, + "name": tag.name, + "description": tag.description.replace("'", "`"), + "category": ["biolink:InformationContentEntity"] + }) + + # Link ontology identifiers we've found for this tag via nlp. + for identifier, metadata in tag.identifiers.items(): + if isinstance(metadata.types, str): + bl_element = self.bl_toolkit.toolkit.get_element( + metadata.types) + category = [bl_element.class_uri or bl_element.slot_uri] + else: + category = metadata.types + synonyms = metadata.synonyms if metadata.synonyms else [] + nodes.append({ + "id": identifier, + "name": metadata.label, + "category": category, + "synonyms": synonyms + }) + nodes_written.add(identifier) + edges.append(make_edge( + subj=tag_id, + obj=identifier)) + edges.append(make_edge( + subj=identifier, + obj=tag_id)) + + concepts_graph = self.convert_to_kgx_json(elements, + written_nodes=nodes_written) + graph['nodes'] += concepts_graph['nodes'] + graph['edges'] += concepts_graph['edges'] + + return graph + + def index_elements(self, elements_file): + "Submit elements_file to ElasticSearch for indexing " + log.info("Indexing %s...", str(elements_file)) + elements =jsonpickle.decode(storage.read_object(elements_file)) + count = 0 + total = len(elements) + # Index Annotated Elements + log.info("found %d from elements files.", len(elements)) + for element in elements: + count += 1 + # Only index DugElements as concepts will be + # indexed differently in next step + if not isinstance(element, DugConcept): + # override data-type with mapping values + if element.type.lower() in self.element_mapping: + element.type = self.element_mapping[element.type.lower()] + if not element.id: + # no id no indexing + continue + # Use the Dug Index object to submit the element to ES + self.index_obj.index_element( + element, index=self.variables_index) + percent_complete = (count / total) * 100 + if percent_complete % 10 == 0: + log.info("%d %%", percent_complete) + log.info("Done indexing %s.", elements_file) + + def validate_indexed_element_file(self, elements_file): + "After submitting elements for indexing, verify that they're available" + elements = [x for x in jsonpickle.decode(storage.read_object(elements_file)) + if not isinstance(x, DugConcept)] + # Pick ~ 10 % + sample_size = int(len(elements) * 0.1) + + # random.choices(elements, k=sample_size) + test_elements = elements[:sample_size] + log.info("Picked %d from %s for validation.", len(test_elements), + elements_file) + for element in test_elements: + # Pick a concept + concepts = [element.concepts[curie] for curie in element.concepts + if element.concepts[curie].name] + + if len(concepts): + # Pick the first concept + concept = concepts[0] + curie = concept.id + search_term = re.sub(r'[^a-zA-Z0-9_\ ]+', '', concept.name) + log.debug("Searching for Concept: %s and Search term: %s", + str(curie), search_term) + all_elements_ids = self._search_elements(curie, search_term) + present = element.id in all_elements_ids + if not present: + log.error("Did not find expected variable %s in search " + "result.", str(element.id)) + log.error("Concept id : %s, Search term: %s", + str(concept.id), search_term) + raise PipelineException( + f"Validation exception - did not find variable " + f"{element.id} from {str(elements_file)}" + f"when searching variable index with Concept ID : " + f"{concept.id} using Search Term : {search_term} ") + else: + log.info( + "%s has no concepts annotated. Skipping validation for it.", + str(element.id)) + + def _search_elements(self, curie, search_term): + "Asynchronously call a search on the curie and search term" + response = self.event_loop.run_until_complete(self.search_obj.search_vars_unscored( + concept=curie, + query=search_term + )) + ids_dict = [] + if 'total_items' in response: + if response['total_items'] == 0: + log.error(f"No search elements returned for variable search: {self.variables_index}.") + log.error(f"Concept id : {curie}, Search term: {search_term}") + raise Exception(f"Validation error - Did not find {curie} for" + f"Search term: {search_term}") + else: + del response['total_items'] + for element_type in response: + all_elements_ids = [e['id'] for e in + reduce(lambda x, y: x + y['elements'], response[element_type], [])] + ids_dict += all_elements_ids + return ids_dict + + def crawl_concepts(self, concepts, data_set_name, output_path=None): + """Adds tranql KG to Concepts + + Terms grabbed from KG are also added as search terms + :param concepts: + :param data_set_name: + :return: + """ + # TODO crawl dir seems to be storaing crawling info to avoid re-crawling, but is that consting us much? , it was when tranql was slow, but + # might right to consider getting rid of it. + crawl_dir = storage.dug_crawl_path('crawl_output') + output_file_name = os.path.join(data_set_name, + 'expanded_concepts.txt') + extracted_dug_elements_file_name = os.path.join(data_set_name, + 'extracted_graph_elements.txt') + if not output_path: + output_file = storage.dug_expanded_concepts_path(output_file_name) + extracted_output_file = storage.dug_expanded_concepts_path( + extracted_dug_elements_file_name + ) + else: + output_file = os.path.join(output_path, output_file_name) + extracted_output_file = os.path.join( output_path, extracted_dug_elements_file_name) + + Path(crawl_dir).mkdir(parents=True, exist_ok=True) + extracted_dug_elements = [] + log.debug("Creating Dug Crawler object") + crawler = Crawler( + crawl_file="", + parser=None, + annotator=None, + tranqlizer=self.tranqlizer, + tranql_queries=self.tranql_queries, + http_session=self.cached_session, + ) + crawler.crawlspace = crawl_dir + counter = 0 + total = len(concepts) + for concept in concepts.values(): + counter += 1 + try: + crawler.expand_concept(concept) + concept.set_search_terms() + concept.set_optional_terms() + except Exception as e: + log.error(concept) + raise e + for query in self.node_to_element_queries: + log.info(query) + casting_config = query['casting_config'] + tranql_source = query['tranql_source'] + dug_element_type = query['output_dug_type'] + extracted_dug_elements += crawler.expand_to_dug_element( + concept=concept, + casting_config=casting_config, + dug_element_type=dug_element_type, + tranql_source=tranql_source + ) + concept.clean() + percent_complete = int((counter / total) * 100) + if percent_complete % 10 == 0: + log.info("%d%%", percent_complete) + log.info("Crawling %s done", data_set_name) + storage.write_object(obj=jsonpickle.encode(concepts, indent=2), path=output_file) + log.info ("Concepts serialized to %s", output_file) + storage.write_object(obj=jsonpickle.encode(extracted_dug_elements, indent=2), + path=extracted_output_file) + log.info("Extracted elements serialized to %s", extracted_output_file) + + def _index_concepts(self, concepts): + "Submit concepts to ElasticSearch for indexing" + log.info("Indexing Concepts") + total = len(concepts) + count = 0 + for concept_id, concept in concepts.items(): + count += 1 + self.index_obj.index_concept(concept, index=self.concepts_index) + # Index knowledge graph answers for each concept + for kg_answer_id, kg_answer in concept.kg_answers.items(): + self.index_obj.index_kg_answer( + concept_id=concept_id, + kg_answer=kg_answer, + index=self.kg_index, + id_suffix=kg_answer_id + ) + percent_complete = int((count / total) * 100) + if percent_complete % 10 == 0: + log.info("%s %%", percent_complete) + log.info("Done Indexing concepts") + + def _validate_indexed_concepts(self, elements, concepts): + """ + Validates linked concepts are searchable + :param elements: Annotated dug elements + :param concepts: Crawled (expanded) concepts + :return: + """ + # 1 . Find concepts with KG <= 10% of all concepts, + # <= because we might have no results for some concepts from tranql + sample_concepts = {key: value for key, value + in concepts.items() if value.kg_answers} + if len(concepts) == 0: + log.info("No Concepts found.") + return + log.info("Found only %d Concepts with Knowledge graph out of %d. %d%%", + len(sample_concepts), len(concepts), + (len(sample_concepts) / len(concepts)) * 100) + # 2. pick elements that have concepts in the sample concepts set + sample_elements = {} + for element in elements: + if isinstance(element, DugConcept): + continue + for concept in element.concepts: + # add elements that have kg + if concept in sample_concepts: + sample_elements[concept] = sample_elements.get( + concept, set()) + sample_elements[concept].add(element.id) + + # Time for some validation + for curie in concepts: + concept = concepts[curie] + if not concept.kg_answers: + continue + search_terms = [] + for key in concept.kg_answers: + kg_object = concept.kg_answers[key] + search_terms += kg_object.get_node_names() + search_terms += kg_object.get_node_synonyms() + # reduce(lambda x,y: x + y, [[node.get("name")] + # + node.get("synonyms", []) + # for node in concept.kg_answers[ + # "knowledge_graph"]["nodes"]], []) + # validation here is that for any of these nodes we should get back + # the variable. + # make unique + search_terms_cap = 10 + search_terms = list(set(search_terms))[:search_terms_cap] + log.debug("Using %d Search terms for concept %s", len(search_terms), + str(curie)) + for search_term in search_terms: + # avoids elastic failure due to some reserved characters + # 'search_phase_execution_exception', + # 'token_mgr_error: Lexical error ... + search_term = re.sub(r'[^a-zA-Z0-9_\ ]+', '', search_term) + + searched_element_ids = self._search_elements(curie, search_term) + + if curie not in sample_elements: + log.error("Did not find Curie id %s in Elements.", + str(curie)) + log.error("Concept id : %s, Search term: %s", + str(concept.id), search_term) + raise PipelineException( + f"Validation error - Did not find {curie} for " + f"Concept id : {concept.id}, " + f"Search term: {search_term}") + + present = bool([x for x in sample_elements[curie] + if x in searched_element_ids]) + if not present: + log.error("Did not find expected variable %s " + "in search result.", + str(curie)) + log.error("Concept id : %s, Search term: %s", + str(concept.id), search_term) + raise PipelineException( + f"Validation error - Did not find {curie} for" + f" Concept id : {concept.id}, " + f"Search term: {search_term}") + + def clear_index(self, index_id): + "Delete the index specified by index_id from ES" + exists = self.event_loop.run_until_complete(self.search_obj.es.indices.exists(index=index_id)) + if exists: + log.info("Deleting index %s", str(index_id)) + response = self.event_loop.run_until_complete( + self.search_obj.es.indices.delete(index=index_id)) + log.info("Cleared Elastic : %s", str(response)) + log.info("Re-initializing the indicies") + self.index_obj.init_indices() + + def clear_variables_index(self): + "Delete the variables index from ES" + self.clear_index(self.variables_index) + + def clear_kg_index(self): + "Delete the KG index from ES" + self.clear_index(self.kg_index) + + def clear_concepts_index(self): + "Delete the concepts index from ES" + self.clear_index(self.concepts_index) + + #### + # Methods above this are directly from what used to be + # dug_helpers.dug_utils.Dug. Methods below are consolidated from what used + # to be dug_helpers.dug_utils.DugUtil. These are intented to be the "top + # level" interface to Roger, which Airflow DAGs or other orchestrators can + # call directly. + + def _fetch_s3_file(self, filename, output_dir): + "Fetch a file from s3 to output_dir" + log.info("Fetching %s", filename) + output_name = filename.split('/')[-1] + output_path = output_dir / output_name + self.s3_utils.get( + str(filename), + str(output_path), + ) + if self.unzip_source: + log.info("Unzipping %s", str(output_path)) + with tarfile.open(str(output_path)) as tar: + tar.extractall(path=output_dir) + return output_path + + def _fetch_remote_file(self, filename, output_dir, current_version): + "Fetch a file from a location using FileFetcher" + log.info("Fetching %s", filename) + # fetch from stars + remote_host = self.config.annotation_base_data_uri + fetch = FileFetcher( + remote_host=remote_host, + remote_dir=current_version, + local_dir=output_dir) + output_path = fetch(filename) + if self.unzip_source: + log.info("Unzipping %s", str(output_path)) + with tarfile.open(str(output_path)) as tar: + tar.extractall(path=output_dir) + return output_path + + def get_versioned_files(self): + """ Fetches a dug input data files to input file directory + """ + meta_data = storage.read_relative_object("../../metadata.yaml") + output_dir: Path = storage.dug_input_files_path( + self.get_files_dir()) + data_store = self.config.dug_inputs.data_source + + # clear dir + storage.clear_dir(output_dir) + data_sets = self.config.dug_inputs.data_sets + log.info("dataset: %s", data_sets) + pulled_files = [] + for data_set in data_sets: + data_set_name, current_version = data_set.split(':') + for item in meta_data["dug_inputs"]["versions"]: + if (item["version"] == current_version and + item["name"] == data_set_name and + item["format"] == self.get_data_format()): + if data_store == "s3": + for filename in item["files"]["s3"]: + pulled_files.append( + self._fetch_s3_file(filename, output_dir)) + else: + for filename in item["files"]["stars"]: + pulled_files.append( + self.fetch_remote_file(filename, output_dir, + current_version)) + return [str(filename) for filename in pulled_files] + + def get_objects(self, input_data_path=None): + """Retrieve initial source objects for parsing + + This is a default method that will be overridden by subclasses + frequently, it is expected. + """ + if not input_data_path: + input_data_path = storage.dug_input_files_path( + self.get_files_dir()) + files = storage.get_files_recursive( + lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) + + def annotate(self, to_string=False, files=None, input_data_path=None, + output_data_path=None): + "Annotate files with the appropriate parsers and crawlers" + if files is None: + files = self.get_objects(input_data_path=input_data_path) + self.annotate_files(parsable_files=files, + output_data_path=output_data_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def index_variables(self, to_string=False, element_object_files=None, + input_data_path=None, output_data_path=None): + """Index variables from element object files for pipeline + + if element_object_files is specified, only those files are + indexed. Otherwise, if the input_data_path is supplied, elements files + under that path are indexed. If neither is supplied, the default + directory is searched for index files and those are indexed. + """ + # self.clear_variables_index() + if element_object_files is None: + element_object_files = storage.dug_elements_objects(input_data_path,format='txt') + for file_ in element_object_files: + self.index_elements(file_) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def validate_indexed_variables(self, to_string=None, + element_object_files=None, + input_data_path=None, + output_data_path=None): + "Validate output from index variables task for pipeline" + if not element_object_files: + element_object_files = storage.dug_elements_objects(input_data_path, format='txt') + for file_ in element_object_files: + log.info("Validating %s", str(file_)) + self.validate_indexed_element_file(file_) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def validate_indexed_concepts(self, config=None, to_string=None, input_data_path=None, output_data_path=None): + """ + Entry for validate concepts + """ + get_data_set_name = lambda file: os.path.split(os.path.dirname(file))[-1] + expanded_concepts_files_dict = { + get_data_set_name(file): file for file in storage.dug_expanded_concept_objects(data_path=input_data_path, format='txt') + } + annotated_elements_files_dict = { + get_data_set_name(file): file for file in storage.dug_elements_objects(data_path=input_data_path, format='txt') + } + try: + assert len(expanded_concepts_files_dict) == len(annotated_elements_files_dict) + except: + log.error("Files Annotated Elements files and Expanded concepts files, should be pairs") + if len(expanded_concepts_files_dict) > len(annotated_elements_files_dict): + log.error("Some Annotated Elements files (from load_and_annotate task) are missing") + else: + log.error("Some Expanded Concepts files (from crawl task) are missing") + log.error(f"Annotated Datasets : {list(annotated_elements_files_dict.keys())}") + log.error(f"Expanded Concepts Datasets: {list(expanded_concepts_files_dict.keys())}") + exit(-1) + for data_set_name in annotated_elements_files_dict: + log.debug(f"Reading concepts and elements for dataset {data_set_name}") + elements_file_path = annotated_elements_files_dict[data_set_name] + concepts_file_path = expanded_concepts_files_dict[data_set_name] + dug_elements = jsonpickle.decode(storage.read_object(elements_file_path)) + dug_concepts = jsonpickle.decode(storage.read_object(concepts_file_path)) + log.debug(f"Read {len(dug_elements)} elements, and {len(dug_concepts)} Concepts") + log.info(f"Validating {data_set_name}") + self._validate_indexed_concepts(elements=dug_elements, concepts=dug_concepts) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def make_kg_tagged(self, to_string=False, elements_files=None, + input_data_path=None, output_data_path=None): + "Create tagged knowledge graphs from elements" + if not output_data_path: + output_data_path = storage.dug_kgx_path("") + storage.clear_dir(output_data_path) + log.info("Starting building KGX files") + + if not elements_files: + elements_files = storage.dug_elements_objects(input_data_path, format='txt') + log.info(f"found {len(elements_files)} files : {elements_files}") + for file_ in elements_files: + elements = jsonpickle.decode(storage.read_object(file_)) + if "topmed_" in file_: + kg = self.make_tagged_kg(elements) + else: + kg = self.convert_to_kgx_json(elements) + dug_base_file_name = file_.split(os.path.sep)[-2] + output_file_path = os.path.join(output_data_path, + dug_base_file_name + '_kgx.json') + storage.write_object(kg, output_file_path) + log.info("Wrote %d and %d edges, to %s", len(kg['nodes']), + len(kg['edges']), output_file_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def crawl_tranql(self, to_string=False, concept_files=None, + input_data_path=None, output_data_path=None): + "Perform the tranql crawl" + if not concept_files: + concept_files = storage.dug_concepts_objects(input_data_path, format='txt') + + if output_data_path: + crawl_dir = os.path.join(output_data_path, 'crawl_output') + expanded_concepts_dir = os.path.join(output_data_path, + 'expanded_concepts') + else: + crawl_dir = storage.dug_crawl_path('crawl_output') + expanded_concepts_dir = storage.dug_expanded_concepts_path("") + log.info("Clearing crawl output dir %s", crawl_dir) + storage.clear_dir(crawl_dir) + + log.info("Clearing expanded concepts dir: %s", expanded_concepts_dir) + storage.clear_dir(expanded_concepts_dir) + + log.info("Crawling Dug Concepts, found %d file(s).", + len(concept_files)) + for file_ in concept_files: + objects = storage.read_object(file_) + objects = objects or {} + if not objects: + log.info(f'no concepts in {file_}') + data_set = jsonpickle.decode(objects) + original_variables_dataset_name = os.path.split( + os.path.dirname(file_))[-1] + self.crawl_concepts(concepts=data_set, + data_set_name=original_variables_dataset_name, output_path= output_data_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + + def index_concepts(self, to_string=False, + input_data_path=None, output_data_path=None): + "Index concepts from expanded concept files" + # These are concepts that have knowledge graphs from tranql + # clear out concepts and kg indicies from previous runs + # self.clear_concepts_index() + # self.clear_kg_index() + expanded_concepts_files = storage.dug_expanded_concept_objects( + input_data_path, format="txt") + for file_ in expanded_concepts_files: + concepts = jsonpickle.decode(storage.read_object(file_)) + self._index_concepts(concepts=concepts) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log diff --git a/dags/roger/pipelines/bdc.py b/dags/roger/pipelines/bdc.py new file mode 100644 index 00000000..bc30cf44 --- /dev/null +++ b/dags/roger/pipelines/bdc.py @@ -0,0 +1,19 @@ +"Pipeline for BDC-dbGap data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class bdcPipeline(DugPipeline): + "Pipeline for BDC-dbGap data set" + pipeline_name = "bdc" + parser_name = "dbgap" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_dd_xml_path() + files = storage.get_files_recursive( + lambda file_name: ( + not file_name.startswith('._') + and file_name.endswith('.xml')), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/crdc.py b/dags/roger/pipelines/crdc.py new file mode 100644 index 00000000..2143cf7b --- /dev/null +++ b/dags/roger/pipelines/crdc.py @@ -0,0 +1,19 @@ +"Pipeline for Cancer Commons data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class CRDCPipeline(DugPipeline): + "Pipeline for Cancer Commons data set" + pipeline_name = "crdc" + parser_name = "crdc" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_crdc_path() + files = storage.get_files_recursive( + lambda file_name: ( + not file_name.startswith('GapExchange_') + and file_name.endswith('.xml')), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/ctn.py b/dags/roger/pipelines/ctn.py new file mode 100644 index 00000000..25918062 --- /dev/null +++ b/dags/roger/pipelines/ctn.py @@ -0,0 +1,10 @@ +"Pipeline for Clinical trials network data" + +from roger.pipelines import DugPipeline + +class CTNPipeline(DugPipeline): + "Pipeline for Clinical trials nework data set" + pipeline_name = "ctn" + parser_name = "ctn" + + diff --git a/dags/roger/pipelines/db_gap.py b/dags/roger/pipelines/db_gap.py new file mode 100644 index 00000000..7c1db504 --- /dev/null +++ b/dags/roger/pipelines/db_gap.py @@ -0,0 +1,10 @@ +"Dug pipeline for dbGaP data set" + +from roger.pipelines import DugPipeline + +class dbGaPPipeline(DugPipeline): + "Pipeline for the dbGaP data set" + + pipeline_name = 'dbGaP' + parser_name = 'DbGaP' + files_dir = 'db_gap' diff --git a/dags/roger/pipelines/heal_research_programs.py b/dags/roger/pipelines/heal_research_programs.py new file mode 100644 index 00000000..d95a4497 --- /dev/null +++ b/dags/roger/pipelines/heal_research_programs.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealResearchProgramPipeline(DugPipeline): + "Pipeline for Heal-research-programs data set" + pipeline_name = "heal-research-programs" + parser_name = "heal-research" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_research_program_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) \ No newline at end of file diff --git a/dags/roger/pipelines/heal_studies.py b/dags/roger/pipelines/heal_studies.py new file mode 100644 index 00000000..cf78c042 --- /dev/null +++ b/dags/roger/pipelines/heal_studies.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealStudiesPipeline(DugPipeline): + "Pipeline for Heal-studies data set" + pipeline_name = "heal-studies" + parser_name = "heal-studies" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/kfdrc.py b/dags/roger/pipelines/kfdrc.py new file mode 100644 index 00000000..bcb0b7ac --- /dev/null +++ b/dags/roger/pipelines/kfdrc.py @@ -0,0 +1,19 @@ +"Pipeline for KDFRC data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class kfdrcPipeline(DugPipeline): + "Pipeline for KDFRC data set" + pipeline_name = "kfdrc" + parser_name = "kfdrc" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_kfdrc_path() + files = storage.get_files_recursive( + lambda file_name: ( + not file_name.startswith('GapExchange_') + and file_name.endswith('.xml')), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/nida.py b/dags/roger/pipelines/nida.py new file mode 100644 index 00000000..b2e841bd --- /dev/null +++ b/dags/roger/pipelines/nida.py @@ -0,0 +1,18 @@ +"NIDA data set pipeline definition" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class NIDAPipeline(DugPipeline): + "NIDA data pipeline" + + pipeline_name = 'nida' + parser_name = 'NIDA' + + def get_objects(self, input_data_path=None): + "Return list of NIDA source files" + if not input_data_path: + input_data_path = storage.dug_input_files_path( + self.get_files_dir()) + files = sorted(storage.get_files_recursive(lambda x: 'NIDA-' in x , input_data_path)) + return files diff --git a/dags/roger/pipelines/radx.py b/dags/roger/pipelines/radx.py new file mode 100644 index 00000000..32491fb0 --- /dev/null +++ b/dags/roger/pipelines/radx.py @@ -0,0 +1,8 @@ +"Pipeline for BACPAC data" + +from roger.pipelines import DugPipeline + +class RadxPipeline(DugPipeline): + "Pipeline for BACPAC data set" + pipeline_name = "radx" + parser_name = "radx" diff --git a/dags/roger/pipelines/sparc.py b/dags/roger/pipelines/sparc.py new file mode 100644 index 00000000..d1c9c950 --- /dev/null +++ b/dags/roger/pipelines/sparc.py @@ -0,0 +1,17 @@ +"Pipeline for Sparc data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class SparcPipeline(DugPipeline): + "Pipeline for Sparc data set" + pipeline_name = "sparc" + parser_name = "SciCrunch" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py new file mode 100644 index 00000000..90b3e515 --- /dev/null +++ b/dags/roger/pipelines/topmed.py @@ -0,0 +1,41 @@ +"Pipeline for Topmed data" + +from roger.pipelines import DugPipeline +from roger.pipelines.base import log, os +import jsonpickle +from roger.core import storage +from roger.logger import logger +class TopmedPipeline(DugPipeline): + "Pipeline for Topmed data set" + pipeline_name = "topmed" + parser_name = "TOPMedTag" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = str(storage.dug_input_files_path('topmed')) + files =storage.get_files_recursive( + lambda file_name: file_name.endswith('.csv'), + input_data_path) + return sorted([str(x) for x in files]) + + def make_kg_tagged(self, to_string=False, elements_files=None, + input_data_path=None, output_data_path=None): + "Create tagged knowledge graphs from elements" + log.info("Override base.make_kg_tagged called") + if not output_data_path: + output_data_path = storage.dug_kgx_path("") + storage.clear_dir(output_data_path) + if not elements_files: + elements_files = storage.dug_elements_objects(input_data_path, format='txt') + for file_ in elements_files: + elements = jsonpickle.decode(storage.read_object(file_)) + kg = self.make_tagged_kg(elements) + dug_base_file_name = file_.split(os.path.sep)[-2] + output_file_path = os.path.join(output_data_path, + dug_base_file_name + '_kgx.json') + storage.write_object(kg, output_file_path) + log.info("Wrote %d and %d edges, to %s", len(kg['nodes']), + len(kg['edges']), output_file_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index c2367e32..f5451140 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -1,10 +1,28 @@ +"Tasks and methods related to Airflow implementations of Roger" + import os from airflow.operators.python import PythonOperator +from airflow.operators.empty import EmptyOperator +from airflow.utils.task_group import TaskGroup from airflow.utils.dates import days_ago +from airflow.models import DAG +from airflow.models.dag import DagContext +from airflow.models.taskinstance import TaskInstance +from typing import Union +from pathlib import Path +import glob +import shutil -from roger.config import config +from roger.config import config, RogerConfig from roger.logger import get_logger +from roger.pipelines.base import DugPipeline +from avalon.mainoperations import put_files, LakeFsWrapper, get_files +from lakefs_sdk.configuration import Configuration +from lakefs_sdk.models.merge import Merge +from functools import partial + +logger = get_logger() default_args = { 'owner': 'RENCI', @@ -21,21 +39,35 @@ def task_wrapper(python_callable, **kwargs): """ # get dag config provided dag_run = kwargs.get('dag_run') - dag_conf = {} - logger = get_logger() - if dag_run: - dag_conf = dag_run.conf - # remove this since to send every other argument to the python callable. - del kwargs['dag_run'] + pass_conf = kwargs.get('pass_conf', True) + if config.lakefs_config.enabled: + # get input path + input_data_path = generate_dir_name_from_task_instance(kwargs['ti'], + roger_config=config, + suffix='input') + # get output path from task id run id dag id combo + output_data_path = generate_dir_name_from_task_instance(kwargs['ti'], + roger_config=config, + suffix='output') + else: + input_data_path, output_data_path = None, None + # cast it to a path object + func_args = { + 'input_data_path': input_data_path, + 'output_data_path': output_data_path, + 'to_string': kwargs.get('to_string') + } + logger.info(f"Task function args: {func_args}") # overrides values config.dag_run = dag_run - return python_callable(to_string=False, config=config) - + if pass_conf: + return python_callable(config=config, **func_args) + return python_callable(**func_args) def get_executor_config(data_path='/opt/airflow/share/data'): """ Get an executor configuration. :param annotations: Annotations to attach to the executor. - :returns: Returns a KubernetesExecutor if K8s is configured and None otherwise. + :returns: Returns a KubernetesExecutor if K8s configured, None otherwise. """ env_var_prefix = config.OS_VAR_PREFIX # based on environment set on scheduler pod, make secrets for worker pod @@ -70,28 +102,343 @@ def get_executor_config(data_path='/opt/airflow/share/data'): } return k8s_executor_config +def init_lakefs_client(config: RogerConfig) -> LakeFsWrapper: + configuration = Configuration() + configuration.username = config.lakefs_config.access_key_id + configuration.password = config.lakefs_config.secret_access_key + configuration.host = config.lakefs_config.host + the_lake = LakeFsWrapper(configuration=configuration) + return the_lake + + +def pagination_helper(page_fetcher, **kwargs): + """Helper function to iterate over paginated results""" + while True: + resp = page_fetcher(**kwargs) + yield from resp.results + if not resp.pagination.has_more: + break + kwargs['after'] = resp.pagination.next_offset + + +def avalon_commit_callback(context: DagContext, **kwargs): + client: LakeFsWrapper = init_lakefs_client(config=config) + # now files have been processed, + # this part should + # get the out path of the task + local_path = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' + task_id = context['ti'].task_id + dag_id = context['ti'].dag_id + run_id = context['ti'].run_id + # run id looks like 2023-10-18T17:35:14.890186+00:00 + # normalized to 2023_10_18T17_35_14_890186_00_00 + # since lakefs branch id must consist of letters, digits, underscores and dashes, + # and cannot start with a dash + run_id_normalized = run_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') + dag_id_normalized = dag_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') + task_id_normalized = task_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') + temp_branch_name = f'{dag_id_normalized}_{task_id_normalized}_{run_id_normalized}' + # remote path to upload the files to. + remote_path = f'{dag_id}/{task_id}/' + + # merge destination branch + branch = config.lakefs_config.branch + repo = config.lakefs_config.repo + # This part pushes to a temp branch on the repo + + # now we have the output path lets do some pushing but where ? + # right now lets stick to using one repo , + + # issue Vladmir pointed out if uploads to a single lakefs branch have not + # been finalized with commit, + # this would cause dirty commits if parallel tasks target the same branch. + + # solution: Lakefs team suggested we commit to a different temp branch per + # task, and merge that branch. + # this callback function will do that for now. + + # 1. put files into a temp branch. + # 2. make sure a commit happens. + # 3. merge that branch to master branch. + logger.info("Pushing local path %s to %s@%s in %s dir", + local_path, repo, temp_branch_name, remote_path) + put_files( + local_path=local_path, + remote_path=remote_path, + task_name=task_id, + task_args=[""], + pipeline_id=dag_id, + task_docker_image="docker-image", + s3storage=False, + lake_fs_client=client, + branch=temp_branch_name, + repo=repo, + # @TODO figure out how to pass real commit id here + commit_id=branch + ) + + # see what changes are going to be pushed from this branch to main branch + for diff in pagination_helper(client._client.refs_api.diff_refs, + repository=repo, left_ref=branch, + right_ref=temp_branch_name): + logger.info("Diff: " + str(diff)) + + try: + # merging temp branch to working branch + # the current working branch wins incase of conflicts + merge = Merge(**{"strategy": "source-wins"}) + client._client.refs_api.merge_into_branch(repository=repo, + source_ref=temp_branch_name, + destination_branch=branch, + merge=merge + ) + + logger.info(f"merged branch {temp_branch_name} into {branch}") + except Exception as e: + # remove temp + logger.error(e) + # delete temp branch + finally: + client._client.branches_api.delete_branch( + repository=repo, + branch=temp_branch_name + ) + logger.info(f"deleted temp branch {temp_branch_name}") + logger.info(f"deleting local dir {local_path}") + files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] + + clean_up(context, **kwargs) + +def clean_up(context: DagContext, **kwargs): + input_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' + output_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='input')).rstrip('/') + '/' + files_to_clean = glob.glob(input_dir + '**', recursive=True) + [input_dir] + files_to_clean += glob.glob(output_dir + '**', recursive=True) + [output_dir] + for f in files_to_clean: + if os.path.exists(f): + shutil.rmtree(f) + +def generate_dir_name_from_task_instance(task_instance: TaskInstance, + roger_config: RogerConfig, suffix:str): + # if lakefs is not enabled just return none so methods default to using + # local dir structure. + if not roger_config.lakefs_config.enabled: + return None + root_data_dir = os.getenv("ROGER_DATA_DIR").rstrip('/') + task_id = task_instance.task_id + dag_id = task_instance.dag_id + run_id = task_instance.run_id + try_number = task_instance._try_number + return Path( + f"{root_data_dir}/{dag_id}_{task_id}_{run_id}_{try_number}_{suffix}") + +def setup_input_data(context, exec_conf): + logger.info(""" + - Figures out the task name and id, + - find its data dependencies + - clean up and create in and out dir + - put dependency data in input dir + - if for some reason data was not found raise an exception + """) + # Serves as a location where files the task will work on are placed. + # computed as ROGER_DATA_DIR + /current task instance name_input_dir + + input_dir = str(generate_dir_name_from_task_instance( + context['ti'], roger_config=config, suffix="input")) + # Clear up files from previous run etc... + + # create input dir + os.makedirs(input_dir, exist_ok=True) + + # Download files from lakefs and store them in this new input_path + client = init_lakefs_client(config=config) + repos = exec_conf['repos'] + # if no external repo is provided we assume to get the upstream task dataset. + if not repos or len(repos) == 0: + # merge destination branch + branch = config.lakefs_config.branch + repo = config.lakefs_config.repo + task_instance: TaskInstance = context['ti'] + # get upstream ids + upstream_ids = task_instance.task.upstream_task_ids + dag_id = task_instance.dag_id + # calculate remote dirs using dag_id + upstreams + repos = [{ + 'repo': repo, + 'branch': branch, + 'path': f'{dag_id}/{upstream_id}' + } for upstream_id in upstream_ids] -def create_python_task (dag, name, a_callable, func_kwargs=None): + # input_repo = exec_conf['input_repo'] + # input_branch = exec_conf['input_branch'] + # If input repo is provided use that as source of files + for repo in repos: + if not repo.get('path'): + # get all if path is not specified + repo['path'] = '*' + logger.info(f"repos : {repos}") + for r in repos: + logger.info("downloading %s from %s@%s to %s", + r['path'], r['repo'], r['branch'], input_dir) + # create path to download to ... + if not os.path.exists(input_dir + f'/{r["repo"]}'): + os.mkdir(input_dir + f'/{r["repo"]}') + get_files( + local_path=input_dir + f'/{r["repo"]}', + remote_path=r['path'], + branch=r['branch'], + repo=r['repo'], + changes_only=False, + lake_fs_client=client + ) + + +def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = {}, pass_conf=True, no_output_files=False): """ Create a python task. :param func_kwargs: additional arguments for callable. :param dag: dag to add task to. :param name: The name of the task. :param a_callable: The code to run in this task. """ + # these are actual arguments passed down to the task function op_kwargs = { - "python_callable": a_callable, - "to_string": True - } + "python_callable": a_callable, + "to_string": True, + "pass_conf": pass_conf + } + # update / override some of the args passed to the task function by default if func_kwargs is None: - func_kwargs = dict() + func_kwargs = {} op_kwargs.update(func_kwargs) - return PythonOperator( - task_id=name, - python_callable=task_wrapper, - op_kwargs=op_kwargs, - executor_config=get_executor_config(), - dag=dag, - provide_context=True - ) + # Python operator arguments , by default for non-lakefs config this is all we need. + python_operator_args = { + "task_id": name, + "python_callable":task_wrapper, + "executor_config" : get_executor_config(), + "dag": dag, + "provide_context" : True + } + + # if we have lakefs... + if config.lakefs_config.enabled: + + # repo and branch for pre-execution , to download input objects + pre_exec_conf = { + 'repos': [] + } + if external_repos: + # if the task is a root task , beginning of the dag... + # and we want to pull data from a different repo. + pre_exec_conf = { + 'repos': [{ + 'repo': r['name'], + 'branch': r['branch'], + 'path': r.get('path', '*') + } for r in external_repos] + } + + pre_exec = partial(setup_input_data, exec_conf=pre_exec_conf) + # add pre_exec partial function as an argument to python executor conf + python_operator_args['pre_execute'] = pre_exec + python_operator_args['on_failure_callback'] = partial(clean_up, kwargs=op_kwargs) + # if the task has output files, we will add a commit callback + if not no_output_files: + python_operator_args['on_success_callback'] = partial(avalon_commit_callback, kwargs=op_kwargs) + + # add kwargs + python_operator_args["op_kwargs"] = op_kwargs + + return PythonOperator(**python_operator_args) + +def create_pipeline_taskgroup( + dag, + pipeline_class: type, + configparam: RogerConfig, + **kwargs): + """Emit an Airflow dag pipeline for the specified pipeline_class + + Extra kwargs are passed to the pipeline class init call. + """ + name = pipeline_class.pipeline_name + input_dataset_version = pipeline_class.input_version + + with TaskGroup(group_id=f"{name}_dataset_pipeline_task_group") as tg: + with pipeline_class(config=configparam, **kwargs) as pipeline: + pipeline: DugPipeline + annotate_task = create_python_task( + dag, + f"annotate_{name}_files", + pipeline.annotate, + external_repos=[{ + 'name': getattr(pipeline_class, 'pipeline_name'), + 'branch': input_dataset_version + }], + pass_conf=False) + + index_variables_task = create_python_task( + dag, + f"index_{name}_variables", + pipeline.index_variables, + pass_conf=False, + # declare that this task will not generate files. + no_output_files=True) + index_variables_task.set_upstream(annotate_task) + + validate_index_variables_task = create_python_task( + dag, + f"validate_{name}_index_variables", + pipeline.validate_indexed_variables, + pass_conf=False, + # declare that this task will not generate files. + no_output_files=True + ) + validate_index_variables_task.set_upstream([annotate_task, index_variables_task]) + + make_kgx_task = create_python_task( + dag, + f"make_kgx_{name}", + pipeline.make_kg_tagged, + pass_conf=False) + make_kgx_task.set_upstream(annotate_task) + + crawl_task = create_python_task( + dag, + f"crawl_{name}", + pipeline.crawl_tranql, + pass_conf=False) + crawl_task.set_upstream(annotate_task) + + index_concepts_task = create_python_task( + dag, + f"index_{name}_concepts", + pipeline.index_concepts, + pass_conf=False, + # declare that this task will not generate files. + no_output_files=True) + index_concepts_task.set_upstream(crawl_task) + + validate_index_concepts_task = create_python_task( + dag, + f"validate_{name}_index_concepts", + pipeline.validate_indexed_concepts, + pass_conf=False, + # declare that this task will not generate files. + no_output_files=True + ) + validate_index_concepts_task.set_upstream([crawl_task, index_concepts_task, annotate_task]) + + + complete_task = EmptyOperator(task_id=f"complete_{name}") + complete_task.set_upstream( + (make_kgx_task, + validate_index_variables_task, validate_index_concepts_task)) + + return tg diff --git a/dags/tranql_translate.py b/dags/tranql_translate.py deleted file mode 100755 index 53394ba2..00000000 --- a/dags/tranql_translate.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -# - -""" -An Airflow workflow for the Roger Translator KGX data pipeline. -""" - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -import roger -from roger.tasks import get_executor_config, default_args, create_python_task - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='tranql_translate', - default_args=default_args, - schedule_interval=None, - concurrency=16, -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - get_kgx = create_python_task (dag, "GetSource", roger.get_kgx) - create_nodes_schema = create_python_task(dag, "CreateNodesSchema", - roger.create_nodes_schema) - create_edges_schema = create_python_task(dag, "CreateEdgesSchema", - roger.create_edges_schema) - continue_task_bulk_load = EmptyOperator(task_id="continueBulkCreate") - continue_task_validate = EmptyOperator(task_id="continueValidation") - merge_nodes = create_python_task (dag, "MergeNodes", roger.merge_nodes) - create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", - roger.create_bulk_nodes) - create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", - roger.create_bulk_edges) - bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) - check_tranql = create_python_task(dag, "CheckTranql", - roger.check_tranql) - validate = create_python_task(dag, "Validate", roger.validate) - finish = EmptyOperator(task_id='Finish') - - """ Build the DAG. """ - intro >> get_kgx >> merge_nodes >> [create_nodes_schema, create_edges_schema ] >> continue_task_bulk_load >> \ - [create_bulk_load_nodes, create_bulk_load_edges] >> bulk_load >> continue_task_validate >>[validate, check_tranql ] >> finish diff --git a/requirements.txt b/requirements.txt index 9868ea75..d1b1f68f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,14 @@ boto3==1.18.23 botocore==1.21.23 -#black==21.10b0 elasticsearch==8.5.2 flatten-dict +jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@2.12.0 +git+https://github.com/helxplatform/dug@2.13.1 orjson kg-utils==0.0.6 bmt==1.1.0 +git+https://github.com/helxplatform/avalon.git@v1.0.1 linkml-runtime==1.6.0