Skip to content

Commit

Permalink
add hra kg (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl authored Jul 4, 2024
1 parent 32ec824 commit d1a8042
Show file tree
Hide file tree
Showing 25 changed files with 958 additions and 739 deletions.
17 changes: 13 additions & 4 deletions 01_ingest/grebi_ingest_ols/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ fn read_entities(json: &mut JsonStreamReader<BufReader<StdinLock<'_>>>, output_n
output_nodes.write_all(r#","#.as_bytes()).unwrap();
}
output_nodes.write_all(r#"""#.as_bytes()).unwrap();
output_nodes.write_all(name.as_bytes()).unwrap();
write_escaped_string(&name.as_bytes(), output_nodes);
output_nodes.write_all(r#"""#.as_bytes()).unwrap();
}
output_nodes.write_all(r#"]"#.as_bytes()).unwrap();
Expand Down Expand Up @@ -434,8 +434,17 @@ fn reprefix_predicate(pred:&str) -> String {
}
}




fn write_escaped_string(str:&[u8], writer:&mut BufWriter<StdoutLock>) {
for c in str {
match c {
b'"' => { writer.write_all(b"\\\"").unwrap(); }
b'\\' => { writer.write_all(b"\\\\").unwrap(); }
b'\n' => { writer.write_all(b"\\n").unwrap(); }
b'\r' => { writer.write_all(b"\\r").unwrap(); }
b'\t' => { writer.write_all(b"\\t").unwrap(); }
_ => { writer.write_all([*c].as_slice()).unwrap(); }
}
}
}


48 changes: 33 additions & 15 deletions 01_ingest/grebi_ingest_rdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ struct Args {
nest_objects_of_predicate:Vec<String>,

#[arg(long)]
exclude_objects_of_predicate:Vec<String> // if an object is used with this predicate, ignore the object
exclude_objects_of_predicate:Vec<String>, // if an object is used with this predicate, ignore the object

#[arg(long, default_value_t = false)]
rdf_types_are_grebi_types:bool
}

fn main() -> std::io::Result<()> {
Expand All @@ -93,6 +96,7 @@ fn main() -> std::io::Result<()> {

let nest_preds:BTreeSet<String> = args.nest_objects_of_predicate.into_iter().collect();
let ignore_preds:BTreeSet<String> = args.exclude_objects_of_predicate.into_iter().collect();
let rdf_types_are_grebi_types = args.rdf_types_are_grebi_types;

let gr:CustomGraph = match args.rdf_type.as_str() {
"rdf_triples_xml" => {
Expand All @@ -102,14 +106,11 @@ fn main() -> std::io::Result<()> {
},
"rdf_quads_nq" => {

if args.rdf_graph.len() == 0 {
panic!("must specify at least one graph to load for nquads");
}

let parser = NQuadsParser {};

let quad_source = parser.parse(reader);
let mut filtered_quads = quad_source.filter_quads(|q| args.rdf_graph.contains(&q.g().unwrap().value().to_string()));
let mut filtered_quads = quad_source.filter_quads(|q|
args.rdf_graph.len() == 0 || args.rdf_graph.contains(&q.g().unwrap().value().to_string()));

let mut g:CustomGraph = CustomGraph::new();

Expand Down Expand Up @@ -160,7 +161,7 @@ fn main() -> std::io::Result<()> {

eprintln!("Building reification index took {} seconds", start_time.elapsed().as_secs());

write_subjects(ds, &mut output_nodes, &nest_preds, &exclude_subjects, &exclude_subjects_at_toplevel, reifs);
write_subjects(ds, &mut output_nodes, &nest_preds, &exclude_subjects, &exclude_subjects_at_toplevel, reifs, rdf_types_are_grebi_types);

eprintln!("Total time elapsed: {} seconds", start_time.elapsed().as_secs());

Expand Down Expand Up @@ -189,7 +190,7 @@ fn populate_reifs(
let annotated_predicate = ds.triples_matching(&s, &pred_prop, &ANY).next().unwrap().unwrap().o().clone();
let annotated_object = ds.triples_matching(&s, &obj_prop, &ANY).next().unwrap().unwrap().o().clone();

let obj_json = term_to_json(&annotated_object, ds, nest_preds, None).to_string();
let obj_json = term_to_json(&annotated_object, ds, nest_preds, None, false).to_string();

let lhs = ReifLhs {
s: annotated_subject.clone(),
Expand All @@ -212,7 +213,14 @@ fn populate_reifs(
}


fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter<StdoutLock>, nest_preds:&BTreeSet<String>, exclude_subjects:&HashSet<Term<Rc<str>>>, exclude_subjects_at_toplevel:&HashSet<Term<Rc<str>>>, reifs:HashMap<ReifLhs, BTreeMap<String, Term<Rc<str>>>>) {
fn write_subjects(
ds:&CustomGraph,
nodes_writer:&mut BufWriter<StdoutLock>,
nest_preds:&BTreeSet<String>,
exclude_subjects:&HashSet<Term<Rc<str>>>,
exclude_subjects_at_toplevel:&HashSet<Term<Rc<str>>>,
reifs:HashMap<ReifLhs, BTreeMap<String, Term<Rc<str>>>>,
rdf_types_are_grebi_types:bool) {

let start_time2 = std::time::Instant::now();

Expand All @@ -229,7 +237,7 @@ fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter<StdoutLock>, nest
continue;
}

let json = term_to_json(s, ds, nest_preds, Some(&reifs));
let json = term_to_json(s, ds, nest_preds, Some(&reifs), rdf_types_are_grebi_types);

let json_obj = json.as_object().unwrap();
let types = json_obj.get("http://www.w3.org/1999/02/22-rdf-syntax-ns#type");
Expand All @@ -252,7 +260,13 @@ fn write_subjects(ds:&CustomGraph, nodes_writer:&mut BufWriter<StdoutLock>, nest
eprintln!("Writing JSONL took {} seconds", start_time2.elapsed().as_secs());
}

fn term_to_json(term:&Term<Rc<str>>, ds:&CustomGraph, nest_preds:&BTreeSet<String>, reifs:Option<&HashMap<ReifLhs, BTreeMap<String, Term<Rc<str>>>>>) -> Value {
fn term_to_json(
term:&Term<Rc<str>>,
ds:&CustomGraph,
nest_preds:&BTreeSet<String>,
reifs:Option<&HashMap<ReifLhs, BTreeMap<String, Term<Rc<str>>>>>,
rdf_types_are_grebi_types:bool
) -> Value {

let triples = ds.triples_matching(term, &ANY, &ANY);

Expand Down Expand Up @@ -285,7 +299,7 @@ fn term_to_json(term:&Term<Rc<str>>, ds:&CustomGraph, nest_preds:&BTreeSet<Strin
let reifs_for_this_sp = reifs_u.get(&ReifLhs { s: tu.s().clone(), p: tu.p().clone() });
if reifs_for_this_sp.is_some() {
let reifs_for_this_sp_u = reifs_for_this_sp.unwrap();
let o_json = term_to_json(&o, ds, nest_preds, None).to_string();
let o_json = term_to_json(&o, ds, nest_preds, None, false).to_string();
let reif = reifs_for_this_sp_u.get(&o_json);
if reif.is_some() {
Some(reif.unwrap())
Expand All @@ -304,7 +318,7 @@ fn term_to_json(term:&Term<Rc<str>>, ds:&CustomGraph, nest_preds:&BTreeSet<Strin
if nest_preds.contains(p) {
match o.kind() {
Iri|Literal|BlankNode => {
let mut obj = term_to_json(o, ds, nest_preds, reifs);
let mut obj = term_to_json(o, ds, nest_preds, reifs, false);
let obj_o = obj.as_object_mut().unwrap();
obj_o.remove_entry("id");
obj
Expand All @@ -314,14 +328,14 @@ fn term_to_json(term:&Term<Rc<str>>, ds:&CustomGraph, nest_preds:&BTreeSet<Strin
} else {
match o.kind() {
Iri|Literal => Value::String( o.value().to_string() ),
BlankNode => term_to_json(o, ds, nest_preds, reifs),
BlankNode => term_to_json(o, ds, nest_preds, reifs, false),
Variable => todo!(),
}
}
};

if reif_subj.is_some() {
let mut reif_as_json = term_to_json(reif_subj.unwrap(), ds, nest_preds, None);
let mut reif_as_json = term_to_json(reif_subj.unwrap(), ds, nest_preds, None, false);
let reif_as_json_o = reif_as_json.as_object_mut().unwrap();
reif_as_json_o.remove_entry("http://www.w3.org/1999/02/22-rdf-syntax-ns#type");
reif_as_json_o.remove_entry("id");
Expand All @@ -340,6 +354,10 @@ fn term_to_json(term:&Term<Rc<str>>, ds:&CustomGraph, nest_preds:&BTreeSet<Strin
}
}

if rdf_types_are_grebi_types && json.contains_key("http://www.w3.org/1999/02/22-rdf-syntax-ns#type") {
json.insert("grebi:type".to_string(), json.get("http://www.w3.org/1999/02/22-rdf-syntax-ns#type").unwrap().clone());
}

return Value::Object(json);
}

Expand Down
17 changes: 14 additions & 3 deletions 03_merge/grebi_merge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ struct Args {
#[arg(long)]
exclude_props: String,

#[arg(long)]
annotate_subgraph_name: Option<String>,

#[arg(trailing_var_arg = true, allow_hyphen_values = true, required = true)]
_files: Vec<String>,
}
Expand All @@ -51,6 +54,8 @@ fn main() -> std::io::Result<()> {
input_filenames.sort();
input_filenames.dedup();

let subgraph_name:Option<String> = args.annotate_subgraph_name;

let mut inputs: Vec<Input> = input_filenames
.iter()
.map(|file| {
Expand Down Expand Up @@ -110,7 +115,7 @@ fn main() -> std::io::Result<()> {
if !id.eq(&cur_id) {
// this is a new subject; we have finished the old one (if present)
if cur_id.len() > 0 {
write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props);
write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props, &subgraph_name);
lines_to_write.clear();
}
cur_id = id.to_vec();
Expand Down Expand Up @@ -143,7 +148,7 @@ fn main() -> std::io::Result<()> {
}

if cur_id.len() > 0 {
write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props);
write_merged_entity(&lines_to_write, &mut writer, &inputs, &exclude_props, &subgraph_name);
lines_to_write.clear();
}

Expand All @@ -153,7 +158,7 @@ fn main() -> std::io::Result<()> {
}

#[inline(always)]
fn write_merged_entity(lines_to_write: &Vec<BufferedLine>, stdout: &mut BufWriter<std::io::StdoutLock>, inputs: &Vec<Input>, exclude_props:&BTreeSet<Vec<u8>>) {
fn write_merged_entity(lines_to_write: &Vec<BufferedLine>, stdout: &mut BufWriter<std::io::StdoutLock>, inputs: &Vec<Input>, exclude_props:&BTreeSet<Vec<u8>>, subgraph_name:&Option<String>) {

if lines_to_write.len() == 0 {
panic!();
Expand Down Expand Up @@ -220,6 +225,12 @@ fn write_merged_entity(lines_to_write: &Vec<BufferedLine>, stdout: &mut BufWrite
}
stdout.write_all(r#"]"#.as_bytes()).unwrap();

if subgraph_name.is_some() {
stdout.write_all(r#","grebi:subgraph":""#.as_bytes()).unwrap();
stdout.write_all(&subgraph_name.as_ref().unwrap().as_bytes());
stdout.write_all(r#"""#.as_bytes()).unwrap();
}

// sort by key, then value, then datasource
merged_props.sort_by(|a, b| {
match a.1.key.cmp(&b.1.key) {
Expand Down
4 changes: 4 additions & 0 deletions 04_index/grebi_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use serde_json::json;
#[command(author, version, about, long_about = None)]
struct Args {

#[arg(long)]
subgraph_name: String,

#[arg(long)]
out_summary_json_path: String,

Expand Down Expand Up @@ -195,6 +198,7 @@ fn main() {

summary_writer.write_all(
serde_json::to_string_pretty(&json!({
"subgraph_name": args.subgraph_name,
"entity_props": entity_props_to_count.iter().map(|(k,v)| {
return (String::from_utf8(k.to_vec()).unwrap(), json!({
"count": v
Expand Down
24 changes: 18 additions & 6 deletions 06_prepare_db_import/grebi_make_csv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io::BufWriter;
use std::io::BufReader;
use std::io::Write;
use std::io::BufRead;
use std::collections::HashSet;
use clap::Parser;
use grebi_shared::json_lexer::JsonTokenType;
use grebi_shared::slice_materialised_edge::SlicedEdge;
Expand All @@ -28,7 +29,7 @@ struct Args {
in_edges_jsonl: String,

#[arg(long)]
in_summary_json: String,
in_summary_jsons: String,

#[arg(long)]
out_nodes_csv_path: String,
Expand All @@ -43,10 +44,21 @@ fn main() -> std::io::Result<()> {

let start_time = std::time::Instant::now();

let summary:Value = serde_json::from_reader(File::open(args.in_summary_json).unwrap()).unwrap();

let all_entity_props: Vec<String> = summary["entity_props"].as_object().unwrap().keys().cloned().collect();
let all_edge_props: Vec<String> = summary["edge_props"].as_object().unwrap().keys().cloned().collect();
let mut all_entity_props: HashSet<String> = HashSet::new();
let mut all_edge_props: HashSet<String> = HashSet::new();


for f in args.in_summary_jsons.split(",") {
let summary:Value = serde_json::from_reader(File::open(f).unwrap()).unwrap();
for prop in summary["edge_props"].as_object().unwrap().keys() {
all_edge_props.insert(prop.to_string());
}
for prop in summary["entity_props"].as_object().unwrap().keys() {
all_entity_props.insert(prop.to_string());
}
}



let mut nodes_reader = BufReader::new(File::open(args.in_nodes_jsonl).unwrap());
Expand Down Expand Up @@ -140,7 +152,7 @@ fn main() -> std::io::Result<()> {
Ok(())
}

fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&Vec<String>, nodes_writer:&mut BufWriter<&File>) {
fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<String>, nodes_writer:&mut BufWriter<&File>) {

let refs:Map<String,Value> = serde_json::from_slice(entity._refs.unwrap()).unwrap();

Expand Down Expand Up @@ -214,7 +226,7 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&Vec<String>,
nodes_writer.write_all(b"\n").unwrap();
}

fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&Vec<String>, edges_writer: &mut BufWriter<&File>) {
fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&HashSet<String>, edges_writer: &mut BufWriter<&File>) {

let refs:Map<String,Value> = serde_json::from_slice(edge._refs.unwrap()).unwrap();

Expand Down
35 changes: 35 additions & 0 deletions 06_prepare_db_import/make_solr_autocomplete_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

import json
import os
import sys
import shlex
import time
import glob
import argparse
from pathlib import Path
from subprocess import Popen, PIPE, STDOUT


def main():
parser = argparse.ArgumentParser(description='Create Solr autocomplete config')
parser.add_argument('--subgraph-name', type=str, help='subgraph name', required=True)
parser.add_argument('--in-template-config-dir', type=str, help='Path of config template', required=True)
parser.add_argument('--out-config-dir', type=str, help='Path to write config', required=True)
args = parser.parse_args()

os.makedirs(args.out_config_dir)

autocomplete_core_path = os.path.join(args.out_config_dir, f'grebi_autocomplete_{args.subgraph_name}')
os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_autocomplete")) + ' ' + shlex.quote(autocomplete_core_path))

os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solr.xml")) + ' ' + shlex.quote(args.out_config_dir))
os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solrconfig.xml")) + ' ' + shlex.quote(args.out_config_dir))
os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "zoo.cfg")) + ' ' + shlex.quote(args.out_config_dir))

Path(f'{autocomplete_core_path}/core.properties').write_text(f"name=grebi_autocomplete_{args.subgraph_name}\n")

if __name__=="__main__":
main()



19 changes: 16 additions & 3 deletions 06_prepare_db_import/make_solr_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,39 @@

def main():
parser = argparse.ArgumentParser(description='Create Solr config')
parser.add_argument('--subgraph-name', type=str, help='subgraph name', required=True)
parser.add_argument('--in-summary-json', type=str, help='summary.json', required=True)
parser.add_argument('--in-template-config-dir', type=str, help='Path of config template', required=True)
parser.add_argument('--out-config-dir', type=str, help='Path to write config', required=True)
args = parser.parse_args()

os.system('cp -r ' + shlex.quote(args.in_template_config_dir) + ' ' + shlex.quote(args.out_config_dir))
os.makedirs(args.out_config_dir)

nodes_core_path = os.path.join(args.out_config_dir, f'grebi_nodes_{args.subgraph_name}')
edges_core_path = os.path.join(args.out_config_dir, f'grebi_edges_{args.subgraph_name}')
os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_nodes")) + ' ' + shlex.quote(nodes_core_path))
os.system('cp -r ' + shlex.quote(os.path.join(args.in_template_config_dir, "grebi_edges")) + ' ' + shlex.quote(edges_core_path))

os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solr.xml")) + ' ' + shlex.quote(args.out_config_dir))
os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "solrconfig.xml")) + ' ' + shlex.quote(args.out_config_dir))
os.system('cp ' + shlex.quote(os.path.join(args.in_template_config_dir, "zoo.cfg")) + ' ' + shlex.quote(args.out_config_dir))

summary = json.load(open(args.in_summary_json))
node_props = map(lambda f: f.replace(':', '__').replace('&', '_'), summary['entity_props'].keys())
edge_props = map(lambda f: f.replace(':', '__').replace('&', '_'), summary['edge_props'].keys())

nodes_schema = Path(os.path.join(args.out_config_dir, 'grebi_nodes/conf/schema.xml'))
Path(f'{nodes_core_path}/core.properties').write_text(f"name=grebi_nodes_{args.subgraph_name}\n")
Path(f'{edges_core_path}/core.properties').write_text(f"name=grebi_edges_{args.subgraph_name}\n")

nodes_schema = Path(f'{nodes_core_path}/conf/schema.xml')
nodes_schema.write_text(nodes_schema.read_text().replace('[[GREBI_FIELDS]]', '\n'.join(list(map(
lambda f: '\n'.join([
f'<field name="{f}" type="string" indexed="true" stored="false" required="false" multiValued="true" />',
f'<copyField source="{f}" dest="str_{f}"/>',
f'<copyField source="{f}" dest="lowercase_{f}"/>'
]), node_props)))))

edges_schema = Path(os.path.join(args.out_config_dir, 'grebi_edges/conf/schema.xml'))
edges_schema = Path(f'{edges_core_path}/conf/schema.xml')
edges_schema.write_text(edges_schema.read_text().replace('[[GREBI_FIELDS]]', '\n'.join(list(map(
lambda f: '\n'.join([
f'<field name="{f}" type="string" indexed="true" stored="false" required="false" multiValued="true" />',
Expand Down
2 changes: 2 additions & 0 deletions 07_create_db/neo4j/create_indexes.cypher
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

CREATE INDEX node_id FOR (n:GraphNode) ON n.`grebi:nodeId`
;
CREATE INDEX subgraph FOR (n:GraphNode) ON n.`grebi:subgraph`
;
CALL db.awaitIndexes(10800)
;
Loading

0 comments on commit d1a8042

Please sign in to comment.