Skip to content

Commit

Permalink
fix: make record operations blocking to run on worker threads (#1221)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Nov 21, 2024
1 parent 3bb396d commit 85cd4ee
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.github.streamshub.console.api;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import jakarta.inject.Inject;
Expand Down Expand Up @@ -72,7 +71,7 @@ public class RecordsResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public CompletionStage<Response> consumeRecords(
public Response consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand Down Expand Up @@ -126,19 +125,18 @@ public CompletionStage<Response> consumeRecords(

requestedFields.accept(fields);
CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");

return recordService.consumeRecords(
var records = recordService.consumeRecords(
topicId,
params.getPartition(),
params.getOffset(),
params.getTimestamp(),
params.getLimit(),
fields,
params.getMaxValueLength())
.thenApply(KafkaRecord.KafkaRecordDataList::new)
.thenApply(Response::ok)
.thenApply(response -> response.cacheControl(noStore))
.thenApply(Response.ResponseBuilder::build);
params.getMaxValueLength());

return Response.ok(new KafkaRecord.KafkaRecordDataList(records))
.cacheControl(noStore)
.build();
}

@POST
Expand All @@ -154,7 +152,7 @@ public CompletionStage<Response> consumeRecords(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public CompletionStage<Response> produceRecord(
public Response produceRecord(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand All @@ -169,15 +167,14 @@ public CompletionStage<Response> produceRecord(

final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);

return recordService.produceRecord(topicId, message.getData())
.thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
.queryParam("filter[partition]", entity.getData().partition())
.queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);
var entity = new KafkaRecord.KafkaRecordData(recordService.produceRecord(topicId, message.getData()));

return Response.status(Status.CREATED)
.entity(entity)
.location(location
.queryParam("filter[partition]", entity.getData().partition())
.queryParam("filter[offset]", entity.getData().offset())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -77,87 +75,76 @@ public class RecordService {
@Inject
ThreadContext threadContext;

public CompletionStage<List<KafkaRecord>> consumeRecords(String topicId,
public List<KafkaRecord> consumeRecords(String topicId,
Integer partition,
Long offset,
Instant timestamp,
Integer limit,
List<String> include,
Integer maxValueLength) {

return topicNameForId(topicId).thenApplyAsync(topicName -> {
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
List<TopicPartition> assignments = partitions.stream()
.filter(p -> partition == null || partition.equals(p.partition()))
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toCollection(ArrayList::new));
String topicName = topicNameForId(topicId);
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
List<TopicPartition> assignments = partitions.stream()
.filter(p -> partition == null || partition.equals(p.partition()))
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toCollection(ArrayList::new));

if (assignments.isEmpty()) {
return Collections.emptyList();
}
if (assignments.isEmpty()) {
return Collections.emptyList();
}

var endOffsets = consumer.endOffsets(assignments);
// End offset of zero means the partition has not been written to - don't bother reading them
assignments.removeIf(assignment -> endOffsets.get(assignment) == 0);
var endOffsets = consumer.endOffsets(assignments);
// End offset of zero means the partition has not been written to - don't bother reading them
assignments.removeIf(assignment -> endOffsets.get(assignment) == 0);

if (assignments.isEmpty()) {
return Collections.emptyList();
}
if (assignments.isEmpty()) {
return Collections.emptyList();
}

consumer.assign(assignments);
consumer.assign(assignments);

if (timestamp != null) {
seekToTimestamp(consumer, assignments, timestamp);
} else {
seekToOffset(consumer, assignments, endOffsets, offset, limit);
}
if (timestamp != null) {
seekToTimestamp(consumer, assignments, timestamp);
} else {
seekToOffset(consumer, assignments, endOffsets, offset, limit);
}

if (assignments.isEmpty()) {
return Collections.emptyList();
}
if (assignments.isEmpty()) {
return Collections.emptyList();
}

/*
* Re-assign, seek operations may have removed assignments for requests beyond
* the end of the partition.
*/
consumer.assign(assignments);
/*
* Re-assign, seek operations may have removed assignments for requests beyond
* the end of the partition.
*/
consumer.assign(assignments);

Iterable<ConsumerRecords<RecordData, RecordData>> poll =
() -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit, Instant.now().plus(pollTimeout));
var limitSet = new SizeLimitedSortedSet<ConsumerRecord<RecordData, RecordData>>(buildComparator(timestamp, offset), limit);

return StreamSupport.stream(poll.spliterator(), false)
.flatMap(records -> StreamSupport.stream(records.spliterator(), false))
.collect(Collectors.toCollection(() -> limitSet))
.stream()
.map(rec -> getItems(rec, topicId, include, maxValueLength))
.toList();
}, threadContext.currentContextExecutor());
}
Iterable<ConsumerRecords<RecordData, RecordData>> poll =
() -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit, Instant.now().plus(pollTimeout));
var limitSet = new SizeLimitedSortedSet<ConsumerRecord<RecordData, RecordData>>(buildComparator(timestamp, offset), limit);

public CompletionStage<KafkaRecord> produceRecord(String topicId, KafkaRecord input) {
CompletableFuture<KafkaRecord> promise = new CompletableFuture<>();
return StreamSupport.stream(poll.spliterator(), false)
.flatMap(records -> StreamSupport.stream(records.spliterator(), false))
.collect(Collectors.toCollection(() -> limitSet))
.stream()
.map(rec -> getItems(rec, topicId, include, maxValueLength))
.toList(); }

topicNameForId(topicId).thenAcceptAsync(topicName -> {
List<PartitionInfo> partitions = producer.partitionsFor(topicName);
Integer partition = input.partition();
public KafkaRecord produceRecord(String topicId, KafkaRecord input) {
String topicName = topicNameForId(topicId);

if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) {
promise.completeExceptionally(invalidPartition(topicId, partition));
} else {
send(topicName, input, producer, promise);
}
List<PartitionInfo> partitions = producer.partitionsFor(topicName);
Integer partition = input.partition();

promise.whenComplete((kafkaRecord, error) -> producer.close());
}, threadContext.currentContextExecutor()).exceptionally(e -> {
promise.completeExceptionally(e);
return null;
});
if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) {
throw invalidPartition(topicId, partition);
}

return promise;
return send(topicName, input, producer);
}

void send(String topicName, KafkaRecord input, Producer<RecordData, RecordData> producer, CompletableFuture<KafkaRecord> promise) {
KafkaRecord send(String topicName, KafkaRecord input, Producer<RecordData, RecordData> producer) {
List<Header> headers = Optional.ofNullable(input.headers())
.orElseGet(Collections::emptyMap)
.entrySet()
Expand Down Expand Up @@ -190,49 +177,42 @@ public byte[] value() {
value,
headers);

CompletableFuture.completedFuture(null)
.thenApplyAsync(nothing -> {
try {
return producer.send(request).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException("Error occurred while sending record to Kafka cluster", e);
} catch (Exception e) {
throw new CompletionException("Error occurred while sending record to Kafka cluster", e);
}
}, threadContext.currentContextExecutor())
.thenApply(meta -> {
KafkaRecord result = new KafkaRecord();
result.partition(meta.partition());
RecordMetadata meta;

if (meta.hasOffset()) {
result.offset(meta.offset());
}
try {
meta = producer.send(request).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException("Error occurred while sending record to Kafka cluster", e);
} catch (Exception e) {
throw new CompletionException("Error occurred while sending record to Kafka cluster", e);
}

if (meta.hasTimestamp()) {
result.timestamp(Instant.ofEpochMilli(meta.timestamp()));
}
KafkaRecord result = new KafkaRecord();
result.partition(meta.partition());

result.key(key.dataString(null));
result.value(value.dataString(null));
result.headers(Arrays.stream(request.headers().toArray())
.collect(
// Duplicate headers will be overwritten
LinkedHashMap::new,
(map, hdr) -> map.put(hdr.key(), headerValue(hdr, null)),
HashMap::putAll));
result.size(sizeOf(meta, request.headers()));
if (meta.hasOffset()) {
result.offset(meta.offset());
}

schemaRelationship(key).ifPresent(result::keySchema);
schemaRelationship(value).ifPresent(result::valueSchema);
if (meta.hasTimestamp()) {
result.timestamp(Instant.ofEpochMilli(meta.timestamp()));
}

return result;
})
.thenAccept(promise::complete)
.exceptionally(exception -> {
promise.completeExceptionally(exception);
return null;
});
result.key(key.dataString(null));
result.value(value.dataString(null));
result.headers(Arrays.stream(request.headers().toArray())
.collect(
// Duplicate headers will be overwritten
LinkedHashMap::new,
(map, hdr) -> map.put(hdr.key(), headerValue(hdr, null)),
HashMap::putAll));
result.size(sizeOf(meta, request.headers()));

schemaRelationship(key).ifPresent(result::keySchema);
schemaRelationship(value).ifPresent(result::valueSchema);

return result;
}

void setSchemaMeta(JsonApiRelationship schemaRelationship, RecordData data) {
Expand All @@ -249,7 +229,7 @@ Optional<String> schemaMeta(JsonApiRelationship schemaRelationship, String key)
});
}

CompletionStage<String> topicNameForId(String topicId) {
String topicNameForId(String topicId) {
Uuid kafkaTopicId = Uuid.fromString(topicId);

return kafkaContext.admin()
Expand All @@ -261,7 +241,9 @@ CompletionStage<String> topicNameForId(String topicId) {
.filter(topic -> kafkaTopicId.equals(topic.topicId()))
.findFirst()
.map(TopicListing::name)
.orElseThrow(() -> noSuchTopic(topicId)));
.orElseThrow(() -> noSuchTopic(topicId)))
.toCompletableFuture()
.join();
}

void seekToTimestamp(Consumer<RecordData, RecordData> consumer, List<TopicPartition> assignments, Instant timestamp) {
Expand Down

0 comments on commit 85cd4ee

Please sign in to comment.