Skip to content

Commit

Permalink
[SPARK-47154][SS][TESTS] Fix kafka-0-10-sql to use `ResetSystemProp…
Browse files Browse the repository at this point in the history
…erties` if `KafkaTestUtils` is used

### What changes were proposed in this pull request?

This PR aims to fix `kafka-0-10-sql` module to use `ResetSystemProperties` if `KafkaTestUtils` is used. The following test suites are fixed.

- ConsumerStrategySuite
- KafkaDataConsumerSuite
- KafkaMissingOffsetsTest
  - KafkaDontFailOnDataLossSuite
  - KafkaSourceStressForDontFailOnDataLossSuite
- KafkaTest
  - KafkaDelegationTokenSuite
  - KafkaMicroBatchSourceSuite
    - KafkaMicroBatchV1SourceWithAdminSuite
    - KafkaMicroBatchV2SourceWithAdminSuite
    - KafkaMicroBatchV1SourceSuite
    - KafkaMicroBatchV2SourceSuite
    - KafkaSourceStressSuite
  - KafkaOffsetReaderSuite
  - KafkaRelationSuite
    - KafkaRelationSuiteWithAdminV1
    - KafkaRelationSuiteWithAdminV2
    - KafkaRelationSuiteV1
    - KafkaRelationSuiteV2
  - KafkaSinkSuite
    - KafkaSinkMicroBatchStreamingSuite
    - KafkaContinuousSinkSuite
    - KafkaSinkBatchSuiteV1
    - KafkaSinkBatchSuiteV2

### Why are the changes needed?

Apache Spark `master` branch has two `KafkaTestUtils` classes.

```
$ find . -name KafkaTestUtils.scala
./connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
./connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
```

`KafkaTestUtils` of `kafka-0-10-sql` uses `System.setProperty` and affects 8 files. We need to use `ResetSystemProperties` to isolate the test cases.

https://github.com/apache/spark/blob/ee312ecb40ea5b5303fc794a3d494b6f27cda923/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L290

```
$ git grep KafkaTestUtils connector/kafka-0-10-sql | awk -F: '{print $1}' | sort | uniq
connector/kafka-0-10-sql/src/test/resources/log4j2.properties
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/ConsumerStrategySuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
```

### Does this PR introduce _any_ user-facing change?

No. This is a test-only PR.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45239 from dongjoon-hyun/SPARK-47154.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Feb 24, 2024
1 parent dd3f81c commit 18b8606
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import org.apache.kafka.common.TopicPartition
import org.mockito.Mockito.mock

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.util.ResetSystemProperties

class ConsumerStrategySuite extends SparkFunSuite {
class ConsumerStrategySuite extends SparkFunSuite with ResetSystemProperties {
private var testUtils: KafkaTestUtils = _

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.util.ResetSystemProperties

/**
* This is a basic test trait which will set up a Kafka cluster that keeps only several records in
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
* does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
* not 0 rather than sleeping a hard-code duration.
*/
trait KafkaMissingOffsetsTest extends SharedSparkSession {
trait KafkaMissingOffsetsTest extends SharedSparkSession with ResetSystemProperties {

protected var testUtils: KafkaTestUtils = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool
import org.apache.spark.util.ResetSystemProperties

/** A trait to clean cached Kafka producers in `afterAll` */
trait KafkaTest extends BeforeAndAfterAll {
trait KafkaTest extends BeforeAndAfterAll with ResetSystemProperties {
self: SparkFunSuite =>

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import org.apache.spark.kafka010.KafkaDelegationTokenTest
import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ResetSystemProperties

class KafkaDataConsumerSuite
extends SharedSparkSession
with PrivateMethodTester
with ResetSystemProperties
with KafkaDelegationTokenTest {

protected var testUtils: KafkaTestUtils = _
Expand Down

0 comments on commit 18b8606

Please sign in to comment.