diff --git a/01_ingest/grebi_ingest_ols/src/main.rs b/01_ingest/grebi_ingest_ols/src/main.rs index d8bee6f..d78df6d 100644 --- a/01_ingest/grebi_ingest_ols/src/main.rs +++ b/01_ingest/grebi_ingest_ols/src/main.rs @@ -266,7 +266,7 @@ fn read_entities(json: &mut JsonStreamReader>>, output_n output_nodes.write_all(r#","#.as_bytes()).unwrap(); } output_nodes.write_all(r#"""#.as_bytes()).unwrap(); - output_nodes.write_all(name.as_bytes()).unwrap(); + write_escaped_string(&name.as_bytes(), output_nodes); output_nodes.write_all(r#"""#.as_bytes()).unwrap(); } output_nodes.write_all(r#"]"#.as_bytes()).unwrap(); @@ -434,8 +434,17 @@ fn reprefix_predicate(pred:&str) -> String { } } - - - +fn write_escaped_string(str:&[u8], writer:&mut BufWriter) { + for c in str { + match c { + b'"' => { writer.write_all(b"\\\"").unwrap(); } + b'\\' => { writer.write_all(b"\\\\").unwrap(); } + b'\n' => { writer.write_all(b"\\n").unwrap(); } + b'\r' => { writer.write_all(b"\\r").unwrap(); } + b'\t' => { writer.write_all(b"\\t").unwrap(); } + _ => { writer.write_all([*c].as_slice()).unwrap(); } + } + } +} diff --git a/01_ingest/grebi_ingest_rdf/src/main.rs b/01_ingest/grebi_ingest_rdf/src/main.rs index 03927c6..2d00a48 100644 --- a/01_ingest/grebi_ingest_rdf/src/main.rs +++ b/01_ingest/grebi_ingest_rdf/src/main.rs @@ -74,7 +74,10 @@ struct Args { nest_objects_of_predicate:Vec, #[arg(long)] - exclude_objects_of_predicate:Vec // if an object is used with this predicate, ignore the object + exclude_objects_of_predicate:Vec, // if an object is used with this predicate, ignore the object + + #[arg(long, default_value_t = false)] + rdf_types_are_grebi_types:bool } fn main() -> std::io::Result<()> { @@ -93,6 +96,7 @@ fn main() -> std::io::Result<()> { let nest_preds:BTreeSet = args.nest_objects_of_predicate.into_iter().collect(); let ignore_preds:BTreeSet = args.exclude_objects_of_predicate.into_iter().collect(); + let rdf_types_are_grebi_types = args.rdf_types_are_grebi_types; let gr:CustomGraph = match args.rdf_type.as_str() { "rdf_triples_xml" => { @@ -102,14 +106,11 @@ fn main() -> std::io::Result<()> { }, "rdf_quads_nq" => { - if args.rdf_graph.len() == 0 { - panic!("must specify at least one graph to load for nquads"); - } - let parser = NQuadsParser {}; let quad_source = parser.parse(reader); - let mut filtered_quads = quad_source.filter_quads(|q| args.rdf_graph.contains(&q.g().unwrap().value().to_string())); + let mut filtered_quads = quad_source.filter_quads(|q| + args.rdf_graph.len() == 0 || args.rdf_graph.contains(&q.g().unwrap().value().to_string())); let mut g:CustomGraph = CustomGraph::new(); @@ -160,7 +161,7 @@ fn main() -> std::io::Result<()> { eprintln!("Building reification index took {} seconds", start_time.elapsed().as_secs()); - write_subjects(ds, &mut output_nodes, &nest_preds, &exclude_subjects, &exclude_subjects_at_toplevel, reifs); + write_subjects(ds, &mut output_nodes, &nest_preds, &exclude_subjects, &exclude_subjects_at_toplevel, reifs, rdf_types_are_grebi_types); eprintln!("Total time elapsed: {} seconds", start_time.elapsed().as_secs()); @@ -189,7 +190,7 @@ fn populate_reifs( let annotated_predicate = ds.triples_matching(&s, &pred_prop, &ANY).next().unwrap().unwrap().o().clone(); let annotated_object = ds.triples_matching(&s, &obj_prop, &ANY).next().unwrap().unwrap().o().clone(); - let obj_json = term_to_json(&annotated_object, ds, nest_preds, None).to_string(); + let obj_json = term_to_json(&annotated_object, ds, nest_preds, None, false).to_string(); let lhs = ReifLhs { s: annotated_subject.clone(), @@ -212,7 +213,14 @@ fn populate_reifs( } -fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter, nest_preds:&BTreeSet, exclude_subjects:&HashSet>>, exclude_subjects_at_toplevel:&HashSet>>, reifs:HashMap>>>) { +fn write_subjects( + ds:&CustomGraph, + nodes_writer:&mut BufWriter, + nest_preds:&BTreeSet, + exclude_subjects:&HashSet>>, + exclude_subjects_at_toplevel:&HashSet>>, + reifs:HashMap>>>, + rdf_types_are_grebi_types:bool) { let start_time2 = std::time::Instant::now(); @@ -229,7 +237,7 @@ fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter, nest continue; } - let json = term_to_json(s, ds, nest_preds, Some(&reifs)); + let json = term_to_json(s, ds, nest_preds, Some(&reifs), rdf_types_are_grebi_types); let json_obj = json.as_object().unwrap(); let types = json_obj.get("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); @@ -252,7 +260,13 @@ fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter, nest eprintln!("Writing JSONL took {} seconds", start_time2.elapsed().as_secs()); } -fn term_to_json(term:&Term>, ds:&CustomGraph, nest_preds:&BTreeSet, reifs:Option<&HashMap>>>>) -> Value { +fn term_to_json( + term:&Term>, + ds:&CustomGraph, + nest_preds:&BTreeSet, + reifs:Option<&HashMap>>>>, + rdf_types_are_grebi_types:bool +) -> Value { let triples = ds.triples_matching(term, &ANY, &ANY); @@ -285,7 +299,7 @@ fn term_to_json(term:&Term>, ds:&CustomGraph, nest_preds:&BTreeSet>, ds:&CustomGraph, nest_preds:&BTreeSet { - let mut obj = term_to_json(o, ds, nest_preds, reifs); + let mut obj = term_to_json(o, ds, nest_preds, reifs, false); let obj_o = obj.as_object_mut().unwrap(); obj_o.remove_entry("id"); obj @@ -314,14 +328,14 @@ fn term_to_json(term:&Term>, ds:&CustomGraph, nest_preds:&BTreeSet Value::String( o.value().to_string() ), - BlankNode => term_to_json(o, ds, nest_preds, reifs), + BlankNode => term_to_json(o, ds, nest_preds, reifs, false), Variable => todo!(), } } }; if reif_subj.is_some() { - let mut reif_as_json = term_to_json(reif_subj.unwrap(), ds, nest_preds, None); + let mut reif_as_json = term_to_json(reif_subj.unwrap(), ds, nest_preds, None, false); let reif_as_json_o = reif_as_json.as_object_mut().unwrap(); reif_as_json_o.remove_entry("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"); reif_as_json_o.remove_entry("id"); @@ -340,6 +354,10 @@ fn term_to_json(term:&Term>, ds:&CustomGraph, nest_preds:&BTreeSet, + #[arg(trailing_var_arg = true, allow_hyphen_values = true, required = true)] _files: Vec, } @@ -51,6 +54,8 @@ fn main() -> std::io::Result<()> { input_filenames.sort(); input_filenames.dedup(); + let subgraph_name:Option = args.annotate_subgraph_name; + let mut inputs: Vec = input_filenames .iter() .map(|file| { @@ -110,7 +115,7 @@ fn main() -> std::io::Result<()> { if !id.eq(&cur_id) { // this is a new subject; we have finished the old one (if present) if cur_id.len() > 0 { - write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props); + write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props, &subgraph_name); lines_to_write.clear(); } cur_id = id.to_vec(); @@ -143,7 +148,7 @@ fn main() -> std::io::Result<()> { } if cur_id.len() > 0 { - write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props); + write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props, &subgraph_name); lines_to_write.clear(); } @@ -153,7 +158,7 @@ fn main() -> std::io::Result<()> { } #[inline(always)] -fn write_merged_entity(lines_to_write: &Vec, stdout: &mut BufWriter, inputs: &Vec, exclude_props:&BTreeSet>) { +fn write_merged_entity(lines_to_write: &Vec, stdout: &mut BufWriter, inputs: &Vec, exclude_props:&BTreeSet>, subgraph_name:&Option) { if lines_to_write.len() == 0 { panic!(); @@ -220,6 +225,12 @@ fn write_merged_entity(lines_to_write: &Vec, stdout: &mut BufWrite } stdout.write_all(r#"]"#.as_bytes()).unwrap(); + if subgraph_name.is_some() { + stdout.write_all(r#","grebi:subgraph":""#.as_bytes()).unwrap(); + stdout.write_all(&subgraph_name.as_ref().unwrap().as_bytes()); + stdout.write_all(r#"""#.as_bytes()).unwrap(); + } + // sort by key, then value, then datasource merged_props.sort_by(|a, b| { match a.1.key.cmp(&b.1.key) { diff --git a/04_index/grebi_index/src/main.rs b/04_index/grebi_index/src/main.rs index 84070b3..1eafadb 100644 --- a/04_index/grebi_index/src/main.rs +++ b/04_index/grebi_index/src/main.rs @@ -26,6 +26,9 @@ use serde_json::json; #[command(author, version, about, long_about = None)] struct Args { + #[arg(long)] + subgraph_name: String, + #[arg(long)] out_summary_json_path: String, @@ -195,6 +198,7 @@ fn main() { summary_writer.write_all( serde_json::to_string_pretty(&json!({ + "subgraph_name": args.subgraph_name, "entity_props": entity_props_to_count.iter().map(|(k,v)| { return (String::from_utf8(k.to_vec()).unwrap(), json!({ "count": v diff --git a/06_prepare_db_import/grebi_make_csv/src/main.rs b/06_prepare_db_import/grebi_make_csv/src/main.rs index d3d3c9d..dabb751 100644 --- a/06_prepare_db_import/grebi_make_csv/src/main.rs +++ b/06_prepare_db_import/grebi_make_csv/src/main.rs @@ -5,6 +5,7 @@ use std::io::BufWriter; use std::io::BufReader; use std::io::Write; use std::io::BufRead; +use std::collections::HashSet; use clap::Parser; use grebi_shared::json_lexer::JsonTokenType; use grebi_shared::slice_materialised_edge::SlicedEdge; @@ -28,7 +29,7 @@ struct Args { in_edges_jsonl: String, #[arg(long)] - in_summary_json: String, + in_summary_jsons: String, #[arg(long)] out_nodes_csv_path: String, @@ -43,10 +44,21 @@ fn main() -> std::io::Result<()> { let start_time = std::time::Instant::now(); - let summary:Value = serde_json::from_reader(File::open(args.in_summary_json).unwrap()).unwrap(); - let all_entity_props: Vec = summary["entity_props"].as_object().unwrap().keys().cloned().collect(); - let all_edge_props: Vec = summary["edge_props"].as_object().unwrap().keys().cloned().collect(); + let mut all_entity_props: HashSet = HashSet::new(); + let mut all_edge_props: HashSet = HashSet::new(); + + + for f in args.in_summary_jsons.split(",") { + let summary:Value = serde_json::from_reader(File::open(f).unwrap()).unwrap(); + for prop in summary["edge_props"].as_object().unwrap().keys() { + all_edge_props.insert(prop.to_string()); + } + for prop in summary["entity_props"].as_object().unwrap().keys() { + all_entity_props.insert(prop.to_string()); + } + } + let mut nodes_reader = BufReader::new(File::open(args.in_nodes_jsonl).unwrap()); @@ -140,7 +152,7 @@ fn main() -> std::io::Result<()> { Ok(()) } -fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&Vec, nodes_writer:&mut BufWriter<&File>) { +fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet, nodes_writer:&mut BufWriter<&File>) { let refs:Map = serde_json::from_slice(entity._refs.unwrap()).unwrap(); @@ -214,7 +226,7 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&Vec, nodes_writer.write_all(b"\n").unwrap(); } -fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&Vec, edges_writer: &mut BufWriter<&File>) { +fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&HashSet, edges_writer: &mut BufWriter<&File>) { let refs:Map = serde_json::from_slice(edge._refs.unwrap()).unwrap(); diff --git a/06_prepare_db_import/make_solr_autocomplete_config.py b/06_prepare_db_import/make_solr_autocomplete_config.py new file mode 100644 index 0000000..f86cf47 --- /dev/null +++ b/06_prepare_db_import/make_solr_autocomplete_config.py @@ -0,0 +1,35 @@ + +import json +import os +import sys +import shlex +import time +import glob +import argparse +from pathlib import Path +from subprocess import Popen, PIPE, STDOUT + + +def main(): + parser = argparse.ArgumentParser(description='Create Solr autocomplete config') + parser.add_argument('--subgraph-name', type=str, help='subgraph name', required=True) + parser.add_argument('--in-template-config-dir', type=str, help='Path of config template', required=True) + parser.add_argument('--out-config-dir', type=str, help='Path to write config', required=True) + args = parser.parse_args() + + os.makedirs(args.out_config_dir) + + autocomplete_core_path = os.path.join(args.out_config_dir, f'grebi_autocomplete_{args.subgraph_name}') + os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_autocomplete")) + ' ' + shlex.quote(autocomplete_core_path)) + + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solr.xml")) + ' ' + shlex.quote(args.out_config_dir)) + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solrconfig.xml")) + ' ' + shlex.quote(args.out_config_dir)) + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "zoo.cfg")) + ' ' + shlex.quote(args.out_config_dir)) + + Path(f'{autocomplete_core_path}/core.properties').write_text(f"name=grebi_autocomplete_{args.subgraph_name}\n") + +if __name__=="__main__": + main() + + + diff --git a/06_prepare_db_import/make_solr_config.py b/06_prepare_db_import/make_solr_config.py index 11ba4ef..7e155b0 100644 --- a/06_prepare_db_import/make_solr_config.py +++ b/06_prepare_db_import/make_solr_config.py @@ -12,18 +12,31 @@ def main(): parser = argparse.ArgumentParser(description='Create Solr config') + parser.add_argument('--subgraph-name', type=str, help='subgraph name', required=True) parser.add_argument('--in-summary-json', type=str, help='summary.json', required=True) parser.add_argument('--in-template-config-dir', type=str, help='Path of config template', required=True) parser.add_argument('--out-config-dir', type=str, help='Path to write config', required=True) args = parser.parse_args() - os.system('cp -r ' + shlex.quote(args.in_template_config_dir) + ' ' + shlex.quote(args.out_config_dir)) + os.makedirs(args.out_config_dir) + + nodes_core_path = os.path.join(args.out_config_dir, f'grebi_nodes_{args.subgraph_name}') + edges_core_path = os.path.join(args.out_config_dir, f'grebi_edges_{args.subgraph_name}') + os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_nodes")) + ' ' + shlex.quote(nodes_core_path)) + os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_edges")) + ' ' + shlex.quote(edges_core_path)) + + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solr.xml")) + ' ' + shlex.quote(args.out_config_dir)) + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solrconfig.xml")) + ' ' + shlex.quote(args.out_config_dir)) + os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "zoo.cfg")) + ' ' + shlex.quote(args.out_config_dir)) summary = json.load(open(args.in_summary_json)) node_props = map(lambda f: f.replace(':', '__').replace('&', '_'), summary['entity_props'].keys()) edge_props = map(lambda f: f.replace(':', '__').replace('&', '_'), summary['edge_props'].keys()) - nodes_schema = Path(os.path.join(args.out_config_dir, 'grebi_nodes/conf/schema.xml')) + Path(f'{nodes_core_path}/core.properties').write_text(f"name=grebi_nodes_{args.subgraph_name}\n") + Path(f'{edges_core_path}/core.properties').write_text(f"name=grebi_edges_{args.subgraph_name}\n") + + nodes_schema = Path(f'{nodes_core_path}/conf/schema.xml') nodes_schema.write_text(nodes_schema.read_text().replace('[[GREBI_FIELDS]]', '\n'.join(list(map( lambda f: '\n'.join([ f'', @@ -31,7 +44,7 @@ def main(): f'' ]), node_props))))) - edges_schema = Path(os.path.join(args.out_config_dir, 'grebi_edges/conf/schema.xml')) + edges_schema = Path(f'{edges_core_path}/conf/schema.xml') edges_schema.write_text(edges_schema.read_text().replace('[[GREBI_FIELDS]]', '\n'.join(list(map( lambda f: '\n'.join([ f'', diff --git a/07_create_db/neo4j/create_indexes.cypher b/07_create_db/neo4j/create_indexes.cypher index 786e533..41b36ee 100644 --- a/07_create_db/neo4j/create_indexes.cypher +++ b/07_create_db/neo4j/create_indexes.cypher @@ -1,5 +1,7 @@ CREATE INDEX node_id FOR (n:GraphNode) ON n.`grebi:nodeId` ; +CREATE INDEX subgraph FOR (n:GraphNode) ON n.`grebi:subgraph` +; CALL db.awaitIndexes(10800) ; diff --git a/07_create_db/solr/solr_import.dockerpy b/07_create_db/solr/solr_import.dockerpy index 33d7378..26a122c 100644 --- a/07_create_db/solr/solr_import.dockerpy +++ b/07_create_db/solr/solr_import.dockerpy @@ -58,7 +58,7 @@ def main(): }) print(response.text) - elif core in {"grebi_nodes", "grebi_edges"}: + else: filenames = glob.glob('/mnt/*.jsonl') with ThreadPoolExecutor(max_workers=WORKERS) as executor: futures = [executor.submit(upload_file, core, port, filename) for filename in filenames if os.path.exists(filename)] @@ -68,10 +68,6 @@ def main(): params={'commit': 'true'}) print(response.text) - else: - print("Invalid core") - sys.exit(1) - os.environ['SOLR_STOP_WAIT'] = '500' subprocess.run(['solr', 'stop', '-p', port]) diff --git a/07_create_db/solr/solr_import.slurm.py b/07_create_db/solr/solr_import.slurm.py index 85c264c..97cf1c4 100644 --- a/07_create_db/solr/solr_import.slurm.py +++ b/07_create_db/solr/solr_import.slurm.py @@ -14,7 +14,7 @@ def main(): parser.add_argument('--solr-config', type=str, help='Solr config dir', required=True) parser.add_argument('--core', type=str, help='Core to import', required=True) parser.add_argument('--port', type=str, help='Port to use for temp solr instance', required=True) - parser.add_argument('--in-names-txt', type=str, help='Path to names.txt', required=True) + parser.add_argument('--in-names-txt', type=str, help='Path to names.txt', required=False) parser.add_argument('--in-data', type=str, help='Path to jsonl files to import', required=True) parser.add_argument('--out-path', type=str, help='Path to use to store the solr database', required=True) args = parser.parse_args() @@ -33,7 +33,7 @@ def main(): '--env NO_PROXY=localhost', # ] + list(map(lambda f: "--bind " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [ '--bind ' + os.path.abspath(".") + ':/mnt', - '--bind ' + os.path.abspath(args.in_names_txt) + ':/names.txt', + ('--bind ' + os.path.abspath(args.in_names_txt) + ':/names.txt') if args.in_names_txt != None else '', '--bind ' + os.path.abspath(args.solr_config) + ':/config', '--bind ' + os.path.abspath(args.out_path) + ':/var/solr', '--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_HOME'], '07_create_db/solr/solr_import.dockerpy')) + ':/import.py', @@ -51,7 +51,7 @@ def main(): '-e NO_PROXY=localhost', #] + list(map(lambda f: "-v " + os.path.abspath(f) + ":/mnt/" + os.path.basename(f), glob.glob(args.in_data + "/solr_*"))) + [ '-v ' + os.path.abspath(".") + ':/mnt', - '-v ' + os.path.abspath(args.in_names_txt) + ':/names.txt', + ('-v ' + os.path.abspath(args.in_names_txt) + ':/names.txt') if args.in_names_txt != None else '', '-v ' + os.path.abspath(args.solr_config) + ':/config', '-v ' + os.path.abspath(args.out_path) + ':/var/solr', '-v ' + os.path.abspath(os.path.join(os.environ['GREBI_HOME'], '07_create_db/solr/solr_import.dockerpy')) + ':/import.py', diff --git a/configs/datasource_configs/hra_kg.json b/configs/datasource_configs/hra_kg.json new file mode 100644 index 0000000..050e16e --- /dev/null +++ b/configs/datasource_configs/hra_kg.json @@ -0,0 +1,14 @@ +{ + "name": "HRA", + "enabled": true, + "ingests": [ + { + "ingest_files": ["./00_fetch_data/hra_kg/hra.nq.gz"], + "ingest_script": "./target/release/grebi_ingest_rdf", + "ingest_args": [ + { "name": "--rdf-type", "value": "rdf_quads_nq" }, + { "name": "--rdf-types-are-grebi-types", "value": "" } + ] + } + ] +} \ No newline at end of file diff --git a/configs/pipeline_configs/ebi.json b/configs/pipeline_configs/ebi.json new file mode 100644 index 0000000..6a9da6b --- /dev/null +++ b/configs/pipeline_configs/ebi.json @@ -0,0 +1,7 @@ +{ + "subgraphs": [ + "ebi_full_monarch", + "monarch", + "hra_kg" + ] +} \ No newline at end of file diff --git a/configs/pipeline_configs/ebi_test.json b/configs/pipeline_configs/ebi_test.json deleted file mode 100644 index ac36c41..0000000 --- a/configs/pipeline_configs/ebi_test.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "bytes_per_merged_file": 1073741824, - "equivalence_props": [ - "owl:equivalentClass", - "owl:equivalentProperty", - "owl:sameAs", - "grebi:equivalentTo", - "ols:iri", - "hgnc:ensembl_gene_id", - "obo:chebi/inchi", - "obo:chebi/inchikey", - "obo:chebi/smiles", - "impc:pmId" - ], - "exclude_edges": [ - "ols:iri", - "ols:shortForm", - "ols:curie", - "oboinowl:id" - ], - "datasource_configs": [ - "./configs/datasource_configs/ols.json" - ] -} diff --git a/configs/pipeline_configs/hra_only.json b/configs/pipeline_configs/hra_only.json new file mode 100644 index 0000000..92f8fd3 --- /dev/null +++ b/configs/pipeline_configs/hra_only.json @@ -0,0 +1,5 @@ +{ + "subgraphs": [ + "hra_kg" + ] +} \ No newline at end of file diff --git a/configs/pipeline_configs/ebi_full_monarch.json b/configs/subgraph_configs/ebi_full_monarch.json similarity index 95% rename from configs/pipeline_configs/ebi_full_monarch.json rename to configs/subgraph_configs/ebi_full_monarch.json index 68d542e..ef0c647 100644 --- a/configs/pipeline_configs/ebi_full_monarch.json +++ b/configs/subgraph_configs/ebi_full_monarch.json @@ -1,4 +1,6 @@ { + "id": "EBI_MONARCH", + "name": "EBI Resources and MONARCH Initiative KG", "bytes_per_merged_file": 1073741824, "equivalence_props": [ "owl:equivalentClass", diff --git a/configs/subgraph_configs/hra_kg.json b/configs/subgraph_configs/hra_kg.json new file mode 100644 index 0000000..30d7e87 --- /dev/null +++ b/configs/subgraph_configs/hra_kg.json @@ -0,0 +1,40 @@ +{ + "id": "HRA_KG", + "name": "Human Reference Atlas KG", + "bytes_per_merged_file": 1073741824, + "equivalence_props": [ + "owl:equivalentClass", + "owl:equivalentProperty", + "owl:sameAs", + "grebi:equivalentTo", + "ols:iri", + "ols:shortForm", + "hgnc:ensembl_gene_id", + "obo:chebi/inchi", + "obo:chebi/inchikey", + "obo:chebi/smiles", + "impc:pmId", + "impc:humanGeneAccId", + "monarch:iri" + ], + "additional_equivalence_groups": [ + ["grebi:name", "ols:label", "rdfs:label", "monarch:name", "impc:name", "reactome:displayName"], + ["grebi:description", "iao:definition", "monarch:description", "ols:definition"], + ["grebi:synonym", "monarch:synonym", "iao:alternative_label", "ols:synonym", "oboinowl:hasExactSynonym"], + ["mondo:0000001", "ogms:0000031"] + ], + "exclude_props": [ + "ols:curie", + "ols:shortForm", + "ols:ontologyPreferredPrefix", + "ols:iri", + "oboinowl:id", + "oboinowl:url", + "monarch:iri" + ], + "exclude_edges": [ + ], + "datasource_configs": [ + "./configs/datasource_configs/hra_kg.json" + ] +} diff --git a/configs/pipeline_configs/local_test.json b/configs/subgraph_configs/monarch.json similarity index 76% rename from configs/pipeline_configs/local_test.json rename to configs/subgraph_configs/monarch.json index 9097c77..46d04ad 100644 --- a/configs/pipeline_configs/local_test.json +++ b/configs/subgraph_configs/monarch.json @@ -1,5 +1,7 @@ { - "bytes_per_merged_file": 173741824, + "id": "MONARCH", + "name": "MONARCH Initiative KG", + "bytes_per_merged_file": 1073741824, "equivalence_props": [ "owl:equivalentClass", "owl:equivalentProperty", @@ -16,9 +18,10 @@ "monarch:iri" ], "additional_equivalence_groups": [ - ["grebi:name", "ols:label", "rdfs:label", "monarch:name", "impc:name"], + ["grebi:name", "ols:label", "rdfs:label", "monarch:name", "impc:name", "reactome:displayName"], ["grebi:description", "iao:definition", "monarch:description", "ols:definition"], - ["grebi:synonym", "monarch:synonym", "iao:alternative_label", "ols:synonym"] + ["grebi:synonym", "monarch:synonym", "iao:alternative_label", "ols:synonym", "oboinowl:hasExactSynonym"], + ["mondo:0000001", "ogms:0000031"] ], "exclude_props": [ "ols:curie", @@ -32,6 +35,6 @@ "exclude_edges": [ ], "datasource_configs": [ - "./configs/datasource_configs/gwas.json" + "./configs/datasource_configs/monarch.json" ] } diff --git a/grebi.nf b/grebi.nf deleted file mode 100644 index 20d3633..0000000 --- a/grebi.nf +++ /dev/null @@ -1,669 +0,0 @@ - -nextflow.enable.dsl=2 - -import groovy.json.JsonSlurper -jsonSlurper = new JsonSlurper() - -params.home = "$GREBI_HOME" -params.config = "$GREBI_CONFIG" - -workflow { - - config = (new JsonSlurper().parse( - new File(params.home + "/configs/pipeline_configs/" + params.config + ".json"))) - - files_listing = prepare() | splitText | map { row -> parseJson(row) } - - ingest(files_listing, Channel.value(config.equivalence_props)) - groups_txt = build_equiv_groups(ingest.out.equivalences.collect(), Channel.value(config.additional_equivalence_groups)) - assigned = assign_ids(ingest.out.nodes, groups_txt).collect(flat: false) - - merged = merge_ingests( - assigned, - Channel.value(config.exclude_props), - Channel.value(config.bytes_per_merged_file)) - - indexed = index(merged.collect()) - materialised = materialise(merged.flatten(), indexed, Channel.value(config.exclude_edges + config.equivalence_props)) - - rocks_db = create_rocks(materialised.collect()) - - neo_input_dir = prepare_neo(indexed, materialised) - neo_db = create_neo(prepare_neo.out.nodes.collect() + prepare_neo.out.edges.collect()) - - solr_config = make_solr_config(indexed.map { it[1] }) - - solr_inputs = prepare_solr(materialised) - solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.map { it[2] }, solr_config) - solr_edges_core = create_solr_edges_core(prepare_solr.out.edges.collect(), indexed.map { it[2] }, solr_config) - solr_autocomplete_core = create_solr_autocomplete_core(indexed.map { it[2] }, solr_config) - - solr_tgz = package_solr(solr_nodes_core, solr_edges_core, solr_autocomplete_core, solr_config) - neo_tgz = package_neo(neo_db) - rocks_tgz = package_rocks(rocks_db) - - date = get_date() - copy_solr_to_ftp(solr_tgz, date) - copy_neo_to_ftp(neo_tgz, date) - copy_rocks_to_ftp(rocks_tgz, date) - - if(params.config == "ebi_full_monarch") { - copy_solr_to_staging(solr_nodes_core, solr_edges_core, solr_autocomplete_core, solr_config) - copy_neo_to_staging(neo_db) - copy_rocksdb_to_staging(rocks_db) - } -} - -process prepare { - cache "lenient" - memory "4 GB" - time "1h" - - output: - path "datasource_files.jsonl" - - script: - """ - PYTHONUNBUFFERED=true python3 ${params.home}/scripts/dataload_00_prepare.py - """ -} - -process ingest { - cache "lenient" - memory { 4.GB + 32.GB * (task.attempt-1) } - time { 1.hour + 8.hour * (task.attempt-1) } - errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } - maxRetries 5 - - input: - val(file_listing) - val(equivalence_props) - - output: - tuple val(file_listing.datasource.name), path("nodes_${task.index}.jsonl.gz"), emit: nodes - path("equivalences_${task.index}.tsv"), emit: equivalences - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - ${getDecompressionCommand(file_listing.filename)} \ - | ${getIngestCommand(file_listing.ingest.ingest_script)} \ - --datasource-name ${file_listing.datasource.name} \ - --filename "${basename(file_listing.filename)}" \ - ${buildIngestArgs(file_listing.ingest.ingest_args)} \ - | ${params.home}/target/release/grebi_normalise_prefixes ${params.home}/prefix_maps/prefix_map_normalise.json \ - | tee >(${params.home}/target/release/grebi_extract_equivalences \ - --equivalence-properties ${equivalence_props.iterator().join(",")} \ - > equivalences_${task.index}.tsv) \ - | pigz --fast > nodes_${task.index}.jsonl.gz - """ -} - -process build_equiv_groups { - cache "lenient" - memory '64 GB' - time '23h' - - input: - path(equivalences_tsv) - val(additional_equivalence_groups) - - output: - path "groups.txt" - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cat ${equivalences_tsv} \ - | ${params.home}/target/release/grebi_equivalences2groups \ - ${buildAddEquivGroupArgs(additional_equivalence_groups)} \ - > groups.txt - """ -} - -process assign_ids { - cache "lenient" - memory { 32.GB + 32.GB * (task.attempt-1) } - time { 1.hour + 8.hour * (task.attempt-1) } - errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } - maxRetries 5 - - input: - tuple(val(datasource_name), path(nodes_jsonl)) - path groups_txt - - output: - tuple(val(datasource_name), path("nodes_with_ids.sorted.jsonl.gz")) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - zcat ${nodes_jsonl} \ - | ${params.home}/target/release/grebi_assign_ids \ - --groups-txt ${groups_txt} \ - > nodes_with_ids.jsonl - LC_ALL=C sort -o nodes_with_ids.sorted.jsonl nodes_with_ids.jsonl - rm -f nodes_with_ids.jsonl - pigz --fast nodes_with_ids.sorted.jsonl - """ -} - -process merge_ingests { - cache "lenient" - memory "16 GB" - time "8h" - - input: - val(assigned) - val(exclude_props) - val(bytes_per_merged_file) - - output: - path('merged.jsonl.*') - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - ${params.home}/target/release/grebi_merge \ - --exclude-props ${exclude_props.iterator().join(",")} \ - ${buildMergeArgs(assigned)} \ - | split -a 6 -d -C ${bytes_per_merged_file} - merged.jsonl. - """ -} - -process index { - cache "lenient" - memory "64 GB" - time "8h" - - input: - val(merged_filenames) - - output: - tuple(path("metadata.jsonl"), path("summary.json"), path("names.txt")) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cat ${merged_filenames.iterator().join(" ")} \ - | ${params.home}/target/release/grebi_index \ - --out-metadata-jsonl-path metadata.jsonl \ - --out-summary-json-path summary.json \ - --out-names-txt names.txt - """ -} - -process materialise { - cache "lenient" - //memory { 80.GB + 30.GB * (task.attempt-1) } - memory "96 GB" - time "8h" - //time { 1.hour + 8.hour * (task.attempt-1) } - //errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } - //maxRetries 5 - - input: - path(merged_filename) - tuple(path(metadata_jsonl), path(summary_json), path(names_txt)) - val(exclude) - - output: - tuple(path("nodes.jsonl"), path("edges.jsonl")) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cat ${merged_filename} \ - | ${params.home}/target/release/grebi_materialise \ - --in-metadata-jsonl ${metadata_jsonl} \ - --out-edges-jsonl edges.jsonl \ - --exclude ${exclude.iterator().join(",")} \ - > nodes.jsonl - """ -} - -process create_rocks { - cache "lenient" - memory "800 GB" - time "23h" - cpus "8" - errorStrategy 'retry' - maxRetries 10 - - input: - val(materialised) - - output: - path("rocksdb") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cat ${materialised.iterator().join(" ")} \ - | ${params.home}/target/release/grebi_make_rocks \ - --rocksdb-path /dev/shm/rocksdb && \ - mv /dev/shm/rocksdb . - """ -} - -process prepare_neo { - cache "lenient" - memory "4 GB" - time "1h" - - input: - tuple(path(metadata_jsonl), path(summary_json), path(names_txt)) - tuple(path(nodes_jsonl), path(edges_jsonl)) - - output: - path("neo_nodes_${task.index}.csv"), emit: nodes - path("neo_edges_${task.index}.csv"), emit: edges - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - ${params.home}/target/release/grebi_make_csv \ - --in-summary-json ${summary_json} \ - --in-nodes-jsonl ${nodes_jsonl} \ - --in-edges-jsonl ${edges_jsonl} \ - --out-nodes-csv-path neo_nodes_${task.index}.csv \ - --out-edges-csv-path neo_edges_${task.index}.csv - """ -} - -process make_solr_config { - cache "lenient" - memory "1 GB" - time "1h" - - input: - path(summary_json) - - output: - path("solr_config") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - python3 ${params.home}/06_prepare_db_import/make_solr_config.py \ - --in-summary-json ${summary_json} \ - --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ - --out-config-dir solr_config - """ -} - -process prepare_solr { - cache "lenient" - memory "4 GB" - time "1h" - - input: - tuple(path(nodes_jsonl), path(edges_jsonl)) - - output: - path("solr_nodes_${task.index}.jsonl"), emit: nodes - path("solr_edges_${task.index}.jsonl"), emit: edges - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - ${params.home}/target/release/grebi_make_solr \ - --in-nodes-jsonl ${nodes_jsonl} \ - --in-edges-jsonl ${edges_jsonl} \ - --out-nodes-jsonl-path solr_nodes_${task.index}.jsonl \ - --out-edges-jsonl-path solr_edges_${task.index}.jsonl - """ -} - -process create_neo { - cache "lenient" - memory "50 GB" - time "8h" - cpus "32" - - input: - path(neo_inputs) - - output: - path("neo4j") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/neo4j/neo4j_import.slurm.py \ - --in-csv-path . \ - --out-db-path neo4j - """ -} - -process create_solr_nodes_core { - cache "lenient" - memory "150 GB" - time "23h" - cpus "32" - - input: - path(solr_inputs) - path(names_txt) - path(solr_config) - - output: - path("solr/data/grebi_nodes") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ - --solr-config solr_config --core grebi_nodes --in-data . --in-names-txt ${names_txt} --out-path solr --port 8985 - """ -} - -process create_solr_edges_core { - cache "lenient" - memory "150 GB" - time "23h" - cpus "32" - - input: - path(solr_inputs) - path(names_txt) - path(solr_config) - - output: - path("solr/data/grebi_edges") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ - --solr-config solr_config --core grebi_edges --in-data . --in-names-txt ${names_txt} --out-path solr --port 8986 - """ -} - -process create_solr_autocomplete_core { - cache "lenient" - memory "150 GB" - time "4h" - cpus "4" - - input: - path(names_txt) - path(solr_config) - - output: - path("solr/data/grebi_autocomplete") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ - --solr-config solr_config --core grebi_autocomplete --in-data . --in-names-txt ${names_txt} --out-path solr --port 8987 - """ -} - -process package_neo { - cache "lenient" - memory "32 GB" - time "8h" - cpus "32" - - publishDir "${params.home}/release/${params.config}", overwrite: true - - input: - path(neo4j) - - output: - path("neo4j.tgz") - - script: - """ - tar -chf neo4j.tgz --use-compress-program="pigz --fast" neo4j - """ -} - -process package_rocks { - cache "lenient" - memory "32 GB" - time "8h" - cpus "32" - - publishDir "${params.home}/release/${params.config}", overwrite: true - - input: - path(rocks_db) - - output: - path("rocksdb.tgz") - - script: - """ - tar -chf rocksdb.tgz --use-compress-program="pigz --fast" ${rocks_db} - """ -} - -process package_solr { - cache "lenient" - memory "32 GB" - time "8h" - cpus "32" - - publishDir "${params.home}/release/${params.config}", overwrite: true - - input: - path(solr_nodes_core) - path(solr_edges_core) - path(solr_autocomplete_core) - path(solr_config) - - output: - path("solr.tgz") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cp -f ${solr_config}/*.xml . - cp -f ${solr_config}/*.cfg . - tar -chf solr.tgz --transform 's,^,solr/,' --use-compress-program="pigz --fast" \ - *.xml *.cfg ${solr_nodes_core} ${solr_edges_core} ${solr_autocomplete_core} - """ -} - -process get_date { - - cache "lenient" - memory "1 GB" - time "1h" - - output: - stdout - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - date +%Y_%m_%d__%H_%M - """ -} - -process copy_neo_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("neo4j.tgz") - val(date) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} - cp -f neo4j.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ - """ -} - -process copy_solr_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("solr.tgz") - val(date) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} - cp -f solr.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ - """ -} - -process copy_rocks_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("rocksdb.tgz") - val(date) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} - cp -f rocksdb.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ - """ -} - -process copy_neo_to_staging { - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("neo4j") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - rm -rf /nfs/public/rw/ontoapps/grebi/staging/neo4j - cp -LR neo4j /nfs/public/rw/ontoapps/grebi/staging/neo4j - """ -} - -process copy_solr_to_staging { - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("grebi_nodes") - path("grebi_edges") - path("grebi_autocomplete") - path(solr_config) - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - rm -rf /nfs/public/rw/ontoapps/grebi/staging/solr && mkdir /nfs/public/rw/ontoapps/grebi/staging/solr - cp -f ${solr_config}/*.xml . - cp -f ${solr_config}/*.cfg . - cp -LR * /nfs/public/rw/ontoapps/grebi/staging/solr/ - """ -} - -process copy_rocksdb_to_staging { - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - publishDir "/nfs/public/rw/ontoapps/grebi/staging/rocksdb", mode: 'copy', overwrite: true - - input: - path("rocksdb") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - rm -rf /nfs/public/rw/ontoapps/grebi/staging/rocksdb - cp -LR rocksdb /nfs/public/rw/ontoapps/grebi/staging/rocksdb - """ -} - -def parseJson(json) { - return new JsonSlurper().parseText(json) -} - -def getDecompressionCommand(filename) { - if (filename.startsWith(".")) { - f = new File(params.home, filename).toString() - } else { - f = filename - } - if (f.endsWith(".gz")) { - return "zcat ${f}" - } else if (f.endsWith(".xz")) { - return "xzcat ${f}" - } else { - return "cat ${f}" - } -} - -def getIngestCommand(script) { - return new File(params.home, script) -} - -def buildIngestArgs(ingestArgs) { - res = "" - ingestArgs.each { arg -> res += "${arg.name} ${arg.value} " } - return res -} - -def buildAddEquivGroupArgs(equivGroups) { - res = "" - equivGroups.each { arg -> res += "--add-group ${arg.iterator().join(",")} " } - return res -} - -def buildMergeArgs(assigned) { - res = "" - assigned.each { a -> - res += "${a[0]}:${a[1]} " - } - return res -} - -def basename(filename) { - return new File(filename).name -} diff --git a/nextflow/01_create_subgraph.nf b/nextflow/01_create_subgraph.nf new file mode 100644 index 0000000..ac3ceb2 --- /dev/null +++ b/nextflow/01_create_subgraph.nf @@ -0,0 +1,258 @@ + +nextflow.enable.dsl=2 + +import groovy.json.JsonSlurper +jsonSlurper = new JsonSlurper() + +params.tmp = "$GREBI_TMP" +params.home = "$GREBI_HOME" +params.subgraph = "$GREBI_SUBGRAPH" + +workflow { + + config = (new JsonSlurper().parse(new File(params.home, 'configs/subgraph_configs/' + params.subgraph + '.json'))) + + files_listing = prepare() | splitText | map { row -> parseJson(row) } + + ingest(files_listing, Channel.value(config.equivalence_props)) + groups_txt = build_equiv_groups(ingest.out.equivalences.collect(), Channel.value(config.additional_equivalence_groups)) + assigned = assign_ids(ingest.out.nodes, groups_txt).collect(flat: false) + + merged = merge_ingests( + assigned, + Channel.value(config.exclude_props), + Channel.value(config.bytes_per_merged_file)) + + indexed = index(merged.collect()) + materialise(merged.flatten(), indexed, Channel.value(config.exclude_edges + config.equivalence_props)) +} + +process prepare { + cache "lenient" + memory "4 GB" + time "1h" + + output: + path "datasource_files.jsonl" + + script: + """ + PYTHONUNBUFFERED=true python3 ${params.home}/scripts/dataload_00_prepare.py + """ +} + +process ingest { + cache "lenient" + memory { 4.GB + 32.GB * (task.attempt-1) } + time { 1.hour + 8.hour * (task.attempt-1) } + errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } + maxRetries 5 + + input: + val(file_listing) + val(equivalence_props) + + output: + tuple val(file_listing.datasource.name), path("nodes_${task.index}.jsonl.gz"), emit: nodes + path("equivalences_${task.index}.tsv"), emit: equivalences + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + ${getDecompressionCommand(file_listing.filename)} \ + | ${getIngestCommand(file_listing.ingest.ingest_script)} \ + --datasource-name ${file_listing.datasource.name} \ + --filename "${basename(file_listing.filename)}" \ + ${buildIngestArgs(file_listing.ingest.ingest_args)} \ + | ${params.home}/target/release/grebi_normalise_prefixes ${params.home}/prefix_maps/prefix_map_normalise.json \ + | tee >(${params.home}/target/release/grebi_extract_equivalences \ + --equivalence-properties ${equivalence_props.iterator().join(",")} \ + > equivalences_${task.index}.tsv) \ + | pigz --fast > nodes_${task.index}.jsonl.gz + """ +} + +process build_equiv_groups { + cache "lenient" + memory '64 GB' + time '23h' + + input: + path(equivalences_tsv) + val(additional_equivalence_groups) + + output: + path "groups.txt" + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cat ${equivalences_tsv} \ + | ${params.home}/target/release/grebi_equivalences2groups \ + ${buildAddEquivGroupArgs(additional_equivalence_groups)} \ + > groups.txt + """ +} + +process assign_ids { + cache "lenient" + memory { 32.GB + 32.GB * (task.attempt-1) } + time { 1.hour + 8.hour * (task.attempt-1) } + errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } + maxRetries 5 + + input: + tuple(val(datasource_name), path(nodes_jsonl)) + path groups_txt + + output: + tuple(val(datasource_name), path("nodes_with_ids.sorted.jsonl.gz")) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + zcat ${nodes_jsonl} \ + | ${params.home}/target/release/grebi_assign_ids \ + --groups-txt ${groups_txt} \ + > nodes_with_ids.jsonl + LC_ALL=C sort -o nodes_with_ids.sorted.jsonl nodes_with_ids.jsonl + rm -f nodes_with_ids.jsonl + pigz --fast nodes_with_ids.sorted.jsonl + """ +} + +process merge_ingests { + cache "lenient" + memory "16 GB" + time "8h" + + input: + val(assigned) + val(exclude_props) + val(bytes_per_merged_file) + + output: + path('merged.jsonl.*') + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + ${params.home}/target/release/grebi_merge \ + --exclude-props ${exclude_props.iterator().join(",")} \ + ${buildMergeArgs(assigned)} \ + | split -a 6 -d -C ${bytes_per_merged_file} - merged.jsonl. + """ +} + +process index { + cache "lenient" + memory "64 GB" + time "8h" + + publishDir "${params.tmp}/${params.subgraph}", overwrite: true + + input: + val(merged_filenames) + + output: + tuple(path("metadata.jsonl"), path("summary.json"), path("names.txt")) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cat ${merged_filenames.iterator().join(" ")} \ + | ${params.home}/target/release/grebi_index \ + --subgraph-name {params.subgraph} \ + --out-metadata-jsonl-path metadata.jsonl \ + --out-summary-json-path summary.json \ + --out-names-txt names.txt + """ +} + +process materialise { + cache "lenient" + //memory { 80.GB + 30.GB * (task.attempt-1) } + //memory "32 GB" + memory "96 GB" + time "8h" + //time { 1.hour + 8.hour * (task.attempt-1) } + //errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } + //maxRetries 5 + + publishDir "${params.tmp}/${params.subgraph}/nodes", overwrite: true, pattern: 'nodes_*' + publishDir "${params.tmp}/${params.subgraph}/edges", overwrite: true, pattern: 'edges_*' + + input: + path(merged_filename) + tuple(path(metadata_jsonl), path(summary_json), path(names_txt)) + val(exclude) + + output: + path("nodes_${task.index}.jsonl") + path("edges_${task.index}.jsonl") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cat ${merged_filename} \ + | ${params.home}/target/release/grebi_materialise \ + --in-metadata-jsonl ${metadata_jsonl} \ + --out-edges-jsonl edges_${task.index}.jsonl \ + --exclude ${exclude.iterator().join(",")} \ + > nodes_${task.index}.jsonl + """ +} + +def parseJson(json) { + return new JsonSlurper().parseText(json) +} + +def getDecompressionCommand(filename) { + def f = "" + if (filename.startsWith(".")) { + f = new File(params.home, filename).toString() + } else { + f = filename + } + if (f.endsWith(".gz")) { + return "zcat ${f}" + } else if (f.endsWith(".xz")) { + return "xzcat ${f}" + } else { + return "cat ${f}" + } +} + +def getIngestCommand(script) { + return new File(params.home, script) +} + +def buildIngestArgs(ingestArgs) { + def res = "" + ingestArgs.each { arg -> res += "${arg.name} ${arg.value} " } + return res +} + +def buildAddEquivGroupArgs(equivGroups) { + def res = "" + equivGroups.each { arg -> res += "--add-group ${arg.iterator().join(",")} " } + return res +} + +def buildMergeArgs(assigned) { + def res = "" + assigned.each { a -> + res += "${a[0]}:${a[1]} " + } + return res +} + +def basename(filename) { + return new File(filename).name +} diff --git a/nextflow/02_create_dbs.nf b/nextflow/02_create_dbs.nf new file mode 100644 index 0000000..09753da --- /dev/null +++ b/nextflow/02_create_dbs.nf @@ -0,0 +1,449 @@ + +nextflow.enable.dsl=2 + +import groovy.json.JsonSlurper +jsonSlurper = new JsonSlurper() + +params.tmp = "$GREBI_TMP" +params.home = "$GREBI_HOME" +params.config = "$GREBI_CONFIG" + +workflow { + + rocks_db = create_rocks(Channel.fromPath("${params.tmp}/*/*.jsonl").collect()) + + subgraph_dirs = Channel.fromPath("${params.tmp}/*", type: 'dir') + + nodes_per_subgraph = subgraph_dirs + | flatMap { d -> (d + "/nodes/").listFiles().collect { f -> tuple(d.name, f) } } + + edges_per_subgraph = subgraph_dirs + | flatMap { d -> (d + "/edges/").listFiles().collect { f -> tuple(d.name, f) } } + + neo_input_dir = prepare_neo( + subgraph_dirs.map { d -> "${d}/summary.json" }.collect(), + nodes_per_subgraph.map { t -> t[1] }, + edges_per_subgraph.map { t -> t[1] } + ) + neo_db = create_neo(prepare_neo.out.nodes.collect() + prepare_neo.out.edges.collect()) + + solr_inputs = prepare_solr(nodes_per_subgraph, edges_per_subgraph) + | map { t -> tuple(t[0], t) } + | groupTuple + + solr_nodes_per_subgraph = solr_inputs | map { t -> tuple(t[0], t[1].collect { u -> u[1] }) } + solr_edges_per_subgraph = solr_inputs | map { t -> tuple(t[0], t[1].collect { u -> u[2] }) } + + solr_nodes_cores = create_solr_nodes_core(solr_nodes_per_subgraph) + solr_edges_cores = create_solr_edges_core(solr_edges_per_subgraph) + + solr_autocomplete_cores = create_solr_autocomplete_core( + subgraph_dirs.map { d -> tuple(d.name, "${d}/names.txt") } + ) + + solr_tgz = package_solr( + solr_nodes_cores.collect(), + solr_edges_cores.collect(), + solr_autocomplete_cores.collect()) + + neo_tgz = package_neo(neo_db) + rocks_tgz = package_rocks(rocks_db) + + date = get_date() + copy_solr_to_ftp(solr_tgz, date) + copy_neo_to_ftp(neo_tgz, date) + copy_rocks_to_ftp(rocks_tgz, date) + + if(params.config == "ebi_full_monarch") { + copy_solr_to_staging(solr_nodes_cores.collect(), solr_edges_cores.collect(), solr_autocomplete_cores.collect()) + copy_neo_to_staging(neo_db) + copy_rocksdb_to_staging(rocks_db) + } +} + +process create_rocks { + cache "lenient" + memory "800 GB" + time "23h" + cpus "8" + errorStrategy 'retry' + maxRetries 10 + + input: + val(materialised) + + output: + path("rocksdb") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cat ${materialised.iterator().join(" ")} \ + | ${params.home}/target/release/grebi_make_rocks \ + --rocksdb-path /dev/shm/rocksdb && \ + mv /dev/shm/rocksdb . + """ +} + +process prepare_neo { + cache "lenient" + memory "4 GB" + time "1h" + + input: + val(summary_jsons) + path(nodes_jsonl) + path(edges_jsonl) + + output: + path("neo_nodes_${task.index}.csv"), emit: nodes + path("neo_edges_${task.index}.csv"), emit: edges + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + ${params.home}/target/release/grebi_make_csv \ + --in-summary-jsons ${summary_jsons.join(",")} \ + --in-nodes-jsonl ${nodes_jsonl} \ + --in-edges-jsonl ${edges_jsonl} \ + --out-nodes-csv-path neo_nodes_${task.index}.csv \ + --out-edges-csv-path neo_edges_${task.index}.csv + """ +} + +process prepare_solr { + cache "lenient" + memory "4 GB" + time "1h" + + input: + tuple val(subgraph_name), val(nodes_jsonl) + tuple val(subgraph_name_), val(edges_jsonl) + + output: + tuple val(subgraph_name), path("solr_nodes_${task.index}.jsonl"), path("solr_edges_${task.index}.jsonl") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + ${params.home}/target/release/grebi_make_solr \ + --in-nodes-jsonl ${nodes_jsonl} \ + --in-edges-jsonl ${edges_jsonl} \ + --out-nodes-jsonl-path solr_nodes_${task.index}.jsonl \ + --out-edges-jsonl-path solr_edges_${task.index}.jsonl + """ +} + +process create_neo { + cache "lenient" + memory "50 GB" + time "8h" + cpus "32" + + input: + path(neo_inputs) + + output: + path("neo4j") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/neo4j/neo4j_import.slurm.py \ + --in-csv-path . \ + --out-db-path neo4j + """ +} + +process create_solr_nodes_core { + cache "lenient" + memory "150 GB" + time "23h" + cpus "32" + + input: + tuple val(subgraph_name), path(solr_inputs) + + output: + path("solr/data/grebi_nodes_${subgraph_name}") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + python3 ${params.home}/06_prepare_db_import/make_solr_config.py \ + --subgraph-name ${subgraph_name} \ + --in-summary-json ${params.tmp}/${subgraph_name}/summary.json \ + --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ + --out-config-dir solr_config + python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ + --solr-config solr_config --core grebi_nodes_${subgraph_name} --in-data . --out-path solr --port 8985 + """ +} + +process create_solr_edges_core { + cache "lenient" + memory "150 GB" + time "23h" + cpus "32" + + input: + tuple val(subgraph_name), path(solr_inputs) + + output: + path("solr/data/grebi_edges_${subgraph_name}") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + python3 ${params.home}/06_prepare_db_import/make_solr_config.py \ + --subgraph-name ${subgraph_name} \ + --in-summary-json ${params.tmp}/${subgraph_name}/summary.json \ + --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ + --out-config-dir solr_config + python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ + --solr-config solr_config --core grebi_edges_${subgraph_name} --in-data . --out-path solr --port 8986 + """ +} + +process create_solr_autocomplete_core { + cache "lenient" + memory "150 GB" + time "4h" + cpus "4" + + input: + tuple val(subgraph_name), path(names_txt) + + output: + path("solr/data/grebi_autocomplete_${subgraph_name}") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + python3 ${params.home}/06_prepare_db_import/make_solr_autocomplete_config.py \ + --subgraph-name ${subgraph_name} \ + --in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \ + --out-config-dir solr_config + python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \ + --solr-config solr_config --core grebi_autocomplete_${subgraph_name} --in-data . --in-names-txt ${names_txt} --out-path solr --port 8987 + """ +} + +process package_neo { + cache "lenient" + memory "32 GB" + time "8h" + cpus "32" + + publishDir "${params.home}/release/${params.config}", overwrite: true + + input: + path(neo4j) + + output: + path("neo4j.tgz") + + script: + """ + tar -chf neo4j.tgz --use-compress-program="pigz --fast" neo4j + """ +} + +process package_rocks { + cache "lenient" + memory "32 GB" + time "8h" + cpus "32" + + publishDir "${params.home}/release/${params.config}", overwrite: true + + input: + path(rocks_db) + + output: + path("rocksdb.tgz") + + script: + """ + tar -chf rocksdb.tgz --use-compress-program="pigz --fast" ${rocks_db} + """ +} + +process package_solr { + cache "lenient" + memory "32 GB" + time "8h" + cpus "32" + + publishDir "${params.home}/release/${params.config}", overwrite: true + + input: + path(solr_nodes_cores) + path(solr_edges_cores) + path(solr_autocomplete_cores) + + output: + path("solr.tgz") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.xml . + cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.cfg . + tar -chf solr.tgz --transform 's,^,solr/,' --use-compress-program="pigz --fast" \ + *.xml *.cfg ${solr_nodes_cores} ${solr_edges_cores} ${solr_autocomplete_cores} + """ +} + +process get_date { + + cache "lenient" + memory "1 GB" + time "1h" + + output: + stdout + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + date +%Y_%m_%d__%H_%M + """ +} + +process copy_neo_to_ftp { + + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + input: + path("neo4j.tgz") + val(date) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} + cp -f neo4j.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ + """ +} + +process copy_solr_to_ftp { + + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + input: + path("solr.tgz") + val(date) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} + cp -f solr.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ + """ +} + +process copy_rocks_to_ftp { + + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + input: + path("rocksdb.tgz") + val(date) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()} + cp -f rocksdb.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${date.trim()}/ + """ +} + +process copy_neo_to_staging { + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + input: + path("neo4j") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + rm -rf /nfs/public/rw/ontoapps/grebi/staging/neo4j + cp -LR neo4j /nfs/public/rw/ontoapps/grebi/staging/neo4j + """ +} + +process copy_solr_to_staging { + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + input: + path(solr_nodes_cores) + path(solr_edges_cores) + path(solr_autocomplete_cores) + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + rm -rf /nfs/public/rw/ontoapps/grebi/staging/solr && mkdir /nfs/public/rw/ontoapps/grebi/staging/solr + cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.xml . + cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.cfg . + cp -LR * /nfs/public/rw/ontoapps/grebi/staging/solr/ + """ +} + +process copy_rocksdb_to_staging { + cache "lenient" + memory "4 GB" + time "8h" + queue "datamover" + + publishDir "/nfs/public/rw/ontoapps/grebi/staging/rocksdb", mode: 'copy', overwrite: true + + input: + path("rocksdb") + + script: + """ + #!/usr/bin/env bash + set -Eeuo pipefail + rm -rf /nfs/public/rw/ontoapps/grebi/staging/rocksdb + cp -LR rocksdb /nfs/public/rw/ontoapps/grebi/staging/rocksdb + """ +} + +def parseJson(json) { + return new JsonSlurper().parseText(json) +} + +def basename(filename) { + return new File(filename).name +} diff --git a/codon_nextflow.config b/nextflow/codon_nextflow.config similarity index 100% rename from codon_nextflow.config rename to nextflow/codon_nextflow.config diff --git a/scripts/copy_to_codon.sh b/scripts/copy_to_codon.sh index 5112e17..d111502 100755 --- a/scripts/copy_to_codon.sh +++ b/scripts/copy_to_codon.sh @@ -1,4 +1,4 @@ #!/bin/bash -rsync -arv --exclude 'target' --exclude '.git' --exclude 'grebi_ui' --exclude 'tmp' --exclude 'work' ./ ebi-codon-slurm-spotbot:/nfs/production/parkinso/spot/grebi/ +rsync -arv --exclude 'target' --exclude '.git' --exclude 'grebi_ui' --exclude 'tmp' --exclude 'work' ./ ebi-codon-slurm-spotbot:/nfs/production/parkinso/spot/grebi2/ diff --git a/scripts/dataload_00_prepare.py b/scripts/dataload_00_prepare.py index 7683b21..226e37a 100644 --- a/scripts/dataload_00_prepare.py +++ b/scripts/dataload_00_prepare.py @@ -10,9 +10,9 @@ def main(): - print("home is " + os.environ['GREBI_HOME']) + print("subgraph is " + os.environ['GREBI_SUBGRAPH']) - config_filename = os.path.abspath(os.path.join(os.environ['GREBI_HOME'], 'configs/pipeline_configs/', os.environ['GREBI_CONFIG'] + '.json')) + config_filename = os.path.abspath(os.path.join(os.environ['GREBI_HOME'], 'configs/subgraph_configs/', os.environ['GREBI_SUBGRAPH'] + '.json')) with open(config_filename, 'r') as f: config = json.load(f) diff --git a/scripts/dataload_codon.py b/scripts/dataload_codon.py new file mode 100644 index 0000000..518527d --- /dev/null +++ b/scripts/dataload_codon.py @@ -0,0 +1,17 @@ + +import json +import sys +import os + +GREBI_HOME = os.environ['GREBI_HOME'] +GREBI_CONFIG = os.environ['GREBI_CONFIG'] + +config = json.load(open(f'{GREBI_HOME}/configs/pipeline_configs/{GREBI_CONFIG}.json')) + +for subgraph in config['subgraphs']: + print(f"===== LOADING SUBGRAPH: {subgraph} =====") + os.environ['GREBI_SUBGRAPH'] = subgraph + os.system(f'nextflow {GREBI_HOME}/nextflow/01_create_subgraph.nf -c {GREBI_HOME}/nextflow/codon_nextflow.config') + print(f"===== FINISHED LOADING SUBGRAPH: {subgraph} =====") + +os.system(f'nextflow {GREBI_HOME}/nextflow/02_create_dbs.nf -c {GREBI_HOME}/nextflow/codon_nextflow.config') diff --git a/scripts/dataload_local.py b/scripts/dataload_local.py new file mode 100644 index 0000000..1f0351b --- /dev/null +++ b/scripts/dataload_local.py @@ -0,0 +1,17 @@ + +import json +import sys +import os + +GREBI_HOME = os.environ['GREBI_HOME'] +GREBI_CONFIG = os.environ['GREBI_CONFIG'] + +config = json.load(open(f'{GREBI_HOME}/configs/pipeline_configs/{GREBI_CONFIG}.json')) + +for subgraph in config['subgraphs']: + print(f"===== LOADING SUBGRAPH: {subgraph} =====") + os.environ['GREBI_SUBGRAPH'] = subgraph + os.system(f'nextflow {GREBI_HOME}/nextflow/01_create_subgraph.nf') + print(f"===== FINISHED LOADING SUBGRAPH: {subgraph} =====") + +os.system(f'nextflow {GREBI_HOME}/nextflow/02_create_dbs.nf')