From 29f28fe203176ddb00f6b8bd64ec1dc5cd874fc9 Mon Sep 17 00:00:00 2001 From: James McLaughlin Date: Fri, 22 Nov 2024 15:41:29 +0000 Subject: [PATCH] separate subgraphs into different neo4js --- dataload/nextflow/02_create_dbs.nf | 231 ------------------ ...01_create_subgraph.nf => load_subgraph.nf} | 0 dataload/scripts/dataload.py | 6 +- .../main/java/uk/ac/ebi/grebi/GrebiApi.java | 12 +- .../java/uk/ac/ebi/grebi/db/Neo4jClient.java | 20 +- .../uk/ac/ebi/grebi/repo/GrebiNeoRepo.java | 57 +++-- 6 files changed, 58 insertions(+), 268 deletions(-) delete mode 100644 dataload/nextflow/02_create_dbs.nf rename dataload/nextflow/{01_create_subgraph.nf => load_subgraph.nf} (100%) diff --git a/dataload/nextflow/02_create_dbs.nf b/dataload/nextflow/02_create_dbs.nf deleted file mode 100644 index 1673ed4..0000000 --- a/dataload/nextflow/02_create_dbs.nf +++ /dev/null @@ -1,231 +0,0 @@ - -nextflow.enable.dsl=2 - -import groovy.json.JsonSlurper -jsonSlurper = new JsonSlurper() - -params.tmp = "$GREBI_TMP" -params.home = "$GREBI_DATALOAD_HOME" -params.config = "$GREBI_CONFIG" -params.timestamp = "$GREBI_TIMESTAMP" -params.is_ebi = "$GREBI_IS_EBI" - -workflow { - - subgraph_dirs = Channel.fromPath("${params.tmp}/*", type: 'dir') - - neo_nodes_files = Channel.fromPath("${params.tmp}/${params.config}/*/neo4j_csv/neo_nodes_*.csv").collect() - neo_edges_files = Channel.fromPath("${params.tmp}/${params.config}/*/neo4j_csv/neo_edges_*.csv").collect() - id_txts = Channel.fromPath("${params.tmp}/${params.config}/*/ids_*.txt").collect() - ids_csv = create_combined_neo_ids_csv(id_txts).collect() - - neo_db = create_neo(neo_nodes_files.collect() + neo_edges_files.collect() + ids_csv) - - solr_tgz = package_solr( Channel.fromPath("${params.tmp}/${params.config}/*/solr_cores/*", type: 'dir').collect()) - rocks_tgz = package_rocks( Channel.fromPath("${params.tmp}/${params.config}/*/*_rocksdb", type: 'dir').collect()) - - neo_tgz = package_neo(neo_db) - - if(params.is_ebi == "true") { - copy_solr_to_ftp(solr_tgz) - copy_neo_to_ftp(neo_tgz) - copy_rocks_to_ftp(rocks_tgz) - - if(params.config == "ebi") { - copy_neo_to_staging(neo_db) - } - } -} - -process create_combined_neo_ids_csv { - cache "lenient" - memory "4 GB" - time "8h" - cpus "8" - - input: - path(ids_txts) - - output: - path("neo_nodes_ids_combined.csv") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cat ${ids_txts} > combined_ids.txt - LC_ALL=C sort -u combined_ids.txt -o combined_ids_uniq.txt - cat combined_ids_uniq.txt | ${params.home}/target/release/grebi_make_neo_ids_csv > neo_nodes_ids_combined.csv - """ -} - -process create_neo { - cache "lenient" - memory "4 GB" - time "8h" - - publishDir "${params.tmp}/${params.config}", overwrite: true - - input: - path(neo_inputs) - - output: - path("combined_neo4j") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - PYTHONUNBUFFERED=true python3 ${params.home}/07_create_db/neo4j/neo4j_import.slurm.py \ - --in-csv-path . \ - --out-db-path combined_neo4j - """ -} - -process package_neo { - cache "lenient" - memory "4 GB" - time "8h" - cpus "8" - - publishDir "${params.tmp}/${params.config}", overwrite: true - - input: - path("combined_neo4j") - - output: - path("combined_neo4j.tgz") - - script: - """ - tar -chf combined_neo4j.tgz --use-compress-program="pigz --fast" combined_neo4j - """ -} - -process package_rocks { - cache "lenient" - memory "4 GB" - time "8h" - cpus "8" - - publishDir "${params.tmp}/${params.config}", overwrite: true - - input: - path(rocks_dbs) - - output: - path("combined_rocksdb.tgz") - - script: - """ - tar -chf combined_rocksdb.tgz --use-compress-program="pigz --fast" ${rocks_dbs} - """ -} - -process package_solr { - cache "lenient" - memory "4 GB" - time "8h" - cpus "8" - - publishDir "${params.tmp}/${params.config}", overwrite: true - - input: - path(cores) - - output: - path("combined_solr.tgz") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.xml . - cp -f ${params.home}/06_prepare_db_import/solr_config_template/*.cfg . - tar -chf combined_solr.tgz --transform 's,^,solr/,' --use-compress-program="pigz --fast" \ - *.xml *.cfg ${cores} - """ -} - -process copy_neo_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("combined_neo4j.tgz") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()} - cp -f combined_neo4j.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()}/ - """ -} - -process copy_solr_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("combined_solr.tgz") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()} - cp -f combined_solr.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()}/ - """ -} - -process copy_rocks_to_ftp { - - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("combined_rocksdb.tgz") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - mkdir -p /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()} - cp -f combined_rocksdb.tgz /nfs/ftp/public/databases/spot/kg/${params.config}/${params.timestamp.trim()}/ - """ -} - -process copy_neo_to_staging { - cache "lenient" - memory "4 GB" - time "8h" - queue "datamover" - - input: - path("neo4j") - - script: - """ - #!/usr/bin/env bash - set -Eeuo pipefail - rm -rf /nfs/public/rw/ontoapps/grebi/staging/neo4j - cp -LR neo4j /nfs/public/rw/ontoapps/grebi/staging/neo4j - """ -} - -def parseJson(json) { - return new JsonSlurper().parseText(json) -} - -def basename(filename) { - return new File(filename).name -} diff --git a/dataload/nextflow/01_create_subgraph.nf b/dataload/nextflow/load_subgraph.nf similarity index 100% rename from dataload/nextflow/01_create_subgraph.nf rename to dataload/nextflow/load_subgraph.nf diff --git a/dataload/scripts/dataload.py b/dataload/scripts/dataload.py index 36457dc..726e033 100644 --- a/dataload/scripts/dataload.py +++ b/dataload/scripts/dataload.py @@ -13,12 +13,8 @@ for subgraph in config['subgraphs']: print(f"===== LOADING SUBGRAPH: {subgraph} =====") os.environ['GREBI_SUBGRAPH'] = subgraph - res = os.system(f'NXF_WORK=work_{subgraph} nextflow {GREBI_DATALOAD_HOME}/nextflow/01_create_subgraph.nf -c {GREBI_NEXTFLOW_CONFIG} -resume') + res = os.system(f'NXF_WORK=work_{subgraph} nextflow {GREBI_DATALOAD_HOME}/nextflow/load_subgraph.nf -c {GREBI_NEXTFLOW_CONFIG} -resume') if res != 0: exit(res) print(f"===== FINISHED LOADING SUBGRAPH: {subgraph} =====") -res = os.system(f'NXF_WORK=work_combined nextflow {GREBI_DATALOAD_HOME}/nextflow/02_create_dbs.nf -c {GREBI_NEXTFLOW_CONFIG} -resume') -if res != 0: - exit(res) - diff --git a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/GrebiApi.java b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/GrebiApi.java index f937631..0f22ade 100644 --- a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/GrebiApi.java +++ b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/GrebiApi.java @@ -34,17 +34,25 @@ public static void main(String[] args) throws ParseException, org.apache.commons Set rocksDbSubgraphs = null; Set solrSubgraphs = null; Set summarySubgraphs = null; + Set neoSubgraphs = null; while(true) { try { neo = new GrebiNeoRepo(); solr = new GrebiSolrRepo(); summary = new GrebiSummaryRepo(); + neo = new GrebiNeoRepo(); rocksDbSubgraphs = (new ResolverClient()).getSubgraphs(); solrSubgraphs = solr.getSubgraphs(); summarySubgraphs = summary.getSubgraphs(); - if(new HashSet<>(List.of(rocksDbSubgraphs, solrSubgraphs, summarySubgraphs)).size() != 1) { - throw new RuntimeException("RocksDB/Solr/the summary jsons do not seem to contain the same subgraphs. Found: " + String.join(",", rocksDbSubgraphs) + " for RocksDB (from resolver service) and " + String.join(",", solrSubgraphs) + " for Solr (from list of solr cores) and " + String.join(",", summarySubgraphs) + " for the summary jsons (from summary server)"); + neoSubgraphs = neo.getSubgraphs(); + if(new HashSet<>(List.of(rocksDbSubgraphs, solrSubgraphs, summarySubgraphs, neoSubgraphs)).size() != 1) { + throw new RuntimeException("RocksDB/Solr/the summary jsons/Neo4j do not seem to contain the same subgraphs. Found: " + + String.join(",", rocksDbSubgraphs) + " for RocksDB (from resolver service) and " + + String.join(",", solrSubgraphs) + " for Solr (from list of solr cores) and " + + String.join(",", summarySubgraphs) + " for the summary jsons (from summary server) and" + + String.join(",", neoSubgraphs) + " for Neo4j (from all neo4j hosts)" + ); } break; } catch(Exception e) { diff --git a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/db/Neo4jClient.java b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/db/Neo4jClient.java index e55d024..ead3c12 100644 --- a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/db/Neo4jClient.java +++ b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/db/Neo4jClient.java @@ -18,26 +18,15 @@ public class Neo4jClient { - static final String NEO4J_HOST = System.getenv("GREBI_NEO4J_HOST"); - - public static String getNeo4jHost() { - if(NEO4J_HOST != null) - return NEO4J_HOST; - return "bolt://localhost:7687/"; - } - + private final Driver driver; private Gson gson = new Gson(); - private Driver driver; + public Neo4jClient(String host) { + this.driver = GraphDatabase.driver(host); + } public Driver getDriver() { - - if(driver == null) { - driver = GraphDatabase.driver(getNeo4jHost()); - } - return driver; - } public Session getSession() { @@ -52,7 +41,6 @@ public List> rawQuery(String query) { Session session = getSession(); Result result = session.run(query); - List> list = result.stream().map(r -> r.asMap()).collect(Collectors.toList()); session.close(); diff --git a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/repo/GrebiNeoRepo.java b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/repo/GrebiNeoRepo.java index 841e010..e5bedc4 100644 --- a/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/repo/GrebiNeoRepo.java +++ b/webapp/grebi_api/src/main/java/uk/ac/ebi/grebi/repo/GrebiNeoRepo.java @@ -12,31 +12,60 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.StreamSupport; public class GrebiNeoRepo { - Neo4jClient neo4jClient = new Neo4jClient(); + static final String[] NEO4J_HOSTS = + System.getenv("GREBI_NEO4J_HOSTS").split(";"); + + public static String[] getNeo4jHosts() { + if(NEO4J_HOSTS != null) + return NEO4J_HOSTS; + return List.of("bolt://localhost:7687/").toArray(new String[0]); + } + + Map subgraphToClient = new HashMap<>(); + ResolverClient resolver = new ResolverClient(); Gson gson = new Gson(); - public GrebiNeoRepo() throws IOException {} + public GrebiNeoRepo() throws IOException { + + for(String host : getNeo4jHosts()) { + Neo4jClient client = new Neo4jClient(host); + + String subgraph = (String) + client.rawQuery("MATCH (n:GraphNode) RETURN n.`grebi:subgraph` AS subgraph LIMIT 1") + .get(0).get("subgraph"); + + subgraphToClient.put(subgraph, client); + } + } + + private Neo4jClient getClient(String subgraph) { + var client = subgraphToClient.get(subgraph); + if(client != null) + return client; + throw new IllegalArgumentException("subgraph " + subgraph + " not found"); + } + + public Set getSubgraphs() { + return subgraphToClient.keySet(); + } final String STATS_QUERY = new String(GrebiApi.class.getResourceAsStream("/cypher/stats.cypher").readAllBytes(), StandardCharsets.UTF_8); final String INCOMING_EDGES_QUERY = new String(GrebiApi.class.getResourceAsStream("/cypher/incoming_edges.cypher").readAllBytes(), StandardCharsets.UTF_8); - public Map getStats() { - EagerResult props_res = neo4jClient.getDriver().executableQuery(STATS_QUERY).withConfig(QueryConfig.builder().withDatabase("neo4j").build()).execute(); - return props_res.records().get(0).values().get(0).asMap(); - } - public List cypher(String query, String resVar) { - EagerResult res = neo4jClient.getDriver().executableQuery(query).withConfig(QueryConfig.builder().withDatabase("neo4j").build()).execute(); - return List.of(); + public Map> getStats() { + Map> subgraphToStats = new HashMap<>(); + for(var subgraph : subgraphToClient.keySet()) { + EagerResult props_res = getClient(subgraph).getDriver().executableQuery(STATS_QUERY).withConfig(QueryConfig.builder().withDatabase("neo4j").build()).execute(); + subgraphToStats.put(subgraph, props_res.records().get(0).values().get(0).asMap()); + } + return subgraphToStats; } public class EdgeAndNode { @@ -48,7 +77,7 @@ public EdgeAndNode(Map edge, Map node) { } public List getIncomingEdges(String subgraph, String nodeId, Pageable pageable) { - EagerResult res = neo4jClient.getDriver().executableQuery(INCOMING_EDGES_QUERY) + EagerResult res = getClient(subgraph).getDriver().executableQuery(INCOMING_EDGES_QUERY) .withParameters(Map.of( "nodeId", subgraph + ":" + nodeId, "offset", pageable.getOffset(),