diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index bf810f3f65877..65581e05089e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils @@ -457,6 +458,54 @@ case class AdaptiveSparkPlanExec( this.inputPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].inputPlan } + private val MaxLevel = 2 + + private def p2s(pro: Product, sb: StringBuilder, numIndent: Int = 1, level: Int = 0): Unit = { + val (canonPro, isPlan, pLevel) = pro match { + case sp: SparkPlan => (sp.canonicalized, true, 0) + case product => (product, false, level) + } + sb.append("\n").append(" " * 4 * numIndent) + .append(canonPro.productPrefix).append(" hash: ").append(canonPro.##) + (0 until canonPro.productArity).foreach { idx => + canonPro.productElement(idx) match { + case _: SparkPlan => + // ignore + case Seq(_: SparkPlan, _: SparkPlan, _@_*) => + // ignore + case st: StructType => + p2s(st, sb, numIndent + 1, MaxLevel) + case p: Product if pLevel < MaxLevel => + p2s(p, sb, numIndent + 1, pLevel + 1) + case o => + if (pLevel < MaxLevel) { + sb.append("\n").append(" " * 4 * (numIndent + 1)) + .append(o.getClass.getSimpleName).append(" hash: ").append(o.##) + } + } + } + if (isPlan) { + canonPro.asInstanceOf[SparkPlan].children.foreach(p2s(_, sb, numIndent + 1)) + } + } + + private def e2s(ex: Exchange, moreDetails: Boolean = false): String = { + val sb = new StringBuilder( + s"${ex.productPrefix}(id: ${ex.id}, canonicalized hash: ${ex.canonicalized.##})") + if (moreDetails) { + sb.append("\nWhole tree info:") + p2s(ex, sb) + } + sb.toString() + } + + private def q2s(qs: QueryStageExec): String = + s"${qs.productPrefix}(id: ${qs.id}, plan: ${qs.plan.productPrefix})" + + private def m2s: String = context.stageCache.map { case (k, v) => + s"{${k.productPrefix}(hash: ${k.##}) -> ${q2s(v)}" + }.mkString("\n ") + /** * This method is called recursively to traverse the plan tree bottom-up and create a new query * stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of @@ -469,9 +518,11 @@ case class AdaptiveSparkPlanExec( */ private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match { case e: Exchange => + logWarning(s"==> Got an exchange ${e2s(e)}, current cache map:\n $m2s") // First have a quick check in the `stageCache` without having to traverse down the node. context.stageCache.get(e.canonicalized) match { case Some(existingStage) if conf.exchangeReuseEnabled => + logWarning(s"==>it can reuse the ${q2s(existingStage)} by key ${e.canonicalized.##}") val stage = reuseQueryStage(existingStage, e) val isMaterialized = stage.isMaterialized CreateStageResult( @@ -480,6 +531,7 @@ case class AdaptiveSparkPlanExec( newStages = if (isMaterialized) Seq.empty else Seq(stage)) case _ => + logWarning("==>No cached stage can be reused, create a new one") val result = createQueryStages(e.child) val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange] // Create a query stage only when all the child query stages are ready. @@ -492,7 +544,13 @@ case class AdaptiveSparkPlanExec( val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { + logWarning(s"==>The converted stage ${q2s(newStage)} can reuse the cached " + + s"stage ${q2s(queryStage)} by hash ${newStage.plan.canonicalized.##}") newStage = reuseQueryStage(queryStage, e) + } else { + logWarning(s"==>A new converted stage ${q2s(newStage)} with hash " + + s"${newStage.plan.canonicalized.##}, cache it. Stage details:\n" + + s"${e2s(newStage.plan.asInstanceOf[Exchange], true)}") } } val isMaterialized = newStage.isMaterialized diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c8a5c03bdd2d6..6168aa9741ba3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ @@ -190,7 +191,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. */ case class RelationConversions( - sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { + sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] with Logging { private def isConvertible(relation: HiveTableRelation): Boolean = { isConvertible(relation.tableMeta) } @@ -203,14 +204,33 @@ case class RelationConversions( private val metastoreCatalog = sessionCatalog.metastoreCatalog + private def canConvert(query: LogicalPlan, r: HiveTableRelation): Boolean = { + val ret1 = query.resolved + val ret2 = DDLUtils.isHiveTable(r.tableMeta) + val ret3 = (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) + val ret4 = isConvertible (r) + logWarning(s"==>query.resolved: $ret1, isHiveTable: $ret2, isPartitionedEnabled: $ret3," + + s"isConvertible $ret4") + if (!ret2) { + logWarning(s"==>not hive table due to its provider: ${r.tableMeta.provider}") + } + if (!ret3) { + logWarning(s"==>partitioned not enabled due to (isPartitioned: ${r.isPartitioned}," + + s" conf enabled: ${conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)})") + } + if (!ret4) { + logWarning(s"==>not convertible due to its serde is " + + s"${r.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)}") + } + ret1 && ret2 && ret3 && ret4 + } + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoStatement( r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists) - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) - && isConvertible(r) => + if canConvert(query, r) => InsertIntoStatement(metastoreCatalog.convert(r), partition, cols, query, overwrite, ifPartitionNotExists)