Skip to content

Commit

Permalink
fix pipeline for repo reorganisation and running locally
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Nov 9, 2024
1 parent c9ffc05 commit fc22223
Show file tree
Hide file tree
Showing 22 changed files with 3,197 additions and 1,394 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The resulting transient databases can be downloaded from https://ftp.ebi.ac.uk/p
| ---------- | ------ | --- | --- | --- |
| `ebi_monarch_xspecies` | All datasources with cross-species phenotype matches merged | ~130m | ~850m | ~900 GB |
| `ebi_monarch` | All datasources with cross-species phenotype matches separated | | | |
| `impc_x_gwas` | Limited to data from IMPC, GWAS Catalog, and related ontologies and mappings | | | |
| `impc_x_gwas` | Limited to data from IMPC, GWAS Catalog, and related ontologies and mappings | ~30m | ~184m | |

Note that the purpose of this pipeline is not to supply another knowledge graph, but to facilitate querying and analysis across existing ones. Consequently the above databases should be considered temporary and are subject to be removed and/or replaced with new ones without warning.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@
<field name="grebi__fromNodeId" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="grebi__fromSourceIds" type="string" indexed="true" stored="true" required="false" multiValued="true" />
<field name="grebi__toNodeId" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="grebi__type" type="string" indexed="true" stored="true" required="false" multiValued="true" />
<copyField source="grebi__type" dest="str_grebi__type"/>
<copyField source="grebi__type" dest="lowercase_grebi__type"/>
<field name="grebi__datasources" type="string" indexed="true" stored="true" required="false" multiValued="true" />
<copyField source="grebi__datasources" dest="str_grebi__datasources"/>
<copyField source="grebi__datasources" dest="lowercase_grebi__datasources"/>
Expand Down
8 changes: 4 additions & 4 deletions dataload/07_create_db/neo4j/neo4j_import.dockersh
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ sleep 20

echo Creating neo4j indexes...

cypher-shell -a neo4j://127.0.0.1:7687 --non-interactive -f /cypher/ic_scores_1.cypher
sleep 20
cypher-shell -a neo4j://127.0.0.1:7687 --non-interactive -f /cypher/ic_scores_2.cypher
sleep 20
#cypher-shell -a neo4j://127.0.0.1:7687 --non-interactive -f /cypher/ic_scores_1.cypher
#sleep 20
#cypher-shell -a neo4j://127.0.0.1:7687 --non-interactive -f /cypher/ic_scores_2.cypher
#sleep 20
cypher-shell -a neo4j://127.0.0.1:7687 --non-interactive -f /cypher/create_indexes.cypher

echo Creating neo4j indexes done
Expand Down
25 changes: 18 additions & 7 deletions dataload/07_create_db/solr/solr_import.dockerpy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def main():
port = sys.argv[2]
mem = sys.argv[3]

print(f"solr_import.dockerpy: core {core}, port {port}, mem {mem}")

os.chdir('/opt/solr')

core_data_path = f"/var/solr/data/{core}"
Expand All @@ -39,17 +41,19 @@ def main():
os.environ['SOLR_ENABLE_REMOTE_STREAMING'] = 'true'
os.environ['SOLR_SECURITY_MANAGER_ENABLED'] = 'false'

subprocess.run(['solr', 'start', '-m', mem, '-p', port, '-noprompt', '-force'])
cmd = ['solr', 'start', '-m', mem, '-p', port, '-noprompt', '-force']
print(' '.join(cmd))
subprocess.run(cmd)

time.sleep(5)
time.sleep(30)

subprocess.run(['wait-for-solr.sh', '--solr-url', f"http://localhost:{port}/solr/{core}/select?q=*:*"])
subprocess.run(['wait-for-solr.sh', '--solr-url', f"http://localhost:{port}"])

time.sleep(5)
time.sleep(30)

if core == "grebi_autocomplete":
if "_autocomplete" in core:
print("Uploading names.txt")
response = session.get(f"http://localhost:{port}/solr/grebi_autocomplete/update",
response = session.get(f"http://localhost:{port}/solr/{core}/update",
params={
'stream.file': '/names.txt',
'fieldnames': 'label',
Expand All @@ -61,8 +65,15 @@ def main():

else:
filenames = glob.glob('/mnt/*.jsonl')
print("ls /mnt")
os.system("ls -hl /mnt")
dbg_filenames=','.join(filenames)
print(f"Found filenames: {dbg_filenames}")
filenames_exist = list(filter(os.path.exists, filenames))
dbg_filenames2=','.join(filenames_exist)
print(f"Existing filenames: {dbg_filenames2}")
with ThreadPoolExecutor(max_workers=WORKERS) as executor:
futures = [executor.submit(upload_file, core, port, filename) for filename in filenames if os.path.exists(filename)]
futures = [executor.submit(upload_file, core, port, filename) for filename in filenames]

time.sleep(5)
response = session.get(f"http://127.0.0.1:{port}/solr/{core}/update",
Expand Down
6 changes: 2 additions & 4 deletions dataload/07_create_db/solr/solr_import.slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ def main():
'singularity run',
'--env PYTHONUNBUFFERED=TRUE',
'--env NO_PROXY=localhost',
# ] + list(map(lambda f: "--bind " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [
'--bind ' + os.path.abspath(".") + ':/mnt',
] + list(map(lambda f: "--bind " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [
('--bind ' + os.path.abspath(args.in_names_txt) + ':/names.txt') if args.in_names_txt != None else '',
'--bind ' + os.path.abspath(args.solr_config) + ':/config',
'--bind ' + os.path.abspath(args.out_path) + ':/var/solr',
Expand All @@ -50,8 +49,7 @@ def main():
'--user="$(id -u):$(id -g)" '
'-e PYTHONUNBUFFERED=TRUE',
'-e NO_PROXY=localhost',
#] + list(map(lambda f: "-v " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [
'-v ' + os.path.abspath(".") + ':/mnt',
] + list(map(lambda f: "-v " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [
('-v ' + os.path.abspath(args.in_names_txt) + ':/names.txt') if args.in_names_txt != None else '',
'-v ' + os.path.abspath(args.solr_config) + ':/config',
'-v ' + os.path.abspath(args.out_path) + ':/var/solr',
Expand Down
53 changes: 27 additions & 26 deletions dataload/nextflow/01_create_subgraph.nf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import groovy.json.JsonSlurper
jsonSlurper = new JsonSlurper()

params.tmp = "$GREBI_TMP"
params.fast_tmp = "$GREBI_FAST_TMP"
params.home = "$GREBI_DATALOAD_HOME"
params.config = "$GREBI_CONFIG"
params.subgraph = "$GREBI_SUBGRAPH"
Expand Down Expand Up @@ -117,7 +118,7 @@ process ingest {

process build_equiv_groups {
cache "lenient"
memory '16 GB'
memory '4 GB'
time '23h'

input:
Expand Down Expand Up @@ -174,7 +175,7 @@ process assign_ids {

process merge_ingests {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"

input:
Expand All @@ -199,7 +200,7 @@ process merge_ingests {

process index {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true
Expand Down Expand Up @@ -229,7 +230,7 @@ process index {

process materialise {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
//time { 1.hour + 8.hour * (task.attempt-1) }
//errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
Expand Down Expand Up @@ -267,7 +268,7 @@ process materialise {

process merge_summary_jsons {
cache "lenient"
memory "8 GB"
memory "4 GB"
time "1h"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true
Expand All @@ -288,7 +289,7 @@ process merge_summary_jsons {

process create_rocks {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "23h"
cpus "8"
errorStrategy 'retry'
Expand All @@ -308,14 +309,14 @@ process create_rocks {
set -Eeuo pipefail
cat ${materialised.iterator().join(" ")} \
| ${params.home}/target/release/grebi_make_rocks \
--rocksdb-path /dev/shm/rocksdb && \
mv /dev/shm/rocksdb ${params.subgraph}_rocksdb
--rocksdb-path ${params.fast_tmp}/rocksdb && \
mv ${params.fast_tmp}/rocksdb ${params.subgraph}_rocksdb
"""
}

process prepare_neo {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "1h"

publishDir "${params.tmp}/${params.config}/${params.subgraph}/neo4j_csv", overwrite: true
Expand Down Expand Up @@ -372,7 +373,7 @@ process prepare_solr {

process create_neo_ids_csv {
cache "lenient"
memory "8 GB"
memory "4 GB"
time "8h"
cpus "8"

Expand All @@ -392,9 +393,9 @@ process create_neo_ids_csv {

process create_neo {
cache "lenient"
memory "50 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand All @@ -416,9 +417,9 @@ process create_neo {

process create_solr_nodes_core {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "23h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}/solr_cores", overwrite: true, saveAs: { filename -> filename.replace("solr/data/", "") }

Expand All @@ -439,15 +440,15 @@ process create_solr_nodes_core {
--in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \
--out-config-dir solr_config
python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \
--solr-config solr_config --core grebi_nodes_${params.subgraph} --in-data . --out-path solr --port 8985 --mem ${task.memory.toGiga()-8}g
--solr-config solr_config --core grebi_nodes_${params.subgraph} --in-data . --out-path solr --port 8985 --mem ${task.memory.toGiga()-2}g
"""
}

process create_solr_edges_core {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "23h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}/solr_cores", overwrite: true, saveAs: { filename -> filename.replace("solr/data/", "") }

Expand All @@ -468,13 +469,13 @@ process create_solr_edges_core {
--in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \
--out-config-dir solr_config
python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \
--solr-config solr_config --core grebi_edges_${params.subgraph} --in-data . --out-path solr --port 8986 --mem ${task.memory.toGiga()-8}g
--solr-config solr_config --core grebi_edges_${params.subgraph} --in-data . --out-path solr --port 8986 --mem ${task.memory.toGiga()-2}g
"""
}

process create_solr_autocomplete_core {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "4h"
cpus "4"

Expand All @@ -495,15 +496,15 @@ process create_solr_autocomplete_core {
--in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \
--out-config-dir solr_config
python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \
--solr-config solr_config --core grebi_autocomplete_${params.subgraph} --in-data . --in-names-txt ${names_txt} --out-path solr --port 8987 --mem ${task.memory.toGiga()-8}g
--solr-config solr_config --core grebi_autocomplete_${params.subgraph} --in-data . --in-names-txt ${names_txt} --out-path solr --port 8987 --mem ${task.memory.toGiga()-2}g
"""
}

process package_neo {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand All @@ -521,9 +522,9 @@ process package_neo {

process package_rocks {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand All @@ -541,9 +542,9 @@ process package_rocks {

process package_solr {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand Down
21 changes: 10 additions & 11 deletions dataload/nextflow/02_create_dbs.nf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ workflow {

neo_db = create_neo(neo_nodes_files.collect() + neo_edges_files.collect() + ids_csv)

solr_tgz = package_solr( Channel.fromPath("${params.tmp}/${params.config}/*/solr_cores/*").collect())
rocks_tgz = package_rocks( Channel.fromPath("${params.tmp}/${params.config}/*/*_rocksdb").collect())
solr_tgz = package_solr( Channel.fromPath("${params.tmp}/${params.config}/*/solr_cores/*", type: 'dir').collect())
rocks_tgz = package_rocks( Channel.fromPath("${params.tmp}/${params.config}/*/*_rocksdb", type: 'dir').collect())

neo_tgz = package_neo(neo_db)

Expand All @@ -39,7 +39,7 @@ workflow {

process create_combined_neo_ids_csv {
cache "lenient"
memory "8 GB"
memory "4 GB"
time "8h"
cpus "8"

Expand All @@ -61,9 +61,8 @@ process create_combined_neo_ids_csv {

process create_neo {
cache "lenient"
memory "50 GB"
memory "4 GB"
time "8h"
cpus "16"

publishDir "${params.tmp}/${params.config}", overwrite: true

Expand All @@ -85,9 +84,9 @@ process create_neo {

process package_neo {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}", overwrite: true

Expand All @@ -105,9 +104,9 @@ process package_neo {

process package_rocks {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}", overwrite: true

Expand All @@ -125,9 +124,9 @@ process package_rocks {

process package_solr {
cache "lenient"
memory "16 GB"
memory "4 GB"
time "8h"
cpus "16"
cpus "8"

publishDir "${params.tmp}/${params.config}", overwrite: true

Expand Down
Loading

0 comments on commit fc22223

Please sign in to comment.