Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit bd4eb9c

Browse files
Marcelo Vanzingatorsmile
Marcelo Vanzin
authored andcommitted
[SPARK-19558][SQL] Add config key to register QueryExecutionListeners automatically.
This change adds a new SQL config key that is equivalent to SparkContext's "spark.extraListeners", allowing users to register QueryExecutionListener instances through the Spark configuration system instead of having to explicitly do it in code. The code used by SparkContext to implement the feature was refactored into a helper method in the Utils class, and SQL's ExecutionListenerManager was modified to use it to initialize listener declared in the configuration. Unit tests were added to verify all the new functionality. Author: Marcelo Vanzin <[email protected]> Closes apache#19309 from vanzin/SPARK-19558.
1 parent bfc7e1f commit bd4eb9c

File tree

9 files changed

+216
-40
lines changed

9 files changed

+216
-40
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

+5-33
Original file line numberDiff line numberDiff line change
@@ -2344,41 +2344,13 @@ class SparkContext(config: SparkConf) extends Logging {
23442344
* (e.g. after the web UI and event logging listeners have been registered).
23452345
*/
23462346
private def setupAndStartListenerBus(): Unit = {
2347-
// Use reflection to instantiate listeners specified via `spark.extraListeners`
23482347
try {
2349-
val listenerClassNames: Seq[String] =
2350-
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
2351-
for (className <- listenerClassNames) {
2352-
// Use reflection to find the right constructor
2353-
val constructors = {
2354-
val listenerClass = Utils.classForName(className)
2355-
listenerClass
2356-
.getConstructors
2357-
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
2348+
conf.get(EXTRA_LISTENERS).foreach { classNames =>
2349+
val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
2350+
listeners.foreach { listener =>
2351+
listenerBus.addToSharedQueue(listener)
2352+
logInfo(s"Registered listener ${listener.getClass().getName()}")
23582353
}
2359-
val constructorTakingSparkConf = constructors.find { c =>
2360-
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
2361-
}
2362-
lazy val zeroArgumentConstructor = constructors.find { c =>
2363-
c.getParameterTypes.isEmpty
2364-
}
2365-
val listener: SparkListenerInterface = {
2366-
if (constructorTakingSparkConf.isDefined) {
2367-
constructorTakingSparkConf.get.newInstance(conf)
2368-
} else if (zeroArgumentConstructor.isDefined) {
2369-
zeroArgumentConstructor.get.newInstance()
2370-
} else {
2371-
throw new SparkException(
2372-
s"$className did not have a zero-argument constructor or a" +
2373-
" single-argument constructor that accepts SparkConf. Note: if the class is" +
2374-
" defined inside of another Scala class, then its constructors may accept an" +
2375-
" implicit parameter that references the enclosing class; in this case, you must" +
2376-
" define the listener as a top-level class in order to prevent this extra" +
2377-
" parameter from breaking Spark's ability to find a valid constructor.")
2378-
}
2379-
}
2380-
listenerBus.addToSharedQueue(listener)
2381-
logInfo(s"Registered listener $className")
23822354
}
23832355
} catch {
23842356
case e: Exception =>

core/src/main/scala/org/apache/spark/internal/config/package.scala

+7
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,11 @@ package object config {
419419
.stringConf
420420
.toSequence
421421
.createWithDefault(Nil)
422+
423+
private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
424+
.doc("Class names of listeners to add to SparkContext during initialization.")
425+
.stringConf
426+
.toSequence
427+
.createOptional
428+
422429
}

core/src/main/scala/org/apache/spark/util/Utils.scala

+56-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.util
1919

2020
import java.io._
2121
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
22+
import java.lang.reflect.InvocationTargetException
2223
import java.math.{MathContext, RoundingMode}
2324
import java.net._
2425
import java.nio.ByteBuffer
@@ -37,7 +38,7 @@ import scala.collection.Map
3738
import scala.collection.mutable.ArrayBuffer
3839
import scala.io.Source
3940
import scala.reflect.ClassTag
40-
import scala.util.Try
41+
import scala.util.{Failure, Success, Try}
4142
import scala.util.control.{ControlThrowable, NonFatal}
4243
import scala.util.matching.Regex
4344

@@ -2687,6 +2688,60 @@ private[spark] object Utils extends Logging {
26872688
def stringToSeq(str: String): Seq[String] = {
26882689
str.split(",").map(_.trim()).filter(_.nonEmpty)
26892690
}
2691+
2692+
/**
2693+
* Create instances of extension classes.
2694+
*
2695+
* The classes in the given list must:
2696+
* - Be sub-classes of the given base class.
2697+
* - Provide either a no-arg constructor, or a 1-arg constructor that takes a SparkConf.
2698+
*
2699+
* The constructors are allowed to throw "UnsupportedOperationException" if the extension does not
2700+
* want to be registered; this allows the implementations to check the Spark configuration (or
2701+
* other state) and decide they do not need to be added. A log message is printed in that case.
2702+
* Other exceptions are bubbled up.
2703+
*/
2704+
def loadExtensions[T](extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
2705+
classes.flatMap { name =>
2706+
try {
2707+
val klass = classForName(name)
2708+
require(extClass.isAssignableFrom(klass),
2709+
s"$name is not a subclass of ${extClass.getName()}.")
2710+
2711+
val ext = Try(klass.getConstructor(classOf[SparkConf])) match {
2712+
case Success(ctor) =>
2713+
ctor.newInstance(conf)
2714+
2715+
case Failure(_) =>
2716+
klass.getConstructor().newInstance()
2717+
}
2718+
2719+
Some(ext.asInstanceOf[T])
2720+
} catch {
2721+
case _: NoSuchMethodException =>
2722+
throw new SparkException(
2723+
s"$name did not have a zero-argument constructor or a" +
2724+
" single-argument constructor that accepts SparkConf. Note: if the class is" +
2725+
" defined inside of another Scala class, then its constructors may accept an" +
2726+
" implicit parameter that references the enclosing class; in this case, you must" +
2727+
" define the class as a top-level class in order to prevent this extra" +
2728+
" parameter from breaking Spark's ability to find a valid constructor.")
2729+
2730+
case e: InvocationTargetException =>
2731+
e.getCause() match {
2732+
case uoe: UnsupportedOperationException =>
2733+
logDebug(s"Extension $name not being initialized.", uoe)
2734+
logInfo(s"Extension $name not being initialized.")
2735+
None
2736+
2737+
case null => throw e
2738+
2739+
case cause => throw cause
2740+
}
2741+
}
2742+
}
2743+
}
2744+
26902745
}
26912746

26922747
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.scalatest.Matchers
2727

2828
import org.apache.spark._
2929
import org.apache.spark.executor.TaskMetrics
30-
import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_CAPACITY
30+
import org.apache.spark.internal.config._
3131
import org.apache.spark.metrics.MetricsSystem
3232
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
3333

@@ -446,13 +446,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
446446
classOf[FirehoseListenerThatAcceptsSparkConf],
447447
classOf[BasicJobCounter])
448448
val conf = new SparkConf().setMaster("local").setAppName("test")
449-
.set("spark.extraListeners", listeners.map(_.getName).mkString(","))
449+
.set(EXTRA_LISTENERS, listeners.map(_.getName))
450450
sc = new SparkContext(conf)
451451
sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1)
452452
sc.listenerBus.listeners.asScala
453453
.count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1)
454454
sc.listenerBus.listeners.asScala
455-
.count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
455+
.count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
456456
}
457457

458458
test("add and remove listeners to/from LiveListenerBus queues") {

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

+55-1
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ import org.apache.commons.math3.stat.inference.ChiSquareTest
3838
import org.apache.hadoop.conf.Configuration
3939
import org.apache.hadoop.fs.Path
4040

41-
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
41+
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
4242
import org.apache.spark.internal.Logging
4343
import org.apache.spark.network.util.ByteUnit
44+
import org.apache.spark.scheduler.SparkListener
4445

4546
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
4647

@@ -1110,4 +1111,57 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
11101111
Utils.tryWithSafeFinallyAndFailureCallbacks {}(catchBlock = {}, finallyBlock = {})
11111112
TaskContext.unset
11121113
}
1114+
1115+
test("load extensions") {
1116+
val extensions = Seq(
1117+
classOf[SimpleExtension],
1118+
classOf[ExtensionWithConf],
1119+
classOf[UnregisterableExtension]).map(_.getName())
1120+
1121+
val conf = new SparkConf(false)
1122+
val instances = Utils.loadExtensions(classOf[Object], extensions, conf)
1123+
assert(instances.size === 2)
1124+
assert(instances.count(_.isInstanceOf[SimpleExtension]) === 1)
1125+
1126+
val extWithConf = instances.find(_.isInstanceOf[ExtensionWithConf])
1127+
.map(_.asInstanceOf[ExtensionWithConf])
1128+
.get
1129+
assert(extWithConf.conf eq conf)
1130+
1131+
class NestedExtension { }
1132+
1133+
val invalid = Seq(classOf[NestedExtension].getName())
1134+
intercept[SparkException] {
1135+
Utils.loadExtensions(classOf[Object], invalid, conf)
1136+
}
1137+
1138+
val error = Seq(classOf[ExtensionWithError].getName())
1139+
intercept[IllegalArgumentException] {
1140+
Utils.loadExtensions(classOf[Object], error, conf)
1141+
}
1142+
1143+
val wrongType = Seq(classOf[ListenerImpl].getName())
1144+
intercept[IllegalArgumentException] {
1145+
Utils.loadExtensions(classOf[Seq[_]], wrongType, conf)
1146+
}
1147+
}
1148+
1149+
}
1150+
1151+
private class SimpleExtension
1152+
1153+
private class ExtensionWithConf(val conf: SparkConf)
1154+
1155+
private class UnregisterableExtension {
1156+
1157+
throw new UnsupportedOperationException()
1158+
1159+
}
1160+
1161+
private class ExtensionWithError {
1162+
1163+
throw new IllegalArgumentException()
1164+
11131165
}
1166+
1167+
private class ListenerImpl extends SparkListener

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala

+8
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,12 @@ object StaticSQLConf {
8787
"implement Function1[SparkSessionExtension, Unit], and must have a no-args constructor.")
8888
.stringConf
8989
.createOptional
90+
91+
val QUERY_EXECUTION_LISTENERS = buildStaticConf("spark.sql.queryExecutionListeners")
92+
.doc("List of class names implementing QueryExecutionListener that will be automatically " +
93+
"added to newly created sessions. The classes should have either a no-arg constructor, " +
94+
"or a constructor that expects a SparkConf argument.")
95+
.stringConf
96+
.toSequence
97+
.createOptional
9098
}

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ abstract class BaseSessionStateBuilder(
266266
* This gets cloned from parent if available, otherwise is a new instance is created.
267267
*/
268268
protected def listenerManager: ExecutionListenerManager = {
269-
parentState.map(_.listenerManager.clone()).getOrElse(new ExecutionListenerManager)
269+
parentState.map(_.listenerManager.clone()).getOrElse(
270+
new ExecutionListenerManager(session.sparkContext.conf))
270271
}
271272

272273
/**

sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
2222
import scala.collection.mutable.ListBuffer
2323
import scala.util.control.NonFatal
2424

25+
import org.apache.spark.SparkConf
2526
import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability}
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.sql.execution.QueryExecution
29+
import org.apache.spark.sql.internal.StaticSQLConf._
30+
import org.apache.spark.util.Utils
2831

2932
/**
3033
* :: Experimental ::
@@ -72,7 +75,14 @@ trait QueryExecutionListener {
7275
*/
7376
@Experimental
7477
@InterfaceStability.Evolving
75-
class ExecutionListenerManager private[sql] () extends Logging {
78+
class ExecutionListenerManager private extends Logging {
79+
80+
private[sql] def this(conf: SparkConf) = {
81+
this()
82+
conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
83+
Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register)
84+
}
85+
}
7686

7787
/**
7888
* Registers the specified [[QueryExecutionListener]].
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.util
19+
20+
import java.util.concurrent.atomic.AtomicInteger
21+
22+
import org.apache.spark._
23+
import org.apache.spark.sql.execution.QueryExecution
24+
import org.apache.spark.sql.internal.StaticSQLConf._
25+
26+
class ExecutionListenerManagerSuite extends SparkFunSuite {
27+
28+
import CountingQueryExecutionListener._
29+
30+
test("register query execution listeners using configuration") {
31+
val conf = new SparkConf(false)
32+
.set(QUERY_EXECUTION_LISTENERS, Seq(classOf[CountingQueryExecutionListener].getName()))
33+
34+
val mgr = new ExecutionListenerManager(conf)
35+
assert(INSTANCE_COUNT.get() === 1)
36+
mgr.onSuccess(null, null, 42L)
37+
assert(CALLBACK_COUNT.get() === 1)
38+
39+
val clone = mgr.clone()
40+
assert(INSTANCE_COUNT.get() === 1)
41+
42+
clone.onSuccess(null, null, 42L)
43+
assert(CALLBACK_COUNT.get() === 2)
44+
}
45+
46+
}
47+
48+
private class CountingQueryExecutionListener extends QueryExecutionListener {
49+
50+
import CountingQueryExecutionListener._
51+
52+
INSTANCE_COUNT.incrementAndGet()
53+
54+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
55+
CALLBACK_COUNT.incrementAndGet()
56+
}
57+
58+
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
59+
CALLBACK_COUNT.incrementAndGet()
60+
}
61+
62+
}
63+
64+
private object CountingQueryExecutionListener {
65+
66+
val CALLBACK_COUNT = new AtomicInteger()
67+
val INSTANCE_COUNT = new AtomicInteger()
68+
69+
}

0 commit comments

Comments
 (0)