Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark agent #705 Spark 3.4 support #739

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.spline.harvester.plugin.embedded
import org.apache.spark.Partition
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.datasources.{FileScanRDD, PartitionedFile}
import za.co.absa.spline.commons.reflect.ReflectionUtils
import za.co.absa.spline.harvester.builder._
import za.co.absa.spline.harvester.plugin.Plugin.{Precedence, ReadNodeInfo}
Expand All @@ -39,14 +39,22 @@ class RDDPlugin(

override def rddReadNodeProcessor: PartialFunction[RDD[_], ReadNodeInfo] = {
case fsr: FileScanRDD =>
val uris = fsr.filePartitions.flatMap(_.files.map(_.filePath))
val files = fsr.filePartitions.flatMap(_.files)
val uris = files.map(extractPath(_))
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
case hr: HadoopRDD[_, _] =>
val partitions = ReflectionUtils.extractValue[Array[Partition]](hr, "partitions_")
val uris = partitions.map(hadoopPartitionToUriString)
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
}

private def extractPath(file: PartitionedFile): String = {
val path = ReflectionUtils.extractValue[AnyRef](file, "filePath")
// for Spark 3.3 and lower path is a String
// for Spark 3.4 path is org.apache.spark.paths.SparkPath
path.toString
}

private def hadoopPartitionToUriString(hadoopPartition: Partition): String = {
val inputSplit = ReflectionUtils.extractValue[AnyRef](hadoopPartition, "inputSplit")
val fileSplitT = ReflectionUtils.extractValue[AnyRef](inputSplit, "t")
Expand All @@ -56,5 +64,4 @@ class RDDPlugin(

uri.toString
}

}
17 changes: 16 additions & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-${elasticsearch.spark.sufix}_${scala.binary.version}</artifactId>
<version>8.2.2</version>
<version>8.9.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -267,6 +267,21 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.4</id>
<properties>
<guava.version>16.0.1</guava.version>
<elasticsearch.spark.sufix>30</elasticsearch.spark.sufix>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.4_${scala.binary.version}</artifactId>
<version>1.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DeltaSpec extends AsyncFlatSpec
private val deltaPath = TempDirectory(prefix = "delta", pathOnly = true).deleteOnExit().toURI.toString

it should "support Delta Lake as a source" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
Expand Down Expand Up @@ -79,7 +79,7 @@ class DeltaSpec extends AsyncFlatSpec
}

it should "support insert into existing Delta Lake table" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { lineageCaptor =>
val testData: DataFrame = {
Expand Down
27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
<spark-31.version>3.1.3</spark-31.version>
<spark-32.version>3.2.3</spark-32.version>
<spark-33.version>3.3.1</spark-33.version>
<spark-34.version>3.4.1</spark-34.version>

<!-- Delta -->

Expand All @@ -100,6 +101,8 @@
<delta-10.version>1.0.0</delta-10.version>
<delta-20.version>2.0.0</delta-20.version>
<delta-21.version>2.1.0</delta-21.version>
<delta-24.version>2.4.0</delta-24.version>


<!-- Cassandra -->
<cassandra-connector.version>${cassandra-connector-24.version}</cassandra-connector.version>
Expand All @@ -108,6 +111,7 @@
<cassandra-connector-31.version>3.1.0</cassandra-connector-31.version>
<cassandra-connector-32.version>3.2.0</cassandra-connector-32.version>
<cassandra-connector-33.version>3.3.0</cassandra-connector-33.version>
<cassandra-connector-34.version>3.4.1</cassandra-connector-34.version>

<spark-excel.version>0.13.7</spark-excel.version>

Expand Down Expand Up @@ -815,6 +819,29 @@
</dependencyManagement>
</profile>

<profile>
<id>spark-3.4</id>
<properties>
<spark.version>${spark-34.version}</spark.version>
<delta.version>${delta-24.version}</delta.version>
<spark-excel.version>${spark-34.version}_0.19.0</spark-excel.version>
<cassandra-connector.version>${cassandra-connector-34.version}</cassandra-connector.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</profile>



<!-- Binary compatibility checking profile -->

<profile>
Expand Down
Loading