Skip to content

Commit

Permalink
debug log
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Dec 28, 2023
1 parent 4f25b3f commit bc55b7b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -457,6 +458,54 @@ case class AdaptiveSparkPlanExec(
this.inputPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].inputPlan
}

private val MaxLevel = 2

private def p2s(pro: Product, sb: StringBuilder, numIndent: Int = 1, level: Int = 0): Unit = {
val (canonPro, isPlan, pLevel) = pro match {
case sp: SparkPlan => (sp.canonicalized, true, 0)
case product => (product, false, level)
}
sb.append("\n").append(" " * 4 * numIndent)
.append(canonPro.productPrefix).append(" hash: ").append(canonPro.##)
(0 until canonPro.productArity).foreach { idx =>
canonPro.productElement(idx) match {
case _: SparkPlan =>
// ignore
case Seq(_: SparkPlan, _: SparkPlan, _@_*) =>
// ignore
case st: StructType =>
p2s(st, sb, numIndent + 1, MaxLevel)
case p: Product if pLevel < MaxLevel =>
p2s(p, sb, numIndent + 1, pLevel + 1)
case o =>
if (pLevel < MaxLevel) {
sb.append("\n").append(" " * 4 * (numIndent + 1))
.append(o.getClass.getSimpleName).append(" hash: ").append(o.##)
}
}
}
if (isPlan) {
canonPro.asInstanceOf[SparkPlan].children.foreach(p2s(_, sb, numIndent + 1))
}
}

private def e2s(ex: Exchange, moreDetails: Boolean = false): String = {
val sb = new StringBuilder(
s"${ex.productPrefix}(id: ${ex.id}, canonicalized hash: ${ex.canonicalized.##})")
if (moreDetails) {
sb.append("\nWhole tree info:")
p2s(ex, sb)
}
sb.toString()
}

private def q2s(qs: QueryStageExec): String =
s"${qs.productPrefix}(id: ${qs.id}, plan: ${qs.plan.productPrefix})"

private def m2s: String = context.stageCache.map { case (k, v) =>
s"{${k.productPrefix}(hash: ${k.##}) -> ${q2s(v)}"
}.mkString("\n ")

/**
* This method is called recursively to traverse the plan tree bottom-up and create a new query
* stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of
Expand All @@ -469,9 +518,11 @@ case class AdaptiveSparkPlanExec(
*/
private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
case e: Exchange =>
logWarning(s"==> Got an exchange ${e2s(e)}, current cache map:\n $m2s")
// First have a quick check in the `stageCache` without having to traverse down the node.
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
logWarning(s"==>it can reuse the ${q2s(existingStage)} by key ${e.canonicalized.##}")
val stage = reuseQueryStage(existingStage, e)
val isMaterialized = stage.isMaterialized
CreateStageResult(
Expand All @@ -480,6 +531,7 @@ case class AdaptiveSparkPlanExec(
newStages = if (isMaterialized) Seq.empty else Seq(stage))

case _ =>
logWarning("==>No cached stage can be reused, create a new one")
val result = createQueryStages(e.child)
val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange]
// Create a query stage only when all the child query stages are ready.
Expand All @@ -492,7 +544,13 @@ case class AdaptiveSparkPlanExec(
val queryStage = context.stageCache.getOrElseUpdate(
newStage.plan.canonicalized, newStage)
if (queryStage.ne(newStage)) {
logWarning(s"==>The converted stage ${q2s(newStage)} can reuse the cached " +
s"stage ${q2s(queryStage)} by hash ${newStage.plan.canonicalized.##}")
newStage = reuseQueryStage(queryStage, e)
} else {
logWarning(s"==>A new converted stage ${q2s(newStage)} with hash " +
s"${newStage.plan.canonicalized.##}, cache it. Stage details:\n" +
s"${e2s(newStage.plan.asInstanceOf[Exchange], true)}")
}
}
val isMaterialized = newStage.isMaterialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -190,7 +191,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
* `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
*/
case class RelationConversions(
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] with Logging {
private def isConvertible(relation: HiveTableRelation): Boolean = {
isConvertible(relation.tableMeta)
}
Expand All @@ -203,14 +204,33 @@ case class RelationConversions(

private val metastoreCatalog = sessionCatalog.metastoreCatalog

private def canConvert(query: LogicalPlan, r: HiveTableRelation): Boolean = {
val ret1 = query.resolved
val ret2 = DDLUtils.isHiveTable(r.tableMeta)
val ret3 = (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
val ret4 = isConvertible (r)
logWarning(s"==>query.resolved: $ret1, isHiveTable: $ret2, isPartitionedEnabled: $ret3," +
s"isConvertible $ret4")
if (!ret2) {
logWarning(s"==>not hive table due to its provider: ${r.tableMeta.provider}")
}
if (!ret3) {
logWarning(s"==>partitioned not enabled due to (isPartitioned: ${r.isPartitioned}," +
s" conf enabled: ${conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)})")
}
if (!ret4) {
logWarning(s"==>not convertible due to its serde is " +
s"${r.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)}")
}
ret1 && ret2 && ret3 && ret4
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
case InsertIntoStatement(
r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
(!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
&& isConvertible(r) =>
if canConvert(query, r) =>
InsertIntoStatement(metastoreCatalog.convert(r), partition, cols,
query, overwrite, ifPartitionNotExists)

Expand Down

0 comments on commit bc55b7b

Please sign in to comment.