Skip to content

Commit

Permalink
KAFKA-17857 Move AbstractResetIntegrationTest and subclasses to tools (
Browse files Browse the repository at this point in the history
…apache#17594)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat authored Nov 4, 2024
1 parent ebb3202 commit e3f9534
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,9 @@ project(':tools') {
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation project(':storage').sourceSets.test.output
testImplementation project(':streams')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams:integration-tests').sourceSets.test.output
testImplementation project(':test-common')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
Expand All @@ -2422,6 +2425,7 @@ project(':tools') {
testImplementation libs.apachedsLdifPartition

testRuntimeOnly libs.junitPlatformLanucher
testRuntimeOnly libs.hamcrest
testRuntimeOnly project(':test-common')
}

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@
<allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
<allow pkg="org.apache.kafka.server.log.remote.storage" />
<allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.streams" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
package org.apache.kafka.tools;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
Expand All @@ -41,7 +41,6 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.StreamsResetter;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
Expand All @@ -61,8 +60,7 @@
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(600)
Expand Down Expand Up @@ -247,7 +245,7 @@ public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(fi
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();

assertThat(resultRerun, equalTo(result));
assertEquals(result, resultRerun);

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
Expand Down Expand Up @@ -307,17 +305,17 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();

assertThat(resultRerun, equalTo(result));
assertThat(resultRerun2, equalTo(result2));
assertEquals(result, resultRerun);
assertEquals(result2, resultRerun2);

if (!useRepartitioned) {
final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), appID + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig);
final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);

for (int i = 0; i < 10; i++) {
assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11)));
assertEquals(resultIntermediate.get(i + 11), resultIntermediate.get(i));
}
assertThat(resultIntermediate.get(10), equalTo(badMessage));
assertEquals(badMessage, resultIntermediate.get(10));
}

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
package org.apache.kafka.tools;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.network.SocketServerConfigs;
Expand All @@ -24,7 +24,6 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.StreamsResetter;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -48,10 +47,9 @@
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests local state store and global application cleanup.
Expand Down Expand Up @@ -210,7 +208,7 @@ public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestI

// Reset will success with --force, it will force delete active members on broker side
cleanGlobal(false, "--force", null, appID);
assertThat("Group is not empty after cleanGlobal", isEmptyConsumerGroup(adminClient, appID));
assertTrue(isEmptyConsumerGroup(adminClient, appID), "Group is not empty after cleanGlobal");

assertInternalTopicsGotDeleted(null);

Expand All @@ -219,7 +217,7 @@ public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestI
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();

assertThat(resultRerun, equalTo(result));
assertEquals(result, resultRerun);
cleanGlobal(false, "--force", null, appID);
}

Expand Down Expand Up @@ -259,7 +257,7 @@ public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(final
streams.close();

result.remove(0);
assertThat(resultRerun, equalTo(result));
assertEquals(result, resultRerun);

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
Expand Down Expand Up @@ -306,7 +304,7 @@ public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic(f
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();

assertThat(resultRerun, equalTo(result));
assertEquals(result, resultRerun);

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
Expand Down Expand Up @@ -348,7 +346,7 @@ public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(fin
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();

assertThat(resultRerun, equalTo(result));
assertEquals(result, resultRerun);

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
package org.apache.kafka.tools;

import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.network.SocketServerConfigs;
Expand Down

0 comments on commit e3f9534

Please sign in to comment.