Skip to content

Commit

Permalink
Spark 3.4 regression & compatibility fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rycowhi committed Mar 8, 2024
1 parent 72309b1 commit 56749e0
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class LineageHarvester(
def harvest(result: Either[Throwable, Duration]): HarvestResult = {
logDebug(s"Harvesting lineage from ${ctx.logicalPlan.getClass}")

println("MINE: ")
println(ctx.logicalPlan.toJSON)

val (readMetrics: Metrics, writeMetrics: Metrics) = ctx.executedPlanOpt.
map(getExecutedReadWriteMetrics).
getOrElse((Map.empty, Map.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.spline

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
Expand All @@ -24,9 +25,13 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.spline.commons.io.TempDirectory
import za.co.absa.spline.commons.version.Version.VersionStringInterpolator
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

import scala.concurrent.Future

class BasicIntegrationTests extends AsyncFlatSpec
with Matchers
with SparkFixture
Expand Down Expand Up @@ -117,15 +122,41 @@ class BasicIntegrationTests extends AsyncFlatSpec
.write.mode(Append).saveAsTable(tableName)
}

(plan2, _) <- captor.lineageOf {
// Spark 3.4+ is creating 2 commands for both writes here so we need to ignore one
// We only want the one that is from CreateDataSourceTableAsSelectCommand
// The one we ignore here is an extra InsertIntoHadoopFsRelationCommand
// They can come out of order so we need to filter out which one is which.
(plan2, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)

(plan3, _) <- captor.lineageOf {
spark
.read.table(tableName)
.write.mode(Overwrite).saveAsTable("somewhere")
}

(plan4, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)
} yield {
println("yield")
val writeUri = plan1.operations.write.outputSource
val readUri = plan2.operations.reads.head.inputSources.head

val writePlan = Seq(plan1, plan2)
.filter(null.!=)
.find(_.operations.write.name == "CreateDataSourceTableAsSelectCommand")
.get
val readPlan = Seq(plan3, plan4)
.filter(null.!=)
.find(_.operations.write.name == "CreateDataSourceTableAsSelectCommand")
.get

val writeUri = writePlan.operations.write.outputSource
val readUri = readPlan.operations.reads.head.inputSources.head

writeUri shouldEqual readUri
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class KafkaSinkSpec

(plan2, _) <- captor.lineageOf(
reader
.option("subscribe", s"$topicName,anotherTopic")
.option("subscribe", s"$topicName")
.load()
.write.mode(Overwrite).save(TempFile(pathOnly = true).deleteOnExit().path.toString))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import za.co.absa.spline.test.fixture.spline.SplineFixture
import za.co.absa.spline.test.fixture.{SparkDatabaseFixture, SparkFixture}

import java.util.UUID
import scala.concurrent.Future
import scala.language.reflectiveCalls
import scala.util.Try

Expand Down Expand Up @@ -376,11 +377,25 @@ class LineageHarvesterSpec extends AsyncFlatSpec
val df = spark.createDataset(Seq(TestRow(1, 2.3, "text")))

for {
(plan, _) <- captor.lineageOf {
(plan1, _) <- captor.lineageOf {
df.createOrReplaceTempView("tempView")
spark.sql("CREATE TABLE users_sales AS SELECT i, d, s FROM tempView ")
}
// Spark 3.4+ is creating 2 commands for this CTAS here so we need to ignore one
// We only want the one that is from CreateHiveTableAsSelectCommand
// The one we ignore here is an extra InsertIntoHiveTableCommand
// They can come out of order so we need to filter out which one is which.
(plan2, _) <- if (ver"$SPARK_VERSION" >= ver"3.4.0") {
captor.lineageOf {
Thread.sleep(5000)
}
} else Future[(ExecutionPlan, Seq[ExecutionEvent])](null, null)
} yield {
val plan = Seq(plan1, plan2)
.filter(null.!=)
.find(_.operations.write.name == "CreateHiveTableAsSelectCommand")
.get

val writeOperation = plan.operations.write
writeOperation.id shouldEqual "op-0"
writeOperation.append shouldEqual false
Expand Down Expand Up @@ -500,7 +515,7 @@ class LineageHarvesterSpec extends AsyncFlatSpec
plan should not be null
event.durationNs should be(empty)
event.error should not be empty
event.error.get.toString should include(s"path ${tmpLocal.toURI.toString.stripSuffix("/")} already exists")
event.error.get.toString.toLowerCase should include(s"path ${tmpLocal.toURI.toString.stripSuffix("/")} already exists")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
<groupId>com.github.cerveada</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<stdout>I</stdout>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
Expand Down

0 comments on commit 56749e0

Please sign in to comment.