diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java index 4c88e801ce..edd7f516b0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java @@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.message.FetchRequestData; @Slf4j public class DelayedFetch extends DelayedOperation { @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation { private final long bytesReadable; private final int fetchMaxBytes; private final boolean readCommitted; - private final Map readPartitionInfo; + private final Map readPartitionInfo; private final Map readRecordsResult; private final MessageFetchContext context; protected volatile Boolean hasError; @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs, final boolean readCommitted, final MessageFetchContext context, final ReplicaManager replicaManager, - final Map readPartitionInfo, + final Map readPartitionInfo, final Map readRecordsResult, final CompletableFuture> callback) { super(delayMs, Optional.empty()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index bb76f6af7f..4def7adecb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -465,7 +465,14 @@ protected void writeAndFlushResponseToClient(Channel channel) { request, response); } - final ByteBuf result = responseToByteBuf(response, request, true); + final ByteBuf result; + try { + result = responseToByteBuf(response, request, true); + } catch (Throwable error) { + log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error); + sendErrorResponse(request, channel, error, true); + return; + } final int resultSize = result.readableBytes(); channel.writeAndFlush(result).addListener(future -> { if (response instanceof ResponseCallbackWrapper) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 756a530818..6f33a9d80e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -17,6 +17,8 @@ import static com.google.common.base.Preconditions.checkState; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_ALLNAMESPACES_PLACEHOLDER; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_PLACEHOLDER; +import static io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils.buildOffsetFetchResponse; +import static io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils.newCoordinator; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -58,7 +60,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -116,6 +117,9 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -134,7 +138,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; @@ -960,18 +963,50 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + List coordinatorKeys = request.version() < FindCoordinatorRequest.MIN_BATCHED_VERSION + ? Collections.singletonList(request.data().key()) : request.data().coordinatorKeys(); + + List> futures = + new ArrayList<>(coordinatorKeys.size()); + for (String coordinatorKey : coordinatorKeys) { + CompletableFuture future = + findSingleCoordinator(coordinatorKey, findCoordinator); + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete((ignore, ex) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + return; + } + List coordinators = new ArrayList<>(futures.size()); + for (CompletableFuture future : futures) { + coordinators.add(future.join()); + } + resultFuture.complete(KafkaResponseUtils.newFindCoordinator(coordinators, request.version())); + }); + + } + + private CompletableFuture findSingleCoordinator( + String coordinatorKey, KafkaHeaderAndRequest findCoordinator) { + + FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + CompletableFuture findSingleCoordinatorResult = + new CompletableFuture<>(); + if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) { TransactionCoordinator transactionCoordinator = getTransactionCoordinator(); - int partition = transactionCoordinator.partitionFor(request.data().key()); + int partition = transactionCoordinator.partitionFor(coordinatorKey); String pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); findBroker(TopicName.get(pulsarTopicName)) .whenComplete((KafkaResponseUtils.BrokerLookupResult result, Throwable throwable) -> { if (result.error != Errors.NONE || throwable != null) { log.error("[{}] Request {}: Error while find coordinator.", ctx.channel(), findCoordinator.getHeader(), throwable); - - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + findSingleCoordinatorResult.complete( + newCoordinator(Errors.LEADER_NOT_AVAILABLE, null, coordinatorKey)); return; } @@ -979,35 +1014,34 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.debug("[{}] Found node {} as coordinator for key {} partition {}.", ctx.channel(), result.node, request.data().key(), partition); } - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node)); + findSingleCoordinatorResult.complete( + newCoordinator(result.error, result.node, coordinatorKey)); }); } else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) { - authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.data().key())) + authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, coordinatorKey)) .whenComplete((isAuthorized, ex) -> { if (ex != null) { log.error("Describe group authorize failed, group - {}. {}", request.data().key(), ex.getMessage()); - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + findSingleCoordinatorResult.complete( + newCoordinator(Errors.GROUP_AUTHORIZATION_FAILED, null, coordinatorKey)); return; } if (!isAuthorized) { - resultFuture.complete( - KafkaResponseUtils - .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + findSingleCoordinatorResult.complete( + newCoordinator(Errors.GROUP_AUTHORIZATION_FAILED, null, coordinatorKey)); return; } CompletableFuture storeGroupIdFuture; - int partition = getGroupCoordinator().partitionFor(request.data().key()); + int partition = getGroupCoordinator().partitionFor(coordinatorKey); String pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition); if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) { - String groupId = request.data().key(); String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), findCoordinator.getHeader().clientId()); currentConnectedClientId.add(findCoordinator.getHeader().clientId()); // Store group name to metadata store for current client, use to collect consumer metrics. - storeGroupIdFuture = storeGroupId(groupId, groupIdPath); + storeGroupIdFuture = storeGroupId(coordinatorKey, groupIdPath); } else { storeGroupIdFuture = CompletableFuture.completedFuture(null); } @@ -1023,8 +1057,8 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.error("[{}] Request {}: Error while find coordinator.", ctx.channel(), findCoordinator.getHeader(), throwable); - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + findSingleCoordinatorResult.complete( + newCoordinator(Errors.LEADER_NOT_AVAILABLE, null, coordinatorKey)); return; } @@ -1032,14 +1066,17 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.debug("[{}] Found node {} as coordinator for key {} partition {}.", ctx.channel(), result.node, request.data().key(), partition); } - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node)); + findSingleCoordinatorResult.complete( + newCoordinator(result.error, result.node, coordinatorKey)); }); }); }); } else { - throw new NotImplementedException("FindCoordinatorRequest not support unknown type " - + request.data().keyType()); + findSingleCoordinatorResult.completeExceptionally( + new NotImplementedException("FindCoordinatorRequest not support unknown type " + + request.data().keyType())); } + return findSingleCoordinatorResult; } @VisibleForTesting @@ -1083,6 +1120,57 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, checkState(getGroupCoordinator() != null, "Group Coordinator not started"); + List> futures = new ArrayList<>(); + if (request.version() >= 8) { + request.data().groups().forEach(group -> { + String groupId = group.groupId(); + List partitions = new ArrayList<>(); + // null topics means no partitions specified, so we should fetch all partitions + if (group.topics() != null) { + group + .topics() + .forEach(topic -> { + topic.partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + } + futures.add(getOffsetFetchForGroup(groupId, partitions)); + }); + + } else { + // old clients + String groupId = request.data().groupId(); + List partitions = new ArrayList<>(); + request.data().topics().forEach(topic -> { + topic + .partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + futures.add(getOffsetFetchForGroup(groupId, partitions)); + } + + FutureUtil.waitForAll(futures).whenComplete((___, error) -> { + if (error != null) { + resultFuture.complete(request.getErrorResponse(error)); + return; + } + List partitionsResponses = new ArrayList<>(); + futures.forEach(f -> { + partitionsResponses.add(f.join()); + }); + + resultFuture.complete(buildOffsetFetchResponse(partitionsResponses, request.version())); + }); + + } + + protected CompletableFuture getOffsetFetchForGroup( + String groupId, + List partitions + ) { + + CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture> authorizeFuture = new CompletableFuture<>(); // replace @@ -1094,10 +1182,11 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, Map unknownPartitionData = Maps.newConcurrentMap(); - if (request.partitions() == null || request.partitions().isEmpty()) { + if (partitions == null || partitions.isEmpty()) { + // fetch all partitions authorizeFuture.complete(null); } else { - AtomicInteger partitionCount = new AtomicInteger(request.partitions().size()); + AtomicInteger partitionCount = new AtomicInteger(partitions.size()); Runnable completeOneAuthorization = () -> { if (partitionCount.decrementAndGet() == 0) { @@ -1105,7 +1194,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } }; final String namespacePrefix = currentNamespacePrefix(); - request.partitions().forEach(tp -> { + partitions.forEach(tp -> { try { String fullName = new KopTopic(tp.topic(), namespacePrefix).getFullName(); authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)) @@ -1138,7 +1227,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, authorizeFuture.whenComplete((partitionList, ex) -> { KeyValue> keyValue = getGroupCoordinator().handleFetchOffsets( - request.groupId(), + groupId, Optional.ofNullable(partitionList) ); if (log.isDebugEnabled()) { @@ -1154,14 +1243,21 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } // recover to original topic name - replaceTopicPartition(keyValue.getValue(), replacingIndex); - keyValue.getValue().putAll(unauthorizedPartitionData); - keyValue.getValue().putAll(unknownPartitionData); - - resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue())); + Map partitionsResponses = keyValue.getValue(); + replaceTopicPartition(partitionsResponses, replacingIndex); + partitionsResponses.putAll(unauthorizedPartitionData); + partitionsResponses.putAll(unknownPartitionData); + + Errors errors = keyValue.getKey(); + resultFuture.complete(new KafkaResponseUtils.OffsetFetchResponseGroupData(groupId, errors, + partitionsResponses)); }); + + return resultFuture; } + + private CompletableFuture> fetchOffset(String topicName, long timestamp) { CompletableFuture> partitionData = new CompletableFuture<>(); @@ -1632,30 +1728,31 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, checkArgument(fetch.getRequest() instanceof FetchRequest); FetchRequest request = (FetchRequest) fetch.getRequest(); + FetchRequestData data = request.data(); if (log.isDebugEnabled()) { log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", - ctx.channel(), fetch.getHeader(), request.fetchData().size()); + ctx.channel(), fetch.getHeader(), data.topics().size()); - request.fetchData().forEach((topic, data) -> { - log.debug("Fetch request topic:{} data:{}.", topic, data.toString()); + data.topics().forEach((topicData) -> { + log.debug("Fetch request topic: data:{}.", topicData.toString()); }); } - if (request.fetchData().isEmpty()) { - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + int numPartitions = data.topics().stream().mapToInt(topic -> topic.partitions().size()).sum(); + if (numPartitions == 0) { + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(new ArrayList<>()))); return; } - ConcurrentHashMap> erroneous = + ConcurrentHashMap erroneous = new ConcurrentHashMap<>(); - ConcurrentHashMap interesting = + ConcurrentHashMap interesting = new ConcurrentHashMap<>(); - AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(request.fetchData().size()); + AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions); Runnable completeOne = () -> { if (unfinishedAuthorizationCount.decrementAndGet() == 0) { TransactionCoordinator transactionCoordinator = null; @@ -1668,13 +1765,12 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, int fetchMinBytes = Math.min(request.minBytes(), fetchMaxBytes); if (interesting.isEmpty()) { if (log.isDebugEnabled()) { - log.debug("Fetch interesting is empty. Partitions: [{}]", request.fetchData()); + log.debug("Fetch interesting is empty. Partitions: [{}]", data.topics()); } - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(erroneous), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(erroneous)))); } else { MessageFetchContext context = MessageFetchContext .get(this, transactionCoordinator, maxReadEntriesNum, namespacePrefix, @@ -1687,18 +1783,17 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, request.isolationLevel(), context ).thenAccept(resultMap -> { - LinkedHashMap> partitions = - new LinkedHashMap<>(); - resultMap.forEach((tp, data) -> { - partitions.put(tp, data.toPartitionData()); + Map all = new HashMap<>(); + resultMap.forEach((tp, results) -> { + all.put(tp, results.toPartitionData()); }); - partitions.putAll(erroneous); + all.putAll(erroneous); boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper( - new FetchResponse<>( - Errors.NONE, - partitions, - 0, - request.metadata().sessionId()), + new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(all))), () -> resultMap.forEach((__, readRecordsResult) -> { readRecordsResult.recycle(); }) @@ -1715,36 +1810,72 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, }; // Regular Kafka consumers need READ permission on each partition they are fetching. - request.fetchData().forEach((topicPartition, partitionData) -> { - final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); - authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) - .whenComplete((isAuthorized, ex) -> { - if (ex != null) { - log.error("Read topic authorize failed, topic - {}. {}", - fullTopicName, ex.getMessage()); - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); - completeOne.run(); - return; - } - if (!isAuthorized) { - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + data.topics().forEach(topicData -> { + topicData.partitions().forEach((partitionData) -> { + TopicPartition topicPartition = new TopicPartition(topicData.topic(), partitionData.partition()); + final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); + authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Read topic authorize failed, topic - {}. {}", + fullTopicName, ex.getMessage()); + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + if (!isAuthorized) { + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + interesting.put(topicPartition, partitionData); completeOne.run(); - return; - } - interesting.put(topicPartition, partitionData); - completeOne.run(); - }); + }); + }); }); } - private static FetchResponse.PartitionData errorResponse(Errors error) { - return new FetchResponse.PartitionData<>(error, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY); + public static List buildFetchResponses( + Map partitionData) { + List result = new ArrayList<>(); + partitionData.keySet() + .stream() + .map(topicPartition -> topicPartition.topic()) + .distinct() + .forEach(topic -> { + FetchResponseData.FetchableTopicResponse fetchableTopicResponse = + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setPartitions(new ArrayList<>()); + result.add(fetchableTopicResponse); + + partitionData.forEach((tp, data) -> { + if (tp.topic().equals(topic)) { + fetchableTopicResponse.partitions().add(new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition()) + .setErrorCode(data.errorCode()) + .setHighWatermark(data.highWatermark()) + .setLastStableOffset(data.lastStableOffset()) + .setLogStartOffset(data.logStartOffset()) + .setAbortedTransactions(data.abortedTransactions()) + .setPreferredReadReplica(data.preferredReadReplica()) + .setRecords(data.records())); + } + }); + }); + return result; + } + + private static FetchResponseData.PartitionData errorResponse(Errors error) { + return new FetchResponseData.PartitionData() + .setErrorCode(error.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } @Override @@ -1780,7 +1911,8 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup, joinGroupResult.getProtocolType(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), - members + members, + request.version() ); if (log.isTraceEnabled()) { log.trace("Sending join group response {} for correlation id {} to client {}.", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/exceptions/KoPTopicInitializeException.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/exceptions/KoPTopicInitializeException.java new file mode 100644 index 0000000000..b586b3f71c --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/exceptions/KoPTopicInitializeException.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.exceptions; + +import java.io.Serial; + +/** + * KoP topic load exception. + */ +public class KoPTopicInitializeException extends KoPBaseException { + + @Serial + private static final long serialVersionUID = 0L; + + public KoPTopicInitializeException(Throwable throwable) { + super(throwable); + } + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 9993b55fdb..06eb600df0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import static io.streamnative.pulsar.handlers.kop.utils.MessageMetadataUtils.isBrokerIndexMetadataInterceptorConfigured; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; @@ -26,6 +28,7 @@ import io.streamnative.pulsar.handlers.kop.MessagePublishContext; import io.streamnative.pulsar.handlers.kop.PendingTopicFutures; import io.streamnative.pulsar.handlers.kop.RequestStats; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicInitializeException; import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException; import io.streamnative.pulsar.handlers.kop.format.DecodeResult; import io.streamnative.pulsar.handlers.kop.format.EncodeRequest; @@ -76,13 +79,13 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.Topic; @@ -166,7 +169,7 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig, public CompletableFuture initialise() { loadTopicProperties().whenComplete((___, errorLoadTopic) -> { if (errorLoadTopic != null) { - initFuture.completeExceptionally(errorLoadTopic); + initFuture.completeExceptionally(new KoPTopicInitializeException(errorLoadTopic)); return; } if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { @@ -174,7 +177,7 @@ public CompletableFuture initialise() { .recover(this, recoveryExecutor) .thenRun(() -> initFuture.complete(this)) .exceptionally(error -> { - initFuture.completeExceptionally(error); + initFuture.completeExceptionally(new KoPTopicInitializeException(error)); return null; }); } else { @@ -292,7 +295,7 @@ protected ReadRecordsResult newObject(Handle handle) { private final Recycler.Handle recyclerHandle; private DecodeResult decodeResult; - private List abortedTransactions; + private List abortedTransactions; private long highWatermark; private long lastStableOffset; private Position lastPosition; @@ -309,7 +312,7 @@ public Errors errors() { } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -325,7 +328,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult, } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -369,7 +372,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio partitionLog); } - public FetchResponse.PartitionData toPartitionData() { + public FetchResponseData.PartitionData toPartitionData() { // There are three cases: // @@ -380,21 +383,20 @@ public FetchResponse.PartitionData toPartitionData() { // 3. errors == Others error : // Get errors. if (errors != null) { - return new FetchResponse.PartitionData<>( - errors, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, - null, - MemoryRecords.EMPTY); + return new FetchResponseData.PartitionData() + .setErrorCode(errors.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } - return new FetchResponse.PartitionData<>( - Errors.NONE, - highWatermark, - lastStableOffset, - highWatermark, // TODO: should it be changed to the logStartOffset? - abortedTransactions, - decodeResult.getRecords()); + return new FetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(highWatermark) + .setLastStableOffset(lastStableOffset) + .setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset? + .setAbortedTransactions(abortedTransactions) + .setRecords(decodeResult.getRecords()); } public void recycle() { @@ -461,7 +463,7 @@ public Optional firstUndecidedOffset() { return producerStateManager.firstUndecidedOffset(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { return producerStateManager.getAbortedIndexList(fetchOffset); } @@ -538,14 +540,14 @@ public Position getLastPosition() { return persistentTopic.getLastPosition(); } - public CompletableFuture readRecords(final FetchRequest.PartitionData partitionData, + public CompletableFuture readRecords(final FetchRequestData.FetchPartition partitionData, final boolean readCommitted, final AtomicLong limitBytes, final int maxReadEntriesNum, final MessageFetchContext context) { final long startPrepareMetadataNanos = MathUtils.nowInNano(); final CompletableFuture future = new CompletableFuture<>(); - final long offset = partitionData.fetchOffset; + final long offset = partitionData.fetchOffset(); KafkaTopicManager topicManager = context.getTopicManager(); // The future that is returned by getTopicConsumerManager is always completed normally topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> { @@ -593,7 +595,7 @@ public CompletableFuture readRecords(final FetchRequest.Parti requestStats.getPrepareMetadataStats().registerSuccessfulEvent( MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); - long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get()); + long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get()); if (readCommitted) { long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L); if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) { @@ -675,7 +677,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos) private void handleEntries(final CompletableFuture future, final List entries, - final FetchRequest.PartitionData partitionData, + final FetchRequestData.FetchPartition partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final boolean readCommitted, @@ -717,9 +719,9 @@ private void handleEntries(final CompletableFuture future, // collect consumer metrics decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats); - List abortedTransactions = null; + List abortedTransactions = null; if (readCommitted) { - abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset); + abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset()); } if (log.isDebugEnabled()) { log.debug("Partition {} read entry completed in {} ns", @@ -1144,7 +1146,15 @@ public CompletableFuture fetchOldestAvailableIndexFromTopic() { // look for the first entry with data PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition); - managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() { + fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition); + + return future; + + } + + private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture future, + ManagedLedgerImpl managedLedger, PositionImpl position) { + managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { try { @@ -1152,6 +1162,13 @@ public void readEntryComplete(Entry entry, Object ctx) { log.info("First offset for topic {} is {} - position {}", fullPartitionName, startOffset, entry.getPosition()); future.complete(startOffset); + } catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) { + long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger); + log.info("Legacy entry for topic {} - position {} - returning current offset {}", + fullPartitionName, + entry.getPosition(), + currentOffset); + future.complete(currentOffset); } catch (Exception err) { future.completeExceptionally(err); } finally { @@ -1164,9 +1181,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { future.completeExceptionally(exception); } }, null); - - return future; - } @VisibleForTesting @@ -1200,7 +1214,18 @@ public CompletableFuture recoverTxEntries( Executor executor) { if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { // no need to scan the topic, because transactions are disabled - return CompletableFuture.completedFuture(Long.valueOf(0)); + return CompletableFuture.completedFuture(0L); + } + if (!isBrokerIndexMetadataInterceptorConfigured(persistentTopic.getBrokerService())) { + // The `UpgradeTest` will set the interceptor to null, + // this will cause NPE problem while `fetchOldestAvailableIndexFromTopic`, + // but we can't disable kafka transaction, + // currently transaction coordinator must set to true (Newly Kafka client requirement). + // TODO Actually, if the AppendIndexMetadataInterceptor is not set, the kafka transaction can't work, + // we need to throw an exception, maybe we need add a new configuration for ProducerId. + log.error("The broker index metadata interceptor is not configured for topic {}, skip recover txn entries.", + fullPartitionName); + return CompletableFuture.completedFuture(0L); } return fetchOldestAvailableIndexFromTopic().thenCompose((minOffset -> { log.info("start recoverTxEntries for {} at offset {} minOffset {}", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index 1bd81bc289..eb95387033 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -27,8 +27,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.requests.FetchResponse; /** * Producer state manager. @@ -339,13 +339,15 @@ public long purgeAbortedTxns(long offset) { return count.get(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { synchronized (abortedIndexList) { - List abortedTransactions = new ArrayList<>(); + List abortedTransactions = new ArrayList<>(); for (AbortedTxn abortedTxn : abortedIndexList) { if (abortedTxn.lastOffset() >= fetchOffset) { abortedTransactions.add( - new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset())); + new FetchResponseData.AbortedTransaction() + .setProducerId(abortedTxn.producerId()) + .setFirstOffset(abortedTxn.firstOffset())); } } return abortedTransactions; @@ -361,8 +363,7 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset) if (snapshotOffset < minOffset) { log.info("{} handleMissingDataBeforeRecovery mapEndOffset {} snapshotOffset " + "{} minOffset {} RESETTING STATE", - topicPartition, - mapEndOffset, minOffset); + topicPartition, mapEndOffset, snapshotOffset, minOffset); // topic was not empty (mapEndOffset has some value) // but there is no more data on the topic (trimmed?) ongoingTxns.clear(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 3671017a88..e8b39eab3d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -20,6 +20,7 @@ import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.MessageFetchContext; import io.streamnative.pulsar.handlers.kop.RequestStats; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicInitializeException; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey; @@ -42,15 +43,16 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.FutureUtil; /** * Used to append records. Mapping to Kafka ReplicaManager.scala. @@ -179,13 +181,12 @@ public CompletableFuture> .thenAccept(offset -> addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, -1L))) .exceptionally(ex -> { - if (isCannotLoadTopicError(ex)) { - log.error("Cannot load topic error while handling append for {}", + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof BrokerServiceException.PersistenceException + || cause instanceof BrokerServiceException.ServiceUnitNotReadyException + || cause instanceof KoPTopicInitializeException) { + log.error("Encounter NotLeaderOrFollower error while handling append for {}", fullPartitionName, ex); - addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)); - } else if (ex.getCause() instanceof BrokerServiceException.PersistenceException) { - log.error("Persistence error while handling append for {}", fullPartitionName, ex); // BrokerServiceException$PersistenceException: // org.apache.bookkeeper.mledger.ManagedLedgerException: // org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: @@ -193,19 +194,19 @@ public CompletableFuture> addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)); - } else if (ex.getCause() instanceof PulsarClientException) { + } else if (cause instanceof PulsarClientException) { log.error("Error on Pulsar Client while handling append for {}", fullPartitionName, ex); addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse(Errors.BROKER_NOT_AVAILABLE)); - } else if (ex.getCause() instanceof NotLeaderOrFollowerException) { + } else if (cause instanceof NotLeaderOrFollowerException) { addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); + new ProduceResponse.PartitionResponse(Errors.forException(cause))); } else { log.error("System error while handling append for {}", fullPartitionName, ex); addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); + new ProduceResponse.PartitionResponse(Errors.forException(cause))); } return null; }); @@ -231,16 +232,11 @@ public CompletableFuture> return completableFuture; } - private static boolean isCannotLoadTopicError(Throwable error) { - String asString = error + ""; - return asString.contains("Failed to load topic within timeout"); - } - public CompletableFuture> fetchMessage( final long timeout, final int fetchMinBytes, final int fetchMaxBytes, - final ConcurrentHashMap fetchInfos, + final ConcurrentHashMap fetchInfos, final IsolationLevel isolationLevel, final MessageFetchContext context) { CompletableFuture> future = @@ -294,7 +290,7 @@ public CompletableFuture> re final boolean readCommitted, final int fetchMaxBytes, final int maxReadEntriesNum, - final Map readPartitionInfo, + final Map readPartitionInfo, final MessageFetchContext context) { AtomicLong limitBytes = new AtomicLong(fetchMaxBytes); CompletableFuture> resultFuture = new CompletableFuture<>(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index fa386b26f7..3a654dc2b3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -41,6 +42,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -54,6 +56,7 @@ import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; @@ -68,6 +71,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.pulsar.common.schema.KeyValue; +@Slf4j public class KafkaResponseUtils { public static ApiVersionsResponse newApiVersions(List versionList) { @@ -184,22 +188,43 @@ public static DescribeGroupsResponse newDescribeGroups( return new DescribeGroupsResponse(data); } - public static FindCoordinatorResponse newFindCoordinator(Node node) { - FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setNodeId(node.id()); - data.setHost(node.host()); - data.setPort(node.port()); - data.setErrorCode(Errors.NONE.code()); - return new FindCoordinatorResponse(data); + public static FindCoordinatorResponseData.Coordinator newCoordinator(Errors errors, + Node node, + String coordinatorKey) { + if (errors != Errors.NONE) { + return new FindCoordinatorResponseData.Coordinator() + .setErrorCode(errors.code()) + .setErrorMessage(errors.message()) + .setKey(coordinatorKey); + } + return new FindCoordinatorResponseData.Coordinator() + .setNodeId(node.id()) + .setHost(node.host()) + .setPort(node.port()) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()) + .setKey(coordinatorKey); } - public static FindCoordinatorResponse newFindCoordinator(Errors errors) { + public static FindCoordinatorResponse newFindCoordinator(List coordinators, + int version) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setErrorCode(errors.code()); - data.setErrorMessage(errors.message()); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + FindCoordinatorResponseData.Coordinator coordinator = coordinators.get(0); + data.setErrorMessage(coordinator.errorMessage()) + .setErrorCode(coordinator.errorCode()) + .setPort(coordinator.port()) + .setHost(coordinator.host()) + .setNodeId(coordinator.nodeId()); + } else { + // for new clients + data.setCoordinators(coordinators); + } + return new FindCoordinatorResponse(data); } + public static HeartbeatResponse newHeartbeat(Errors errors) { HeartbeatResponseData data = new HeartbeatResponseData(); data.setErrorCode(errors.code()); @@ -212,7 +237,8 @@ public static JoinGroupResponse newJoinGroup(Errors errors, String groupProtocolType, String memberId, String leaderId, - Map groupMembers) { + Map groupMembers, + short requestVersion) { JoinGroupResponseData data = new JoinGroupResponseData() .setErrorCode(errors.code()) .setLeader(leaderId) @@ -234,7 +260,7 @@ public static JoinGroupResponse newJoinGroup(Errors errors, data.setThrottleTimeMs(1000); } - return new JoinGroupResponse(data); + return new JoinGroupResponse(data, requestVersion); } public static LeaveGroupResponse newLeaveGroup(Errors errors) { @@ -347,6 +373,7 @@ public static MetadataResponse newMetadata(List nodes, return new MetadataResponse(data, apiVersion); } + @Getter @AllArgsConstructor public static class BrokerLookupResult { @@ -435,4 +462,59 @@ public static SyncGroupResponse newSyncGroup(Errors errors, data.setAssignment(assignment); return new SyncGroupResponse(data); } + + @AllArgsConstructor + public static class OffsetFetchResponseGroupData { + String groupId; + Errors errors; + Map partitionsResponses; + } + + public static OffsetFetchResponse buildOffsetFetchResponse( + List groups, + int version) { + + if (version < 8) { + // old clients + OffsetFetchResponseGroupData offsetFetchResponseGroupData = groups.get(0); + return new OffsetFetchResponse(offsetFetchResponseGroupData.errors, + offsetFetchResponseGroupData.partitionsResponses); + } else { + // new clients + OffsetFetchResponseData data = new OffsetFetchResponseData(); + for (OffsetFetchResponseGroupData groupData : groups) { + OffsetFetchResponseData.OffsetFetchResponseGroup offsetFetchResponseGroup = + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setErrorCode(groupData.errors.code()) + .setGroupId(groupData.groupId) + .setTopics(new ArrayList<>()); + data.groups().add(offsetFetchResponseGroup); + Set topics = groupData.partitionsResponses.keySet().stream().map(TopicPartition::topic) + .collect(Collectors.toSet()); + topics.forEach(topic -> { + offsetFetchResponseGroup.topics().add(new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(topic) + .setPartitions(groupData.partitionsResponses.entrySet() + .stream() + .filter(e -> e.getKey().topic().equals(topic)) + .map(entry -> { + OffsetFetchResponse.PartitionData value = entry.getValue(); + if (log.isDebugEnabled()) { + log.debug("Add resp for group {} topic {}: {}", + groupData.groupId, topic, value); + } + return new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setErrorCode(value.error.code()) + .setMetadata(value.metadata) + .setPartitionIndex(entry.getKey().partition()) + .setCommittedOffset(value.offset) + .setCommittedLeaderEpoch(value.leaderEpoch.orElse(-1)); + }) + .collect(Collectors.toList()))); + }); + } + return new OffsetFetchResponse(data); + } + + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java index 7940f063ad..e2a843b739 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.kop.utils; import com.google.common.base.Predicate; +import io.jsonwebtoken.lang.Collections; import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException; import java.util.concurrent.CompletableFuture; @@ -28,8 +29,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.protocol.Commands; /** @@ -38,6 +42,18 @@ @Slf4j public class MessageMetadataUtils { + public static boolean isBrokerIndexMetadataInterceptorConfigured(BrokerService brokerService) { + if (Collections.isEmpty(brokerService.getBrokerEntryMetadataInterceptors())) { + return false; + } + for (BrokerEntryMetadataInterceptor interceptor : brokerService.getBrokerEntryMetadataInterceptors()) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + return true; + } + } + return false; + } + public static long getCurrentOffset(ManagedLedger managedLedger) { return ((ManagedLedgerInterceptorImpl) managedLedger.getManagedLedgerInterceptor()).getIndex(); } diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java index 21cb50eb95..038db82591 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java @@ -56,4 +56,9 @@ public int throttleTimeMs() { public ApiMessage data() { return abstractResponse.data(); } + + @Override + public void maybeSetThrottleTimeMs(int i) { + abstractResponse.maybeSetThrottleTimeMs(i); + } } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java index 948d58e1b4..18cbcad50d 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java @@ -339,15 +339,15 @@ public MineMemoryRecordsBuilder(ByteBufferOutputStream bufferStream, } @Override - public Long appendWithOffset(long offset, SimpleRecord record) { - return appendWithOffset(offset, + public void appendWithOffset(long offset, SimpleRecord record) { + appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } - public Long appendWithOffset(long offset, + public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, @@ -359,9 +359,9 @@ public Long appendWithOffset(long offset, if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); - return null; + } else { - return appendLegacyRecord(offset, timestamp, key, value); + appendLegacyRecord(offset, timestamp, key, value); } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); diff --git a/pom.xml b/pom.xml index da7d67e171..9582ab3361 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ 2.14.2 2.14.2 - 2.8.0 + 3.4.1 1.18.24 4.11.0 3.0.0-beta-2 diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java index 35efa41895..165726969a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java @@ -92,7 +92,7 @@ public void testPublishTime() throws Exception { // time before first message ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, startTime)); KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 056a7cc0ce..d97f726cea 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -73,10 +73,12 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; @@ -108,6 +110,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.jetbrains.annotations.NotNull; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -285,7 +288,7 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, EARLIEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -338,7 +341,7 @@ public void testConsumerListOffsetLatest() throws Exception { // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -572,7 +575,7 @@ public void testConsumerListOffset() throws Exception { private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws Exception { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, timestamp)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -586,21 +589,24 @@ private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws /// Add test for FetchRequest private void checkFetchResponse(List expectedPartitions, - FetchResponse fetchResponse, + FetchResponse fetchResponse, int maxPartitionBytes, int maxResponseBytes, int numMessagesPerPartition) { - assertEquals(expectedPartitions.size(), fetchResponse.responseData().size()); - expectedPartitions.forEach(tp -> assertTrue(fetchResponse.responseData().get(tp) != null)); + assertEquals(expectedPartitions.size(), fetchResponse + .data().responses().stream().mapToInt(r->r.partitions().size())); + expectedPartitions.forEach(tp + -> assertTrue(getPartitionDataFromFetchResponse(fetchResponse, tp) != null)); final AtomicBoolean emptyResponseSeen = new AtomicBoolean(false); AtomicInteger responseSize = new AtomicInteger(0); AtomicInteger responseBufferSize = new AtomicInteger(0); expectedPartitions.forEach(tp -> { - FetchResponse.PartitionData partitionData = fetchResponse.responseData().get(tp); - assertEquals(Errors.NONE, partitionData.error()); + FetchResponseData.PartitionData partitionData = getPartitionDataFromFetchResponse(fetchResponse, tp); + + assertEquals(Errors.NONE.code(), partitionData.errorCode()); assertTrue(partitionData.highWatermark() > 0); MemoryRecords records = (MemoryRecords) partitionData.records(); @@ -631,6 +637,18 @@ private void checkFetchResponse(List expectedPartitions, // In Kop implementation, fetch at least 1 item for each topicPartition in the request. } + @NotNull + private static FetchResponseData.PartitionData getPartitionDataFromFetchResponse(FetchResponse fetchResponse, + TopicPartition tp) { + FetchResponseData.PartitionData partitionData = fetchResponse + .data() + .responses().stream().filter(t->t.topic().equals(tp.topic())) + .flatMap(r-> r.partitions().stream()) + .filter(p -> p.partitionIndex() == tp.partition()) + .findFirst().orElse(null); + return partitionData; + } + private Map createPartitionMap(int maxPartitionBytes, List topicPartitions, Map offsetMap) { @@ -649,7 +667,8 @@ private KafkaHeaderAndRequest createFetchRequest(int maxResponseBytes, Map offsetMap) { AbstractRequest.Builder builder = FetchRequest.Builder - .forConsumer(Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) + .forConsumer(ApiKeys.FETCH.latestVersion(), + Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) .setMaxBytes(maxResponseBytes); return buildRequest(builder); @@ -795,8 +814,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture1 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest1, responseFuture1); - FetchResponse fetchResponse1 = - (FetchResponse) responseFuture1.get(); + FetchResponse fetchResponse1 = + (FetchResponse) responseFuture1.get(); checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -814,8 +833,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture2 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest2, responseFuture2); - FetchResponse fetchResponse2 = - (FetchResponse) responseFuture2.get(); + FetchResponse fetchResponse2 = + (FetchResponse) responseFuture2.get(); checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -836,8 +855,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { offsetMaps); CompletableFuture responseFuture3 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest3, responseFuture3); - FetchResponse fetchResponse3 = - (FetchResponse) responseFuture3.get(); + FetchResponse fetchResponse3 = + (FetchResponse) responseFuture3.get(); checkFetchResponse(shuffledTopicPartitions3, fetchResponse3, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -895,7 +914,7 @@ public void testGetOffsetsForUnknownTopic() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -989,11 +1008,17 @@ private void verifySendMessageToPartition(final TopicPartition topicPartition, ApiKeys.PRODUCE.latestVersion(), produceRequestData)); final CompletableFuture future = new CompletableFuture<>(); kafkaRequestHandler.handleProduceRequest(request, future); - final ProduceResponse.PartitionResponse response = - ((ProduceResponse) future.get()).responses().get(topicPartition); + final ProduceResponseData.PartitionProduceResponse response = + ((ProduceResponse) future.get()).data().responses() + .stream() + .filter(r->r.name().equals(topicPartition.topic())) + .flatMap(r->r.partitionResponses().stream()) + .filter(p->p.index() == topicPartition.partition()) + .findFirst() + .get(); assertNotNull(response); - assertEquals(response.error, expectedError); - assertEquals(response.baseOffset, expectedOffset); + assertEquals(response.errorCode(), expectedError.code()); + assertEquals(response.baseOffset(), expectedOffset); } private static MemoryRecords newIdempotentRecords( @@ -1142,10 +1167,13 @@ public void testFetchMinBytesSingleConsumer() throws Exception { final int minBytes = 1; @Cleanup - final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder.forConsumer(maxWaitMs, minBytes, - Collections.singletonMap(topicPartition, new FetchRequest.PartitionData( + final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder + .forConsumer(ApiKeys.FETCH.oldestVersion(), + maxWaitMs, minBytes, + Collections.singletonMap(topicPartition, new FetchRequest.PartitionData(null, 0L, -1L, 1024 * 1024, Optional.empty() )))); + final CompletableFuture future = new CompletableFuture<>(); final long startTime = System.currentTimeMillis(); kafkaRequestHandler.handleFetchRequest(request, future); @@ -1157,11 +1185,10 @@ public void testFetchMinBytesSingleConsumer() throws Exception { AbstractResponse abstractResponse = ((ResponseCallbackWrapper) future.get(maxWaitMs + 1000, TimeUnit.MILLISECONDS)).getResponse(); assertTrue(abstractResponse instanceof FetchResponse); - final FetchResponse response = (FetchResponse) abstractResponse; + final FetchResponse response = (FetchResponse) abstractResponse; assertEquals(response.error(), Errors.NONE); final long endTime = System.currentTimeMillis(); - log.info("Take {} ms to process FETCH request, record count: {}", - endTime - startTime, response.responseData().size()); + log.info("Take {} ms to process FETCH request", endTime - startTime); assertTrue(endTime - startTime <= maxWaitMs); Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index 129a5928d4..71b8a9fb6c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.testng.Assert.assertEquals; + import io.netty.buffer.ByteBuf; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -48,7 +50,7 @@ public static List newListOffsetTargetT public static FetchRequest.PartitionData newFetchRequestPartitionData(long fetchOffset, long logStartOffset, int maxBytes) { - return new FetchRequest.PartitionData(fetchOffset, + return new FetchRequest.PartitionData(null, fetchOffset, logStartOffset, maxBytes, Optional.empty() @@ -110,7 +112,12 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, SocketAddress serviceAddress) { - AbstractRequest request = builder.build(builder.apiKey().latestVersion()); + return buildRequest(builder, serviceAddress, builder.latestAllowedVersion()); + } + public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress, short version) { + AbstractRequest request = builder.build(version); + assertEquals(version, request.version()); RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 6ded463c1c..847cdb7283 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -639,7 +639,7 @@ public void testListOffsetsForNotExistedTopic() throws Exception { final RequestHeader header = new RequestHeader(ApiKeys.LIST_OFFSETS, ApiKeys.LIST_OFFSETS.latestVersion(), "client", 0); final ListOffsetsRequest request = - ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP)) .build(ApiKeys.LIST_OFFSETS.latestVersion()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index c10026058a..9e4f0ed880 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -341,9 +341,26 @@ public void testHandleProduceRequest() throws ExecutionException, InterruptedExc final ProduceResponse response = (ProduceResponse) responseFuture.get(); //Topic: "topic2" authorize success. Error is not TOPIC_AUTHORIZATION_FAILED - assertEquals(response.responses().get(topicPartition2).error, Errors.NOT_LEADER_OR_FOLLOWER); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition2.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition2.partition()) + .findFirst() + .get().errorCode(), Errors.NOT_LEADER_OR_FOLLOWER.code()); + //Topic: `TOPIC` authorize failed. - assertEquals(response.responses().get(topicPartition1).error, Errors.TOPIC_AUTHORIZATION_FAILED); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition1.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition1.partition()) + .findFirst() + .get().errorCode(), Errors.TOPIC_AUTHORIZATION_FAILED.code()); } @Test(timeOut = 20000) @@ -384,7 +401,7 @@ public void testHandleListOffsetRequestAuthorizationSuccess() throws Exception { // Test for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -412,7 +429,7 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -428,12 +445,19 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationSuccess() + @DataProvider(name = "offsetFetchVersions") + public static Object[][] offsetFetchVersions() { + return new Object[][]{ + { (short) 7 }, + { (short) ApiKeys.OFFSET_FETCH.latestVersion() } }; + } + + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationSuccess(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationSuccess"; + + "testHandleOffsetFetchRequestAuthorizationSuccess_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -450,7 +474,8 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(version, request.getRequest().version()); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -459,18 +484,35 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData() - .forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, Errors.NONE)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.NONE.code())); + } else { + assertEquals(offsetFetchResponse.data() + .topics() + .stream().flatMap(t->t.partitions().stream()) + .count(), 1); + assertEquals(offsetFetchResponse.error(), Errors.NONE); + offsetFetchResponse.data().topics().stream().flatMap(t->t.partitions().stream()) + .forEach((partitionData) -> assertEquals(partitionData.errorCode(), Errors.NONE.code())); + } } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationFailed() + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationFailed(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationFailed"; + + "testHandleOffsetFetchRequestAuthorizationFailed_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -480,7 +522,8 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(request.getRequest().version(), version); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -489,10 +532,27 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData().forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, - Errors.TOPIC_AUTHORIZATION_FAILED)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + } else { + assertTrue(offsetFetchResponse.data() + .topics() + .stream().flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + + assertEquals(offsetFetchResponse.error(), Errors.NONE); + } + } @Test(timeOut = 20000) @@ -586,7 +646,7 @@ public void testHandleTxnOffsetCommitAuthorizationFailed() throws ExecutionExcep offsetData.put(topicPartition, KafkaCommonTestUtils.newTxnOffsetCommitRequestCommittedOffset(1L, "")); TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Handle request @@ -618,7 +678,7 @@ public void testHandleTxnOffsetCommitPartAuthorizationFailed() throws ExecutionE TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Topic: `test1` authorize success. @@ -815,6 +875,10 @@ private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.B return KafkaCommonTestUtils.buildRequest(builder, serviceAddress); } + private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, short version) { + return KafkaCommonTestUtils.buildRequest(builder, serviceAddress, version); + } + private void handleGroupImmigration() { GroupCoordinator groupCoordinator = handler.getGroupCoordinator(); for (int i = 0; i < conf.getOffsetsTopicNumPartitions(); i++) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 5df75b9bb2..fe939f3689 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -192,7 +192,8 @@ protected void resetConfig() { // kafka related settings. kafkaConfig.setOffsetsTopicNumPartitions(1); - kafkaConfig.setKafkaTransactionCoordinatorEnabled(false); + // kafka 3.1.x clients init the producerId by default, so we need to enable it. + kafkaConfig.setKafkaTransactionCoordinatorEnabled(true); kafkaConfig.setKafkaTxnLogTopicNumPartitions(1); kafkaConfig.setKafkaListeners( diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/admin/KafkaAdminTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/admin/KafkaAdminTest.java index 897af7490c..ca36a20ab5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/admin/KafkaAdminTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/admin/KafkaAdminTest.java @@ -76,6 +76,7 @@ import org.apache.pulsar.common.util.Murmur3_32Hash; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Ignore; import org.testng.annotations.Test; @@ -552,6 +553,8 @@ public void testDeleteConsumerGroupOffsets() throws Exception { admin.topics().deletePartitionedTopic(topic, true); } + @Ignore("\"org.apache.kafka.common.errors.UnsupportedVersionException: " + + "The version of API is not supported.\" in testAlterClientQuotas") @Test(timeOut = 30000) public void testAlterClientQuotas() throws ExecutionException, InterruptedException { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java index 8feaeb5d52..cfb3ee5ce2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java @@ -107,6 +107,8 @@ private static TransactionMetadata transactionMetadata(String transactionalId, @BeforeClass @Override protected void setup() throws Exception { + // we need to disable the kafka transaction coordinator to avoid the conflict + this.conf.setKafkaTransactionCoordinatorEnabled(false); this.conf.setKafkaTxnLogTopicNumPartitions(numPartitions); internalSetup(); MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index a8e8559c35..ea6ef0e23b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -63,8 +63,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -770,12 +770,12 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog.awaitInitialisation().get(); assertEquals(0, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); - List abortedIndexList = + List abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); String lastMessage = "msg1b"; @@ -810,7 +810,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); assertEquals(1, abortedIndexList.size()); waitForTransactionsToBeInStableState(transactionalId); @@ -848,7 +848,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except }); assertEquals(1, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); producer.send(new ProducerRecord<>(topicName, 0, "msg4")).get(); // OFFSET 8 @@ -875,8 +875,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); producer.beginTransaction(); @@ -900,8 +900,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); @@ -916,7 +916,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); assertEquals(1, abortedIndexList.size()); - assertEquals(11, abortedIndexList.get(0).firstOffset); + assertEquals(11, abortedIndexList.get(0).firstOffset()); // use a new consumer group, it will read from the beginning of the topic assertEquals( @@ -1428,12 +1428,12 @@ public void testAbortedTxEventuallyPurged() throws Exception { .getPartitionLog(topicPartition, namespacePrefix); partitionLog.awaitInitialisation().get(); - List abortedIndexList = + List abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); assertEquals(2, abortedIndexList.size()); assertEquals(2, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(3, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(3, abortedIndexList.get(1).firstOffset()); takeSnapshot(topicName); @@ -1458,8 +1458,8 @@ public void testAbortedTxEventuallyPurged() throws Exception { abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); assertEquals(2, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(3, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(3, abortedIndexList.get(1).firstOffset()); // force reading the minimum valid offset // the timer is not started by the PH because @@ -1478,7 +1478,7 @@ public void testAbortedTxEventuallyPurged() throws Exception { assertEquals(1, abortedIndexList.size()); // the second TX cannot be purged because the lastOffset is 5, that is the boundary of the // trimmed portion of the topic - assertEquals(3, abortedIndexList.get(0).firstOffset); + assertEquals(3, abortedIndexList.get(0).firstOffset()); producer1.close(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java index c4acdece51..efdbc0965b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -86,6 +87,7 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setOffsetsTopicNumPartitions(offsetsTopicNumPartitions); kConfig.setAdvertisedAddress("localhost"); + kConfig.setKafkaTransactionCoordinatorEnabled(true); kConfig.setClusterName(configClusterName); kConfig.setManagedLedgerCacheSizeMB(8); kConfig.setActiveConsumerFailoverDelayTimeMillis(0); @@ -392,7 +394,7 @@ public void testMutiBrokerAndCoordinator() throws Exception { @Test(timeOut = 30000) public void testMultiBrokerUnloadReload() throws Exception { int partitionNumber = 10; - String kafkaTopicName = "kopMultiBrokerUnloadReload" + partitionNumber; + String kafkaTopicName = "kopMultiBrokerUnloadReload-" + partitionNumber + "-" + UUID.randomUUID(); String pulsarTopicName = "persistent://public/default/" + kafkaTopicName; String kopNamespace = "public/default"; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java index 17109602ae..e76f85591c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java @@ -127,7 +127,7 @@ public void testMetricsProvider() throws Exception { } Assert.assertEquals(getApiKeysSet(), new TreeSet<>( - Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE))); + Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.INIT_PRODUCER_ID))); // 2. consume messages with Kafka consumer @Cleanup @@ -154,14 +154,14 @@ public void testMetricsProvider() throws Exception { Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.INIT_PRODUCER_ID ))); // commit offsets kConsumer.getConsumer().commitSync(Duration.ofSeconds(5)); Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT, ApiKeys.INIT_PRODUCER_ID ))); try { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java index af5869529f..e8ac8ee348 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; @@ -109,7 +110,8 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -143,7 +145,9 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters + .fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -173,13 +177,15 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started ReadOnlyKeyValueStore store = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + store = kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java index b156fb7106..1794bb2676 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java @@ -48,9 +48,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -58,7 +60,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; @@ -115,7 +116,7 @@ protected void extraSetup() throws Exception { groupedStream = stream .groupBy( mapper, - Serialized.with(Serdes.String(), Serdes.String())); + Grouped.with(Serdes.String(), Serdes.String())); reducer = (value1, value2) -> value1 + ":" + value2; initializer = () -> 0; @@ -174,7 +175,7 @@ public void shouldReduceWindowed() throws Exception { final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); groupedStream - .windowedBy(TimeWindows.of(500L)) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .reduce(reducer) .toStream() .to(outputTopic, Produced.with(windowedSerde, Serdes.String())); @@ -279,7 +280,7 @@ public void shouldAggregateWindowed() throws Exception { produceMessages(secondTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); - groupedStream.windowedBy(TimeWindows.of(500L)) + groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .aggregate( initializer, aggregator, @@ -415,8 +416,8 @@ public void shouldGroupByKey() throws Exception { produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())) - .windowedBy(TimeWindows.of(500L)) + stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .count() .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); @@ -509,8 +510,9 @@ public void shouldCountSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .count() .toStream() .transform(() -> new Transformer, Long, KeyValue>() { @@ -608,8 +610,9 @@ public void shouldReduceSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); final String userSessionsStore = "UserSessionsStore"; builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) .toStream() .foreach((key, value) -> { @@ -620,7 +623,8 @@ public void shouldReduceSessionWindows() throws Exception { startStreams(); latch.await(30, TimeUnit.SECONDS); final ReadOnlySessionStore sessionStore = - kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore())); // verify correct data received assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); @@ -690,6 +694,9 @@ private List> receiveMessages(final Deserializer keyDes consumerProperties.setProperty(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.MAX_VALUE + ""); if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { + consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, + Serdes.serdeFrom(innerClass).getClass().getName()); + consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()); } @@ -740,6 +747,8 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial final Map configs = new HashMap<>(); Serde serde = Serdes.serdeFrom(innerClass); configs.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, serde.getClass().getName()); + configs.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, serde.getClass().getName()); + serde.close(); // https://issues.apache.org/jira/browse/KAFKA-10366 configs.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.toString(Long.MAX_VALUE)); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java index b27b455ee8..73197e75f1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; @@ -115,14 +116,16 @@ public void shouldRestoreInMemoryKTableOnRestart() throws Exception { startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore store = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> store.approximateNumEntries() == 4L, 30000L, "waiting for values"); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore recoveredStore = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> { try { return recoveredStore.approximateNumEntries() == 4L; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java index 88a0ad7db7..41c676eb28 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java @@ -15,8 +15,8 @@ import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; +import java.time.Duration; import java.util.Properties; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.NonNull; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -83,7 +83,7 @@ protected void setupTestCase() throws Exception { @AfterMethod protected void cleanupTestCase() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(3, TimeUnit.SECONDS); + kafkaStreams.close(Duration.ofSeconds(3)); TestUtils.purgeLocalStreamsState(streamsConfiguration); } }