Skip to content

Commit

Permalink
KAFKA-18302; Update CoordinatorRecord (apache#18512)
Browse files Browse the repository at this point in the history
This patch does a few things:
1) Replace ApiMessageAndVersion by ApiMessage in CoordinatorRecord for the key
2) Leverage the fact that ApiMessage exposes the apiKey. Hence we don't need to specify the key anymore.

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
dajac authored Jan 21, 2025
1 parent 247c0f0 commit b368c38
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 870 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,22 +233,25 @@ public static byte[] toVersionPrefixedBytes(final short version, final Message m
else return Utils.toArray(buffer);
}

public static ByteBuffer toCoordinatorTypePrefixedByteBuffer(final short type, final Message message) {
public static ByteBuffer toCoordinatorTypePrefixedByteBuffer(final ApiMessage message) {
if (message.apiKey() < 0) {
throw new IllegalArgumentException("Cannot serialize a message without an api key.");
}
if (message.highestSupportedVersion() != 0 || message.lowestSupportedVersion() != 0) {
throw new IllegalArgumentException("Cannot serialize a message with a different version than 0.");
}

ObjectSerializationCache cache = new ObjectSerializationCache();
int messageSize = message.size(cache, (short) 0);
ByteBufferAccessor bytes = new ByteBufferAccessor(ByteBuffer.allocate(messageSize + 2));
bytes.writeShort(type);
bytes.writeShort(message.apiKey());
message.write(bytes, cache, (short) 0);
bytes.flip();
return bytes.buffer();
}

public static byte[] toCoordinatorTypePrefixedBytes(final short type, final Message message) {
ByteBuffer buffer = toCoordinatorTypePrefixedByteBuffer(type, message);
public static byte[] toCoordinatorTypePrefixedBytes(final ApiMessage message) {
ByteBuffer buffer = toCoordinatorTypePrefixedByteBuffer(message);
// take the inner array directly if it is full of data.
if (buffer.hasArray() &&
buffer.arrayOffset() == 0 &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,37 @@
*/
package org.apache.kafka.coordinator.common.runtime;

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import java.util.Objects;

/**
* A Record which contains an {{@link ApiMessageAndVersion}} as key and
* A Record which contains an {{@link ApiMessage}} as key and
* an {{@link ApiMessageAndVersion}} as value. The value could be null to
* represent a tombstone.
*
* This class is immutable.
*/
public class CoordinatorRecord {

public static CoordinatorRecord record(
ApiMessage key,
ApiMessageAndVersion value
) {
return new CoordinatorRecord(key, value);
}

public static CoordinatorRecord tombstone(
ApiMessage key
) {
return new CoordinatorRecord(key, null);
}

/**
* The key of the record.
*/
private final ApiMessageAndVersion key;
private final ApiMessage key;

/**
* The value of the record or null if the record is
Expand All @@ -45,18 +60,30 @@ public class CoordinatorRecord {
* @param key A non-null key.
* @param value A key or null.
*/
public CoordinatorRecord(
ApiMessageAndVersion key,
private CoordinatorRecord(
ApiMessage key,
ApiMessageAndVersion value
) {
this.key = Objects.requireNonNull(key);
if (key.apiKey() < 0) {
throw new IllegalArgumentException("The key must have a type.");
}

this.value = value;
if (value != null) {
if (value.message().apiKey() < 0) {
throw new IllegalArgumentException("The value must have a type.");
}
if (value.message().apiKey() != key.apiKey()) {
throw new IllegalArgumentException("The key and the value must have the same type.");
}
}
}

/**
* @return The key.
*/
public ApiMessageAndVersion key() {
public ApiMessage key() {
return this.key;
}

Expand All @@ -71,18 +98,13 @@ public ApiMessageAndVersion value() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CoordinatorRecord record = (CoordinatorRecord) o;

if (!Objects.equals(key, record.key)) return false;
return Objects.equals(value, record.value);
CoordinatorRecord that = (CoordinatorRecord) o;
return Objects.equals(key, that.key) && Objects.equals(value, that.value);
}

@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
return Objects.hash(key, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(),
record.key().message()
record.key()
);
}

Expand All @@ -72,15 +71,15 @@ public CoordinatorRecord deserialize(
readMessage(keyMessage, keyBuffer, recordType, "key");

if (valueBuffer == null) {
return new CoordinatorRecord(new ApiMessageAndVersion(keyMessage, recordType), null);
return CoordinatorRecord.tombstone(keyMessage);
}

final ApiMessage valueMessage = apiMessageValueFor(recordType);
final short valueVersion = readVersion(valueBuffer, "value");
readMessage(valueMessage, valueBuffer, valueVersion, "value");

return new CoordinatorRecord(
new ApiMessageAndVersion(keyMessage, recordType),
return CoordinatorRecord.record(
keyMessage,
new ApiMessageAndVersion(valueMessage, valueVersion)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,32 @@
public class CoordinatorRecordTest {
@Test
public void testAttributes() {
ApiMessageAndVersion key = new ApiMessageAndVersion(mock(ApiMessage.class), (short) 0);
ApiMessage key = mock(ApiMessage.class);
ApiMessageAndVersion value = new ApiMessageAndVersion(mock(ApiMessage.class), (short) 0);
CoordinatorRecord record = new CoordinatorRecord(key, value);
CoordinatorRecord record = CoordinatorRecord.record(key, value);
assertEquals(key, record.key());
assertEquals(value, record.value());
}

@Test
public void testKeyCannotBeNull() {
assertThrows(NullPointerException.class, () -> new CoordinatorRecord(null, null));
assertThrows(NullPointerException.class, () -> CoordinatorRecord.record(null, null));
}

@Test
public void testValueCanBeNull() {
ApiMessageAndVersion key = new ApiMessageAndVersion(mock(ApiMessage.class), (short) 0);
CoordinatorRecord record = new CoordinatorRecord(key, null);
ApiMessage key = mock(ApiMessage.class);
CoordinatorRecord record = CoordinatorRecord.record(key, null);
assertEquals(key, record.key());
assertNull(record.value());
}

@Test
public void testEquals() {
ApiMessageAndVersion key = new ApiMessageAndVersion(mock(ApiMessage.class), (short) 0);
ApiMessage key = mock(ApiMessage.class);
ApiMessageAndVersion value = new ApiMessageAndVersion(mock(ApiMessage.class), (short) 0);
CoordinatorRecord record1 = new CoordinatorRecord(key, value);
CoordinatorRecord record2 = new CoordinatorRecord(key, value);
CoordinatorRecord record1 = CoordinatorRecord.record(key, value);
CoordinatorRecord record2 = CoordinatorRecord.record(key, value);
assertEquals(record1, record2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1060,11 +1060,10 @@ object GroupMetadataManager {
* @return key for offset commit message
*/
def offsetCommitKey(groupId: String, topicPartition: TopicPartition): Array[Byte] = {
MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.OFFSET_COMMIT.id(),
new OffsetCommitKey()
.setGroup(groupId)
.setTopic(topicPartition.topic)
.setPartition(topicPartition.partition))
MessageUtil.toCoordinatorTypePrefixedBytes(new OffsetCommitKey()
.setGroup(groupId)
.setTopic(topicPartition.topic)
.setPartition(topicPartition.partition))
}

/**
Expand All @@ -1074,9 +1073,8 @@ object GroupMetadataManager {
* @return key bytes for group metadata message
*/
def groupMetadataKey(groupId: String): Array[Byte] = {
MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.GROUP_METADATA.id(),
new GroupMetadataKeyData()
.setGroup(groupId))
MessageUtil.toCoordinatorTypePrefixedBytes(new GroupMetadataKeyData()
.setGroup(groupId))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ object TransactionLog {
* @return key bytes
*/
private[transaction] def keyToBytes(transactionalId: String): Array[Byte] = {
MessageUtil.toCoordinatorTypePrefixedBytes(CoordinatorRecordType.TRANSACTION_LOG.id,
new TransactionLogKey().setTransactionalId(transactionalId))
MessageUtil.toCoordinatorTypePrefixedBytes(new TransactionLogKey().setTransactionalId(transactionalId))
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ object DumpLogSegments {
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message)),
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
Expand Down Expand Up @@ -549,7 +549,7 @@ object DumpLogSegments {
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message)),
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
Expand Down Expand Up @@ -635,7 +635,7 @@ object DumpLogSegments {
try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message)),
Some(prepareKey(r.key)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
Expand Down
Loading

0 comments on commit b368c38

Please sign in to comment.