Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Clean up errors sent to the client during Topic Unload (#1282)
Browse files Browse the repository at this point in the history
### Motivation

While investigating on #1281 I have found a couple of enhancements:
- we should add more logging to troubleshoot unknown errors during topic unload
- we can improve some logging
- in case of CursorAlreadyClosedException  we return UNKNOWN_SERVER_ERROR and NOT_LEADER_FOR_PARTITION and this generates a "received an unknown error" log line on the client, that is pretty scary

### Modifications

- improve the toString representation of the callback passed to asyncFindPosition
- return NOT_LEADER_FOR_PARTITION in case of CursorAlreadyClosedException and ManagedLedgerFencedException
- reduce log level of some parts of the code
  • Loading branch information
eolivelli authored May 13, 2022
1 parent b6be650 commit cc49e7c
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,12 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
final ManagedLedger ledger = topic.getManagedLedger();

if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
log.error("[{}] Async get cursor for offset {} failed, because current managedLedger has been closed",
requestHandler.ctx.channel(), offset);
log.error("[{}] Async get cursor for offset {} for topic {} failed, "
+ "because current managedLedger has been closed",
requestHandler.ctx.channel(), offset, topic.getName());
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
future.completeExceptionally(new Exception("Current managedLedger has been closed."));
future.completeExceptionally(new Exception("Current managedLedger for "
+ topic.getName() + " has been closed."));
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ private void handlePartitionData(final TopicPartition topicPartition,
} else {
cursorFuture.whenComplete((cursorLongPair, ex) -> {
if (ex != null) {
log.error("KafkaTopicConsumerManager.asyncGetCursorByOffset({}) failed for topic {}.",
offset, topicPartition, ex.getCause());
registerPrepareMetadataFailedEvent(startPrepareMetadataNanos);
requestHandler.getKafkaTopicManagerSharedState()
.getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullTopicName);
Expand All @@ -375,7 +373,16 @@ private void handlePartitionData(final TopicPartition topicPartition,
if (throwable != null) {
tcm.deleteOneCursorAsync(cursorLongPair.getLeft(),
"cursor.readEntry fail. deleteCursor");
addErrorPartitionResponse(topicPartition, Errors.forException(throwable));
if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException
|| throwable
instanceof ManagedLedgerException.ManagedLedgerFencedException) {
addErrorPartitionResponse(topicPartition,
Errors.NOT_LEADER_FOR_PARTITION);
} else {
log.error("Read entry error on {}", partitionData, throwable);
addErrorPartitionResponse(topicPartition,
Errors.forException(throwable));
}
} else if (entries == null) {
addErrorPartitionResponse(topicPartition,
Errors.forException(new ApiException("Cursor is null")));
Expand Down Expand Up @@ -605,7 +612,7 @@ public void markDeleteComplete(Object ctx) {
// this is OK, since this is kind of cumulative ack, following commit will come.
@Override
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
log.warn("Mark delete success for position: {} with error:",
log.warn("Mark delete failed for position: {} with error:",
currentPosition, e);
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public EncodeResult encode(final EncodeRequest encodeRequest) {
sequenceId = Commands.initBatchMessageMetadata(msgMetadata, message.getMessageBuilder());
}
currentBatchSizeBytes += message.getDataBuffer().readableBytes();
if (log.isDebugEnabled()) {
log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ",
if (log.isTraceEnabled()) {
log.trace("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ",
sequenceId, numMessagesInBatch, currentBatchSizeBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package io.streamnative.pulsar.handlers.kop.utils;

import com.google.common.base.Predicate;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.exceptions.MetadataCorruptedException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -171,7 +173,18 @@ public static long getMockOffset(long ledgerId, long entryId) {
public static CompletableFuture<Position> asyncFindPosition(final ManagedLedger managedLedger,
final long offset,
final boolean skipMessagesWithoutIndex) {
return managedLedger.asyncFindPosition(entry -> {
return managedLedger.asyncFindPosition(new FindEntryByOffset(managedLedger,
offset, skipMessagesWithoutIndex));
}

@AllArgsConstructor
private static class FindEntryByOffset implements Predicate<Entry> {
private final ManagedLedger managedLedger;
private final long offset;
private final boolean skipMessagesWithoutIndex;

@Override
public boolean apply(Entry entry) {
if (entry == null) {
// `entry` should not be null, add the null check here to fix the spotbugs check
return false;
Expand All @@ -191,6 +204,12 @@ public static CompletableFuture<Position> asyncFindPosition(final ManagedLedger
} finally {
entry.release();
}
});
}

@Override
public String toString() {
return "FindEntryByOffset{ " + offset + "}";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,9 @@ public void testUnloadTopic() throws Exception {
topicConsumerManager.removeCursorFuture(totalMessages - 1).get();
fail("should have failed");
} catch (ExecutionException ex) {
assertTrue(ex.getCause().getMessage().contains("Current managedLedger has been closed."));
log.info("error", ex);
assertTrue(ex.getCause().getMessage().contains("Current managedLedger for "
+ fullTopicName + " has been closed."));
}

}
Expand Down

0 comments on commit cc49e7c

Please sign in to comment.