Skip to content

Commit

Permalink
fix: CQDG-476 Partition by batch
Browse files Browse the repository at this point in the history
  • Loading branch information
jecos committed Nov 9, 2023
1 parent c163146 commit 455aa94
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 31 deletions.
5 changes: 3 additions & 2 deletions config/output/config/dev-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2056,11 +2056,11 @@ datalake {
alternate,
"ensembl_transcript_id"
]
loadtype=OverWrite
loadtype=Read
partitionby=[
chromosome
]
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/variants.*.vep.vcf.gz"
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/{{BATCH}}/variants.*.vep.vcf.gz"
readoptions {}
repartition {
column-names=[
Expand Down Expand Up @@ -2091,6 +2091,7 @@ datalake {
partitionby=[
"study_id",
dataset,
batch,
"has_alt",
chromosome
]
Expand Down
5 changes: 3 additions & 2 deletions config/output/config/prod-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2056,11 +2056,11 @@ datalake {
alternate,
"ensembl_transcript_id"
]
loadtype=OverWrite
loadtype=Read
partitionby=[
chromosome
]
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/variants.*.vep.vcf.gz"
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/{{BATCH}}/variants.*.vep.vcf.gz"
readoptions {}
repartition {
column-names=[
Expand Down Expand Up @@ -2091,6 +2091,7 @@ datalake {
partitionby=[
"study_id",
dataset,
batch,
"has_alt",
chromosome
]
Expand Down
5 changes: 3 additions & 2 deletions config/output/config/qa-cqdg.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2056,11 +2056,11 @@ datalake {
alternate,
"ensembl_transcript_id"
]
loadtype=OverWrite
loadtype=Read
partitionby=[
chromosome
]
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/variants.*.vep.vcf.gz"
path="/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/{{BATCH}}/variants.*.vep.vcf.gz"
readoptions {}
repartition {
column-names=[
Expand Down Expand Up @@ -2091,6 +2091,7 @@ datalake {
partitionby=[
"study_id",
dataset,
batch,
"has_alt",
chromosome
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package bio.ferlab.fhir.etl.config

import bio.ferlab.datalake.commons.config.Format.{AVRO, DELTA, JSON, PARQUET, VCF}
import bio.ferlab.datalake.commons.config.LoadType.{OverWrite, OverWritePartition, Scd1}
import bio.ferlab.datalake.commons.config.LoadType.{OverWrite, OverWritePartition, Read, Scd1}
import bio.ferlab.datalake.commons.config._
import bio.ferlab.datalake.commons.file.FileSystemType.S3
import bio.ferlab.datalake.spark3.genomics.GenomicDatasets
Expand Down Expand Up @@ -92,9 +92,9 @@ object ConfigurationGenerator extends App {
DatasetConf(
id = "raw_vcf",
storageid = storage_vcf,
path = "/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/variants.*.vep.vcf.gz",
path = "/{{OWNER}}/{{STUDY_CODE}}/{{DATASET}}/{{BATCH}}/variants.*.vep.vcf.gz",
format = VCF,
loadtype = OverWrite,
loadtype = Read,
partitionby = List("chromosome"),
table = Some(TableConf("database", "raw_vcf")),
keys = List("chromosome", "start", "reference", "alternate", "ensembl_transcript_id"),
Expand All @@ -107,7 +107,7 @@ object ConfigurationGenerator extends App {
format = DELTA,
loadtype = OverWritePartition,
table = Some(TableConf("database", "normalized_snv")),
partitionby = List("study_id", "dataset", "has_alt", "chromosome"),
partitionby = List("study_id", "dataset", "batch", "has_alt", "chromosome"),
writeoptions = WriteOptions.DEFAULT_OPTIONS ++ Map("overwriteSchema" -> "true"),
repartition = Some(RepartitionByRange(Seq("chromosome", "start"), Some(100)))
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.functions.{array_contains, col, lit}

import java.time.LocalDateTime

case class Consequences(rc: RuntimeETLContext, studyId: String, studyCode: String, owner: String, dataset: String, referenceGenomePath: Option[String]) extends BaseConsequences(rc: RuntimeETLContext, annotationsColumn = csq, groupByLocus = true) {
case class Consequences(rc: RuntimeETLContext, studyId: String, studyCode: String, owner: String, dataset: String, batch: String, referenceGenomePath: Option[String]) extends BaseConsequences(rc: RuntimeETLContext, annotationsColumn = csq, groupByLocus = true) {
private val raw_variant_calling: DatasetConf = conf.getDataset("raw_vcf")
override val mainDestination: DatasetConf = conf.getDataset("normalized_consequences")

Expand All @@ -19,6 +19,7 @@ case class Consequences(rc: RuntimeETLContext, studyId: String, studyCode: Strin
raw_vcf -> vcf(raw_variant_calling.location
.replace("{{STUDY_CODE}}", s"$studyCode")
.replace("{{DATASET}}", s"$dataset")
.replace("{{BATCH}}", s"$batch")
.replace("{{OWNER}}", s"$owner"), referenceGenomePath = None)
.where(col("contigName").isin(validContigNames: _*))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ object RunNormalizedGenomic {
@arg(name = "study-code", short = 'c', doc = "Study Code") studyCode: String,
@arg(name = "owner", short = 'o', doc = "Owner") owner: String,
@arg(name = "dataset", short = 'd', doc = "Dataset") dataset: String,
@arg(name = "batch", short = 'b', doc = "Batch") batch: String,
@arg(name = "release-id", short = 'r', doc = "Release Id") releaseId: String,
@arg(name = "reference-genome-path", short = 'g', doc = "Reference Genome Path") referenceGenomePath: Option[String]): Unit = SNV(rc, studyId, studyCode,owner, dataset, releaseId, referenceGenomePath).run()
@arg(name = "reference-genome-path", short = 'g', doc = "Reference Genome Path") referenceGenomePath: Option[String]): Unit = SNV(rc, studyId, studyCode, owner, dataset, batch, releaseId, referenceGenomePath).run()


@main
Expand All @@ -20,7 +21,8 @@ object RunNormalizedGenomic {
@arg(name = "study-code", short = 'c', doc = "Study Code") studyCode: String,
@arg(name = "owner", short = 'o', doc = "Owner") owner: String,
@arg(name = "dataset", short = 'd', doc = "Dataset") dataset: String,
@arg(name = "reference-genome-path", short = 'g', doc = "Reference Genome Path") referenceGenomePath: Option[String]): Unit = Consequences(rc, studyId, studyCode, owner, dataset, referenceGenomePath).run()
@arg(name = "batch", short = 'b', doc = "Batch") batch: String,
@arg(name = "reference-genome-path", short = 'g', doc = "Reference Genome Path") referenceGenomePath: Option[String]): Unit = Consequences(rc, studyId, studyCode, owner, dataset, batch, referenceGenomePath).run()

def main(args: Array[String]): Unit = ParserForMethods(this).runOrThrow(args, allowPositional = true)
}
22 changes: 12 additions & 10 deletions variant-task/src/main/scala/bio/ferlab/etl/normalized/SNV.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

import java.time.LocalDateTime

case class SNV(rc: RuntimeETLContext, studyId: String, studyCode: String, owner: String, dataset: String, releaseId: String, referenceGenomePath: Option[String]) extends SimpleSingleETL(rc) {
case class SNV(rc: RuntimeETLContext, studyId: String, studyCode: String, owner: String, dataset: String, batch: String, releaseId: String, referenceGenomePath: Option[String]) extends SimpleSingleETL(rc) {
private val enriched_specimen: DatasetConf = conf.getDataset("enriched_specimen")
private val raw_variant_calling: DatasetConf = conf.getDataset("raw_vcf")
private val normalized_task: DatasetConf = conf.getDataset("normalized_task")
Expand All @@ -25,6 +25,7 @@ case class SNV(rc: RuntimeETLContext, studyId: String, studyCode: String, owner:
"raw_vcf" -> vcf(raw_variant_calling.location
.replace("{{STUDY_CODE}}", s"$studyCode")
.replace("{{DATASET}}", s"$dataset")
.replace("{{BATCH}}", s"$batch")
.replace("{{OWNER}}", s"$owner"), referenceGenomePath = None)
.where(col("contigName").isin(validContigNames: _*)),
enriched_specimen.id -> enriched_specimen.read.where(col("study_id") === studyId),
Expand All @@ -39,23 +40,23 @@ case class SNV(rc: RuntimeETLContext, studyId: String, studyCode: String, owner:
val enrichedSpecimenDF = data(enriched_specimen.id).select("sample_id", "is_affected", "participant_id", "family_id", "gender", "mother_id", "father_id", "study_id", "study_code")
.withColumnRenamed("is_affected", "affected_status")

val occurrences = selectOccurrences(vcf, releaseId, dataset)
val occurrences = selectOccurrences(vcf, releaseId, dataset, batch)

occurrences.join(broadcast(enrichedSpecimenDF), Seq("sample_id"))
.withAlleleDepths()
// Parental origin + transmission computation were removed for performance

// .withRelativesGenotype(Seq("gq", "dp", "info_qd", "filter", "ad_ref", "ad_alt", "ad_total", "ad_ratio", "calls", "affected_status", "zygosity"),
// participantIdColumn = col("participant_id"),
// familyIdColumn = col("family_id")
// )
// .withParentalOrigin("parental_origin", col("calls"), col("father_calls"), col("mother_calls"))
// .withGenotypeTransmission(TRANSMISSION_MODE, `gender_name` = "gender")
// .withRelativesGenotype(Seq("gq", "dp", "info_qd", "filter", "ad_ref", "ad_alt", "ad_total", "ad_ratio", "calls", "affected_status", "zygosity"),
// participantIdColumn = col("participant_id"),
// familyIdColumn = col("family_id")
// )
// .withParentalOrigin("parental_origin", col("calls"), col("father_calls"), col("mother_calls"))
// .withGenotypeTransmission(TRANSMISSION_MODE, `gender_name` = "gender")
.withSource(data(normalized_task.id))
// .withCompoundHeterozygous(patientIdColumnName = "participant.participant_id") //TODO
}

override def replaceWhere: Option[String] = Some(s"study_id = '$studyId' and dataset='$dataset'")
override def replaceWhere: Option[String] = Some(s"study_id = '$studyId' and dataset='$dataset' and batch='$batch' ")

}

Expand All @@ -71,7 +72,7 @@ object SNV {
}
}

private def selectOccurrences(inputDF: DataFrame, releaseId: String, dataset: String): DataFrame = {
private def selectOccurrences(inputDF: DataFrame, releaseId: String, dataset: String, batch:String): DataFrame = {
val occurrences = inputDF
.select(
chromosome,
Expand Down Expand Up @@ -109,6 +110,7 @@ object SNV {
// col("file_name"),
lit(releaseId) as "release_id",
lit(dataset) as "dataset",
lit(batch) as "batch",
is_normalized
)
.drop("annotation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package bio.ferlab.etl.model
case class NORMALIZED_SNV(`sample_id`: String = "S1",
`chromosome`: String = "1",
`dataset`: String = "dataset_default",
`batch`: String = "annotated_vcf",
`start`: Long = 14464,
`end`: Long = 14465,
`reference`: String = "A",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package bio.ferlab.etl.model
case class NORMALIZED_SNV_WITHOUT_PARENTAL_ORIGIN(`sample_id`: String = "S1",
`chromosome`: String = "1",
`dataset`: String = "dataset_default",
`batch`: String = "annotated_vcf",
`start`: Long = 14464,
`end`: Long = 14465,
`reference`: String = "A",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ConsequencesSpec extends SparkSpec with WithTestConfig {


it should "generate normalized consequences from input VCF" in {
val results = Consequences(TestETLContext(), "STU0000001", "STU0000001", "owner", "dataset_default", None).transform(data)
val results = Consequences(TestETLContext(), "STU0000001", "STU0000001", "owner", "dataset_default", "annotated_vcf", None).transform(data)

val result = results("normalized_consequences").as[NormalizedConsequences].collect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class SNVSpec extends AnyFlatSpec with Matchers with WithSparkSession with WithT
val dataFomVCFFile: Map[String, DataFrame] = Map(
raw_variant_calling.id -> Seq(
VCF_SNV_INPUT(`contigName` = "chr1", `INFO_FILTERS` = Seq("PASS"),
`genotypes` = List(
GENOTYPES(`sampleId` = "S1"),
GENOTYPES(`sampleId` = "S2", `calls` = List(0, 0)),
GENOTYPES(`sampleId` = "S3"))),
`genotypes` = List(
GENOTYPES(`sampleId` = "S1"),
GENOTYPES(`sampleId` = "S2", `calls` = List(0, 0)),
GENOTYPES(`sampleId` = "S3"))),
VCF_SNV_INPUT(`contigName` = "chr2", `INFO_FILTERS` = Seq("DRAGENSnpHardQUAL"),
`genotypes` = List(GENOTYPES(`sampleId` = "S4"))) // Should be filtered out
).toDF(),
Expand All @@ -41,7 +41,7 @@ class SNVSpec extends AnyFlatSpec with Matchers with WithSparkSession with WithT
).toDF()
)

val results = SNV(TestETLContext(), "STU0000001", "STU0000001", "owner", "dataset_default", releaseId = "1", None).transform(dataFomVCFFile)
val results = SNV(TestETLContext(), "STU0000001", "STU0000001", "owner", "dataset_default", "annotated_vcf", releaseId = "1", None).transform(dataFomVCFFile)

val result = results("normalized_snv").as[NORMALIZED_SNV_WITHOUT_PARENTAL_ORIGIN].collect()

Expand All @@ -54,8 +54,8 @@ class SNVSpec extends AnyFlatSpec with Matchers with WithSparkSession with WithT
`calls` = Seq(0, 0),
`has_alt` = false,
`zygosity` = "WT",
// `parental_origin` = null,
// `transmission_mode` = "non_carrier_proband"
// `parental_origin` = null,
// `transmission_mode` = "non_carrier_proband"
)
result.filter(e => e.`sample_id` === "S3").head shouldBe
NORMALIZED_SNV_WITHOUT_PARENTAL_ORIGIN(
Expand Down

0 comments on commit 455aa94

Please sign in to comment.