Skip to content

Commit

Permalink
Merge pull request #95 from Ferlab-Ste-Justine/feat/cqdg-842_ncit_fro…
Browse files Browse the repository at this point in the history
…m_obo

fix: CQDG-842 join NCIT terms from obo file
  • Loading branch information
adipaul1981 authored Sep 18, 2024
2 parents fe36204 + 68ccce4 commit 2be640b
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 19 deletions.
21 changes: 21 additions & 0 deletions config/output/config/dev-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,27 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=PARQUET
id="ncit_terms"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/ncit_terms"
readoptions {}
storageid=storage
table {
database=normalized
name="ncit_terms"
}
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=JSON
id="icd_terms"
Expand Down
21 changes: 21 additions & 0 deletions config/output/config/prod-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,27 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=PARQUET
id="ncit_terms"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/ncit_terms"
readoptions {}
storageid=storage
table {
database="cqdg_portal_prod"
name="ncit_terms"
}
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=JSON
id="icd_terms"
Expand Down
21 changes: 21 additions & 0 deletions config/output/config/qa-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,27 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=PARQUET
id="ncit_terms"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/ncit_terms"
readoptions {}
storageid=storage
table {
database="cqdg_portal_qa"
name="ncit_terms"
}
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=JSON
id="icd_terms"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ object ConfigurationGenerator extends App {
rawsAndNormalized ++ Seq(
DatasetConf(id = "hpo_terms", storageid = storage, path = s"/hpo_terms", table = Some(TableConf("database", "hpo_terms")), format = PARQUET, loadtype = OverWrite),
DatasetConf(id = "mondo_terms", storageid = storage, path = s"/mondo_terms", table = Some(TableConf("database", "mondo_terms")), format = PARQUET, loadtype = OverWrite),
DatasetConf(id = "ncit_terms", storageid = storage, path = s"/ncit_terms", table = Some(TableConf("database", "ncit_terms")), format = PARQUET, loadtype = OverWrite),
DatasetConf(id = "icd_terms", storageid = storage, path = s"/icd_terms", table = Some(TableConf("database", "icd_terms")), format = JSON, loadtype = OverWrite)
) ++ Seq(
DatasetConf(id = "simple_participant", storageid = storage, path = s"/es_index/fhir/simple_participant", format = PARQUET, loadtype = OverWrite, partitionby = partitionByStudyIdAndReleaseId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class SpecimenEnricher(rc: RuntimeETLContext, studyIds: Seq[String]) extend
private val study: DatasetConf = conf.getDataset("normalized_research_study")
private val disease: DatasetConf = conf.getDataset("normalized_diagnosis")
private val disease_status: DatasetConf = conf.getDataset("normalized_disease_status")
private val ncit_terms: DatasetConf = conf.getDataset("ncit_terms")

override def extract(lastRunDateTime: LocalDateTime, currentRunDateTime: LocalDateTime): Map[String, DataFrame] = {
Seq(patient, specimen, group, family_relationship, sample_registration, study, disease, disease_status)
Expand Down Expand Up @@ -64,8 +65,9 @@ case class SpecimenEnricher(rc: RuntimeETLContext, studyIds: Seq[String]) extend
"submitter_participant_id", "family_id", "father_id", "mother_id")

data(specimen.id)
.joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source")
.join(patientDf, Seq("subject"))
.addSamplesToBiospecimen(data(sample_registration.id))
.addSamplesToBiospecimen(data(sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type"))
.withColumnRenamed("fhir_id", "biospecimen_id")
.withColumnRenamed("sample_id", "fhir_sample_id")
.withColumnRenamed("submitter_sample_id", "sample_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class BiospecimenCentric(studyIds: List[String])(implicit configuration: Configu
val simple_participant: DatasetConf = conf.getDataset("simple_participant")
val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric")
val es_index_file_centric: DatasetConf = conf.getDataset("es_index_file_centric")
val ncit_terms: DatasetConf = conf.getDataset("ncit_terms")

override def extract(lastRunDateTime: LocalDateTime = minDateTime,
currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = {
Expand All @@ -34,10 +35,11 @@ class BiospecimenCentric(studyIds: List[String])(implicit configuration: Configu

val transformedBiospecimen =
biospecimenDF
.joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source")
.addStudy(data(es_index_study_centric.id))
.addParticipant(data(simple_participant.id))
.addFiles(data(normalized_drs_document_reference.id), data(normalized_sequencing_experiment.id))
.addSamplesToBiospecimen(data(normalized_sample_registration.id))
.addSamplesToBiospecimen(data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type"))
.withColumnRenamed("fhir_id", "biospecimen_id")
.drop("subject")
.withColumn("study_code", col("study.study_code"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FileCentric(studyIds: List[String])(implicit configuration: Configuration)
val normalized_sample_registration: DatasetConf = conf.getDataset("normalized_sample_registration")
val simple_participant: DatasetConf = conf.getDataset("simple_participant")
val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric")
val ncit_terms: DatasetConf = conf.getDataset("ncit_terms")

override def extract(lastRunDateTime: LocalDateTime = minDateTime,
currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = {
Expand All @@ -32,7 +33,11 @@ class FileCentric(studyIds: List[String])(implicit configuration: Configuration)

val transformedFile =
fileDF
.addParticipantWithBiospecimen(data(simple_participant.id), data(normalized_biospecimen.id), data(normalized_sample_registration.id))
.addParticipantWithBiospecimen(
data(simple_participant.id),
data(normalized_biospecimen.id).joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source"),
data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type")
)
.addStudy(data(es_index_study_centric.id))
.addSequencingExperiment(data(normalized_sequencing_experiment.id))
.addAssociatedDocumentRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ParticipantCentric(studyIds: List[String])(implicit configuration: Configu
val normalized_biospecimen: DatasetConf = conf.getDataset("normalized_biospecimen")
val normalized_sample_registration: DatasetConf = conf.getDataset("normalized_sample_registration")
val es_index_study_centric: DatasetConf = conf.getDataset("es_index_study_centric")
val ncit_terms: DatasetConf = conf.getDataset("ncit_terms")

override def extract(lastRunDateTime: LocalDateTime = minDateTime,
currentRunDateTime: LocalDateTime = LocalDateTime.now())(implicit spark: SparkSession): Map[String, DataFrame] = {
Expand All @@ -35,9 +36,9 @@ class ParticipantCentric(studyIds: List[String])(implicit configuration: Configu
.addStudy(data(es_index_study_centric.id))
.addFilesWithBiospecimen (
data(normalized_drs_document_reference.id),
data(normalized_biospecimen.id),
data(normalized_biospecimen.id).joinNcitTerms(data(ncit_terms.id), "biospecimen_tissue_source"),
data(normalized_sequencing_experiment.id),
data(normalized_sample_registration.id),
data(normalized_sample_registration.id).joinNcitTerms(data(ncit_terms.id), "sample_type"),
)
.withColumn("biospecimens", array_distinct(flatten(col("files.biospecimens"))))
.withColumn("participant_2_id", col("participant_id")) //copy column/ front-end requirements
Expand Down
21 changes: 11 additions & 10 deletions prepare-index/src/main/scala/bio/ferlab/fhir/etl/common/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,11 @@ object Utils {

def addSamplesToBiospecimen(samplesDf: DataFrame): DataFrame = {
val samplesGrouped = samplesDf
.withColumn("sample_type",
when(col("sample_type")("display").isNotNull,
concat_ws(" ", col("sample_type")("display"), concat(lit("("), col("sample_type")("code"), lit(")"))))
.otherwise(col("sample_type")("code"))
)
.withColumnRenamed("fhir_id", "sample_id")
.withColumn("sample_2_id", col("sample_id")) //doubling sample_id portal use
.withColumnRenamed("parent", "fhir_id")

df.join(samplesGrouped, Seq("fhir_id", "subject", "study_id"), "left_outer")
.withColumn("biospecimen_tissue_source",
when(col("biospecimen_tissue_source")("display").isNotNull,
concat_ws(" ", col("biospecimen_tissue_source")("display"), concat(lit("("), col("biospecimen_tissue_source")("code"), lit(")"))))
.otherwise(col("biospecimen_tissue_source")("code"))
)
}

def addBiospecimen(biospecimenDf: DataFrame): DataFrame = {
Expand All @@ -86,6 +76,17 @@ object Utils {
.drop(biospecimenRenamedDf.columns.filterNot(_.equals("biospecimen_id")): _*)
}

def joinNcitTerms(ncitTerms: DataFrame, targetCol: String): DataFrame = {
val columns = df.columns

df.join(ncitTerms, col(s"$targetCol.code") === col("id"), "left_outer")
.withColumn(targetCol,
when(col(targetCol)("display").isNotNull,
concat_ws(" ", col(targetCol)("display"), concat(lit("("), col(targetCol)("code"), lit(")"))))
.otherwise(col(targetCol)("code"))
).select(columns.map(col): _*)
}

def addDiagnosisPhenotypes(phenotypeDF: DataFrame, diagnosesDF: DataFrame)(hpoTerms: DataFrame, mondoTerms: DataFrame, icdTerms: DataFrame): DataFrame = {
val (observedPhenotypes, observedPhenotypesWithAncestors, phenotypes) = getTaggedPhenotypes(phenotypeDF, hpoTerms)

Expand Down
Empty file.
Binary file not shown.
2 changes: 2 additions & 0 deletions prepare-index/src/test/scala/BiospecimenCentricSpec.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, DatasetConf, SimpleConfiguration}
import bio.ferlab.datalake.spark3.loader.GenericLoader.read
import bio.ferlab.fhir.etl.centricTypes.BiospecimenCentric
import model._
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -71,6 +72,7 @@ class BiospecimenCentricSpec extends AnyFlatSpec with Matchers with WithSparkSes
SAMPLE_INPUT(`subject` = "P1", `parent` = "B1", `fhir_id` = "sam1"),
SAMPLE_INPUT(`subject` = "P3", `parent` = "B4", `fhir_id` = "sam2"),
).toDF(),
"ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None),
)

val output = new BiospecimenCentric(List("STU0000001"))(conf).transform(data)
Expand Down
2 changes: 2 additions & 0 deletions prepare-index/src/test/scala/FileCentricSpec.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, SimpleConfiguration}
import bio.ferlab.datalake.spark3.loader.GenericLoader
import bio.ferlab.fhir.etl.centricTypes.FileCentric
import model._
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -42,6 +43,7 @@ class FileCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession {
SAMPLE_INPUT(`subject` = "P1", `parent` = "B1", `fhir_id` = "sam1"),
SAMPLE_INPUT(`subject` = "P1", `parent` = "B2", `fhir_id` = "sam2"),
).toDF(),
"ncit_terms" -> GenericLoader.read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None),
)

val output = new FileCentric(List("STU0000001"))(conf).transform(data)
Expand Down
5 changes: 4 additions & 1 deletion prepare-index/src/test/scala/ParticipantCentricSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import bio.ferlab.datalake.commons.config.{Configuration, ConfigurationLoader, SimpleConfiguration}
import bio.ferlab.datalake.spark3.loader.GenericLoader.read
import bio.ferlab.fhir.etl.centricTypes.ParticipantCentric
import model._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.functions.col
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
import pureconfig.generic.auto._
Expand Down Expand Up @@ -35,6 +37,7 @@ class ParticipantCentricSpec extends AnyFlatSpec with Matchers with WithSparkSes
SAMPLE_INPUT(fhir_id = "S2", `subject` = "P2", `parent` = "B2")
).toDF(),
"es_index_study_centric" -> Seq(STUDY_CENTRIC()).toDF(),
"ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None),
)

val output = new ParticipantCentric(List("SD_Z6MWD3H0"))(conf).transform(data)
Expand Down
10 changes: 7 additions & 3 deletions prepare-index/src/test/scala/SpecimenEnricherSpec.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import bio.ferlab.datalake.spark3.loader.GenericLoader.read
import bio.ferlab.datalake.testutils.TestETLContext
import bio.ferlab.fhir.etl.SpecimenEnricher
import model._
Expand Down Expand Up @@ -31,7 +32,7 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi
"normalized_biospecimen" -> Seq(
BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_1", subject = "P1", `submitter_biospecimen_id` = "BS_1"),
BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_2", subject = "P2", `submitter_biospecimen_id` = "BS_2"),
BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_3", subject = "P3", `submitter_biospecimen_id` = "BS_3")
BIOSPECIMEN_INPUT(fhir_id = "FHIR_BS_3", subject = "P3", `submitter_biospecimen_id` = "BS_3", biospecimen_tissue_source = CODE_SYSTEM(`code` = "Unknown", `display` = null))
).toDF(),
"normalized_research_study" -> Seq(
RESEARCHSTUDY(`study_code` = "study_code1"),
Expand All @@ -43,13 +44,16 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi
DISEASE_STATUS(`fhir_id` = "F1", `subject` = "P1", `disease_status` = "yes"),
DISEASE_STATUS(`fhir_id` = "F2", `subject` = "P2", `disease_status` = null),
DISEASE_STATUS(`fhir_id` = "F3", `subject` = "P3", `disease_status` = "no"),
).toDF()
).toDF(),
"ncit_terms" -> read(getClass.getResource("/ncit_terms").toString, "Parquet", Map(), None, None),
)

val output = SpecimenEnricher(TestETLContext(), Seq("SD_Z6MWD3H0")).transform(data)

val resultDF = output("enriched_specimen")

resultDF.show(false)

val specimensEnriched = resultDF.as[SPECIMEN_ENRICHED].collect()

specimensEnriched.find(_.`biospecimen_id` == "FHIR_BS_1") shouldBe Some(
Expand All @@ -68,7 +72,7 @@ class SpecimenEnricherSpec extends AnyFlatSpec with Matchers with WithSparkSessi
))
specimensEnriched.find(_.`biospecimen_id` == "FHIR_BS_3") shouldBe Some(
SPECIMEN_ENRICHED(`biospecimen_id` = "FHIR_BS_3", `age_biospecimen_collection` = "Young", `submitter_biospecimen_id` = "BS_3",
`participant_id` = "P3",`participant_fhir_id` = "P3", `submitter_participant_id` = "P3_internal", `is_affected` = Some(false),
`biospecimen_tissue_source`= "Unknown", `participant_id` = "P3",`participant_fhir_id` = "P3", `submitter_participant_id` = "P3_internal", `is_affected` = Some(false),
`sample_id` = null, `sample_type` = null, `fhir_sample_id` = null)
)

Expand Down

0 comments on commit 2be640b

Please sign in to comment.