diff --git a/README.md b/README.md index c8df557..ce8999e 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,10 @@ $ ./e2e/runner.sh -m https://xyz -i test -r https://github.com/my-spark/spark -d ## Running the tests using maven +Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on +your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions +on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster. + Running the integration tests requires a Spark distribution package tarball that contains Spark jars, submission clients, etc. You can download a tarball from http://spark.apache.org/downloads.html. Or, you can create a distribution from @@ -82,23 +86,7 @@ In order to run against any cluster, use the following: ```sh $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" -``` - -## Preserve the Minikube VM - -The integration tests make use of -[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual -machine and setup a single-node kubernetes cluster within it. By default the vm -is destroyed after the tests are finished. If you want to preserve the vm, e.g. -to reduce the running time of tests during development, you can pass the -property `spark.docker.test.persistMinikube` to the test process: - -``` -$ mvn clean integration-test \ - -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - -DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -``` + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// ## Reuse the previous Docker images @@ -106,13 +94,12 @@ The integration tests build a number of Docker images, which takes some time. By default, the images are built every time the tests run. You may want to skip re-building those images during development, if the distribution package did not change since the last run. You can pass the property -`spark.docker.test.skipBuildImages` to the test process. This will work only if -you have been setting the property `spark.docker.test.persistMinikube`, in the -previous run since the docker daemon run inside the minikube environment. Here -is an example: +`spark.kubernetes.test.imageDockerTag` to the test process and specify the Docker +image tag that is appropriate. +Here is an example: ``` $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - "-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true" + -Dspark.kubernetes.test.imageDockerTag=latest ``` diff --git a/integration-test/pom.xml b/integration-test/pom.xml index bf48318..9375d91 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -40,7 +40,6 @@ 1.7.24 kubernetes-integration-tests YOUR-SPARK-DISTRO-TARBALL-HERE - YOUR-DOCKERFILES-DIR-HERE jar @@ -141,37 +140,6 @@ - - com.googlecode.maven-download-plugin - download-maven-plugin - ${download-maven-plugin.version} - - - download-minikube-linux - pre-integration-test - - wget - - - https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64 - ${project.build.directory}/minikube-bin/linux-amd64 - minikube - - - - download-minikube-darwin - pre-integration-test - - wget - - - https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64 - ${project.build.directory}/minikube-bin/darwin-amd64 - minikube - - - - diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0abf98c..3b60af8 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -30,7 +30,9 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory -import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants._ +import org.apache.spark.deploy.k8s.integrationtest.config._ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { @@ -59,6 +61,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit .set("spark.kubernetes.driver.container.image", driverImage) .set("spark.kubernetes.executor.container.image", executorImage) .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) + .set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver")) + .set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor")) + .set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init")) .set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL) kubernetesTestComponents.createNamespace() } @@ -97,6 +102,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") { + doMinikubeCheck sparkAppConf .set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi") .set("spark.kubernetes.driver.label.label1", "label1-value") @@ -217,6 +223,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } } + private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}" private def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getSpec.getContainers.get(0).getImage === driverImage) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 911b3a9..c300ca4 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -26,6 +26,32 @@ object Utils extends Logging { try f.apply(resource) finally resource.close() } + def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = { + var originalThrowable: Throwable = null + try { + block + } catch { + case t: Throwable => + // Purposefully not using NonFatal, because even fatal exceptions + // we don't want to have our finallyBlock suppress + originalThrowable = t + throw originalThrowable + } finally { + try { + finallyBlock + } catch { + case t: Throwable => + if (originalThrowable != null) { + originalThrowable.addSuppressed(t) + logWarning(s"Suppressing exception in finally: " + t.getMessage, t) + throw originalThrowable + } else { + throw t + } + } + } + } + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { require(rawMasterURL.startsWith("k8s://"), "Kubernetes master URL must start with k8s://.") diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala index cbb98fa..345ccc8 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala @@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.deploy.k8s.integrationtest.Utils import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.config._ private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ @@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB defaultClient } - override def name(): String = GCE_TEST_BACKEND + override def dockerImageTag(): String = { + return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest") + } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index e1aaf13..9c64c64 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -23,9 +23,9 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend private[spark] trait IntegrationTestBackend { - def name(): String def initialize(): Unit def getKubernetesClient: DefaultKubernetesClient + def dockerImageTag(): String def cleanUp(): Unit = {} } @@ -33,6 +33,6 @@ private[spark] object IntegrationTestBackendFactory { def getTestBackend(): IntegrationTestBackend = { Option(System.getProperty("spark.kubernetes.test.master")) .map(new GCETestBackend(_)) - .getOrElse(new MinikubeTestBackend()) + .getOrElse(MinikubeTestBackend) } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c04bd75..8204852 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -20,57 +20,30 @@ import java.nio.file.Paths import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} -import org.apache.commons.lang3.SystemUtils import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils} // TODO support windows private[spark] object Minikube extends Logging { - private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) { - Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile - } else if (SystemUtils.IS_OS_WINDOWS) { - throw new IllegalStateException("Executing Minikube based integration tests not yet " + - " available on Windows.") - } else { - Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile - } - - private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + - s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" - private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 - // NOTE: This and the following methods are synchronized to prevent deleteMinikube from - // destroying the minikube VM while other methods try to use the VM. - // Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox. - def startMinikube(): Unit = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) - if (getMinikubeStatus != MinikubeStatus.RUNNING) { - executeMinikube("start", "--memory", "6000", "--cpus", "8") - } else { - logInfo("Minikube is already started.") - } - } - - def getMinikubeIp: String = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + def getMinikubeIp: String = { val outputs = executeMinikube("ip") .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) assert(outputs.size == 1, "Unexpected amount of output from minikube ip") outputs.head } - def getMinikubeStatus: MinikubeStatus.Value = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + def getMinikubeStatus: MinikubeStatus.Value = { val statusString = executeMinikube("status") - .filter(_.contains("minikube: ")) + .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:")) .head + .replaceFirst("minikubeVM: ", "") .replaceFirst("minikube: ", "") MinikubeStatus.unapply(statusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } - def getDockerEnv: Map[String, String] = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + def getDockerEnv: Map[String, String] = { executeMinikube("docker-env", "--shell", "bash") .filter(_.startsWith("export")) .map(_.replaceFirst("export ", "").split('=')) @@ -78,16 +51,7 @@ private[spark] object Minikube extends Logging { .toMap } - def deleteMinikube(): Unit = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) - if (getMinikubeStatus != MinikubeStatus.NONE) { - executeMinikube("delete") - } else { - logInfo("Minikube was already not running.") - } - } - - def getKubernetesClient: DefaultKubernetesClient = synchronized { + def getKubernetesClient: DefaultKubernetesClient = { val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") val kubernetesConf = new ConfigBuilder() @@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging { } private def executeMinikube(action: String, args: String*): Seq[String] = { - if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { - if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { - throw new IllegalStateException("Failed to make the Minikube binary executable.") - } - } - ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, - MINIKUBE_STARTUP_TIMEOUT_SECONDS) + ProcessUtils.executeProcess( + Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS) } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 7a1433e..89db42f 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -19,29 +19,33 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND -import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.k8s.integrationtest.config._ +import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager -private[spark] class MinikubeTestBackend extends IntegrationTestBackend { +private[spark] object MinikubeTestBackend extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ + private val userProvidedDockerImageTag = Option( + System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY)) + private val dockerManager = new KubernetesSuiteDockerManager( + Minikube.getDockerEnv, userProvidedDockerImageTag) override def initialize(): Unit = { - Minikube.startMinikube() - if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) { - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - } + val minikubeStatus = Minikube.getMinikubeStatus + require(minikubeStatus == MinikubeStatus.RUNNING, + s"Minikube must be running before integration tests can execute. Current status" + + s" is: $minikubeStatus") + dockerManager.buildSparkDockerImages() defaultClient = Minikube.getKubernetesClient } - override def getKubernetesClient(): DefaultKubernetesClient = { - defaultClient + override def cleanUp(): Unit = { + super.cleanUp() + dockerManager.deleteImages() } - override def cleanUp(): Unit = { - if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient } - override def name(): String = MINIKUBE_TEST_BACKEND + override def dockerImageTag(): String = dockerManager.dockerImageTag() } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala new file mode 100644 index 0000000..d82a1de --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +package object config { + val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag" + val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.container.image" + val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.container.image" + val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.container.image" +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala new file mode 100644 index 0000000..0163d33 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.docker + +import java.io.{File, PrintWriter} +import java.net.URI +import java.nio.file.Paths +import java.util.UUID + +import com.google.common.base.Charsets +import com.google.common.io.Files +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import com.spotify.docker.client.DockerClient.{ListContainersParam, ListImagesParam, RemoveContainerParam} +import com.spotify.docker.client.messages.Container +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.constants._ +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite +import org.apache.spark.deploy.k8s.integrationtest.Logging +import org.apache.spark.deploy.k8s.integrationtest.Utils.tryWithResource + +private[spark] class KubernetesSuiteDockerManager( + dockerEnv: Map[String, String], userProvidedDockerImageTag: Option[String]) extends Logging { + + private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH + // Dockerfile paths must be relative to the build path. + private val DOCKERFILES_DIR = "kubernetes/dockerfiles/" + private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile" + private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile" + private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile" + private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile" + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + + private val resolvedDockerImageTag = + userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates + .builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build().get()) + .build() + + def buildSparkDockerImages(): Unit = { + if (userProvidedDockerImageTag.isEmpty) { + Eventually.eventually(TIMEOUT, INTERVAL) { + dockerClient.ping() + } + buildImage("spark-base", BASE_DOCKER_FILE) + buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + } + } + + def deleteImages(): Unit = { + if (userProvidedDockerImageTag.isEmpty) { + removeRunningContainers() + deleteImage("spark-base") + deleteImage("spark-driver") + deleteImage("spark-executor") + deleteImage("spark-init") + } + } + + def dockerImageTag(): String = resolvedDockerImageTag + + private def buildImage(name: String, dockerFile: String): Unit = { + logInfo(s"Building Docker image - $name:$resolvedDockerImageTag") + val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve( + s"$dockerFile-$resolvedDockerImageTag").toAbsolutePath.toString) + dockerFileWithBaseTag.deleteOnExit() + try { + val originalDockerFileText = Files.readLines( + DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala + val dockerFileTextWithProperBaseImage = originalDockerFileText.map( + _.replace("FROM spark-base", s"FROM spark-base:$resolvedDockerImageTag")) + tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter => + tryWithResource(new PrintWriter(fileWriter)) { printWriter => + for (line <- dockerFileTextWithProperBaseImage) { + // scalastyle:off println + printWriter.println(line) + // scalastyle:on println + } + } + } + dockerClient.build( + DOCKER_BUILD_PATH, + s"$name:$resolvedDockerImageTag", + s"$dockerFile-$resolvedDockerImageTag", + new LoggingBuildHandler()) + } finally { + dockerFileWithBaseTag.delete() + } + } + + /** + * Forces all containers running an image with the configured tag to halt and be removed. + */ + private def removeRunningContainers(): Unit = { + val imageIds = dockerClient.listImages(ListImagesParam.allImages()) + .asScala + .filter(image => image.repoTags().asScala.exists(_.endsWith(s":$resolvedDockerImageTag"))) + .map(_.id()) + .toSet + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val runningContainersWithImageTag = stopRunningContainers(imageIds) + require( + runningContainersWithImageTag.isEmpty, + s"${runningContainersWithImageTag.size} containers found still running" + + s" with the image tag $resolvedDockerImageTag") + } + dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + .foreach(container => dockerClient.removeContainer( + container.id(), RemoveContainerParam.forceKill(true))) + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val containersWithImageTag = dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" + + s" found with image tag $resolvedDockerImageTag.") + } + + } + + private def stopRunningContainers(imageIds: Set[String]): Iterable[Container] = { + val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds) + if (runningContainersWithImageTag.nonEmpty) { + logInfo(s"Found ${runningContainersWithImageTag.size} containers running with" + + s" an image with the tag $resolvedDockerImageTag. Attempting to remove these containers," + + s" and then will stall for 2 seconds.") + runningContainersWithImageTag.foreach { container => + dockerClient.stopContainer(container.id(), 5) + } + } + runningContainersWithImageTag + } + + private def getRunningContainersWithImageIds(imageIds: Set[String]): Iterable[Container] = { + dockerClient + .listContainers( + ListContainersParam.allContainers(), + ListContainersParam.withStatusRunning()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + } + + private def deleteImage(name: String): Unit = { + try { + dockerClient.removeImage(s"$name:$resolvedDockerImageTag") + } catch { + case e: RuntimeException => + logWarning(s"Failed to delete image $name:$resolvedDockerImageTag. There may be images leaking in the" + + s" docker environment which are now stale and unused.", e) + } + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala deleted file mode 100644 index 9cce325..0000000 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest.docker - -import java.net.URI -import java.net.URLEncoder -import java.nio.file.Paths - -import com.spotify.docker.client.{DockerClient, DefaultDockerClient, DockerCertificates, LoggingBuildHandler} -import org.apache.http.client.utils.URIBuilder -import org.scalatest.concurrent.{Eventually, PatienceConfiguration} -import org.scalatest.time.{Minutes, Seconds, Span} - -import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH -import org.apache.spark.deploy.k8s.integrationtest.Logging - -private[spark] class SparkDockerImageBuilder - (private val dockerEnv: Map[String, String]) extends Logging { - - private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH - // Dockerfile paths must be relative to the build path. - private val DOCKERFILES_DIR = "kubernetes/dockerfiles/" - private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile" - private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile" - private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile" - private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile" - private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) - private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) - private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", - throw new IllegalStateException("DOCKER_HOST env not found.")) - - private val originalDockerUri = URI.create(dockerHost) - private val httpsDockerUri = new URIBuilder() - .setHost(originalDockerUri.getHost) - .setPort(originalDockerUri.getPort) - .setScheme("https") - .build() - - private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", - throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) - - private val dockerClient = new DefaultDockerClient.Builder() - .uri(httpsDockerUri) - .dockerCertificates(DockerCertificates.builder() - .dockerCertPath(Paths.get(dockerCerts)) - .build() - .get()) - .build() - - def buildSparkDockerImages(): Unit = { - Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } - buildImage("spark-base", BASE_DOCKER_FILE, - Some("{\"spark_jars\":\"jars\",\"img_path\":\"kubernetes/dockerfiles\"}")) - buildImage("spark-driver", DRIVER_DOCKER_FILE) - buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) - } - - private def buildImage( - name: String, - dockerFile: String, - buildArgs: Option[String] = None): Unit = { - if (buildArgs.nonEmpty) { - dockerClient.build( - DOCKER_BUILD_PATH, - name, - dockerFile, - new LoggingBuildHandler(), - DockerClient.BuildParam.create("buildargs", URLEncoder.encode(buildArgs.get, "UTF-8"))) - } else { - dockerClient.build( - DOCKER_BUILD_PATH, - name, - dockerFile, - new LoggingBuildHandler()) - } - - logInfo(s"Built $name docker image") - } -}