Skip to content

Commit

Permalink
Merge pull request #84 from Ferlab-Ste-Justine/cqdg-643
Browse files Browse the repository at this point in the history
feat: CQDG-643 Update etl deps
  • Loading branch information
Jeremy Costanza authored Mar 18, 2024
2 parents bd63baf + 5303f16 commit 9742f8a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sbtassembly.AssemblyPlugin.autoImport.assembly

val sparkVersion = "3.5.1"
val datalakeSpark3Version = "13.0.0"
val datalakeSpark3Version = "14.0.0"
val deltaCoreVersion = "3.1.0"


Expand All @@ -18,9 +18,9 @@ val sparkDepsSetting = Seq(
"bio.ferlab" %% "datalake-test-utils" % datalakeSpark3Version % Test,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-hive" % sparkVersion % Provided,
"org.apache.hadoop" % "hadoop-aws" % "3.3.6" % Provided,
"org.apache.hadoop" % "hadoop-aws" % "3.3.4" % Provided,
"io.delta" %% "delta-spark" % deltaCoreVersion % Provided,
"org.scalatest" %% "scalatest" % "3.2.9" % Test,
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
)
)
val commonSettings = Seq(
Expand Down
49 changes: 26 additions & 23 deletions variant-task/src/main/scala/bio/ferlab/etl/normalized/SNV.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import bio.ferlab.datalake.spark3.implicits.GenomicImplicits._
import bio.ferlab.datalake.spark3.implicits.GenomicImplicits.columns._
import bio.ferlab.etl.normalized.SNV._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.time.LocalDateTime
Expand Down Expand Up @@ -50,6 +51,7 @@ case class SNV(rc: RuntimeETLContext, studyId: String, studyCode: String, owner:

object SNV {
private final val GENES_SYMBOL = "genes_symbol"

implicit class DataFrameOps(df: DataFrame) {
def withSource(task: DataFrame)(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
Expand All @@ -61,7 +63,7 @@ object SNV {
}
}

private def selectOccurrences(inputDF: DataFrame, releaseId: String, dataset: String, batch:String): DataFrame = {
private def selectOccurrences(inputDF: DataFrame, releaseId: String, dataset: String, batch: String): DataFrame = {
val inputDfExpGenotypes = inputDF
.filter(col("INFO_FILTERS").isNull || array_contains(col("INFO_FILTERS"), "PASS")) // Remove low quality variants
.withColumn("annotation", firstCsq)
Expand Down Expand Up @@ -108,37 +110,38 @@ object SNV {
col("genotype.conditionalQuality") as "gq",
col("genotype.calls") as "calls",
array_contains(col("genotype.calls"), 1) as "has_alt",
is_multi_allelic,
old_multi_allelic,
col("qual") as "quality",
col("INFO_FILTERS")(0) as "filter",
ac as "info_ac",
an as "info_an",
af as "info_af",
col("INFO_END") as "info_end",
optional_info(inputDF,"INFO_BaseQRankSum","info_baseq_rank_sum", "float"),
optional_info(inputDF,"INFO_ExcessHet","info_excess_het", "float"),
optional_info(inputDF,"INFO_FS","info_fs", "float"),
optional_info(inputDF,"INFO_DS","info_ds", "boolean"),
optional_info(inputDF,"INFO_FractionInformativeReads","info_fraction_informative_reads", "float"),
optional_info(inputDF,"INFO_InbreedingCoeff","info_inbreed_coeff", "float"),
optional_info(inputDF,"INFO_MLEAC","info_mleac", "array<int>"),
optional_info(inputDF,"INFO_MLEAF","info_mleaf", "array<float>"),
optional_info(inputDF,"INFO_MQ","info_mq", "float"),
optional_info(inputDF,"INFO_MQRankSum", "info_m_qrank_sum", "float"),
optional_info(inputDF,"INFO_QD", "info_qd", "float"),
optional_info(inputDF,"INFO_R2_5P_bias", "info_r2_5p_bias", "float"),
optional_info(inputDF,"INFO_ReadPosRankSum","info_read_pos_rank_sum", "float"),
optional_info(inputDF,"INFO_SOR","info_sor", "float"),
optional_info(inputDF,"INFO_VQSLOD","info_vqslod", "float"),
optional_info(inputDF,"INFO_culprit","info_culprit", "string"),
optional_info(inputDF,"INFO_DP","info_dp", "int"),
optional_info(inputDF, "INFO_HaplotypeScore", "info_haplotype_score", "float"),
optional_info(inputDF, "INFO_OLD_RECORD", "info_old_record", "string"),
optional_info(inputDF, "INFO_BaseQRankSum", "info_baseq_rank_sum", "double"),
optional_info(inputDF, "INFO_ExcessHet", "info_excess_het", "double"),
optional_info(inputDF, "INFO_FS", "info_fs", "double"),
optional_info(inputDF, "INFO_DS", "info_ds", "boolean"),
optional_info(inputDF, "INFO_FractionInformativeReads", "info_fraction_informative_reads", "double"),
optional_info(inputDF, "INFO_InbreedingCoeff", "info_inbreed_coeff", "double"),
optional_info(inputDF, "INFO_MLEAC", "info_mleac", "array<int>"),
optional_info(inputDF, "INFO_MLEAF", "info_mleaf", "array<double>"),
optional_info(inputDF, "INFO_MQ", "info_mq", "double"),
optional_info(inputDF, "INFO_MQRankSum", "info_m_qrank_sum", "double"),
optional_info(inputDF, "INFO_QD", "info_qd", "double"),
optional_info(inputDF, "INFO_R2_5P_bias", "info_r2_5p_bias", "double"),
optional_info(inputDF, "INFO_ReadPosRankSum", "info_read_pos_rank_sum", "double"),
optional_info(inputDF, "INFO_SOR", "info_sor", "double"),
optional_info(inputDF, "INFO_VQSLOD", "info_vqslod", "double"),
optional_info(inputDF, "INFO_culprit", "info_culprit", "string"),
optional_info(inputDF, "INFO_DP", "info_dp", "int"),
optional_info(inputDF, "INFO_HaplotypeScore", "info_haplotype_score", "double"),
lit(releaseId) as "release_id",
lit(dataset) as "dataset",
lit(batch) as "batch",
is_normalized
lit(batch) as "batch"
)
.withColumn("is_normalized",col("info_old_record").isNotNull)
.withColumn("is_multi_allelic", lit(false)) //we dont split multi allelics with glow anymore. In the future we should try to infer this field from info_old_record
.withColumn("old_multi_allelic", lit(null).cast(StringType)) //Should be deleted in the future, replace by info_old_record
.drop("annotation")
.withColumn("zygosity", zygosity(col("calls")))
occurrences
Expand Down

0 comments on commit 9742f8a

Please sign in to comment.