Skip to content

Commit

Permalink
turn off union rdd parallel lising
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Dec 20, 2018
1 parent 4b7f2c5 commit 40ac044
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.language.implicitConversions

import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}

import yaooqinn.kyuubi.service.ServiceException

/**
* Kyuubi server level configuration which will be set when at the very beginning of server start.
*
Expand All @@ -44,6 +46,9 @@ object KyuubiConf {
def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
}

val KYUUBI_HOME: String =
sys.env.getOrElse("KYUUBI_HOME", throw new ServiceException("KYUUBI_HOME is not set!"))

/////////////////////////////////////////////////////////////////////////////////////////////////
// High Availability by ZooKeeper //
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -288,9 +293,7 @@ object KyuubiConf {
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
.doc("Default value to set spark.local.dir")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "local")
.createWithDefault(KYUUBI_HOME + File.separator + "local")

/////////////////////////////////////////////////////////////////////////////////////////////////
// Authentication //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object KyuubiSparkUtil extends Logging {
val HDFS_CLIENT_CACHE_DEFAULT = "true"
val FILE_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache"
val FILE_CLIENT_CACHE_DEFAULT = "true"
val RDD_PAR_LISTING: String = SPARK_PREFIX + "rdd.parallelListingThreshold"

// Runtime Spark Version
val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION
Expand Down Expand Up @@ -282,6 +283,14 @@ object KyuubiSparkUtil extends Logging {
if (UserGroupInformation.isSecurityEnabled) {
conf.setIfMissing(HDFS_CLIENT_CACHE, HDFS_CLIENT_CACHE_DEFAULT)
conf.setIfMissing(FILE_CLIENT_CACHE, FILE_CLIENT_CACHE_DEFAULT)
// If you are using Kyuubi against kerberized HDFS, you will run into HDFS_DELEGATION_TOKEN
// expiration in some particular sql queries. This exception is usually caught in
// HadoopRDD.getPartitions, where the JobConf has no Credentials because it is generated by
// Configuration, and the UGI.getCurrentUser contains only the oldest tokens which are
// destined to expire. The reason seems to be parallel listing UnionRDD's sub RDDs using a
// ForkJoinPool which points to another calling context. Turn off parallel listing seems
// to be a solution to this issue.
conf.setIfMissing(RDD_PAR_LISTING, Int.MaxValue.toString)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.util.control.NonFatal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hadoop.mapred.JobConf
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
Expand All @@ -41,7 +40,6 @@ import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils

class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging {

Expand Down Expand Up @@ -315,10 +313,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
statementId,
session.getUserName)
}
val hadoopConf = session.sparkSession.sparkContext.hadoopConfiguration
val jobConf = new JobConf(hadoopConf)
jobConf.setCredentials(session.ugi.getCredentials)
ReflectUtils.setFieldValue(session.sparkSession.sparkContext, "_hadoopConfiguration", jobConf)
session.sparkSession.sparkContext.setJobGroup(statementId, statement)
result = session.sparkSession.sql(statement)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
Expand Down

0 comments on commit 40ac044

Please sign in to comment.