Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709 (
Browse files Browse the repository at this point in the history
…#1981)

(cherry picked from commit 71b77b2)

### Motivation

Update Kafka wire protocol to 3.4.0 and implement KIP-699 and KIP-709.

---------

Co-authored-by: Enrico Olivelli <[email protected]>
  • Loading branch information
gaoran10 and eolivelli committed Aug 7, 2023
1 parent b645003 commit 19801c1
Show file tree
Hide file tree
Showing 27 changed files with 671 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.message.FetchRequestData;

@Slf4j
public class DelayedFetch extends DelayedOperation {
Expand All @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation {
private final long bytesReadable;
private final int fetchMaxBytes;
private final boolean readCommitted;
private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
private final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo;
private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
private final MessageFetchContext context;
protected volatile Boolean hasError;
Expand All @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs,
final boolean readCommitted,
final MessageFetchContext context,
final ReplicaManager replicaManager,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult,
final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
super(delayMs, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,14 @@ protected void writeAndFlushResponseToClient(Channel channel) {
request, response);
}

final ByteBuf result = responseToByteBuf(response, request, true);
final ByteBuf result;
try {
result = responseToByteBuf(response, request, true);
} catch (Throwable error) {
log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error);
sendErrorResponse(request, channel, error, true);
return;
}
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.exceptions;

import java.io.Serial;

/**
* KoP topic load exception.
*/
public class KoPTopicInitializeException extends KoPBaseException {

@Serial
private static final long serialVersionUID = 0L;

public KoPTopicInitializeException(Throwable throwable) {
super(throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import static io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils.isBrokerIndexMetadataInterceptorConfigured;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
Expand All @@ -26,6 +28,7 @@
import io.streamnative.pulsar.handlers.kop.MessagePublishContext;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicInitializeException;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
Expand Down Expand Up @@ -76,13 +79,13 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -166,15 +169,15 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig,
public CompletableFuture<PartitionLog> initialise() {
loadTopicProperties().whenComplete((___, errorLoadTopic) -> {
if (errorLoadTopic != null) {
initFuture.completeExceptionally(errorLoadTopic);
initFuture.completeExceptionally(new KoPTopicInitializeException(errorLoadTopic));
return;
}
if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
producerStateManager
.recover(this, recoveryExecutor)
.thenRun(() -> initFuture.complete(this))
.exceptionally(error -> {
initFuture.completeExceptionally(error);
initFuture.completeExceptionally(new KoPTopicInitializeException(error));
return null;
});
} else {
Expand Down Expand Up @@ -292,7 +295,7 @@ protected ReadRecordsResult newObject(Handle<ReadRecordsResult> handle) {
private final Recycler.Handle<ReadRecordsResult> recyclerHandle;

private DecodeResult decodeResult;
private List<FetchResponse.AbortedTransaction> abortedTransactions;
private List<FetchResponseData.AbortedTransaction> abortedTransactions;
private long highWatermark;
private long lastStableOffset;
private Position lastPosition;
Expand All @@ -309,7 +312,7 @@ public Errors errors() {
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand All @@ -325,7 +328,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult,
}

public static ReadRecordsResult get(DecodeResult decodeResult,
List<FetchResponse.AbortedTransaction> abortedTransactions,
List<FetchResponseData.AbortedTransaction> abortedTransactions,
long highWatermark,
long lastStableOffset,
Position lastPosition,
Expand Down Expand Up @@ -369,7 +372,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio
partitionLog);
}

public FetchResponse.PartitionData<Records> toPartitionData() {
public FetchResponseData.PartitionData toPartitionData() {

// There are three cases:
//
Expand All @@ -380,21 +383,20 @@ public FetchResponse.PartitionData<Records> toPartitionData() {
// 3. errors == Others error :
// Get errors.
if (errors != null) {
return new FetchResponse.PartitionData<>(
errors,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY);
return new FetchResponseData.PartitionData()
.setErrorCode(errors.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
.setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
.setRecords(MemoryRecords.EMPTY);
}
return new FetchResponse.PartitionData<>(
Errors.NONE,
highWatermark,
lastStableOffset,
highWatermark, // TODO: should it be changed to the logStartOffset?
abortedTransactions,
decodeResult.getRecords());
return new FetchResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setHighWatermark(highWatermark)
.setLastStableOffset(lastStableOffset)
.setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset?
.setAbortedTransactions(abortedTransactions)
.setRecords(decodeResult.getRecords());
}

public void recycle() {
Expand Down Expand Up @@ -461,7 +463,7 @@ public Optional<Long> firstUndecidedOffset() {
return producerStateManager.firstUndecidedOffset();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
return producerStateManager.getAbortedIndexList(fetchOffset);
}

Expand Down Expand Up @@ -538,14 +540,14 @@ public Position getLastPosition() {
return persistentTopic.getLastPosition();
}

public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.PartitionData partitionData,
public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequestData.FetchPartition partitionData,
final boolean readCommitted,
final AtomicLong limitBytes,
final int maxReadEntriesNum,
final MessageFetchContext context) {
final long startPrepareMetadataNanos = MathUtils.nowInNano();
final CompletableFuture<ReadRecordsResult> future = new CompletableFuture<>();
final long offset = partitionData.fetchOffset;
final long offset = partitionData.fetchOffset();
KafkaTopicManager topicManager = context.getTopicManager();
// The future that is returned by getTopicConsumerManager is always completed normally
topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> {
Expand Down Expand Up @@ -593,7 +595,7 @@ public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequest.Parti

requestStats.getPrepareMetadataStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get());
long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get());
if (readCommitted) {
long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L);
if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) {
Expand Down Expand Up @@ -675,7 +677,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos)

private void handleEntries(final CompletableFuture<ReadRecordsResult> future,
final List<Entry> entries,
final FetchRequest.PartitionData partitionData,
final FetchRequestData.FetchPartition partitionData,
final KafkaTopicConsumerManager tcm,
final ManagedCursor cursor,
final boolean readCommitted,
Expand Down Expand Up @@ -717,9 +719,9 @@ private void handleEntries(final CompletableFuture<ReadRecordsResult> future,

// collect consumer metrics
decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats);
List<FetchResponse.AbortedTransaction> abortedTransactions = null;
List<FetchResponseData.AbortedTransaction> abortedTransactions = null;
if (readCommitted) {
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset);
abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset());
}
if (log.isDebugEnabled()) {
log.debug("Partition {} read entry completed in {} ns",
Expand Down Expand Up @@ -1144,14 +1146,29 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
// look for the first entry with data
PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition);

managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() {
fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition);

return future;

}

private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture<Long> future,
ManagedLedgerImpl managedLedger, PositionImpl position) {
managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
long startOffset = MessageMetadataUtils.peekBaseOffsetFromEntry(entry);
log.info("First offset for topic {} is {} - position {}", fullPartitionName,
startOffset, entry.getPosition());
future.complete(startOffset);
} catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) {
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
log.info("Legacy entry for topic {} - position {} - returning current offset {}",
fullPartitionName,
entry.getPosition(),
currentOffset);
future.complete(currentOffset);
} catch (Exception err) {
future.completeExceptionally(err);
} finally {
Expand All @@ -1164,9 +1181,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);

return future;

}

@VisibleForTesting
Expand Down Expand Up @@ -1200,7 +1214,18 @@ public CompletableFuture<Long> recoverTxEntries(
Executor executor) {
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
// no need to scan the topic, because transactions are disabled
return CompletableFuture.completedFuture(Long.valueOf(0));
return CompletableFuture.completedFuture(0L);
}
if (!isBrokerIndexMetadataInterceptorConfigured(persistentTopic.getBrokerService())) {
// The `UpgradeTest` will set the interceptor to null,
// this will cause NPE problem while `fetchOldestAvailableIndexFromTopic`,
// but we can't disable kafka transaction,
// currently transaction coordinator must set to true (Newly Kafka client requirement).
// TODO Actually, if the AppendIndexMetadataInterceptor is not set, the kafka transaction can't work,
// we need to throw an exception, maybe we need add a new configuration for ProducerId.
log.error("The broker index metadata interceptor is not configured for topic {}, skip recover txn entries.",
fullPartitionName);
return CompletableFuture.completedFuture(0L);
}
return fetchOldestAvailableIndexFromTopic().thenCompose((minOffset -> {
log.info("start recoverTxEntries for {} at offset {} minOffset {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;

/**
* Producer state manager.
Expand Down Expand Up @@ -339,13 +339,15 @@ public long purgeAbortedTxns(long offset) {
return count.get();
}

public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
public List<FetchResponseData.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
synchronized (abortedIndexList) {
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
for (AbortedTxn abortedTxn : abortedIndexList) {
if (abortedTxn.lastOffset() >= fetchOffset) {
abortedTransactions.add(
new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset()));
new FetchResponseData.AbortedTransaction()
.setProducerId(abortedTxn.producerId())
.setFirstOffset(abortedTxn.firstOffset()));
}
}
return abortedTransactions;
Expand All @@ -361,8 +363,7 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
if (snapshotOffset < minOffset) {
log.info("{} handleMissingDataBeforeRecovery mapEndOffset {} snapshotOffset "
+ "{} minOffset {} RESETTING STATE",
topicPartition,
mapEndOffset, minOffset);
topicPartition, mapEndOffset, snapshotOffset, minOffset);
// topic was not empty (mapEndOffset has some value)
// but there is no more data on the topic (trimmed?)
ongoingTxns.clear();
Expand Down
Loading

0 comments on commit 19801c1

Please sign in to comment.