Skip to content

Commit

Permalink
Merge pull request #79 from Ferlab-Ste-Justine/fix/cqdg-527_fix_fam_c…
Browse files Browse the repository at this point in the history
…ount

fix: CQDG-527 count family fix
  • Loading branch information
adipaul1981 authored Jan 3, 2024
2 parents f790cfd + ff0ec89 commit 40fbe5d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import bio.ferlab.datalake.commons.config.{Configuration, DatasetConf}
import bio.ferlab.datalake.spark3.etl.v2.ETL
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits._
import bio.ferlab.fhir.etl.common.Utils._
import org.apache.spark.sql.functions.{col, first, struct}
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.time.LocalDateTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,18 @@ class StudyCentric(releaseId: String, studyIds: List[String])(implicit configura
.groupBy("study_id")
.agg(size(collect_list("sample_id")) as "sample_count")

val filesExplodedDF = data(normalized_drs_document_reference.id)
.withColumn("files_exp", explode(col("files")))
.drop("study_id")

val participantsWithFilesDF = data(normalized_patient.id)
.withColumnRenamed("fhir_id", "participant_id")
.join(filesExplodedDF, Seq("participant_id"), "left_outer")

val dataTypesCount = participantsWithFilesDF
.na.drop(Seq("data_type"))
.groupBy("study_id", "data_type")
.agg(size(collect_set(col("participant_id"))) as "participant_count")
.groupBy("study_id")
.agg(collect_list(struct(col("data_type"), col("participant_count"))) as "data_types")
val participantsWithFiles = data(normalized_patient.id)
.withColumnRenamed("fhir_id", "subject")
.addFiles(data(normalized_drs_document_reference.id), data(normalized_sequencing_experiment.id))

val dataCategoryCount = participantsWithFilesDF
.na.drop(Seq("data_category"))
.groupBy("study_id", "data_category")
.agg(size(collect_set(col("participant_id"))) as "participant_count")
.groupBy("study_id")
.agg(collect_list(struct(col("data_category"), col("participant_count"))) as "data_categories")
val dataTypesCount = participantsWithFiles
.fieldCount("data_type", "data_types")

val dataCategoryCount = participantsWithFiles
.fieldCount("data_category", "data_categories")

val participantCount =
data(normalized_patient.id)
.withColumnRenamed("fhir_id", "subject")
.addFiles(data(normalized_drs_document_reference.id), data(normalized_sequencing_experiment.id))
participantsWithFiles
.groupBy("study_id")
.agg(size(collect_set("subject")) as "participant_count")

Expand All @@ -105,6 +91,10 @@ class StudyCentric(releaseId: String, studyIds: List[String])(implicit configura
)

val familyCount = data(normalized_group.id)
.withColumn("subject", explode(col("family_members")))
.join(participantsWithFiles, Seq("study_id", "subject"), "inner")
.groupBy("study_id", "internal_family_id", "submitter_family_id")
.agg(collect_set(col("subject")) as "family_members")
.where(size(col("family_members")).gt(1))
.groupBy("study_id")
.agg(size(collect_set(col("internal_family_id"))) as "family_count")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ object Utils {
df
.join(diagnosis, col("fhir_id") === col("cqdg_participant_id"), "left_outer")
.join(mondoWithAncestors, Seq("cqdg_participant_id"), "left_outer")
.drop("cqdg_participant_id")
.join(phenotypes, col("fhir_id") === col("cqdg_participant_id"), "left_outer")
.join(observedPhenotypes, Seq("cqdg_participant_id"), "left_outer")
.join(nonObservedPhenotypes, Seq("cqdg_participant_id"), "left_outer")
.join(phenotypes, Seq("cqdg_participant_id"), "left_outer")
.join(observedPhenotypesWithAncestors, Seq("cqdg_participant_id"), "left_outer")
.withColumnRenamed("fhir_id", "participant_id")
.drop("cqdg_participant_id")
.withColumnRenamed("fhir_id", "participant_id")
}

def addCauseOfDeath(causeOfDeath: DataFrame): DataFrame = {
Expand Down Expand Up @@ -139,15 +140,12 @@ object Utils {
.drop("subject")
}

def addGroup(group: DataFrame): DataFrame = {
val explodedGroupDf = group
.withColumn("family_member", explode(col("family_members")))
.drop("study_id", "release_id")

df
.join(explodedGroupDf, col("family_member") === col("submitter_participant_id"), "left_outer")
.select("internal_family_id", "submitter_family_id", "submitter_participant_id", "focus_participant_id", "relationship_to_proband", "family_type")
}
def fieldCount(field: String, countField: String): DataFrame = df.withColumn("files_exp", explode(col("files")))
.na.drop(Seq(s"files_exp.$field"))
.groupBy("study_id", s"files_exp.$field")
.agg(size(collect_set(col("subject"))) as "participant_count")
.groupBy("study_id")
.agg(collect_list(struct(col(field), col("participant_count"))) as countField)

def addFilesWithBiospecimen(filesDf: DataFrame, biospecimensDf: DataFrame, seqExperiment: DataFrame, sampleRegistrationDF: DataFrame): DataFrame = {
val biospecimenGrouped = biospecimensDf.addSamplesGroupedToBiospecimen(sampleRegistrationDF)
Expand Down
15 changes: 12 additions & 3 deletions prepare-index/src/test/scala/StudyCentricSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ class StudyCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession {
val patient1: PATIENT_INPUT = PATIENT_INPUT(`fhir_id` = "PRT0000001")
val patient2: PATIENT_INPUT = PATIENT_INPUT( `fhir_id` = "PRT0000002", `age_at_recruitment` = "Young", `submitter_participant_id` = "35849419216")
val patient3: PATIENT_INPUT = PATIENT_INPUT( `fhir_id` = "PRT0000003", `age_at_recruitment` = "Young", `ethnicity` = "aboriginal" ,`submitter_participant_id` = "35849430470")
val patient4: PATIENT_INPUT = PATIENT_INPUT(`fhir_id` = "PRT0000004", `submitter_participant_id` = "Excluded1")
val patient5: PATIENT_INPUT = PATIENT_INPUT( `fhir_id` = "PRT0000005", `submitter_participant_id` = "Excluded2")
val patient6: PATIENT_INPUT = PATIENT_INPUT( `fhir_id` = "PRT0000006", `submitter_participant_id` = "Excluded3")

val family1: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW()
val family2: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "FAM0000002FR", `submitter_participant_id` = "PRT0000002", `relationship_to_proband` = "Father")
val family3: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "FAM0000003FR", `submitter_participant_id` = "PRT0000003", `relationship_to_proband` = "Is the proband")

val family3: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "FAM0000003FR", `submitter_participant_id` = "PRT0000003", `relationship_to_proband` = "Proband")
val family4: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "Excluded1", `submitter_participant_id` = "PRT0000004", `relationship_to_proband` = "Mother")
val family5: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "Excluded2", `submitter_participant_id` = "PRT0000005", `relationship_to_proband` = "Father")
val family6: FAMILY_RELATIONSHIP_NEW = FAMILY_RELATIONSHIP_NEW(`internal_family_relationship_id` = "Excluded3", `submitter_participant_id` = "PRT0000006", `relationship_to_proband` = "Proband")

val document1: DOCUMENTREFERENCE = DOCUMENTREFERENCE(`fhir_id` = "1", `data_type` = "ALIR", `biospecimen_reference` = "BIO1" , `files` = Seq(FILE(`file_name` = "file1.cram", `file_format` = "CRAM")), `dataset` = Some("dataset1"))
val document7: DOCUMENTREFERENCE = DOCUMENTREFERENCE(`fhir_id` = "7", `data_type` = "ALIR", `biospecimen_reference` = "BIO1", `relates_to` = Some("1") , `files` = Seq(FILE(`file_name` = "file1.crai", `file_format` = "CRAI")), `dataset` = Some("dataset1"))
Expand All @@ -47,6 +52,9 @@ class StudyCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession {
val sample3: SAMPLE_INPUT = SAMPLE_INPUT(`fhir_id` = "SAMPLE3", `parent` = "BIO1", `subject` = "PRT0000001")
val sample4: SAMPLE_INPUT = SAMPLE_INPUT(`fhir_id` = "SAMPLE4", `parent` = "BIO1", `subject` = "PRT0000001")

val group1: GROUP = GROUP(`internal_family_id` = "12345STU0000001", `family_members` = Seq("PRT0000001", "PRT0000002", "PRT0000003"), `submitter_family_id` = "12345", `study_id` = "STU0000001")
val group2: GROUP = GROUP(`internal_family_id` = "Group1Excluded", `family_members` = Seq("PRT0000004", "PRT0000005", "PRT0000006"), `submitter_family_id` = "Excluded1")

val studyDataset: Seq[DATASET_INPUT] = Seq(DATASET_INPUT(`name` = "dataset1", `description` = None), DATASET_INPUT(`name` = "dataset2", `description` = Some("desc")))


Expand All @@ -56,7 +64,7 @@ class StudyCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession {
"normalized_patient" -> Seq(patient1, patient2, patient3).toDF(),
"normalized_document_reference" -> Seq(document1, document2, document3, document4, document5, document6, document7).toDF(),
"normalized_family_relationship" -> Seq(family1, family2, family3).toDF(),
"normalized_group" -> Seq(GROUP_NEW()).toDF(),
"normalized_group" -> Seq(group1, group2).toDF(),
"normalized_diagnosis" -> Seq(diagnosis1, diagnosis2, diagnosis3).toDF(),
"normalized_task" -> Seq(TASK()).toDF(),
"normalized_phenotype" -> Seq(phenotype1, phenotype2, phenotype3).toDF(),
Expand All @@ -76,6 +84,7 @@ class StudyCentricSpec extends AnyFlatSpec with Matchers with WithSparkSession {
`data_types` = Seq(("SSUP","1"), ("SNV","1"), ("GCNV","2"), ("ALIR","1"), ("GSV","1")),
`sample_count` = 4,
`file_count` = 6,
`family_count` = 1,
`data_categories` = Seq(("Genomics","2")),
`datasets` = Seq(
// Dataset file_count should be 5 as CRAI files should be excluded
Expand Down
12 changes: 0 additions & 12 deletions prepare-index/src/test/scala/model/PATIENT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@ case class FAMILY_RELATIONSHIP_NEW (
`focus_participant_id`: String = "PRT0000003",
`relationship_to_proband`: String = "Mother",
)


//TODO rename
case class GROUP_NEW(
`internal_family_id`: String = "12345STU0000001",
`study_id`: String = "STU0000001",
`release_id`: String = "5",
`family_type`: String = "Case-parent trio",
`family_members`: Seq[String] = Seq("PRT0000001", "PRT0000002", "PRT0000003"),
`submitter_family_id`: String = "12345",
)

case class PATIENT_OUPUT(
`submitter_participant_ids`: String = "PRT0000001",
`release_id`: String = "5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class RunEnrichGenomicSpec extends SparkSpec with WithTestConfig {
val updatedData = data.updated(normalized_snv.id, onlyWxsDf)

val result = variantsETL.transformSingle(updatedData)
result.show(false)

result.isEmpty shouldBe false

Expand Down

0 comments on commit 40fbe5d

Please sign in to comment.