From 8936cfb258be5f01962435f1b346c4bfa48df49a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 18 Apr 2016 22:41:38 -0700 Subject: [PATCH 1/4] Expose clear only in 1.4+ --- .../testing/LocalSparkContextObject.scala | 18 +++++++++++++++++ .../testing/LocalSparkContextObject.scala | 20 +++++++++++++++++++ .../scala/org/apache/spark/SparkUtils.scala | 0 3 files changed, 38 insertions(+) create mode 100644 src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala create mode 100644 src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala rename src/main/{1.3 => 1.4}/scala/org/apache/spark/SparkUtils.scala (100%) diff --git a/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala b/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala new file mode 100644 index 00000000..574e2caa --- /dev/null +++ b/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala @@ -0,0 +1,18 @@ +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { + try { + f(sc) + } finally { + stop(sc) + } + } +} diff --git a/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala b/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala new file mode 100644 index 00000000..ceddf589 --- /dev/null +++ b/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala @@ -0,0 +1,20 @@ +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { + try { + f(sc) + } finally { + stop(sc) + } + } + + def clearLocalRootDirs(): Unit = SparkUtils.clearLocalRootDirs() +} diff --git a/src/main/1.3/scala/org/apache/spark/SparkUtils.scala b/src/main/1.4/scala/org/apache/spark/SparkUtils.scala similarity index 100% rename from src/main/1.3/scala/org/apache/spark/SparkUtils.scala rename to src/main/1.4/scala/org/apache/spark/SparkUtils.scala From e2846ab93e42d539624ed7ec1613306ff9f5ca32 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 18 Apr 2016 22:41:47 -0700 Subject: [PATCH 2/4] Expose clear only in 1.4+ --- .../spark/testing/LocalSparkContext.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala b/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala index dd9acb91..bd0e71e5 100644 --- a/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala +++ b/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala @@ -40,24 +40,3 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self } } - -object LocalSparkContext { - def stop(sc: SparkContext) { - if (sc != null) { - sc.stop() - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ - def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { - try { - f(sc) - } finally { - stop(sc) - } - } - - def clearLocalRootDirs(): Unit = SparkUtils.clearLocalRootDirs() -} From ea10fe671f23a169296859ff182d0e8284cfbcac Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 18 Apr 2016 22:48:53 -0700 Subject: [PATCH 3/4] same obj --- .../spark/testing/LocalSparkContext.scala | 61 ++++++++++++++++++ .../testing/LocalSparkContextObject.scala | 18 ------ .../spark/testing/LocalSparkContext.scala | 63 +++++++++++++++++++ .../testing/LocalSparkContextObject.scala | 20 ------ 4 files changed, 124 insertions(+), 38 deletions(-) create mode 100644 src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala delete mode 100644 src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala create mode 100644 src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala delete mode 100644 src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala diff --git a/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala b/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala new file mode 100644 index 00000000..ce06d550 --- /dev/null +++ b/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.holdenkarau.spark.testing + +import org.apache.spark._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Suite + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => + + @transient var sc: SparkContext = _ + + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() { + LocalSparkContext.stop(sc) + sc = null + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { + try { + f(sc) + } finally { + stop(sc) + } + } +} diff --git a/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala b/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala deleted file mode 100644 index 574e2caa..00000000 --- a/src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala +++ /dev/null @@ -1,18 +0,0 @@ -object LocalSparkContext { - def stop(sc: SparkContext) { - if (sc != null) { - sc.stop() - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ - def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { - try { - f(sc) - } finally { - stop(sc) - } - } -} diff --git a/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala b/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala new file mode 100644 index 00000000..dd9acb91 --- /dev/null +++ b/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.holdenkarau.spark.testing + +import org.apache.spark._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Suite + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => + + @transient var sc: SparkContext = _ + + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() { + LocalSparkContext.stop(sc) + sc = null + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { + try { + f(sc) + } finally { + stop(sc) + } + } + + def clearLocalRootDirs(): Unit = SparkUtils.clearLocalRootDirs() +} diff --git a/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala b/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala deleted file mode 100644 index ceddf589..00000000 --- a/src/main/1.4/scala/com/holdenkarau/spark/testing/LocalSparkContextObject.scala +++ /dev/null @@ -1,20 +0,0 @@ -object LocalSparkContext { - def stop(sc: SparkContext) { - if (sc != null) { - sc.stop() - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ - def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = { - try { - f(sc) - } finally { - stop(sc) - } - } - - def clearLocalRootDirs(): Unit = SparkUtils.clearLocalRootDirs() -} From baba0954ce813a7d2690ba3212e1bc9f36077a70 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 18 Apr 2016 22:50:51 -0700 Subject: [PATCH 4/4] delete old general --- .../spark/testing/LocalSparkContext.scala | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala diff --git a/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala b/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala deleted file mode 100644 index bd0e71e5..00000000 --- a/src/main/1.3/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.holdenkarau.spark.testing - -import org.apache.spark._ - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.BeforeAndAfterEach -import org.scalatest.Suite - -/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ -trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => - - @transient var sc: SparkContext = _ - - - override def afterEach() { - resetSparkContext() - super.afterEach() - } - - def resetSparkContext() { - LocalSparkContext.stop(sc) - sc = null - } - -}