diff --git a/Makefile b/Makefile index c2b7268..d7f6c16 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,5 @@ kg-microbe-biomedical: kg-microbe-biomedical-function-merge: poetry run kg merge -m duckdb -n 1000000 -e 100000 -s "bacdive, mediadive, madin_etal, rhea_mappings, bactotraits, chebi, ec, envo, go, ncbitaxon, upa, hp, mondo, ctd, wallen_etal, uniprot_human, uniprot_functional_microbes" --merge-label $@ -include kg-microbe-merge.Makefile \ No newline at end of file +include kg-microbe-merge.Makefile + diff --git a/download.yaml b/download.yaml index 4c37e62..7e1a876 100644 --- a/download.yaml +++ b/download.yaml @@ -37,7 +37,7 @@ # - url: git://Knowledge-Graph-Hub/kg-microbe/BactoTraits.tar.gz - local_name: BactoTraits.tar.gz + local_name: bactotraits.tar.gz # # KG-Microbe [CTD] diff --git a/hpc/run_parallel_merge.sl b/hpc/run_parallel_merge.sl new file mode 100644 index 0000000..d69addc --- /dev/null +++ b/hpc/run_parallel_merge.sl @@ -0,0 +1,32 @@ +#!/bin/bash +#SBATCH --account=m4689 +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --time=360 +#SBATCH --ntasks=1 +#SBATCH --mem=425GB +#SBATCH --job-name=parallel_merge +#SBATCH --output=parallel_merge_%A_%a.out +#SBATCH --error=parallel_merge_%A_%a.err +#SBATCH --array=0-3 +#SBATCH -N 1 + +module load python/3.10 +cd kg-microbe-merge +python -m venv venv-merge +source venv-merge/bin/activate +pip install poetry +poetry install + +# Array of merged graph names +merges=( + kg-microbe-core + kg-microbe-biomedical +) + +# Get the merge for this job array task +merge=${merges[$SLURM_ARRAY_TASK_ID]} + +echo "Starting $merge" +time poetry run make $merge +echo "Finished $merge" diff --git a/hpc/run_parallel_merge_biomedical_function.sl b/hpc/run_parallel_merge_biomedical_function.sl new file mode 100644 index 0000000..83d1ad4 --- /dev/null +++ b/hpc/run_parallel_merge_biomedical_function.sl @@ -0,0 +1,33 @@ +#!/bin/bash +#SBATCH --account=m4689 +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --time=360 +#SBATCH --ntasks=1 +#SBATCH --mem=425GB +#SBATCH --job-name=parallel_merge +#SBATCH --output=parallel_merge_%A_%a.out +#SBATCH --error=parallel_merge_%A_%a.err +#SBATCH --array=0 +#SBATCH -N 1 +#SBATCH --mail-type=BEGIN,END +#SBATCH --mail-user=MJoachimiak@lbl.gov + +module load python/3.10 +cd kg-microbe-merge +python -m venv venv-merge +source venv-merge/bin/activate +pip install poetry +poetry install + +# Array of merged graph names +merges=( + kg-microbe-biomedical-function +) + +# Get the merge for this job array task +merge=${merges[$SLURM_ARRAY_TASK_ID]} + +echo "Starting $merge" +time poetry run make $merge +echo "Finished $merge" diff --git a/hpc/run_parallel_merge_function.sl b/hpc/run_parallel_merge_function.sl new file mode 100644 index 0000000..673dd56 --- /dev/null +++ b/hpc/run_parallel_merge_function.sl @@ -0,0 +1,33 @@ +#!/bin/bash +#SBATCH --account=m4689 +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --time=360 +#SBATCH --ntasks=1 +#SBATCH --mem=425GB +#SBATCH --job-name=parallel_merge +#SBATCH --output=parallel_merge_%A_%a.out +#SBATCH --error=parallel_merge_%A_%a.err +#SBATCH --array=0 +#SBATCH -N 1 +#SBATCH --mail-type=BEGIN,END +#SBATCH --mail-user=MJoachimiak@lbl.gov + +module load python/3.10 +cd kg-microbe-merge +python -m venv venv-merge +source venv-merge/bin/activate +pip install poetry +poetry install + +# Array of merged graph names +merges=( + kg-microbe-function +) + +# Get the merge for this job array task +merge=${merges[$SLURM_ARRAY_TASK_ID]} + +echo "Starting $merge" +time poetry run make $merge +echo "Finished $merge" diff --git a/kg_microbe_merge/utils/duckdb_utils.py b/kg_microbe_merge/utils/duckdb_utils.py index f1f3e56..1fe8cdc 100644 --- a/kg_microbe_merge/utils/duckdb_utils.py +++ b/kg_microbe_merge/utils/duckdb_utils.py @@ -1,6 +1,7 @@ """Utility functions for working with DuckDB in the KG Microbe Merge project.""" import os +from pathlib import Path from typing import List import duckdb @@ -304,7 +305,9 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz :param priority_sources: List of source names to prioritize. """ # Create a DuckDB connection - conn = duckdb.connect("nodes.db") + merge_label = Path(output_file).parent.name + nodes_db_file = f"{merge_label}_nodes.db" + conn = duckdb.connect(nodes_db_file) # Load the files into DuckDB load_into_duckdb(conn, nodes_file_list, "combined_nodes") @@ -379,7 +382,7 @@ def duckdb_nodes_merge(nodes_file_list, output_file, priority_sources, batch_siz finally: # Close the connection conn.close() - os.remove("nodes.db") + os.remove(nodes_db_file) def duckdb_edges_merge(edges_file_list, output_file, batch_size=1000000): @@ -416,7 +419,8 @@ def duckdb_edges_merge(edges_file_list, output_file, batch_size=1000000): memory usage and allows for processing of very large datasets that exceed available RAM. """ os.makedirs(TMP_DIR, exist_ok=True) - db_file = "edges_persistent.db" + merge_label = Path(output_file).parent.name + db_file = f"{merge_label}_edges_persistent.db" conn = duckdb.connect(db_file) try: