Skip to content

Commit

Permalink
KAFKA-18769: Improve leadership changes handling in ShareConsumeReque…
Browse files Browse the repository at this point in the history
…stManager. (apache#18851)

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
ShivsundarR authored Feb 12, 2025
1 parent b0e5cdf commit 0e40b80
Show file tree
Hide file tree
Showing 2 changed files with 475 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ public PollResult poll(long currentTimeMs) {


// Iterate over the session handlers to see if there are acknowledgements to be sent for partitions
// which are no longer part of the current subscription, or whose records were fetched from a
// previous leader.
// which are no longer part of the current subscription.
// We fail acknowledgements for records fetched from a previous leader.
Cluster cluster = metadata.fetch();
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
Expand All @@ -203,14 +203,20 @@ public PollResult poll(long currentTimeMs) {
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
metricsManager.recordAcknowledgementSent(acks.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks);
if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
metricsManager.recordAcknowledgementSent(acks.size());
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acks);

sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
handlerMap.put(node, sessionHandler);
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
handlerMap.put(node, sessionHandler);

topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId);
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId);
} else {
log.debug("Leader for the partition is down or has changed, failing Acknowledgements for partition {}", tip);
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, acks));
}
});

nodeAcksFromFetchMap.clear();
Expand Down Expand Up @@ -475,11 +481,16 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) {
acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements());
if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
acknowledgementsMapForNode.put(tip, nodeAcknowledgements.acknowledgements());

metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcknowledgements.acknowledgements()));
}
}
}

Expand Down Expand Up @@ -523,29 +534,34 @@ public void commitAsync(final Map<TopicIdPartition, NodeAcknowledgements> acknow
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
NodeAcknowledgements nodeAcknowledgements = acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) && (nodeAcknowledgements.nodeId() == node.id())) {
Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements();
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest();
if (asyncRequestState == null) {
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":2",
Long.MAX_VALUE,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_ASYNC
));
} else {
Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
if (prevAcks != null) {
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
Acknowledgements acknowledgements = nodeAcknowledgements.acknowledgements();
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
AcknowledgeRequestState asyncRequestState = acknowledgeRequestStates.get(nodeId).getAsyncRequest();
if (asyncRequestState == null) {
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":2",
Long.MAX_VALUE,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_ASYNC
));
} else {
Acknowledgements prevAcks = asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
if (prevAcks != null) {
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
}
}
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcknowledgements.acknowledgements()));
}
}
}
Expand All @@ -572,40 +588,57 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, No
final ResultHandler resultHandler = new ResultHandler(resultCount, Optional.empty());

closing = true;
Map<Integer, Map<TopicIdPartition, Acknowledgements>> acknowledgementsMapAllNodes = new HashMap<>();

acknowledgementsMap.forEach((tip, nodeAcks) -> {
if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
Map<TopicIdPartition, Acknowledgements> acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new HashMap<>());
Acknowledgements prevAcks = acksMap.putIfAbsent(tip, nodeAcks.acknowledgements());
if (prevAcks != null) {
acksMap.get(tip).merge(nodeAcks.acknowledgements());
}
} else {
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, nodeAcks.acknowledgements()));
}
});

sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();

acknowledgementsMap.forEach((tip, nodeAcks) -> {
Acknowledgements acknowledgements = Acknowledgements.empty();
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.remove(tip);
if (acksFromFetchMap != null) {
acknowledgements.merge(acksFromFetchMap);
//Add any waiting piggyback acknowledgements for the node.
Map<TopicIdPartition, Acknowledgements> fetchAcks = fetchAcknowledgementsToSend.remove(nodeId);
if (fetchAcks != null) {
fetchAcks.forEach((tip, acks) -> {
if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
Map<TopicIdPartition, Acknowledgements> acksMap = acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks);
if (prevAcks != null) {
acksMap.get(tip).merge(acks);
}
} else {
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, acks));
}
}

if (nodeAcks.nodeId() == node.id()) {
acknowledgements.merge(nodeAcks.acknowledgements());
}

if (!acknowledgements.isEmpty()) {
acknowledgementsMapForNode.put(tip, acknowledgements);
});
}

Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId);
if (acknowledgementsMapForNode != null) {
acknowledgementsMapForNode.forEach((tip, acknowledgements) -> {
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
});
});
} else {
acknowledgementsMapForNode = new HashMap<>();
}

acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Ensure there is no close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest())) {
log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id()));
Expand All @@ -630,6 +663,28 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, No
return closeFuture;
}

/**
* The method checks whether the leader for a topicIdPartition has changed.
* @param nodeId The previous leader for the partition.
* @param topicIdPartition The TopicIdPartition to check.
* @return Returns true if leader information is available and leader has changed.
* If the leader information is not available or if the leader has not changed, it returns false.
*/
private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition topicIdPartition) {
Optional<Node> leaderNode = metadata.currentLeader(topicIdPartition.topicPartition()).leader;
if (leaderNode.isPresent()) {
if (leaderNode.get().id() != nodeId) {
log.debug("Node {} is no longer the leader for partition {}, failing acknowledgements", nodeId, topicIdPartition);
return true;
}
} else {
log.debug("No leader found for partition {}", topicIdPartition);
metadata.requestUpdate(false);
return false;
}
return false;
}

private void handleShareFetchSuccess(Node fetchTarget,
@SuppressWarnings("unused") ShareFetchRequestData requestData,
ClientResponse resp) {
Expand Down
Loading

0 comments on commit 0e40b80

Please sign in to comment.