diff --git a/dataload/06_prepare_db_import/grebi_make_compressed_blob/src/main.rs b/dataload/06_prepare_db_import/grebi_make_compressed_blob/src/main.rs index 03946dd..d7c470a 100644 --- a/dataload/06_prepare_db_import/grebi_make_compressed_blob/src/main.rs +++ b/dataload/06_prepare_db_import/grebi_make_compressed_blob/src/main.rs @@ -1,10 +1,10 @@ - use flate2::write::ZlibEncoder; use flate2::Compression; use grebi_shared::get_id; use std::io::BufReader; use std::io::BufRead; use std::io::BufWriter; +use std::io::StdinLock; use std::io; use std::io::Write; @@ -50,3 +50,4 @@ fn main() { } } + diff --git a/dataload/08_run_queries/run_queries.dockerpy b/dataload/08_run_queries/run_queries.dockerpy index 34690d4..a49e875 100644 --- a/dataload/08_run_queries/run_queries.dockerpy +++ b/dataload/08_run_queries/run_queries.dockerpy @@ -1,54 +1,44 @@ #!/usr/bin/env python3 import os -from py2neo import Graph -from pandas import DataFrame from pathlib import Path -import glob -import sys -import sqlite3 -import subprocess +from pandas import DataFrame +import json +from timeit import default_timer as timer -os.system('echo "dbms.security.auth_enabled=false" >> /var/lib/neo4j/conf/neo4j.conf') -os.system('neo4j start') -os.system('sleep 20') +os.system("neo4j start") +os.system("sleep 20") +from py2neo import Graph +import yaml graph = Graph("bolt://localhost:7687") -for file in glob.glob("/mnt/*.cypher"): - print("Running query: " + file) - query = open(file).read() - df = DataFrame(graph.run(query).data()) - df.to_csv("/out/" + os.path.basename(file).split(".")[0]+".csv", index=False) - -os.system('neo4j stop') -os.system('sleep 20') - - -os.system('rm -f /out/materialised_queries.sqlite') -con = sqlite3.connect("/out/materialised_queries.sqlite") -cur = con.cursor() - -cur.execute("CREATE TABLE metadata(json BLOB);") -for file in glob.glob("/mnt/*.json"): - cur.execute("INSERT INTO metadata VALUES (?);", (open(file).read(),)) - -for file in glob.glob("/out/*.csv"): - table_name = os.path.basename(file).split(".")[0] - with open(file) as f: - header = f.readline() - columns = header.split(",") - cur.execute("CREATE TABLE " - + table_name - + " (" + ", ".join([c + " TEXT" for c in columns]) + ");") - -con.close() - -for file in glob.glob("/out/*.csv"): - table_name = os.path.basename(file).split(".")[0] - result = subprocess.run(['sqlite3', - '/out/materialised_queries.sqlite', - '-cmd', - '.mode csv', - '.import --skip 1 ' + file + ' ' + table_name], - capture_output=True) +for file in os.listdir("/materialised_queries"): + if not file.endswith(".yaml"): + continue + + query_id = Path(file).stem + + query = yaml.safe_load(open(f"/materialised_queries/{file}")) + + start_time = timer() + + print(f"Running query {query_id}") + df = DataFrame(graph.run(query['cypher_query']).data()) + + end_time = timer() + + query['start_time'] = start_time + query['end_time'] = end_time + query['time'] = end_time - start_time + + print(f"Saving {len(df)} rows to {Path(f'/out/{query_id}.csv.gz')}") + df.to_csv(Path(f"/out/{query_id}.csv.gz"), index=False, compression="gzip") + + with open(f"/out/{query_id}.json", "w") as f: + json.dump(query, f) + +os.system("sleep 20") +os.system("neo4j stop") +os.system("sleep 20") + diff --git a/dataload/08_run_queries/run_queries.py b/dataload/08_run_queries/run_queries.py deleted file mode 100644 index 0a4e63e..0000000 --- a/dataload/08_run_queries/run_queries.py +++ /dev/null @@ -1,40 +0,0 @@ - - -import json -import os -import sys -import shlex -import time -import glob -from subprocess import Popen, PIPE, STDOUT - -def main(): - config_filename = os.path.abspath(os.path.join('./configs/pipeline_configs/', os.environ['GREBI_CONFIG'] + '.json')) - print(get_time() + " --- Config filename: " + config_filename, flush=True) - - with open(config_filename, 'r') as f: - config = json.load(f) - - if os.environ['GREBI_USE_SLURM'] == "1": - cmd = ' '.join([ - 'srun -t 23:0:0 --mem 180g -c 32', - 'python3', - './08_run_queries/run_queries.slurm.py', - config_filename - ]) - else: - cmd = ' '.join([ - 'python3', - './08_run_queries/run_queries.slurm.py', - config_filename - ]) - - if os.system(cmd) != 0: - print("run queries failed") - exit(1) - -def get_time(): - return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) - -if __name__=="__main__": - main() diff --git a/dataload/08_run_queries/run_queries.slurm.py b/dataload/08_run_queries/run_queries.slurm.py index 48977fd..0cc7e4e 100644 --- a/dataload/08_run_queries/run_queries.slurm.py +++ b/dataload/08_run_queries/run_queries.slurm.py @@ -5,61 +5,62 @@ import shlex import time import glob +import argparse from subprocess import Popen, PIPE, STDOUT def main(): - if len(sys.argv) < 2: - print("Usage: run_queries.py ") - exit(1) + parser = argparse.ArgumentParser(description='Create Neo4j DB') + parser.add_argument('--in-db-path', type=str, help='Path with the neo4j database to query', required=True) + parser.add_argument('--out-sqlites-path', type=str, help='Path for the output sqlite files of materialised results', required=True) + args = parser.parse_args() + + has_singularity = os.system('which singularity') == 0 - config_filename = os.path.abspath(sys.argv[1]) - print(get_time() + " --- Config filename: " + config_filename, flush=True) + print(get_time() + " --- Run queries") - with open(config_filename, 'r') as f: - config = json.load(f) + neo_path = args.in_db_path + neo_data_path = os.path.abspath(os.path.join(neo_path, "data")) + neo_logs_path = os.path.abspath(os.path.join(neo_path, "logs")) - neo_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j")) - neo_data_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j", "data")) - neo_logs_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j", "logs")) - out_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "08_run_queries")) + sqlites_path = args.out_sqlites_path - os.system('rm -rf ' + shlex.quote(out_path)) - os.makedirs(out_path, exist_ok=True) + os.makedirs(sqlites_path) - if os.environ['GREBI_USE_SLURM'] == "1": + if has_singularity: cmd = ' '.join([ - 'JAVA_OPTS=\'-server -Xms150g -Xmx150g\'', + 'JAVA_OPTS=\'-server -Xms50g -Xmx50g\'', 'singularity run', - '--bind ' + os.path.abspath("./queries") + ':/mnt', - '--bind ' + shlex.quote(out_path) + ':/out', + '--bind ' + os.path.abspath(".") + ':/mnt', '--bind ' + shlex.quote(neo_data_path) + ':/data', '--bind ' + shlex.quote(neo_logs_path) + ':/logs', - '--bind ' + os.path.abspath('./08_run_queries/run_queries.dockerpy') + ':/run_queries.py', + '--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py', + '--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], 'materialised_queries', os.environ['GREBI_SUBGRAPH'] )) + ':/materialised_queries', + '--bind ' + os.path.abspath(args.out_sqlites_path) + ':/out', '--writable-tmpfs', '--network=none', '--env NEO4J_AUTH=none', - 'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:latest', - 'python3 /run_queries.py' + 'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0', + 'python3 /run_queries.dockerpy' ]) else: - os.system('chmod 777 ' + shlex.quote(out_path)) cmd = ' '.join([ 'docker run', - '-v ' + os.path.abspath("./queries") + ':/mnt', - '-v ' + shlex.quote(out_path) + ':/out', + '--user="$(id -u):$(id -g)"' '-v ' + shlex.quote(neo_data_path) + ':/data', '-v ' + shlex.quote(neo_logs_path) + ':/logs', - '-v ' + os.path.abspath('./08_run_queries/run_queries.dockerpy') + ':/run_queries.py', + '-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py', + '-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], 'materialised_queries', os.environ['GREBI_SUBGRAPH'] )) + ':/materialised_queries', + '-v ' + os.path.abspath(args.out_sqlites_path) + ':/out', '-e NEO4J_AUTH=none', - 'ghcr.io/ebispot/grebi_neo4j_with_extras:latest', + 'ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0', 'python3 /run_queries.py' ]) print(cmd) if os.system(cmd) != 0: - print("run queries failed") + print("neo4j import failed") exit(1) diff --git a/dataload/materialised_queries/ebi_monarch_xspecies/.gitkeep b/dataload/materialised_queries/ebi_monarch_xspecies/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/dataload/materialised_queries/impc_x_gwas/impc_x_gwas.yaml b/dataload/materialised_queries/impc_x_gwas/impc_x_gwas.yaml new file mode 100644 index 0000000..afc2151 --- /dev/null +++ b/dataload/materialised_queries/impc_x_gwas/impc_x_gwas.yaml @@ -0,0 +1,29 @@ +title: Human GWAS variants to mouse models in IMPC +description: Connects human GWAS variants to mouse models in IMPC using multiple different graph paths through phenotype and disease ontologies +uses_datasources: + - IMPC + - GWAS + - OLS.mp + - OLS.hp + - OLS.upheno + - OLS.efo + - OLS.mondo + - OLS.doid + - OLS.oba +cypher_query: |- + MATCH (snp:`gwas:SNP`)-[:`gwas:associated_with`]->(trait) + MATCH (trait)<-[:`upheno:phenotypeToTrait`]-(speciesNeutralPhenotype) + MATCH (speciesNeutralPhenotype)<-[:`biolink:broad_match`]-(descendantPhenotype)-[:sourceId]->(descendantSourceId) + WHERE "OLS.mp" IN descendantPhenotype.`grebi:datasources` + MATCH (descendantPhenotype)<-[:`impc:phenotype`]-(mouseGene) + RETURN "gwas->oba->upheno->mp->impc" AS graph_path, + [id in snp.id WHERE id =~ "rs[0-9]*" | id][0] AS gwas_variant, + [id in trait.id WHERE id =~ "oba:.*" | id][0] AS trait_id, + trait.`grebi:name`[0] as trait_name, + [id in speciesNeutralPhenotype.id WHERE id =~ "upheno:[0-9]*" | id][0] AS species_neutral_phenotype_id, + speciesNeutralPhenotype.`grebi:name`[0] AS species_neutral_phenotype_name, + [id in descendantPhenotype.id WHERE id =~ "mp:[0-9]*" | id][0] AS mouse_phenotype, + descendantPhenotype.`grebi:name`[0] AS mouse_phenotype_name, + mouseGene.`grebi:name`[0] AS mouse_gene_name, + [id in mouseGene.id WHERE id =~ "mgi:[0-9]*" | id][0] AS mouse_gene_id + diff --git a/dataload/nextflow/load_subgraph.nf b/dataload/nextflow/load_subgraph.nf index 3ed44ab..32e1631 100644 --- a/dataload/nextflow/load_subgraph.nf +++ b/dataload/nextflow/load_subgraph.nf @@ -42,8 +42,10 @@ workflow { prepare_neo.out.nodes.collect() + prepare_neo.out.edges.collect() + prepare_neo.out.id_edges.collect() + - ids_csv.collect() -) + ids_csv.collect() + ) + + mat_queries_sqlites = run_materialised_queries(neo_db) solr_inputs = prepare_solr(materialise.out.nodes, materialise.out.edges) solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.names_txt) @@ -433,6 +435,33 @@ process create_neo { """ } +process run_materialised_queries { + cache "lenient" + memory "8 GB" + time "8h" + cpus "8" + neo_tmp_path "/dev/shm" + + publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true + + input: + path(neo_db) + + output: + path("materialised_queries") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cp -r ${neo_db}/* ${task.neo_tmp_path} + PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/run_queries.py \ + --in-db-path ${task.neo_tmp_path} \ + --out-sqlites-path materialised_queries + """ +} + + process create_solr_nodes_core { cache "lenient" memory "4 GB" diff --git a/dataload/nextflow/saturos_nextflow.config b/dataload/nextflow/saturos_nextflow.config index 0fec219..d6d1431 100644 --- a/dataload/nextflow/saturos_nextflow.config +++ b/dataload/nextflow/saturos_nextflow.config @@ -63,5 +63,12 @@ process { } } +process { + withName: run_materialised_queries { + memory = 1500.GB + cpus = 8 + neo_tmp_path = "/dev/shm" + } +} diff --git a/dataload/queries/genes_to_diseases_ranked_by_otar_score.cypher b/dataload/queries/genes_to_diseases_ranked_by_otar_score.cypher deleted file mode 100644 index a5e6ef7..0000000 --- a/dataload/queries/genes_to_diseases_ranked_by_otar_score.cypher +++ /dev/null @@ -1,10 +0,0 @@ -MATCH (d:`biolink:Disease`)-[:id]->(id:Id {id: "mondo:0005044"}) -WITH d -MATCH (d)<-[r1:`gwas:associated_with`]-(s:`gwas:SNP`)-[]->(g:`hgnc:Gene`) -WITH d,s,r1,g -MATCH (s)-[r2]-(o:`otar:Evidence`) -WHERE o.`otar:variantEffect` is not null -WITH d,s, o,r1,r2,g -ORDER BY o.`otar:score` DESC -RETURN DISTINCT(g.`hgnc:symbol`[0]) as gene_symbol, d.`grebi:name`[0] as disease_name, o.`otar:variantEffect`[0] as variant_effect, toFloat(o.`otar:score`[0]) as otar_score -LIMIT 10 \ No newline at end of file diff --git a/dataload/queries/gwas_snps_to_human_diseases.cypher b/dataload/queries/gwas_snps_to_human_diseases.cypher deleted file mode 100644 index c9da736..0000000 --- a/dataload/queries/gwas_snps_to_human_diseases.cypher +++ /dev/null @@ -1,6 +0,0 @@ -MATCH (d:`biolink:Disease`)-[:id]->(id:Id {id: "mondo:0005044"}) -WITH d -MATCH (s:`gwas:SNP`)-[]->(d) -WITH s, d -MATCH p = (d)<-[]-(s)-[]->(g:`hgnc:Gene`) -RETURN p \ No newline at end of file diff --git a/dataload/queries/mouse_genes_to_human_diseases.cypher b/dataload/queries/mouse_genes_to_human_diseases.cypher deleted file mode 100644 index c04e71d..0000000 --- a/dataload/queries/mouse_genes_to_human_diseases.cypher +++ /dev/null @@ -1,3 +0,0 @@ -MATCH (mouse_gene:`impc:MouseGene`)-[:`impc:humanGeneOrthologues`]->(human_gene:`hgnc:Gene`)<-[:`otar:targetId`]-(evidence:`otar:Evidence`)-[:`otar:diseaseId`]->(disease:`ols:Class`) -RETURN mouse_gene.`impc:name`[0] as mouse_gene_name, disease.`ols:label`[0] as disease, evidence.`otar:score`[0] as score -ORDER BY score DESC diff --git a/dataload/queries/mouse_genes_to_human_diseases.json b/dataload/queries/mouse_genes_to_human_diseases.json deleted file mode 100644 index 43f70b3..0000000 --- a/dataload/queries/mouse_genes_to_human_diseases.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "title": "What are the disease associations for IMPC genes from OpenTargets?", - "datasources": ["IMPC","OpenTargets","OLS.mondo"] -} \ No newline at end of file diff --git a/dataload/queries/mouse_phenotypes_by_system.cypher b/dataload/queries/mouse_phenotypes_by_system.cypher deleted file mode 100644 index f606e7e..0000000 --- a/dataload/queries/mouse_phenotypes_by_system.cypher +++ /dev/null @@ -1,4 +0,0 @@ -MATCH (g:`impc:MouseGene`)-[:`biolink:has_phenotype`]->(phenotype:`ols:Class`)-[:`upheno:0000001`]->(anatomical_entity:`ols:Class`)-[:`bfo:part_of`]->(system:`ols:Class`) -WHERE "uberon:0000467" IN system.`ols:directAncestor` -RETURN g.`impc:name`[0] AS gene, system.`ols:label`[0] AS system, count(phenotype) as n_phenotype -ORDER BY n_phenotype DESC diff --git a/dataload/queries/mouse_phenotypes_by_system.json b/dataload/queries/mouse_phenotypes_by_system.json deleted file mode 100644 index b49ef53..0000000 --- a/dataload/queries/mouse_phenotypes_by_system.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "title": "Which mouse genes have the most and least phenotypes by system?", - "datasources": ["IMPC","OLS.uberon","OLS.mp"] -} \ No newline at end of file diff --git a/dataload/queries/run_query.py b/dataload/queries/run_query.py deleted file mode 100644 index 392cefe..0000000 --- a/dataload/queries/run_query.py +++ /dev/null @@ -1,16 +0,0 @@ - -import sys -import os -from pathlib import Path -from pandas import DataFrame - -f = open(sys.argv[1], "r") -query = f.read() - -from py2neo import Graph -graph = Graph("bolt://localhost:7687") -df = DataFrame(graph.run(query).data()) - -df.to_csv(Path(sys.argv[1]).with_suffix('.csv'), index=False) - - diff --git a/dataload/scripts/dataload_codon.sh b/dataload/scripts/dataload_codon.sh index f45fce4..65a0dc6 100755 --- a/dataload/scripts/dataload_codon.sh +++ b/dataload/scripts/dataload_codon.sh @@ -7,7 +7,7 @@ export GREBI_TIMESTAMP=$(date +%Y_%m_%d__%H_%M) export GREBI_MAX_ENTITIES=1000000000 export GREBI_NEXTFLOW_CONFIG=$GREBI_DATALOAD_HOME/nextflow/codon_nextflow.config module load nextflow-22.10.1-gcc-11.2.0-ju5saqw -module load python +module load python-3.10.2-gcc-9.3.0-gswnsij source /nfs/production/parkinso/spot/grebi/.venv/bin/activate cd /hps/nobackup/parkinso/spot/grebi/ export PYTHONUNBUFFERED=true