Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 220 #308

Merged
merged 7 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import scala.math.abs

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

/**
* :: Experimental ::
Expand Down Expand Up @@ -151,6 +155,45 @@ trait DataFrameSuiteBaseLike extends SparkContextProvider
}
}

/**
* Compares if two [[DataFrame]]s are equal without caring about order of rows, by
* finding elements in one DataFrame that is not in the other. The resulting
* DataFrame should be empty inferring the two DataFrames have the same elements.
* Also verifies that the schema is identical.
*/
def assertDataFrameNoOrderEquals(expected: DataFrame, result: DataFrame) {
assert(expected.schema, result.schema)
assertDataFrameDataEquals(expected, result)
}


/**
* Compares if two [[DataFrame]]s are equal without caring about order of rows, by
* finding elements in one DataFrame that is not in the other. The resulting
* DataFrame should be empty inferring the two DataFrames have the same elements.
* Does not compare the schema.
*/
def assertDataFrameDataEquals(expected: DataFrame, result: DataFrame): Unit = {
val expectedCol = "assertDataFrameNoOrderEquals_expected"
val actualCol = "assertDataFrameNoOrderEquals_actual"
expected.rdd.cache
result.rdd.cache
assert("Length not Equal", expected.rdd.count, result.rdd.count)

val columns = expected.columns.map(s => col(s))
val expectedElementsCount = expected
.groupBy(columns: _*)
.agg(count(lit(1)).as(expectedCol))
val resultElementsCount = result
.groupBy(columns: _*)
.agg(count(lit(1)).as(actualCol))

val diff = expectedElementsCount
.join(resultElementsCount, expected.columns, "full_outer")
.filter(col(expectedCol) =!= col(actualCol))
assertEmpty(diff.take(maxUnequalRowsToShow))
}

/**
* Zip RDD's with precise indexes. This is used so we can join two DataFrame's
* Rows together regardless of if the source is different but still compare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ class SampleDataFrameTest extends FunSuite with DataFrameSuiteBase {
assertDataFrameEquals(input, input)
}

test("dataframe should be equal with different order of rows") {
import sqlContext.implicits._
val inputListWithDuplicates = inputList ++ List(inputList.head)
val input = sc.parallelize(inputListWithDuplicates).toDF
val reverseInput = sc.parallelize(inputListWithDuplicates.reverse).toDF
assertDataFrameNoOrderEquals(input, reverseInput)
}

test("empty dataframes should be equal") {
import sqlContext.implicits._
val emptyList = spark.emptyDataset[Magic].toDF()
val emptyList2 = spark.emptyDataset[Magic].toDF()
assertDataFrameEquals(emptyList, emptyList2)
assertDataFrameNoOrderEquals(emptyList, emptyList2)
}

test("empty dataframes should be not be equal to nonempty ones") {
import sqlContext.implicits._
val emptyList = spark.emptyDataset[Magic].toDF()
val input = sc.parallelize(inputList).toDF
assertThrows[org.scalatest.exceptions.TestFailedException] {
assertDataFrameEquals(emptyList, input)
}
assertThrows[org.scalatest.exceptions.TestFailedException] {
assertDataFrameNoOrderEquals(emptyList, input)
}
assertThrows[org.scalatest.exceptions.TestFailedException] {
assertDataFrameEquals(input, emptyList)
}
assertThrows[org.scalatest.exceptions.TestFailedException] {
assertDataFrameNoOrderEquals(input, emptyList)
}
}

test("unequal dataframes should not be equal") {
import sqlContext.implicits._
val input = sc.parallelize(inputList).toDF
Expand All @@ -47,6 +81,19 @@ class SampleDataFrameTest extends FunSuite with DataFrameSuiteBase {
}
}

test("unequal dataframe with different order should not equal") {
import sqlContext.implicits._
val inputListWithDuplicates = inputList ++ List(inputList.head)
val input = sc.parallelize(inputListWithDuplicates).toDF
val input2 = sc.parallelize(inputList).toDF
intercept[org.scalatest.exceptions.TestFailedException] {
assertDataFrameNoOrderEquals(input, input2)
}
intercept[org.scalatest.exceptions.TestFailedException] {
assertDataFrameNoOrderEquals(input2, input)
}
}

test("dataframe approx expected") {
import sqlContext.implicits._
val input = sc.parallelize(inputList).toDF
Expand Down