@@ -3,19 +3,23 @@ package spark.jobserver
3
3
import java .io .IOException
4
4
import java .nio .file .{Files , Paths }
5
5
import java .nio .charset .Charset
6
- import java .util .concurrent .TimeUnit
6
+ import java .util .concurrent .{ ExecutorService , Executors , TimeUnit }
7
7
8
8
import akka .actor ._
9
9
import akka .cluster .Cluster
10
10
import akka .cluster .ClusterEvent .{MemberUp , MemberEvent , InitialStateAsEvents }
11
11
import akka .util .Timeout
12
+ import com .google .common .util .concurrent .ThreadFactoryBuilder
12
13
import com .typesafe .config .{Config , ConfigFactory , ConfigRenderOptions }
13
14
import ooyala .common .akka .InstrumentedActor
14
15
import spark .jobserver .util .SparkJobUtils
16
+
15
17
import scala .collection .mutable
16
18
import scala .util .{Try , Success , Failure }
17
19
import scala .sys .process ._
18
20
21
+ import scala .collection .concurrent .TrieMap
22
+
19
23
/**
20
24
* The AkkaClusterSupervisorActor launches Spark Contexts as external processes
21
25
* that connect back with the master node via Akka Cluster.
@@ -50,8 +54,10 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
50
54
// TODO: try to pass this state to the jobManager at start instead of having to track
51
55
// extra state. What happens if the WebApi process dies before the forked process
52
56
// starts up? Then it never gets initialized, and this state disappears.
53
- private val contextInitInfos = mutable.HashMap .empty[String , (Boolean , ActorRef => Unit , Throwable => Unit )]
54
-
57
+ private val contextInitInfos = TrieMap .empty[String , (Boolean , ActorRef => Unit , Throwable => Unit )]
58
+ private val contextInitExecutorService = Executors .newCachedThreadPool(
59
+ new ThreadFactoryBuilder ().setDaemon(true ).setNameFormat(" job-server-context-init-thread -% d" ).build
60
+ )
55
61
// actor name -> (JobManagerActor ref, ResultActor ref)
56
62
private val contexts = mutable.HashMap .empty[String , (ActorRef , ActorRef )]
57
63
@@ -212,26 +218,31 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
212
218
cmdString = cmdString + s " ${contextConfig.getString(" spark.proxy.user" )}"
213
219
}
214
220
215
- val pb = Process (cmdString)
216
- val pio = new ProcessIO (_ => (),
217
- stdout => scala.io.Source .fromInputStream(stdout)
218
- .getLines.foreach(println),
219
- stderr => scala.io.Source .fromInputStream(stderr).getLines().foreach(println))
220
- logger.info(" Starting to execute sub process {}" , pb)
221
- val processStart = Try {
222
- val process = pb.run(pio)
223
- val exitVal = process.exitValue()
224
- if (exitVal != 0 ) {
225
- throw new IOException (" Failed to launch context process, got exit code " + exitVal)
226
- }
227
- }
228
-
229
- if (processStart.isSuccess) {
230
- contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
231
- } else {
232
- failureFunc(processStart.failed.get)
233
- }
221
+ contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
222
+
223
+ contextInitExecutorService.submit(new Runnable {
224
+ override def run (): Unit = {
225
+ val pb = Process (cmdString)
226
+ val pio = new ProcessIO (_ => (),
227
+ stdout => scala.io.Source .fromInputStream(stdout)
228
+ .getLines.foreach(println),
229
+ stderr => scala.io.Source .fromInputStream(stderr).getLines().foreach(println))
230
+
231
+ logger.info(" Starting to execute sub process {}" , pb)
232
+ val processStart = Try {
233
+ val process = pb.run(pio)
234
+ val exitVal = process.exitValue()
235
+ if (exitVal != 0 ) {
236
+ throw new IOException (" Failed to launch context process, got exit code " + exitVal)
237
+ }
238
+ }
234
239
240
+ if (processStart.isFailure) {
241
+ failureFunc(processStart.failed.get)
242
+ contextInitInfos.remove(contextActorName)
243
+ }
244
+ }
245
+ })
235
246
}
236
247
237
248
private def createContextDir (name : String ,
0 commit comments