Skip to content

Commit

Permalink
fix: Improve slow consume records operation (#1212)
Browse files Browse the repository at this point in the history
- Skip consumer assignments for topic partitions with no data
- Skip consumer assignments that seek beyond the end of the assigned
topic partition
- Limit consumer poll time to 100ms per request to poll the consumer
- Make total time polling via the endpoint configurable, default 5s
- Fix NPE in multi-format de-serializer

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Nov 15, 2024
1 parent 97c1d87 commit 11fe54e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.context.ThreadContext;
import org.jboss.logging.Logger;

Expand All @@ -60,6 +61,10 @@ public class RecordService {
@Inject
Logger logger;

@Inject
@ConfigProperty(name = "console.topics.records.poll-timeout", defaultValue = "PT5S")
Duration pollTimeout;

@Inject
KafkaContext kafkaContext;

Expand All @@ -85,22 +90,40 @@ public CompletionStage<List<KafkaRecord>> consumeRecords(String topicId,
List<TopicPartition> assignments = partitions.stream()
.filter(p -> partition == null || partition.equals(p.partition()))
.map(p -> new TopicPartition(p.topic(), p.partition()))
.toList();
.collect(Collectors.toCollection(ArrayList::new));

if (assignments.isEmpty()) {
return Collections.emptyList();
}

consumer.assign(assignments);
var endOffsets = consumer.endOffsets(assignments);
// End offset of zero means the partition has not been written to - don't bother reading them
assignments.removeIf(assignment -> endOffsets.get(assignment) == 0);

if (assignments.isEmpty()) {
return Collections.emptyList();
}

consumer.assign(assignments);

if (timestamp != null) {
seekToTimestamp(consumer, assignments, timestamp);
} else {
seekToOffset(consumer, assignments, endOffsets, offset, limit);
}

Iterable<ConsumerRecords<RecordData, RecordData>> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit);
if (assignments.isEmpty()) {
return Collections.emptyList();
}

/*
* Re-assign, seek operations may have removed assignments for requests beyond
* the end of the partition.
*/
consumer.assign(assignments);

Iterable<ConsumerRecords<RecordData, RecordData>> poll =
() -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit, Instant.now().plus(pollTimeout));
var limitSet = new SizeLimitedSortedSet<ConsumerRecord<RecordData, RecordData>>(buildComparator(timestamp, offset), limit);

return StreamSupport.stream(poll.spliterator(), false)
Expand Down Expand Up @@ -256,42 +279,45 @@ void seekToTimestamp(Consumer<RecordData, RecordData> consumer, List<TopicPartit
consumer.seek(p, tsOffset.offset());
} else {
/*
* No offset for the time-stamp (future date?), seek to
* end and return nothing for this partition.
* No offset for the time-stamp (future date?), remove the assignment
* and return nothing for this partition.
*/
if (logger.isDebugEnabled()) {
logger.debugf("No offset found for search timestamp %d, seeking to end of topic %s/partition %d",
logger.debugf("No offset found for search timestamp %d, removing assignment for topic %s/partition %d",
(Object) tsMillis, p.topic(), p.partition());
}
consumer.seekToEnd(List.of(p));

assignments.remove(p);
}
});
}

void seekToOffset(Consumer<RecordData, RecordData> consumer, List<TopicPartition> assignments, Map<TopicPartition, Long> endOffsets, Long offset, int limit) {
var beginningOffsets = consumer.beginningOffsets(assignments);
Iterator<TopicPartition> cursor = assignments.iterator();

assignments.forEach(p -> {
while (cursor.hasNext()) {
TopicPartition p = cursor.next();
long partitionBegin = beginningOffsets.get(p);
long partitionEnd = endOffsets.get(p);
long seekTarget;

if (offset == null) {
// Fetch the latest records, no earlier than the beginning of the partition
seekTarget = Math.max(partitionBegin, partitionEnd - limit);
} else if (offset <= partitionEnd) {
consumer.seek(p, seekTarget);
} else if (offset < partitionEnd) {
// Seek to the requested offset, no earlier than the beginning of the partition
seekTarget = Math.max(partitionBegin, offset);
consumer.seek(p, seekTarget);
} else {
/*
* Requested offset is beyond the end of the partition,
* seek to end and return nothing for this partition.
* remove the assignment and return nothing for this partition.
*/
seekTarget = partitionEnd;
cursor.remove();
}

consumer.seek(p, seekTarget);
});
}
}

Comparator<ConsumerRecord<RecordData, RecordData>> buildComparator(Instant timestamp, Long offset) {
Expand Down Expand Up @@ -415,20 +441,22 @@ static InvalidPartitionsException invalidPartition(String topicId, int partition

static class ConsumerRecordsIterator<K, V> implements Iterator<ConsumerRecords<K, V>> {
private static final Logger LOGGER = Logger.getLogger(ConsumerRecordsIterator.class);
private static final Duration MAX_POLL_TIME = Duration.ofMillis(100);

private Instant timeout = Instant.now().plusSeconds(2);
private final Instant timeout;
private int recordsConsumed = 0;
private Map<TopicPartition, Integer> partitionConsumed = new HashMap<>();
private final Consumer<K, V> consumer;
private final Set<TopicPartition> assignments;
private final Map<TopicPartition, Long> endOffsets;
private final int limit;

public ConsumerRecordsIterator(Consumer<K, V> consumer, Map<TopicPartition, Long> endOffsets, int limit) {
public ConsumerRecordsIterator(Consumer<K, V> consumer, Map<TopicPartition, Long> endOffsets, int limit, Instant timeout) {
this.consumer = consumer;
this.assignments = new HashSet<>(consumer.assignment());
this.endOffsets = endOffsets;
this.limit = limit;
this.timeout = timeout;
}

@Override
Expand All @@ -448,20 +476,30 @@ public ConsumerRecords<K, V> next() {
throw new NoSuchElementException();
}

var pollTimeout = Duration.between(Instant.now(), timeout);
var records = consumer.poll(pollTimeout.isNegative() ? Duration.ZERO : pollTimeout);
ConsumerRecords<K, V> records = ConsumerRecords.empty();

while (records.isEmpty() && Instant.now().isBefore(timeout)) {
records = poll();
}

int pollSize = 0;

for (var partition : records.partitions()) {
var partitionRecords = records.records(partition);
int consumed = partitionRecords.size();
pollSize += consumed;
int total = partitionConsumed.compute(partition, (k, v) -> requireNonNullElse(v, 0) + consumed);
long maxOffset = partitionRecords.stream().mapToLong(ConsumerRecord::offset).max().orElse(-1);

if (total >= limit || maxOffset >= endOffsets.get(partition)) {
// Consumed `limit` records for this partition or reached the end of the partition
if (total >= limit) {
// Consumed `limit` records for this partition
assignments.remove(partition);
} else if (consumed > 0) {
long maxOffset = partitionRecords.stream().mapToLong(ConsumerRecord::offset).max().getAsLong() + 1;

if (maxOffset >= endOffsets.get(partition)) {
// Reached the end of the partition
assignments.remove(partition);
}
}
}

Expand All @@ -479,5 +517,18 @@ public ConsumerRecords<K, V> next() {

return records;
}

ConsumerRecords<K, V> poll() {
var timeRemaining = Duration.between(Instant.now(), timeout);
Duration pollTimeout;

if (timeRemaining.isNegative()) {
pollTimeout = Duration.ZERO;
} else {
pollTimeout = MAX_POLL_TIME.compareTo(timeRemaining) < 0 ? MAX_POLL_TIME : timeRemaining;
}

return consumer.poll(pollTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private RecordData readData(Headers headers, byte[] data, ArtifactReference arti
}

private SchemaLookupResult<Object> resolve(ArtifactReference artifactReference) {
if (!artifactReference.hasValue()) {
if (artifactReference == null || !artifactReference.hasValue()) {
return NO_SCHEMA_ID;
}

Expand Down

0 comments on commit 11fe54e

Please sign in to comment.