-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Update sbt testing task of Delta Connect Client to automatically use the Spark latest jars #3670
base: master
Are you sure you want to change the base?
[Spark] Update sbt testing task of Delta Connect Client to automatically use the Spark latest jars #3670
Changes from 13 commits
d383a46
786f7f7
6cce920
c5083eb
5608dc2
9a34d71
7ebb012
de32be0
7e116ba
0fe5734
bb59180
c5864dc
6c19df2
9332044
08e50b1
94225b6
571c2de
f834113
1bc4ea4
1ac2075
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,7 @@ jobs: | |
- name: Run Spark Master tests | ||
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml | ||
run: | | ||
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test | ||
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test | ||
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test | ||
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Temporarily for now to test |
||
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test | ||
if: steps.git-diff.outputs.diff |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
// scalastyle:off line.size.limit | ||
|
||
import java.io.BufferedInputStream | ||
import java.net.URL | ||
import java.nio.file.Files | ||
import java.nio.file.attribute.PosixFilePermission | ||
import java.util | ||
|
@@ -26,8 +27,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream | |
import org.apache.commons.compress.utils.IOUtils | ||
|
||
import scala.collection.mutable | ||
import scala.io.Source | ||
import scala.sys.process._ | ||
import scala.util.Using | ||
import scala.xml.XML | ||
|
||
import sbt.internal.inc.Analysis | ||
import sbtprotoc.ProtocPlugin.autoImport._ | ||
|
@@ -273,6 +276,53 @@ lazy val connectCommon = (project in file("spark-connect/common")) | |
), | ||
) | ||
|
||
/** | ||
* Downloads the latest nightly release JAR of a given Spark component and saves it to | ||
* the specified directory. | ||
* | ||
* @param sparkComponentName The name of the Spark component. | ||
* @param destDir The directory where the JAR should be saved. | ||
*/ | ||
def downloadLatestSparkReleaseJar( | ||
sparkComponentName: String, | ||
destDir: File): Unit = { | ||
val sparkMasterSnapshotsURL = "https://repository.apache.org/content/groups/snapshots/" + | ||
"org/apache/spark/" | ||
|
||
// Construct the URL to the maven-metadata.xml file, the maven-metadata.xml has | ||
// metadata information about the latest nightly release of the jar, since the | ||
// jar directory also retains slightly older jar versions. | ||
// | ||
// An example folder is: | ||
// https://repository.apache.org/content/groups/snapshots/org/apache/spark/ | ||
// spark-catalyst_2.13/4.0.0-SNAPSHOT/ | ||
val latestSparkComponentJarDir = sparkMasterSnapshotsURL + | ||
s"$sparkComponentName/$SPARK_MASTER_VERSION/" | ||
val metadataUrl = new URL(latestSparkComponentJarDir + "maven-metadata.xml") | ||
|
||
// Fetch and parse the maven-metadata.xml file. | ||
val metadataXml = XML.load(metadataUrl) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For some reasons, the XML that we download is So it's missing the I'm not sure why this is the case, I tried using other To get the latest jar name, we can either
I think way 1 works just as fine as way 2, so I decided to not spend more time trying to fix the |
||
|
||
// Extract the metadata information about the latest nightly release JAR. | ||
val sparkVersion = SPARK_MASTER_VERSION.replace("-SNAPSHOT", "") | ||
val timestamp = (metadataXml \\ "snapshot" \ "timestamp").text | ||
val buildNumber = (metadataXml \\ "snapshot" \ "buildNumber").text | ||
|
||
// Ensure the metadata information is properly extracted. | ||
if (sparkVersion.isEmpty || timestamp.isEmpty || buildNumber.isEmpty) { | ||
throw new RuntimeException("Could not extract the required metadata " + | ||
"from maven-metadata.xml") | ||
} | ||
|
||
// Construct the URL for the latest nightly release JAR. | ||
val latestSparkJarName = s"$sparkComponentName-$sparkVersion-$timestamp-$buildNumber.jar" | ||
val latestSparkJarUrl = latestSparkComponentJarDir + latestSparkJarName | ||
val latestSparkJarPath = destDir / s"$sparkComponentName.jar" | ||
|
||
// Download the latest nightly release JAR. | ||
new URL(latestSparkJarUrl) #> latestSparkJarPath! | ||
} | ||
|
||
lazy val connectClient = (project in file("spark-connect/client")) | ||
.disablePlugins(JavaFormatterPlugin) | ||
.dependsOn(connectCommon % "compile->compile;test->test;provided->provided") | ||
|
@@ -343,20 +393,38 @@ lazy val connectClient = (project in file("spark-connect/client")) | |
} | ||
} | ||
} | ||
|
||
val sparkJarsDir = destDir / "spark-4.0.0-preview1-bin-hadoop3" / "jars" | ||
if (!sparkJarsDir.exists()) { | ||
throw new RuntimeException(s"Jars directory $sparkJarsDir does not exist after extraction.") | ||
} | ||
|
||
// The Spark Jars have the format "sparkComponentName-4.0.0-preview1.jar", for example: | ||
// spark-catalyst_2.13-4.0.0-preview1.jar, spark-core_2.13-4.0.0-preview1.jar, etc. | ||
val outdatedJars = sparkJarsDir.listFiles().filter(_.getName.endsWith("preview1.jar")) | ||
|
||
// Replace outdated Spark 4.0 First Preview jars with latest nightly release Spark | ||
// Master jars. | ||
outdatedJars.foreach { outdatedJar => | ||
val sparkComponentName = outdatedJar.getName.stripSuffix("-4.0.0-preview1.jar") | ||
downloadLatestSparkReleaseJar(sparkComponentName, sparkJarsDir) | ||
outdatedJar.delete() | ||
} | ||
|
||
files | ||
} else { | ||
destDir.get() | ||
} | ||
}.taskValue, | ||
(Test / resourceGenerators) += Def.task { | ||
val src = url("https://repository.apache.org/content/groups/public/org/apache/spark/spark-connect_2.13/4.0.0-preview1/spark-connect_2.13-4.0.0-preview1.jar") | ||
val dest = (Test / resourceManaged).value / "spark-connect.jar" | ||
val sparkConnectComponentName = "spark-connect_2.13" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We define sparkComponentName on line 409 to have the suffix |
||
val dest = (Test / resourceManaged).value / s"$sparkConnectComponentName.jar" | ||
|
||
if (!dest.exists()) { | ||
src #> dest !; | ||
Seq(dest) | ||
} else { | ||
dest.get() | ||
downloadLatestSparkReleaseJar(sparkConnectComponentName, (Test / resourceManaged).value) | ||
} | ||
|
||
Seq(dest) | ||
}.taskValue | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark, will put things back the way they were when about to merge