diff --git a/00_fetch_data/chembl/export.py b/00_fetch_data/chembl/export.py deleted file mode 100644 index 24008b3..0000000 --- a/00_fetch_data/chembl/export.py +++ /dev/null @@ -1,39 +0,0 @@ -import sqlite3 -import json -import sys - -def export_tables_to_jsonl(sqlite_file): - conn = sqlite3.connect(sqlite_file) - cursor = conn.cursor() - - cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") - tables = cursor.fetchall() - - for table in tables: - table_name = table[0] - - cursor.execute(f"PRAGMA table_info({table_name});") - columns_info = cursor.fetchall() - primary_key = None - for column_info in columns_info: - if column_info[5] == 1: # PK column - primary_key = column_info[1] - break - - cursor.execute(f"SELECT * FROM {table_name};") - rows = cursor.fetchall() - column_names = [description[0] for description in cursor.description] - - for row in rows: - row_dict = dict(zip(column_names, row)) - row_dict = {f"chembl:{key}": value for key, value in row_dict.items()} - if primary_key: - row_dict["id"] = row_dict["chembl:"+primary_key] - row_dict["grebi:type"] = f"chembl:{table_name}" - print(json.dumps(row_dict)) - - conn.close() - -export_tables_to_jsonl('chembl_34/chembl_34_sqlite/chembl_34.db') - - diff --git a/01_ingest/grebi_ingest_gwas/src/main.rs b/01_ingest/grebi_ingest_gwas/src/main.rs index 32d3707..0bac8a4 100644 --- a/01_ingest/grebi_ingest_gwas/src/main.rs +++ b/01_ingest/grebi_ingest_gwas/src/main.rs @@ -38,10 +38,10 @@ fn main() { .trim(csv::Trim::All) .from_reader(reader); - if args.filename.starts_with("gwas-catalog-associations") { + if args.filename.contains("gwas-catalog-associations") { eprintln!("GWAS ingest: writing associations"); write_associations(&mut csv_reader, &mut output_nodes, &args.datasource_name); - } else if args.filename.starts_with("gwas-catalog-studies") { + } else if args.filename.contains("gwas-catalog-studies") { eprintln!("GWAS ingest: writing studies"); write_studies(&mut csv_reader, &mut output_nodes, &args.datasource_name); } else { diff --git a/01_ingest/grebi_ingest_sqlite/Cargo.toml b/01_ingest/grebi_ingest_sqlite/Cargo.toml new file mode 100644 index 0000000..eed8874 --- /dev/null +++ b/01_ingest/grebi_ingest_sqlite/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "grebi_ingest_sqlite" +version = "0.1.0" +edition = "2021" + +[dependencies] +Inflector = "0.11.4" +clap = { version = "4.4.11", features = ["derive"] } +hex = "0.4.3" +rusqlite = "0.31.0" +serde_json = { version = "1.0.108", features=["preserve_order"] } +jemallocator = "0.5.4" + diff --git a/01_ingest/grebi_ingest_sqlite/src/main.rs b/01_ingest/grebi_ingest_sqlite/src/main.rs new file mode 100644 index 0000000..483776e --- /dev/null +++ b/01_ingest/grebi_ingest_sqlite/src/main.rs @@ -0,0 +1,152 @@ +use rusqlite::{params, Connection, Result}; +use serde_json::{json, Value}; +use inflector::Inflector; +use std::collections::HashMap; +use clap::Parser; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +#[derive(clap::Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + + #[arg(long)] + datasource_name: String, + + #[arg(long)] + filename: String, + +} + +fn main() -> Result<()> { + + let args = Args::parse(); + let conn = Connection::open(&args.filename)?; + let prefix = args.datasource_name.to_lowercase(); + + let schema_info = get_schema_info(&conn)?; + let foreign_keys = get_foreign_keys(&conn)?; + + for (table, columns) in &schema_info { + + if table.starts_with("sqlite_") { + continue; + } + + let grebi_type = format!("{}:{}", prefix, table.to_singular()); + + eprintln!("--- Reading table: {} => {}", table, grebi_type); + + let primary_keys = get_primary_keys(&conn, table).unwrap(); + eprintln!("\tcolumns: {:?}", columns); + eprintln!("\tprimary keys: {:?}", primary_keys); + + let mut stmt = conn.prepare(&format!("SELECT * FROM {}", table)).unwrap(); + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + + let mut json_obj = json!({}); + let mut ids = Vec::new(); + + for (idx, column) in columns.iter().enumerate() { + + let value:Option = match row.get(idx)? { + rusqlite::types::Value::Null => None, + rusqlite::types::Value::Integer(i) => Some(i.to_string()), + rusqlite::types::Value::Real(r) => Some(r.to_string()), + rusqlite::types::Value::Text(t) => Some(t.to_string()), + rusqlite::types::Value::Blob(b) => Some(hex::encode(b)) + }; + + if value.is_none() { + continue; + } + + let v = value.unwrap(); + + let col_name = format!("{}:{}", prefix, column); + + if primary_keys.contains(column) { + ids.push(format!("{}:{}:{}", prefix, table.to_singular(), v.clone())); + } + + let fk_info = foreign_keys.get(&(table.clone(), column.clone())); + + if fk_info.is_some() { + json_obj[&col_name] = json!(format!("{}:{}:{}", prefix, fk_info.unwrap().0.to_singular(), v)); + ids.push(format!("{}:{}:{}", fk_info.unwrap().0.to_singular(), prefix, v)); + } else { + json_obj[&col_name] = json!(v); + } + } + + json_obj["grebi:type"] = json!(grebi_type); + json_obj["id"] = json!(ids); + + println!("{}", serde_json::to_string(&json_obj).unwrap()); + } + } + Ok(()) +} + +fn get_schema_info(conn: &Connection) -> Result>> { + let mut schema_info = HashMap::new(); + let mut stmt = conn.prepare("SELECT name FROM sqlite_master WHERE type='table'")?; + let tables = stmt.query_map(params![], |row| row.get(0))?; + + for table in tables { + let table: String = table?; + let mut columns = Vec::new(); + let mut col_stmt = conn.prepare(&format!("PRAGMA table_info({})", table))?; + let col_info = col_stmt.query_map(params![], |row| row.get(1))?; + + for col in col_info { + columns.push(col?); + } + schema_info.insert(table, columns); + } + Ok(schema_info) +} + +fn get_primary_keys(conn: &Connection, table: &str) -> Result> { + let mut primary_keys = Vec::new(); + let mut stmt = conn.prepare(&format!("PRAGMA table_info({})", table))?; + let col_info = stmt.query_map(params![], |row| { + let name: String = row.get(1)?; + let is_pk: bool = row.get(5)?; + Ok((name, is_pk)) + })?; + + for col in col_info { + let (name, is_pk) = col?; + if is_pk { + primary_keys.push(name); + } + } + Ok(primary_keys) +} + +fn get_foreign_keys(conn: &Connection) -> Result> { + let mut foreign_keys = HashMap::new(); + let mut stmt = conn.prepare("SELECT name FROM sqlite_master WHERE type='table'")?; + let tables = stmt.query_map(params![], |row| row.get(0))?; + + for table in tables { + let table: String = table?; + let mut fk_stmt = conn.prepare(&format!("PRAGMA foreign_key_list({})", table))?; + let fk_info = fk_stmt.query_map(params![], |row| { + let from: String = row.get(3)?; + let to_table: String = row.get(2)?; + let to: String = row.get(4)?; + Ok((from, to_table, to)) + })?; + + for fk in fk_info { + let (from, to_table, to) = fk?; + foreign_keys.insert((table.clone(), from), (to_table, to)); + } + } + Ok(foreign_keys) +} + diff --git a/02_assign_ids/grebi_assign_ids/Cargo.toml b/02_assign_ids/grebi_assign_ids/Cargo.toml index 88bddd0..af59944 100644 --- a/02_assign_ids/grebi_assign_ids/Cargo.toml +++ b/02_assign_ids/grebi_assign_ids/Cargo.toml @@ -8,7 +8,6 @@ serde_json = { version = "1.0.108", features=["preserve_order"] } grebi_shared = { path = "../../grebi_shared" } csv = "1.3.0" fasthash = "0.4.0" -rusqlite = "0.30.0" lmdb-zero = "0.4.4" bloomfilter = "1.0.13" jemallocator = "0.5.4" diff --git a/02_assign_ids/grebi_identifiers2groups/Cargo.toml b/02_assign_ids/grebi_identifiers2groups/Cargo.toml index ec77587..f2b322f 100644 --- a/02_assign_ids/grebi_identifiers2groups/Cargo.toml +++ b/02_assign_ids/grebi_identifiers2groups/Cargo.toml @@ -8,7 +8,6 @@ serde_json = { version = "1.0.108", features=["preserve_order"] } grebi_shared = { path = "../../grebi_shared" } csv = "1.3.0" fasthash = "0.4.0" -rusqlite = "0.30.0" lmdb-zero = "0.4.4" bloomfilter = "1.0.13" clap = { version = "4.4.11", features = ["derive"] } diff --git a/02_assign_ids/grebi_identifiers2groups/src/main.rs b/02_assign_ids/grebi_identifiers2groups/src/main.rs index c991bdb..c83bcc2 100644 --- a/02_assign_ids/grebi_identifiers2groups/src/main.rs +++ b/02_assign_ids/grebi_identifiers2groups/src/main.rs @@ -2,7 +2,6 @@ use std::collections::{HashSet, HashMap, BTreeMap}; use std::{env, io}; use csv; -use rusqlite::Connection; use bloomfilter::Bloom; use clap::Parser; use std::io::{BufRead, BufReader }; diff --git a/Cargo.lock b/Cargo.lock index befd238..db0422f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,16 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "adler" version = "1.0.2" @@ -29,12 +39,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" - [[package]] name = "anstream" version = "0.6.5" @@ -95,7 +99,7 @@ version = "0.69.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "cexpr", "clang-sys", "itertools", @@ -123,9 +127,9 @@ checksum = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "block-buffer" @@ -458,7 +462,6 @@ dependencies = [ "grebi_shared", "jemallocator", "lmdb-zero", - "rusqlite", "serde_json", ] @@ -485,7 +488,6 @@ dependencies = [ "fasthash", "grebi_shared", "lmdb-zero", - "rusqlite", "serde_json", ] @@ -567,6 +569,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "grebi_ingest_sqlite" +version = "0.1.0" +dependencies = [ + "Inflector", + "clap", + "hex", + "jemallocator", + "rusqlite", + "serde_json", +] + [[package]] name = "grebi_ingest_sssom" version = "0.1.0" @@ -667,14 +681,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ "ahash", - "allocator-api2", ] [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ "hashbrown", ] @@ -819,9 +832,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" dependencies = [ "pkg-config", "vcpkg", @@ -1115,11 +1128,11 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "fallible-iterator", "fallible-streaming-iterator", "hashlink", diff --git a/Cargo.toml b/Cargo.toml index f4a0ca1..4f782bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "01_ingest/grebi_ingest_json", "01_ingest/grebi_ingest_reactome", "01_ingest/grebi_ingest_kgx_edges", + "01_ingest/grebi_ingest_sqlite", "01_ingest/grebi_normalise_prefixes", "02_assign_ids/grebi_extract_identifiers", "02_assign_ids/grebi_identifiers2groups", diff --git a/configs/datasource_configs/chembl.json b/configs/datasource_configs/chembl.json new file mode 100644 index 0000000..344b106 --- /dev/null +++ b/configs/datasource_configs/chembl.json @@ -0,0 +1,13 @@ +{ + "name": "ChEMBL", + "enabled": true, + "ingests": [ + { + "ingest_files": ["./00_fetch_data/chembl/chembl_34/chembl_34_sqlite/chembl_34.db"], + "ingest_script": "./target/release/grebi_ingest_sqlite", + "stdin": false, + "ingest_args": [ + ] + } + ] +} \ No newline at end of file diff --git a/configs/subgraph_configs/ebi_full_monarch.json b/configs/subgraph_configs/ebi_full_monarch.json index 013b10d..b724b22 100644 --- a/configs/subgraph_configs/ebi_full_monarch.json +++ b/configs/subgraph_configs/ebi_full_monarch.json @@ -49,6 +49,7 @@ "./configs/datasource_configs/mondo_efo.json", "./configs/datasource_configs/hett_pesticides_appril.json", "./configs/datasource_configs/hett_pesticides_eu.json", - "./configs/datasource_configs/hett_pesticides_gb.json" + "./configs/datasource_configs/hett_pesticides_gb.json", + "./configs/datasource_configs/chembl.json" ] } diff --git a/configs/subgraph_configs/hett.json b/configs/subgraph_configs/hett.json index 7e24e2d..af52bd1 100644 --- a/configs/subgraph_configs/hett.json +++ b/configs/subgraph_configs/hett.json @@ -38,6 +38,7 @@ "datasource_configs": [ "./configs/datasource_configs/hett_pesticides_appril.json", "./configs/datasource_configs/hett_pesticides_eu.json", - "./configs/datasource_configs/hett_pesticides_gb.json" + "./configs/datasource_configs/hett_pesticides_gb.json", + "./configs/datasource_configs/chembl.json" ] } diff --git a/nextflow/01_create_subgraph.nf b/nextflow/01_create_subgraph.nf index 400e461..20167ba 100644 --- a/nextflow/01_create_subgraph.nf +++ b/nextflow/01_create_subgraph.nf @@ -8,6 +8,7 @@ params.tmp = "$GREBI_TMP" params.home = "$GREBI_HOME" params.config = "$GREBI_CONFIG" params.subgraph = "$GREBI_SUBGRAPH" +params.timestamp = "$GREBI_TIMESTAMP" params.is_ebi = "$GREBI_IS_EBI" workflow { @@ -95,10 +96,10 @@ process ingest { """ #!/usr/bin/env bash set -Eeuo pipefail - ${getDecompressionCommand(file_listing.filename)} \ - | ${getIngestCommand(file_listing.ingest.ingest_script)} \ + ${getStdinCommand(file_listing.ingest, file_listing.filename)} \ + ${getIngestCommand(file_listing.ingest.ingest_script)} \ --datasource-name ${file_listing.datasource.name} \ - --filename "${basename(file_listing.filename)}" \ + --filename "${file_listing.filename}" \ ${buildIngestArgs(file_listing.ingest.ingest_args)} \ | ${params.home}/target/release/grebi_normalise_prefixes ${params.home}/prefix_maps/prefix_map_normalise.json \ | tee >(${params.home}/target/release/grebi_extract_identifiers \ @@ -201,7 +202,7 @@ process index { path("metadata.jsonl"), emit: metadata_jsonl path("summary.json"), emit: summary_json path("names.txt"), emit: names_txt - path("ids.txt"), emit: ids_txt + path("ids_${params.subgraph}.txt"), emit: ids_txt script: """ @@ -209,11 +210,11 @@ process index { set -Eeuo pipefail cat ${merged_filenames.iterator().join(" ")} \ | ${params.home}/target/release/grebi_index \ - --subgraph-name {params.subgraph} \ + --subgraph-name ${params.subgraph} \ --out-metadata-jsonl-path metadata.jsonl \ --out-summary-json-path summary.json \ --out-names-txt names.txt \ - --out-ids-txt ids.txt + --out-ids-txt ids_${params.subgraph}.txt """ } @@ -639,17 +640,20 @@ def parseJson(json) { return new JsonSlurper().parseText(json) } -def getDecompressionCommand(filename) { +def getStdinCommand(ingest, filename) { + if (ingest.stdin == false) { + return "" + } def f = filename if (filename.startsWith(".")) { f = new File(params.home, filename).toString() } if (f.endsWith(".gz")) { - return "zcat ${f}" + return "zcat ${f} |" } else if (f.endsWith(".xz")) { - return "xzcat ${f}" + return "xzcat ${f} |" } else { - return "cat ${f}" + return "cat ${f} |" } } diff --git a/nextflow/02_create_dbs.nf b/nextflow/02_create_dbs.nf index 7aa6d0d..bdfc12d 100644 --- a/nextflow/02_create_dbs.nf +++ b/nextflow/02_create_dbs.nf @@ -16,7 +16,7 @@ workflow { neo_nodes_files = Channel.fromPath("${params.tmp}/${params.config}/*/neo4j_csv/neo_nodes_*.csv").collect() neo_edges_files = Channel.fromPath("${params.tmp}/${params.config}/*/neo4j_csv/neo_edges_*.csv").collect() - id_txts = Channel.fromPath("${params.tmp}/${params.config}/*/ids.txt").collect() + id_txts = Channel.fromPath("${params.tmp}/${params.config}/*/ids_*.txt").collect() ids_csv = create_combined_neo_ids_csv(id_txts).collect() neo_db = create_neo(neo_nodes_files.collect() + neo_edges_files.collect() + ids_csv) diff --git a/prefix_maps/extra_prefixes.json b/prefix_maps/extra_prefixes.json index 97bcbe4..cc4f1ef 100644 --- a/prefix_maps/extra_prefixes.json +++ b/prefix_maps/extra_prefixes.json @@ -186,5 +186,8 @@ "rouge:": "http://purl.uniprot.org/rouge/", "source:": "http://purl.uniprot.org/source/", "swiss-model-workspace:": "http://purl.uniprot.org/swiss-model-workspace/", -"unipathway:": "http://purl.uniprot.org/unipathway/" +"unipathway:": "http://purl.uniprot.org/unipathway/", +"EFO_": "efo:", +"GO_": "go:", +"BAO_": "bao:" }