Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Update sbt testing task of Delta Connect Client to automatically use the Spark latest jars #3670

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/spark_master_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- name: Run Spark Master tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
Copy link
Contributor Author

@longvu-db longvu-db Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark, will put things back the way they were when about to merge

TEST_PARALLELISM_COUNT=1 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark, will put things back the way they were when about to merge

TEST_PARALLELISM_COUNT=1 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=1 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
if: steps.git-diff.outputs.diff
89 changes: 78 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// scalastyle:off line.size.limit

import java.io.BufferedInputStream
import java.net.URL
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission
import java.util
Expand All @@ -26,8 +27,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.utils.IOUtils

import scala.collection.mutable
import scala.io.Source
import scala.sys.process._
import scala.util.Using
import scala.xml.XML

import sbt.internal.inc.Analysis
import sbtprotoc.ProtocPlugin.autoImport._
Expand Down Expand Up @@ -273,6 +276,53 @@ lazy val connectCommon = (project in file("spark-connect/common"))
),
)

/**
* Downloads the latest nightly release JAR of a given Spark component and saves it to
* the specified directory.
*
* @param sparkComponentName The name of the Spark component.
* @param destDir The directory where the JAR should be saved.
*/
def downloadLatestSparkReleaseJar(
sparkComponentName: String,
destDir: File): Unit = {
val sparkMasterSnapshotsURL = "https://repository.apache.org/content/groups/snapshots/" +
"org/apache/spark/"

// Construct the URL to the maven-metadata.xml file, the maven-metadata.xml has
// metadata information about the latest nightly release of the jar, since the
// jar directory also retains slightly older jar versions.
//
// An example folder is:
// https://repository.apache.org/content/groups/snapshots/org/apache/spark/
// spark-catalyst_2.13/4.0.0-SNAPSHOT/
val latestSparkComponentJarDir = sparkMasterSnapshotsURL +
s"$sparkComponentName/$SPARK_MASTER_VERSION/"
val metadataUrl = new URL(latestSparkComponentJarDir + "maven-metadata.xml")

// Fetch and parse the maven-metadata.xml file.
val metadataXml = XML.load(metadataUrl)
Copy link
Contributor Author

@longvu-db longvu-db Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reasons, the XML that we download is

image

So it's missing the snapshotVersions section in https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-connect_2.13/4.0.0-SNAPSHOT/maven-metadata.xml

I'm not sure why this is the case, I tried using other load APIs in https://javadoc.io/doc/org.scala-lang.modules/scala-xml_2.13/latest/scala/xml/XML$.html, I'm not sure about changing the version of the scala-xml library in https://github.com/delta-io/delta/blob/master/project/plugins.sbt#L46.

To get the latest jar name, we can either

  1. Extract the timestamp and buildNumber, and combine them together
  2. Extract the value, nested in snapshotVersion.

I think way 1 works just as fine as way 2, so I decided to not spend more time trying to fix the metadataURL fetching to fetch the full file.


// Extract the metadata information about the latest nightly release JAR.
val sparkVersion = SPARK_MASTER_VERSION.replace("-SNAPSHOT", "")
val timestamp = (metadataXml \\ "snapshot" \ "timestamp").text
val buildNumber = (metadataXml \\ "snapshot" \ "buildNumber").text

// Ensure the metadata information is properly extracted.
if (sparkVersion.isEmpty || timestamp.isEmpty || buildNumber.isEmpty) {
throw new RuntimeException("Could not extract the required metadata " +
"from maven-metadata.xml")
}

// Construct the URL for the latest nightly release JAR.
val latestSparkJarName = s"$sparkComponentName-$sparkVersion-$timestamp-$buildNumber.jar"
val latestSparkJarUrl = latestSparkComponentJarDir + latestSparkJarName
val latestSparkJarPath = destDir / s"$sparkComponentName.jar"

// Download the latest nightly release JAR.
new URL(latestSparkJarUrl) #> latestSparkJarPath !
}

lazy val connectClient = (project in file("spark-connect/client"))
.disablePlugins(JavaFormatterPlugin)
.dependsOn(connectCommon % "compile->compile;test->test;provided->provided")
Expand Down Expand Up @@ -317,7 +367,6 @@ lazy val connectClient = (project in file("spark-connect/client"))
val destDir = (Test / resourceManaged).value / "spark"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a TODO

if (!destDir.exists()) {
IO.createDirectory(destDir)
val files = mutable.Buffer.empty[File]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to maintain a list of files, we write to the directory directly.

Using(new BufferedInputStream(location.openStream())) { bi =>
Using(new GzipCompressorInputStream(bi)) { gi =>
Using(new TarArchiveInputStream(gi)) { ti =>
Expand All @@ -327,7 +376,9 @@ lazy val connectClient = (project in file("spark-connect/client"))
if (entry.isDirectory) {
dest.mkdirs()
} else {
files += dest
// Ensure the parent directories exist.
dest.getParentFile.mkdirs()

Using(Files.newOutputStream(dest.toPath)) { os =>
IOUtils.copy(ti, os)
}
Expand All @@ -343,20 +394,36 @@ lazy val connectClient = (project in file("spark-connect/client"))
}
}
}
files
} else {
destDir.get()

val sparkJarsDir = destDir / "spark-4.0.0-preview1-bin-hadoop3" / "jars"
if (!sparkJarsDir.exists()) {
throw new RuntimeException(s"Jars directory $sparkJarsDir does not exist after extraction.")
}

// The Spark Jars have the format "sparkComponentName-4.0.0-preview1.jar", for example:
// spark-catalyst_2.13-4.0.0-preview1.jar, spark-core_2.13-4.0.0-preview1.jar, etc.
val outdatedJars = sparkJarsDir.listFiles().filter(_.getName.endsWith("preview1.jar"))

// Replace outdated Spark 4.0 First Preview jars with latest nightly release Spark
// Master jars.
outdatedJars.foreach { outdatedJar =>
val sparkComponentName = outdatedJar.getName.stripSuffix("-4.0.0-preview1.jar")
downloadLatestSparkReleaseJar(sparkComponentName, sparkJarsDir)
}
outdatedJars.foreach(_.delete())
}

Seq(destDir)
}.taskValue,
(Test / resourceGenerators) += Def.task {
val src = url("https://repository.apache.org/content/groups/public/org/apache/spark/spark-connect_2.13/4.0.0-preview1/spark-connect_2.13-4.0.0-preview1.jar")
val dest = (Test / resourceManaged).value / "spark-connect.jar"
val sparkConnectComponentName = "spark-connect_2.13"
Copy link
Contributor Author

@longvu-db longvu-db Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define sparkComponentName on line 409 to have the suffix _2.13, so also changing this to be coherent.

val dest = (Test / resourceManaged).value / s"$sparkConnectComponentName.jar"

if (!dest.exists()) {
src #> dest !;
Seq(dest)
} else {
dest.get()
downloadLatestSparkReleaseJar(sparkConnectComponentName, (Test / resourceManaged).value)
}

Seq(dest)
}.taskValue
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ trait RemoteSparkSession extends BeforeAndAfterAll { self: Suite =>
"spark-connect/server/target/scala-2.13/delta-connect-server-assembly-3.3.0-SNAPSHOT.jar"

private val resources = s"$buildLocation/spark-connect/client/target/scala-2.13/resource_managed/test"
private val sparkConnectJar = s"$resources/spark-connect.jar"
private val sparkConnectJar = s"$resources/spark-connect_2.13.jar"
private val sparkSubmit = s"$resources/spark/spark-4.0.0-preview1-bin-hadoop3/sbin/start-connect-server.sh"

private lazy val server = {
Expand Down
Loading