diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 537522326fc78..fe90895cacb53 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -474,6 +474,27 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } driverConf.set(EXECUTOR_ID, arguments.executorId) + // Set executor memory related config here according to resource profile + if (cfg.resourceProfile.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) { + cfg.resourceProfile + .executorResources + .foreach { + case (ResourceProfile.OFFHEAP_MEM, request) => + driverConf.set(MEMORY_OFFHEAP_SIZE.key, request.amount.toString + "m") + logInfo(s"Set executor off-heap memory to $request") + case (ResourceProfile.MEMORY, request) => + driverConf.set(EXECUTOR_MEMORY.key, request.amount.toString + "m") + logInfo(s"Set executor memory to $request") + case (ResourceProfile.OVERHEAD_MEM, request) => + // Maybe don't need to set this since it's nearly used by tasks. + driverConf.set(EXECUTOR_MEMORY_OVERHEAD.key, request.amount.toString + "m") + logInfo(s"Set executor memory_overhead to $request") + case (ResourceProfile.CORES, request) => + driverConf.set(EXECUTOR_CORES.key, request.amount.toString) + logInfo(s"Set executor cores to $request") + case _ => + } + } val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) // Set the application attemptId in the BlockStoreClient if available.