diff --git a/config/output/config/dev-cqdg.conf b/config/output/config/dev-cqdg.conf index 8989b28c..e2233318 100644 --- a/config/output/config/dev-cqdg.conf +++ b/config/output/config/dev-cqdg.conf @@ -1883,6 +1883,27 @@ datalake { "valid_to_column"="valid_to" } }, + { + format=PARQUET + id="ncit_terms" + keys=[] + loadtype=OverWrite + partitionby=[] + path="/ncit_terms" + readoptions {} + storageid=storage + table { + database=normalized + name="ncit_terms" + } + writeoptions { + "created_on_column"="created_on" + "is_current_column"="is_current" + "updated_on_column"="updated_on" + "valid_from_column"="valid_from" + "valid_to_column"="valid_to" + } + }, { format=JSON id="icd_terms" diff --git a/config/output/config/prod-cqdg.conf b/config/output/config/prod-cqdg.conf index 163a285c..6322169e 100644 --- a/config/output/config/prod-cqdg.conf +++ b/config/output/config/prod-cqdg.conf @@ -1883,6 +1883,27 @@ datalake { "valid_to_column"="valid_to" } }, + { + format=PARQUET + id="ncit_terms" + keys=[] + loadtype=OverWrite + partitionby=[] + path="/ncit_terms" + readoptions {} + storageid=storage + table { + database="cqdg_portal_prod" + name="ncit_terms" + } + writeoptions { + "created_on_column"="created_on" + "is_current_column"="is_current" + "updated_on_column"="updated_on" + "valid_from_column"="valid_from" + "valid_to_column"="valid_to" + } + }, { format=JSON id="icd_terms" diff --git a/config/output/config/qa-cqdg.conf b/config/output/config/qa-cqdg.conf index ac730e59..89f15d27 100644 --- a/config/output/config/qa-cqdg.conf +++ b/config/output/config/qa-cqdg.conf @@ -1883,6 +1883,27 @@ datalake { "valid_to_column"="valid_to" } }, + { + format=PARQUET + id="ncit_terms" + keys=[] + loadtype=OverWrite + partitionby=[] + path="/ncit_terms" + readoptions {} + storageid=storage + table { + database="cqdg_portal_qa" + name="ncit_terms" + } + writeoptions { + "created_on_column"="created_on" + "is_current_column"="is_current" + "updated_on_column"="updated_on" + "valid_from_column"="valid_from" + "valid_to_column"="valid_to" + } + }, { format=JSON id="icd_terms" diff --git a/config/src/main/scala/bio/ferlab/fhir/etl/config/ConfigurationGenerator.scala b/config/src/main/scala/bio/ferlab/fhir/etl/config/ConfigurationGenerator.scala index 64595b97..809d8882 100644 --- a/config/src/main/scala/bio/ferlab/fhir/etl/config/ConfigurationGenerator.scala +++ b/config/src/main/scala/bio/ferlab/fhir/etl/config/ConfigurationGenerator.scala @@ -68,6 +68,7 @@ object ConfigurationGenerator extends App { rawsAndNormalized ++ Seq( DatasetConf(id = "hpo_terms", storageid = storage, path = s"/hpo_terms", table = Some(TableConf("database", "hpo_terms")), format = PARQUET, loadtype = OverWrite), DatasetConf(id = "mondo_terms", storageid = storage, path = s"/mondo_terms", table = Some(TableConf("database", "mondo_terms")), format = PARQUET, loadtype = OverWrite), + DatasetConf(id = "ncit_terms", storageid = storage, path = s"/ncit_terms", table = Some(TableConf("database", "ncit_terms")), format = PARQUET, loadtype = OverWrite), DatasetConf(id = "icd_terms", storageid = storage, path = s"/icd_terms", table = Some(TableConf("database", "icd_terms")), format = JSON, loadtype = OverWrite) ) ++ Seq( DatasetConf(id = "simple_participant", storageid = storage, path = s"/es_index/fhir/simple_participant", format = PARQUET, loadtype = OverWrite, partitionby = partitionByStudyIdAndReleaseId) diff --git a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/SpecimenEnricher.scala b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/SpecimenEnricher.scala index 016b28be..9cc1f706 100644 --- a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/SpecimenEnricher.scala +++ b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/SpecimenEnricher.scala @@ -25,6 +25,7 @@ case class SpecimenEnricher(rc: RuntimeETLContext, studyIds: Seq[String]) extend private val study: DatasetConf = conf.getDataset("normalized_research_study") private val disease: DatasetConf = conf.getDataset("normalized_diagnosis") private val disease_status: DatasetConf = conf.getDataset("normalized_disease_status") + private val ncit_terms: DatasetConf = conf.getDataset("ncit_terms") override def extract(lastRunDateTime: LocalDateTime, currentRunDateTime: LocalDateTime): Map[String, DataFrame] = { Seq(patient, specimen, group, family_relationship, sample_registration, study, disease, disease_status) @@ -64,8 +65,9 @@ case class SpecimenEnricher(rc: RuntimeETLContext, studyIds: Seq[String]) extend "submitter_participant_id", "family_id", "father_id", "mother_id") data(specimen.id) + .joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source") .join(patientDf, Seq("subject")) - .addSamplesToBiospecimen(data(sample_registration.id)) + .addSamplesToBiospecimen(data(sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type")) .withColumnRenamed("fhir_id", "biospecimen_id") .withColumnRenamed("sample_id", "fhir_sample_id") .withColumnRenamed("submitter_sample_id", "sample_id") diff --git a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/BiospecimenCentric.scala b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/BiospecimenCentric.scala index 2b11fff5..66301b71 100644 --- a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/BiospecimenCentric.scala +++ b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/BiospecimenCentric.scala @@ -19,6 +19,7 @@ class BiospecimenCentric(studyIds: List[String])(implicit configuration: Configu val simple_participant: DatasetConf = conf.getDataset("simple_participant") val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric") val es_index_file_centric: DatasetConf = conf.getDataset("es_index_file_centric") + val ncit_terms: DatasetConf = conf.getDataset("ncit_terms") override def extract(lastRunDateTime: LocalDateTime = minDateTime, currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = { @@ -34,10 +35,11 @@ class BiospecimenCentric(studyIds: List[String])(implicit configuration: Configu val transformedBiospecimen = biospecimenDF + .joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source") .addStudy(data(es_index_study_centric.id)) .addParticipant(data(simple_participant.id)) .addFiles(data(normalized_drs_document_reference.id), data(normalized_sequencing_experiment.id)) - .addSamplesToBiospecimen(data(normalized_sample_registration.id)) + .addSamplesToBiospecimen(data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type")) .withColumnRenamed("fhir_id", "biospecimen_id") .drop("subject") .withColumn("study_code", col("study.study_code")) diff --git a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/FileCentric.scala b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/FileCentric.scala index 03f205b2..4a17d9ff 100644 --- a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/FileCentric.scala +++ b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/FileCentric.scala @@ -18,6 +18,7 @@ class FileCentric(studyIds: List[String])(implicit configuration: Configuration) val normalized_sample_registration: DatasetConf = conf.getDataset("normalized_sample_registration") val simple_participant: DatasetConf = conf.getDataset("simple_participant") val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric") + val ncit_terms: DatasetConf = conf.getDataset("ncit_terms") override def extract(lastRunDateTime: LocalDateTime = minDateTime, currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = { @@ -32,7 +33,11 @@ class FileCentric(studyIds: List[String])(implicit configuration: Configuration) val transformedFile = fileDF - .addParticipantWithBiospecimen(data(simple_participant.id), data(normalized_biospecimen.id), data(normalized_sample_registration.id)) + .addParticipantWithBiospecimen( + data(simple_participant.id), + data(normalized_biospecimen.id).joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source"), + data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type") + ) .addStudy(data(es_index_study_centric.id)) .addSequencingExperiment(data(normalized_sequencing_experiment.id)) .addAssociatedDocumentRef() diff --git a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/ParticipantCentric.scala b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/ParticipantCentric.scala index 7c6c3f9e..28c28245 100644 --- a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/ParticipantCentric.scala +++ b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/centricTypes/ParticipantCentric.scala @@ -18,6 +18,7 @@ class ParticipantCentric(studyIds: List[String])(implicit configuration: Configu val normalized_biospecimen: DatasetConf = conf.getDataset("normalized_biospecimen") val normalized_sample_registration: DatasetConf = conf.getDataset("normalized_sample_registration") val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric") + val ncit_terms: DatasetConf = conf.getDataset("ncit_terms") override def extract(lastRunDateTime: LocalDateTime = minDateTime, currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = { @@ -35,9 +36,9 @@ class ParticipantCentric(studyIds: List[String])(implicit configuration: Configu .addStudy(data(es_index_study_centric.id)) .addFilesWithBiospecimen ( data(normalized_drs_document_reference.id), - data(normalized_biospecimen.id), + data(normalized_biospecimen.id).joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source"), data(normalized_sequencing_experiment.id), - data(normalized_sample_registration.id), + data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type"), ) .withColumn("biospecimens", array_distinct(flatten(col("files.biospecimens")))) .withColumn("participant_2_id", col("participant_id")) //copy column/ front-end requirements diff --git a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/common/Utils.scala b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/common/Utils.scala index ceb3813e..f24a1dd6 100644 --- a/prepare-index/src/main/scala/bio/ferlab/fhir/etl/common/Utils.scala +++ b/prepare-index/src/main/scala/bio/ferlab/fhir/etl/common/Utils.scala @@ -45,21 +45,11 @@ object Utils { def addSamplesToBiospecimen(samplesDf: DataFrame): DataFrame = { val samplesGrouped = samplesDf - .withColumn("sample_type", - when(col("sample_type")("display").isNotNull, - concat_ws(" ", col("sample_type")("display"), concat(lit("("), col("sample_type")("code"), lit(")")))) - .otherwise(col("sample_type")("code")) - ) .withColumnRenamed("fhir_id", "sample_id") .withColumn("sample_2_id", col("sample_id")) //doubling sample_id portal use .withColumnRenamed("parent", "fhir_id") df.join(samplesGrouped, Seq("fhir_id", "subject", "study_id"), "left_outer") - .withColumn("biospecimen_tissue_source", - when(col("biospecimen_tissue_source")("display").isNotNull, - concat_ws(" ", col("biospecimen_tissue_source")("display"), concat(lit("("), col("biospecimen_tissue_source")("code"), lit(")")))) - .otherwise(col("biospecimen_tissue_source")("code")) - ) } def addBiospecimen(biospecimenDf: DataFrame): DataFrame = { @@ -86,6 +76,17 @@ object Utils { .drop(biospecimenRenamedDf.columns.filterNot(_.equals("biospecimen_id")): _*) } + def joinNcitTerms(ncitTerms: DataFrame, targetCol: String): DataFrame = { + val columns = df.columns + + df.join(ncitTerms, col(s"$targetCol.code") === col("id"), "left_outer") + .withColumn(targetCol, + when(col(targetCol)("display").isNotNull, + concat_ws(" ", col(targetCol)("display"), concat(lit("("), col(targetCol)("code"), lit(")")))) + .otherwise(col(targetCol)("code")) + ).select(columns.map(col): _*) + } + def addDiagnosisPhenotypes(phenotypeDF: DataFrame, diagnosesDF: DataFrame)(hpoTerms: DataFrame, mondoTerms: DataFrame, icdTerms: DataFrame): DataFrame = { val (observedPhenotypes, observedPhenotypesWithAncestors, phenotypes) = getTaggedPhenotypes(phenotypeDF, hpoTerms) diff --git a/prepare-index/src/test/resources/ncit_terms/_SUCCESS b/prepare-index/src/test/resources/ncit_terms/_SUCCESS new file mode 100644 index 00000000..e69de29b diff --git a/prepare-index/src/test/resources/ncit_terms/part-00000-16c1c2fe-3cf8-4572-8653-31060146e8f8-c000.snappy.parquet b/prepare-index/src/test/resources/ncit_terms/part-00000-16c1c2fe-3cf8-4572-8653-31060146e8f8-c000.snappy.parquet new file mode 100644 index 00000000..410602bb Binary files /dev/null and b/prepare-index/src/test/resources/ncit_terms/part-00000-16c1c2fe-3cf8-4572-8653-31060146e8f8-c000.snappy.parquet differ diff --git a/prepare-index/src/test/scala/BiospecimenCentricSpec.scala b/prepare-index/src/test/scala/BiospecimenCentricSpec.scala index 576a5fd2..bc4949c3 100644 --- a/prepare-index/src/test/scala/BiospecimenCentricSpec.scala +++ b/prepare-index/src/test/scala/BiospecimenCentricSpec.scala @@ -1,4 +1,5 @@ import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, DatasetConf, SimpleConfiguration} +import bio.ferlab.datalake.spark3.loader.GenericLoader.read import bio.ferlab.fhir.etl.centricTypes.BiospecimenCentric import model._ import org.apache.spark.sql.DataFrame @@ -71,6 +72,7 @@ class BiospecimenCentricSpec extends AnyFlatSpec with Matchers with WithSparkSes SAMPLE_INPUT(`subject` = "P1", `parent` = "B1", `fhir_id` = "sam1"), SAMPLE_INPUT(`subject` = "P3", `parent` = "B4", `fhir_id` = "sam2"), ).toDF(), + "ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None), ) val output = new BiospecimenCentric(List("STU0000001"))(conf).transform(data) diff --git a/prepare-index/src/test/scala/FileCentricSpec.scala b/prepare-index/src/test/scala/FileCentricSpec.scala index d34be196..cdac4f59 100644 --- a/prepare-index/src/test/scala/FileCentricSpec.scala +++ b/prepare-index/src/test/scala/FileCentricSpec.scala @@ -1,4 +1,5 @@ import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, SimpleConfiguration} +import bio.ferlab.datalake.spark3.loader.GenericLoader import bio.ferlab.fhir.etl.centricTypes.FileCentric import model._ import org.apache.spark.sql.DataFrame @@ -42,6 +43,7 @@ class FileCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession { SAMPLE_INPUT(`subject` = "P1", `parent` = "B1", `fhir_id` = "sam1"), SAMPLE_INPUT(`subject` = "P1", `parent` = "B2", `fhir_id` = "sam2"), ).toDF(), + "ncit_terms" -> GenericLoader.read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None), ) val output = new FileCentric(List("STU0000001"))(conf).transform(data) diff --git a/prepare-index/src/test/scala/ParticipantCentricSpec.scala b/prepare-index/src/test/scala/ParticipantCentricSpec.scala index f365ccb6..eb353d37 100644 --- a/prepare-index/src/test/scala/ParticipantCentricSpec.scala +++ b/prepare-index/src/test/scala/ParticipantCentricSpec.scala @@ -1,7 +1,9 @@ import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, SimpleConfiguration} +import bio.ferlab.datalake.spark3.loader.GenericLoader.read import bio.ferlab.fhir.etl.centricTypes.ParticipantCentric import model._ -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.functions.col import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec import pureconfig.generic.auto._ @@ -35,6 +37,7 @@ class ParticipantCentricSpec extends AnyFlatSpec with Matchers with WithSparkSes SAMPLE_INPUT(fhir_id = "S2", `subject` = "P2", `parent` = "B2") ).toDF(), "es_index_study_centric" -> Seq(STUDY_CENTRIC()).toDF(), + "ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None), ) val output = new ParticipantCentric(List("SD_Z6MWD3H0"))(conf).transform(data) diff --git a/prepare-index/src/test/scala/SpecimenEnricherSpec.scala b/prepare-index/src/test/scala/SpecimenEnricherSpec.scala index 311a494b..a5384f22 100644 --- a/prepare-index/src/test/scala/SpecimenEnricherSpec.scala +++ b/prepare-index/src/test/scala/SpecimenEnricherSpec.scala @@ -1,3 +1,4 @@ +import bio.ferlab.datalake.spark3.loader.GenericLoader.read import bio.ferlab.datalake.testutils.TestETLContext import bio.ferlab.fhir.etl.SpecimenEnricher import model._ @@ -31,7 +32,7 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi "normalized_biospecimen" -> Seq( BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_1", subject = "P1", `submitter_biospecimen_id` = "BS_1"), BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_2", subject = "P2", `submitter_biospecimen_id` = "BS_2"), - BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_3", subject = "P3", `submitter_biospecimen_id` = "BS_3") + BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_3", subject = "P3", `submitter_biospecimen_id` = "BS_3", biospecimen_tissue_source = CODE_SYSTEM(`code` = "Unknown", `display` = null)) ).toDF(), "normalized_research_study" -> Seq( RESEARCHSTUDY(`study_code` = "study_code1"), @@ -43,13 +44,16 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi DISEASE_STATUS(`fhir_id` = "F1", `subject` = "P1", `disease_status` = "yes"), DISEASE_STATUS(`fhir_id` = "F2", `subject` = "P2", `disease_status` = null), DISEASE_STATUS(`fhir_id` = "F3", `subject` = "P3", `disease_status` = "no"), - ).toDF() + ).toDF(), + "ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None), ) val output = SpecimenEnricher(TestETLContext(), Seq("SD_Z6MWD3H0")).transform(data) val resultDF = output("enriched_specimen") + resultDF.show(false) + val specimensEnriched = resultDF.as[SPECIMEN_ENRICHED].collect() specimensEnriched.find(_.`biospecimen_id` == "FHIR_BS_1") shouldBe Some( @@ -68,7 +72,7 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi )) specimensEnriched.find(_.`biospecimen_id` == "FHIR_BS_3") shouldBe Some( SPECIMEN_ENRICHED(`biospecimen_id` = "FHIR_BS_3", `age_biospecimen_collection` = "Young", `submitter_biospecimen_id` = "BS_3", - `participant_id` = "P3",`participant_fhir_id` = "P3", `submitter_participant_id` = "P3_internal", `is_affected` = Some(false), + `biospecimen_tissue_source`= "Unknown", `participant_id` = "P3",`participant_fhir_id` = "P3", `submitter_participant_id` = "P3_internal", `is_affected` = Some(false), `sample_id` = null, `sample_type` = null, `fhir_sample_id` = null) )