diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index fc3751e..af93446 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -13,6 +13,7 @@ import ( func Test_Should_Produce_Successfully(t *testing.T) { // Given + t.Parallel() topic := "produce-topic" brokerAddress := "localhost:9092" @@ -39,6 +40,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { func Test_Should_Batch_Produce_Successfully(t *testing.T) { // Given + t.Parallel() topic := "batch-produce-topic" brokerAddress := "localhost:9092" @@ -68,6 +70,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { func Test_Should_Consume_Message_Successfully(t *testing.T) { // Given + t.Parallel() topic := "topic" consumerGroup := "topic-cg" brokerAddress := "localhost:9092" @@ -107,6 +110,7 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) { func Test_Should_Pause_And_Resume_Successfully(t *testing.T) { // Given + t.Parallel() topic := "pause-topic" consumerGroup := "pause-topic-cg" brokerAddress := "localhost:9092" @@ -168,6 +172,7 @@ func Test_Should_Pause_And_Resume_Successfully(t *testing.T) { func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { // Given + t.Parallel() topic := "batch-topic" consumerGroup := "batch-topic-cg" brokerAddress := "localhost:9092" @@ -217,11 +222,12 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Disabled(t *testing.T) { // Given + t.Parallel() topic := "nontransactional-cronsumer-topic" consumerGroup := "nontransactional-cronsumer-cg" brokerAddress := "localhost:9092" - retryTopic := "retry-topic" + retryTopic := "nontransactional-retry-topic" _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{ {Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, @@ -274,6 +280,7 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { // Given + t.Parallel() topic := "cronsumer-topic" consumerGroup := "cronsumer-cg" brokerAddress := "localhost:9092" @@ -320,6 +327,7 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { func Test_Should_Propagate_Custom_Headers_With_Kafka_Cronsumer_Successfully(t *testing.T) { // Given + t.Parallel() topic := "cronsumer-header-topic" consumerGroup := "cronsumer-header-cg" brokerAddress := "localhost:9092" @@ -389,6 +397,7 @@ func Test_Should_Propagate_Custom_Headers_With_Kafka_Cronsumer_Successfully(t *t func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) { // Given + t.Parallel() topic := "batch-topic-prebatch-enabled" consumerGroup := "batch-topic-prebatch-cg" brokerAddress := "localhost:9092" @@ -442,6 +451,7 @@ func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) { func Test_Should_Skip_Message_When_Header_Filter_Given(t *testing.T) { // Given + t.Parallel() topic := "header-filter-topic" consumerGroup := "header-filter-cg" brokerAddress := "localhost:9092"