Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Use a pre-installed Minikube instance for integration tests. #521

Open
wants to merge 11 commits into
base: branch-2.2-kubernetes
Choose a base branch
from
6 changes: 4 additions & 2 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ Below is a list of the submodules for this cluster manager and what they do.

# Running the Kubernetes Integration Tests

Note that the integration test framework is currently being heavily revised and is subject to change.

Note that currently the integration tests only run with Java 8.

Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.

Running any of the integration tests requires including `kubernetes-integration-tests` profile in the build command. In
order to prepare the environment for running the integration tests, the `pre-integration-test` step must be run in Maven
on the `resource-managers/kubernetes/integration-tests` module:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY =
"spark.kubernetes.test.imageDockerTag"

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
31 changes: 0 additions & 31 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,37 +339,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.3.0</version>
<executions>
<execution>
<id>download-minikube-linux</id>
<phase>pre-integration-test</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
</execution>
<execution>
<id>download-minikube-darwin</id>
<phase>pre-integration-test</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- Triggers scalatest plugin in the integration-test phase instead of
the test phase, so that test jobs are copied over beforehand.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
import org.apache.spark.deploy.k8s.SSLUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.k8s.submit.{Client, ClientArguments, JavaMainAppResource, KeyAndCertPem, MainAppResource, PythonMainAppResource, RMainAppResource}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand All @@ -51,9 +50,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
kubernetesTestComponents
.kubernetesClient
.inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
staticAssetServerLauncher = new StaticAssetServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
kubernetesTestComponents
.kubernetesClient
.inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
}

override def afterAll(): Unit = {
Expand All @@ -62,8 +65,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {

before {
sparkConf = kubernetesTestComponents.newSparkConf()
.set(INIT_CONTAINER_DOCKER_IMAGE, s"spark-init:latest")
.set(DRIVER_DOCKER_IMAGE, s"spark-driver:latest")
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", APP_LOCATOR_LABEL)
kubernetesTestComponents.createNamespace()
}
Expand All @@ -73,14 +77,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run PySpark Job on file from SUBMITTER with --py-files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
sparkConf
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver-py"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor-py"))


runPySparkPiAndVerifyCompletion(
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION,
Expand All @@ -89,20 +92,18 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
sparkConf
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver-py"))
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor-py"))

runPySparkPiAndVerifyCompletion(PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION, Seq.empty[String])
}

test("Run SparkR Job on file locally") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
sparkConf
Expand All @@ -115,7 +116,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Run SparkR Job on file from SUBMITTER") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
sparkConf
Expand All @@ -128,14 +129,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Simple submission test with the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
runSparkPiAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

val keyStoreAndTrustStore = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
Expand All @@ -162,14 +163,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use container-local resources without the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

test("Dynamic executor scaling basic test") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

launchStagingServer(SSLOptions(), None)
createShuffleServiceDaemonSet()
Expand All @@ -190,7 +191,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use remote resources without the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
sparkConf.setJars(Seq(
s"$assetServerUri/${EXAMPLES_JAR_FILE.getName}",
Expand All @@ -200,7 +201,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Mix remote resources with submitted ones.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
sparkConf.setJars(Seq(
Expand All @@ -210,7 +211,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use key and certificate PEM files for TLS.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
val keyAndCertificate = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
launchStagingServer(
SSLOptions(enabled = true),
Expand All @@ -222,7 +223,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use client key and client cert file when requesting executors") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
sparkConf.setJars(Seq(
CONTAINER_LOCAL_MAIN_APP_RESOURCE,
CONTAINER_LOCAL_HELPER_JAR_PATH))
Expand All @@ -239,7 +240,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Added files should be placed in the driver's working directory.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
Expand All @@ -257,7 +258,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Setting JVM options on the driver and executors with spaces.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
launchStagingServer(SSLOptions(), None)
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
Map("simpleDriverConf" -> "simpleDriverConfValue",
Expand Down Expand Up @@ -287,7 +288,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Submit small local files without the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)
sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
Expand All @@ -305,15 +306,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}

test("Use a very long application name.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)).setAppName("long" * 40)
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

private def launchStagingServer(
resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
assume(testBackend == MinikubeTestBackend)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions, keyAndCertPem)
Expand Down Expand Up @@ -405,7 +406,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.endVolume()
.addNewContainer()
.withName("shuffle")
.withImage("spark-shuffle:latest")
.withImage(s"spark-shuffle:${testBackend.dockerImageTag()}")
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName("shuffle-dir")
Expand Down Expand Up @@ -441,6 +442,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}
propertiesFile
}

private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
}

private[spark] object KubernetesSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.util.Utils
/**
* Launches a pod that runs the resource staging server, exposing it over a NodePort.
*/
private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesClient) {
private[spark] class ResourceStagingServerLauncher(
kubernetesClient: KubernetesClient, dockerImageTag: String) {

private val SECRETS_ROOT_DIR = "/mnt/secrets/spark-staging"
private val KEYSTORE_SECRET_KEY = "keyStore"
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC
.endVolume()
.addNewContainer()
.withName("staging-server-container")
.withImage("spark-resource-staging-server:latest")
.withImage(s"spark-resource-staging-server:$dockerImageTag")
.withImagePullPolicy("IfNotPresent")
.withNewReadinessProbe()
.withHttpGet(probePingHttpGet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.util.Utils
* Launches a simple HTTP server which provides jars that can be downloaded by Spark applications
* in integration tests.
*/
private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClient) {
private[spark] class StaticAssetServerLauncher(
kubernetesClient: KubernetesClient, dockerImageTag: String) {

// Returns the HTTP Base URI of the server.
def launchStaticAssetServer(): String = {
Expand All @@ -46,7 +47,7 @@ private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClien
.withNewSpec()
.addNewContainer()
.withName("static-asset-server-container")
.withImage("spark-integration-test-asset-server:latest")
.withImage(s"spark-integration-test-asset-server:$dockerImageTag")
.withImagePullPolicy("IfNotPresent")
.withNewReadinessProbe()
.withHttpGet(probePingHttpGet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.GCE

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}

import org.apache.spark.deploy.k8s.config.resolveK8sMaster
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND

private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _
Expand All @@ -36,5 +35,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
defaultClient
}

override def name(): String = GCE_TEST_BACKEND
override def dockerImageTag(): String = {
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager

private[spark] trait IntegrationTestBackend {
def name(): String
def initialize(): Unit
def getKubernetesClient(): DefaultKubernetesClient
def dockerImageTag(): String
def cleanUp(): Unit = {}
}

private[spark] object IntegrationTestBackendFactory {
def getTestBackend(): IntegrationTestBackend = {
Option(System.getProperty("spark.kubernetes.test.master"))
.map(new GCETestBackend(_))
.getOrElse(new MinikubeTestBackend())
.getOrElse(MinikubeTestBackend)
}
}
Loading