Skip to content

Commit

Permalink
Fix test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Mridul Muralidharan committed Nov 8, 2023
1 parent d5ba0cc commit 585b493
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize any plugins before the task scheduler is initialized.
_plugins = PluginContainer(this, _resources.asJava)
_env.initiailzeShuffleManager()

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import java.io.File
import java.util.Locale

import scala.collection.concurrent
import scala.collection.mutable
Expand Down Expand Up @@ -195,11 +194,9 @@ class SparkEnv (
}

private[spark] def initiailzeShuffleManager(): Unit = {
Preconditions.checkState(null == _shuffleManager, "Shuffle manager already initialized")
// Must not be driver
Preconditions.checkState(executorId != SparkContext.DRIVER_IDENTIFIER,
"Should not be called in driver")
_shuffleManager = ShuffleManager.create(conf, isDriver = false)
Preconditions.checkState(null == _shuffleManager,
"Shuffle manager already initialized to %s", _shuffleManager)
_shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER)
}
}

Expand Down Expand Up @@ -371,7 +368,8 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null
// val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null
val shuffleManager: ShuffleManager = null
val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)

val blockManagerPort = if (isDriver) {
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.serializer.SerializerHelper
import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher, ShuffleManager}
import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
Expand Down Expand Up @@ -337,8 +337,11 @@ private[spark] class Executor(
PluginContainer(env, resources.asJava)
}

Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
env.initiailzeShuffleManager()
// Already initialized in local mode
if (!isLocal) {
Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
env.initiailzeShuffleManager()
}
}

metricsPoller.start()
Expand Down

0 comments on commit 585b493

Please sign in to comment.