diff --git a/tests/rptest/scale_tests/many_partitions_test.py b/tests/rptest/scale_tests/many_partitions_test.py index 3be34b6eaa52..f0d88454bfbe 100644 --- a/tests/rptest/scale_tests/many_partitions_test.py +++ b/tests/rptest/scale_tests/many_partitions_test.py @@ -29,7 +29,7 @@ from rptest.scale_tests.topic_scale_profiles import TopicScaleProfileManager from rptest.services.rpk_consumer import RpkConsumer from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, LoggingConfig, MetricsEndpoint -from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer, KgoVerifierRandomConsumer +from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer, KgoVerifierRandomConsumer from rptest.services.kgo_repeater_service import KgoRepeaterService, repeater_traffic from rptest.services.openmessaging_benchmark import OpenMessagingBenchmark from rptest.services.openmessaging_benchmark_configs import OMBSampleConfigurations @@ -612,20 +612,21 @@ def _write_and_random_read(self, scale: ScaleParameters, topic_names): # minutes during these events. expect_transmit_time += 600 - seq_consumer = KgoVerifierSeqConsumer( + verifier = KgoVerifierConsumerGroupConsumer( self.test_context, self.redpanda, target_topic, 0, + readers=math.ceil(scale.partition_limit / 5000), max_msgs=max_msgs, nodes=[self.preallocated_nodes[2]]) - seq_consumer.start(clean=False) + verifier.start(clean=False) - seq_consumer.wait(timeout_sec=expect_transmit_time) - assert seq_consumer.consumer_status.validator.invalid_reads == 0 + verifier.wait(timeout_sec=expect_transmit_time) + assert verifier.consumer_status.validator.invalid_reads == 0 if not scale.tiered_storage_enabled: - assert seq_consumer.consumer_status.validator.valid_reads >= fast_producer.produce_status.acked + msg_count_per_topic, \ - f"{seq_consumer.consumer_status.validator.valid_reads} >= {fast_producer.produce_status.acked} + {msg_count_per_topic}" + assert verifier.consumer_status.validator.valid_reads >= fast_producer.produce_status.acked + msg_count_per_topic, \ + f"{verifier.consumer_status.validator.valid_reads} >= {fast_producer.produce_status.acked} + {msg_count_per_topic}" self.free_preallocated_nodes()