Skip to content

Commit

Permalink
KAFKA-15853: Move OffsetConfig to group-coordinator module (apache#15161
Browse files Browse the repository at this point in the history
)


Reviewers: Mickael Maison <[email protected]>, David Jacot <[email protected]>, Nikolay <[email protected]>
  • Loading branch information
OmniaGM authored Jan 11, 2024
1 parent da175b5 commit dba789d
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 102 deletions.
23 changes: 12 additions & 11 deletions core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard

Expand Down Expand Up @@ -1751,17 +1752,17 @@ object GroupCoordinator {
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}

private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetsTopicCompressionType = config.offsetsTopicCompressionType,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
config.offsetMetadataMaxSize,
config.offsetsLoadBufferSize,
config.offsetsRetentionMinutes * 60L * 1000L,
config.offsetsRetentionCheckIntervalMs,
config.offsetsTopicPartitions,
config.offsetsTopicSegmentBytes,
config.offsetsTopicReplicationFactor,
config.offsetsTopicCompressionType,
config.offsetCommitTimeoutMs,
config.offsetCommitRequiredAcks
)

private[group] def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicIdPartition, TopicPartition}
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
Expand Down
66 changes: 0 additions & 66 deletions core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala

This file was deleted.

20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Locale, Properties}
import kafka.cluster.EndPoint
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
Expand All @@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor, UniformAssignor}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
Expand Down Expand Up @@ -181,16 +181,16 @@ object Defaults {
val ConsumerGroupAssignors = List(classOf[UniformAssignor].getName, classOf[RangeAssignor].getName).asJava

/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
val OffsetsLoadBufferSize = OffsetConfig.DefaultLoadBufferSize
val OffsetsTopicReplicationFactor = OffsetConfig.DefaultOffsetsTopicReplicationFactor
val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions
val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes
val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionType.id
val OffsetMetadataMaxSize = OffsetConfig.DEFAULT_MAX_METADATA_SIZE
val OffsetsLoadBufferSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE
val OffsetsTopicReplicationFactor = OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR
val OffsetsTopicPartitions: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS
val OffsetsTopicSegmentBytes: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES
val OffsetsTopicCompressionCodec: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id
val OffsetsRetentionMinutes: Int = 7 * 24 * 60
val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs
val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs
val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks
val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS
val OffsetCommitTimeoutMs = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS
val OffsetCommitRequiredAcks = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS

/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMs = TransactionStateManager.DefaultTransactionalIdExpirationMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package kafka.server

import java.util.{Collections, Objects, Properties}
import java.util.concurrent.TimeUnit

import kafka.api.SaslSetup
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
Expand All @@ -31,6 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.coordinator.group.OffsetConfig
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}

Expand Down Expand Up @@ -108,7 +107,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
s"Unexpected ${KafkaConfig.InterBrokerListenerNameProp} for broker ${config.brokerId}")
}

TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS,
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)

createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.VerificationGuard
Expand Down Expand Up @@ -3816,7 +3817,7 @@ class GroupCoordinatorTest {
val producerEpoch: Short = 3

val offsets = Map(
tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DefaultMaxMetadataSize + 1), 0)
tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DEFAULT_MAX_METADATA_SIZE + 1), 0)
)

val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, offsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
Expand Down Expand Up @@ -79,16 +80,16 @@ class GroupMetadataManagerTest {

private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetsTopicCompressionType = config.offsetsTopicCompressionType,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
new OffsetConfig(config.offsetMetadataMaxSize,
config.offsetsLoadBufferSize,
config.offsetsRetentionMinutes * 60 * 1000L,
config.offsetsRetentionCheckIntervalMs,
config.offsetsTopicPartitions,
config.offsetsTopicSegmentBytes,
config.offsetsTopicReplicationFactor,
config.offsetsTopicCompressionType,
config.offsetCommitTimeoutMs,
config.offsetCommitRequiredAcks)
}

@BeforeEach
Expand Down Expand Up @@ -775,7 +776,7 @@ class GroupMetadataManagerTest {
)

// create a GroupMetadata record larger then offsets.load.buffer.size (here at least 16 bytes larger)
val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
val assignmentSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE + 16
val memberId = "98098230493"

val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
Expand Down
Loading

0 comments on commit dba789d

Please sign in to comment.