Skip to content

Commit

Permalink
Increase limit
Browse files Browse the repository at this point in the history
  • Loading branch information
longvu-db committed Sep 17, 2024
1 parent 34babe5 commit ab81500
Showing 1 changed file with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package io.delta.tables

import org.apache.spark.sql.Row
import org.apache.spark.sql.connect.SparkConnectServerTest
import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.test.DeltaQueryTest

class DeltaMergeBuilderSuite extends DeltaQueryTest with RemoteSparkSession {
class DeltaMergeBuilderSuite extends DeltaQueryTest
with RemoteSparkSession
with SparkConnectServerTest {
private def writeTargetTable(path: String): Unit = {
val session = spark
import session.implicits._
Expand Down Expand Up @@ -382,30 +386,32 @@ class DeltaMergeBuilderSuite extends DeltaQueryTest with RemoteSparkSession {
}

test("merge dataframe with many columns") {
withTempPath { dir =>
val path = dir.getAbsolutePath
var df1 = spark.range(1).toDF
val numColumns = 100
for (i <- 0 until numColumns) {
df1 = df1.withColumn(s"col$i", col("id"))
}
df1.write.mode("overwrite").format("delta").save(path)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, path)
withSparkEnvConfs((CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT.key, "4096")) {
withTempPath { dir =>
val path = dir.getAbsolutePath
var df1 = spark.range(1).toDF
val numColumns = 100
for (i <- 0 until numColumns) {
df1 = df1.withColumn(s"col$i", col("id"))
}
df1.write.mode("overwrite").format("delta").save(path)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, path)

var df2 = spark.range(1).toDF
for (i <- 0 until numColumns) {
df2 = df2.withColumn(s"col$i", col("id") + 1)
}

deltaTable
.as("t")
.merge(df2.as("s"), "s.id = t.id")
.whenMatched().updateAll()
.execute()

var df2 = spark.range(1).toDF
for (i <- 0 until numColumns) {
df2 = df2.withColumn(s"col$i", col("id") + 1)
checkAnswer(
deltaTable.toDF,
Seq(df2.collectAsList().get(0)))
}

deltaTable
.as("t")
.merge(df2.as("s"), "s.id = t.id")
.whenMatched().updateAll()
.execute()

checkAnswer(
deltaTable.toDF,
Seq(df2.collectAsList().get(0)))
}
}
}

0 comments on commit ab81500

Please sign in to comment.