Skip to content

Commit

Permalink
add materialised queries runner
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Dec 22, 2024
1 parent 4776df7 commit 46d2509
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

use flate2::write::ZlibEncoder;
use flate2::Compression;
use grebi_shared::get_id;
use std::io::BufReader;
use std::io::BufRead;
use std::io::BufWriter;
use std::io::StdinLock;
use std::io;
use std::io::Write;

Expand Down Expand Up @@ -50,3 +50,4 @@ fn main() {
}

}

82 changes: 36 additions & 46 deletions dataload/08_run_queries/run_queries.dockerpy
Original file line number Diff line number Diff line change
@@ -1,54 +1,44 @@
#!/usr/bin/env python3

import os
from py2neo import Graph
from pandas import DataFrame
from pathlib import Path
import glob
import sys
import sqlite3
import subprocess
from pandas import DataFrame
import json
from timeit import default_timer as timer

os.system('echo "dbms.security.auth_enabled=false" >> /var/lib/neo4j/conf/neo4j.conf')
os.system('neo4j start')
os.system('sleep 20')
os.system("neo4j start")
os.system("sleep 20")

from py2neo import Graph
import yaml
graph = Graph("bolt://localhost:7687")

for file in glob.glob("/mnt/*.cypher"):
print("Running query: " + file)
query = open(file).read()
df = DataFrame(graph.run(query).data())
df.to_csv("/out/" + os.path.basename(file).split(".")[0]+".csv", index=False)

os.system('neo4j stop')
os.system('sleep 20')


os.system('rm -f /out/materialised_queries.sqlite')
con = sqlite3.connect("/out/materialised_queries.sqlite")
cur = con.cursor()

cur.execute("CREATE TABLE metadata(json BLOB);")
for file in glob.glob("/mnt/*.json"):
cur.execute("INSERT INTO metadata VALUES (?);", (open(file).read(),))

for file in glob.glob("/out/*.csv"):
table_name = os.path.basename(file).split(".")[0]
with open(file) as f:
header = f.readline()
columns = header.split(",")
cur.execute("CREATE TABLE "
+ table_name
+ " (" + ", ".join([c + " TEXT" for c in columns]) + ");")

con.close()

for file in glob.glob("/out/*.csv"):
table_name = os.path.basename(file).split(".")[0]
result = subprocess.run(['sqlite3',
'/out/materialised_queries.sqlite',
'-cmd',
'.mode csv',
'.import --skip 1 ' + file + ' ' + table_name],
capture_output=True)
for file in os.listdir("/materialised_queries"):
if not file.endswith(".yaml"):
continue

query_id = Path(file).stem

query = yaml.safe_load(open(f"/materialised_queries/{file}"))

start_time = timer()

print(f"Running query {query_id}")
df = DataFrame(graph.run(query['cypher_query']).data())

end_time = timer()

query['start_time'] = start_time
query['end_time'] = end_time
query['time'] = end_time - start_time

print(f"Saving {len(df)} rows to {Path(f'/out/{query_id}.csv.gz')}")
df.to_csv(Path(f"/out/{query_id}.csv.gz"), index=False, compression="gzip")

with open(f"/out/{query_id}.json", "w") as f:
json.dump(query, f)

os.system("sleep 20")
os.system("neo4j stop")
os.system("sleep 20")

40 changes: 0 additions & 40 deletions dataload/08_run_queries/run_queries.py

This file was deleted.

53 changes: 27 additions & 26 deletions dataload/08_run_queries/run_queries.slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,62 @@
import shlex
import time
import glob
import argparse
from subprocess import Popen, PIPE, STDOUT

def main():

if len(sys.argv) < 2:
print("Usage: run_queries.py <grebi_config.json>")
exit(1)
parser = argparse.ArgumentParser(description='Create Neo4j DB')
parser.add_argument('--in-db-path', type=str, help='Path with the neo4j database to query', required=True)
parser.add_argument('--out-sqlites-path', type=str, help='Path for the output sqlite files of materialised results', required=True)
args = parser.parse_args()

has_singularity = os.system('which singularity') == 0

config_filename = os.path.abspath(sys.argv[1])
print(get_time() + " --- Config filename: " + config_filename, flush=True)
print(get_time() + " --- Run queries")

with open(config_filename, 'r') as f:
config = json.load(f)
neo_path = args.in_db_path
neo_data_path = os.path.abspath(os.path.join(neo_path, "data"))
neo_logs_path = os.path.abspath(os.path.join(neo_path, "logs"))

neo_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j"))
neo_data_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j", "data"))
neo_logs_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "07_create_db", "neo4j", "logs"))
out_path = os.path.abspath(os.path.join(os.environ['GREBI_HPS_TMP'], os.environ['GREBI_CONFIG'], "08_run_queries"))
sqlites_path = args.out_sqlites_path

os.system('rm -rf ' + shlex.quote(out_path))
os.makedirs(out_path, exist_ok=True)
os.makedirs(sqlites_path)

if os.environ['GREBI_USE_SLURM'] == "1":
if has_singularity:
cmd = ' '.join([
'JAVA_OPTS=\'-server -Xms150g -Xmx150g\'',
'JAVA_OPTS=\'-server -Xms50g -Xmx50g\'',
'singularity run',
'--bind ' + os.path.abspath("./queries") + ':/mnt',
'--bind ' + shlex.quote(out_path) + ':/out',
'--bind ' + os.path.abspath(".") + ':/mnt',
'--bind ' + shlex.quote(neo_data_path) + ':/data',
'--bind ' + shlex.quote(neo_logs_path) + ':/logs',
'--bind ' + os.path.abspath('./08_run_queries/run_queries.dockerpy') + ':/run_queries.py',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], 'materialised_queries', os.environ['GREBI_SUBGRAPH'] )) + ':/materialised_queries',
'--bind ' + os.path.abspath(args.out_sqlites_path) + ':/out',
'--writable-tmpfs',
'--network=none',
'--env NEO4J_AUTH=none',
'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:latest',
'python3 /run_queries.py'
'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
'python3 /run_queries.dockerpy'
])
else:
os.system('chmod 777 ' + shlex.quote(out_path))
cmd = ' '.join([
'docker run',
'-v ' + os.path.abspath("./queries") + ':/mnt',
'-v ' + shlex.quote(out_path) + ':/out',
'--user="$(id -u):$(id -g)"'
'-v ' + shlex.quote(neo_data_path) + ':/data',
'-v ' + shlex.quote(neo_logs_path) + ':/logs',
'-v ' + os.path.abspath('./08_run_queries/run_queries.dockerpy') + ':/run_queries.py',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], 'materialised_queries', os.environ['GREBI_SUBGRAPH'] )) + ':/materialised_queries',
'-v ' + os.path.abspath(args.out_sqlites_path) + ':/out',
'-e NEO4J_AUTH=none',
'ghcr.io/ebispot/grebi_neo4j_with_extras:latest',
'ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
'python3 /run_queries.py'
])

print(cmd)

if os.system(cmd) != 0:
print("run queries failed")
print("neo4j import failed")
exit(1)


Expand Down
Empty file.
29 changes: 29 additions & 0 deletions dataload/materialised_queries/impc_x_gwas/impc_x_gwas.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
title: Human GWAS variants to mouse models in IMPC
description: Connects human GWAS variants to mouse models in IMPC using multiple different graph paths through phenotype and disease ontologies
uses_datasources:
- IMPC
- GWAS
- OLS.mp
- OLS.hp
- OLS.upheno
- OLS.efo
- OLS.mondo
- OLS.doid
- OLS.oba
cypher_query: |-
MATCH (snp:`gwas:SNP`)-[:`gwas:associated_with`]->(trait)
MATCH (trait)<-[:`upheno:phenotypeToTrait`]-(speciesNeutralPhenotype)
MATCH (speciesNeutralPhenotype)<-[:`biolink:broad_match`]-(descendantPhenotype)-[:sourceId]->(descendantSourceId)
WHERE "OLS.mp" IN descendantPhenotype.`grebi:datasources`
MATCH (descendantPhenotype)<-[:`impc:phenotype`]-(mouseGene)
RETURN "gwas->oba->upheno->mp->impc" AS graph_path,
[id in snp.id WHERE id =~ "rs[0-9]*" | id][0] AS gwas_variant,
[id in trait.id WHERE id =~ "oba:.*" | id][0] AS trait_id,
trait.`grebi:name`[0] as trait_name,
[id in speciesNeutralPhenotype.id WHERE id =~ "upheno:[0-9]*" | id][0] AS species_neutral_phenotype_id,
speciesNeutralPhenotype.`grebi:name`[0] AS species_neutral_phenotype_name,
[id in descendantPhenotype.id WHERE id =~ "mp:[0-9]*" | id][0] AS mouse_phenotype,
descendantPhenotype.`grebi:name`[0] AS mouse_phenotype_name,
mouseGene.`grebi:name`[0] AS mouse_gene_name,
[id in mouseGene.id WHERE id =~ "mgi:[0-9]*" | id][0] AS mouse_gene_id
33 changes: 31 additions & 2 deletions dataload/nextflow/load_subgraph.nf
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ workflow {
prepare_neo.out.nodes.collect() +
prepare_neo.out.edges.collect() +
prepare_neo.out.id_edges.collect() +
ids_csv.collect()
)
ids_csv.collect()
)

mat_queries_sqlites = run_materialised_queries(neo_db)

solr_inputs = prepare_solr(materialise.out.nodes, materialise.out.edges)
solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.names_txt)
Expand Down Expand Up @@ -433,6 +435,33 @@ process create_neo {
"""
}

process run_materialised_queries {
cache "lenient"
memory "8 GB"
time "8h"
cpus "8"
neo_tmp_path "/dev/shm"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

input:
path(neo_db)

output:
path("materialised_queries")

script:
"""
#!/usr/bin/env bash
set -Eeuo pipefail
cp -r ${neo_db}/* ${task.neo_tmp_path}
PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/run_queries.py \
--in-db-path ${task.neo_tmp_path} \
--out-sqlites-path materialised_queries
"""
}


process create_solr_nodes_core {
cache "lenient"
memory "4 GB"
Expand Down
7 changes: 7 additions & 0 deletions dataload/nextflow/saturos_nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,12 @@ process {
}
}

process {
withName: run_materialised_queries {
memory = 1500.GB
cpus = 8
neo_tmp_path = "/dev/shm"
}
}


10 changes: 0 additions & 10 deletions dataload/queries/genes_to_diseases_ranked_by_otar_score.cypher

This file was deleted.

6 changes: 0 additions & 6 deletions dataload/queries/gwas_snps_to_human_diseases.cypher

This file was deleted.

3 changes: 0 additions & 3 deletions dataload/queries/mouse_genes_to_human_diseases.cypher

This file was deleted.

4 changes: 0 additions & 4 deletions dataload/queries/mouse_genes_to_human_diseases.json

This file was deleted.

4 changes: 0 additions & 4 deletions dataload/queries/mouse_phenotypes_by_system.cypher

This file was deleted.

4 changes: 0 additions & 4 deletions dataload/queries/mouse_phenotypes_by_system.json

This file was deleted.

16 changes: 0 additions & 16 deletions dataload/queries/run_query.py

This file was deleted.

2 changes: 1 addition & 1 deletion dataload/scripts/dataload_codon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export GREBI_TIMESTAMP=$(date +%Y_%m_%d__%H_%M)
export GREBI_MAX_ENTITIES=1000000000
export GREBI_NEXTFLOW_CONFIG=$GREBI_DATALOAD_HOME/nextflow/codon_nextflow.config
module load nextflow-22.10.1-gcc-11.2.0-ju5saqw
module load python
module load python-3.10.2-gcc-9.3.0-gswnsij
source /nfs/production/parkinso/spot/grebi/.venv/bin/activate
cd /hps/nobackup/parkinso/spot/grebi/
export PYTHONUNBUFFERED=true
Expand Down

0 comments on commit 46d2509

Please sign in to comment.