Skip to content

Commit

Permalink
Add error meta data for scenarios when key/value can not be decoded
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 17, 2024
1 parent 5b0a9ce commit 256cc61
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,10 @@ boolean validConfigs(Optional<Kafka> kafkaResource, KafkaClusterConfig clusterCo
}

if (createContext) {
clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey()));
log.warn(clientsMessage.toString().trim());
if (clientsMessage.length() > 0) {
clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey()));
log.warn(clientsMessage.toString().trim());
}
} else {
clientsMessage.insert(0, "Missing configuration detected for connection to cluster %s, no connection will be setup".formatted(clusterConfig.clusterKey()));
log.error(clientsMessage.toString().trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SchemasResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response describeConfigs(
public Response getSchemaContent(
@Parameter(description = "Schema identifier")
@PathParam("schemaId")
String schemaId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -201,8 +200,9 @@ public byte[] value() {
(map, hdr) -> map.put(hdr.key(), headerValue(hdr, null)),
HashMap::putAll));
result.size(sizeOf(meta, request.headers()));
maybeRelate(key).ifPresent(result::keySchema);
maybeRelate(value).ifPresent(result::valueSchema);

schemaRelationship(key).ifPresent(result::keySchema);
schemaRelationship(value).ifPresent(result::valueSchema);

return result;
})
Expand Down Expand Up @@ -321,13 +321,13 @@ KafkaRecord getItems(ConsumerRecord<RecordData, RecordData> rec, String topicId,
setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::headers);
setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::size);

maybeRelate(rec.key()).ifPresent(item::keySchema);
maybeRelate(rec.value()).ifPresent(item::valueSchema);
schemaRelationship(rec.key()).ifPresent(item::keySchema);
schemaRelationship(rec.value()).ifPresent(item::valueSchema);

return item;
}

Optional<JsonApiRelationship> maybeRelate(RecordData data) {
Optional<JsonApiRelationship> schemaRelationship(RecordData data) {
return Optional.ofNullable(data)
.map(d -> d.meta)
.filter(recordMeta -> recordMeta.containsKey("schema-id"))
Expand All @@ -343,9 +343,19 @@ Optional<JsonApiRelationship> maybeRelate(RecordData data) {
relationship.data(new Identifier("schemas", schemaId));
relationship.addLink("content", "/api/schemas/" + schemaId);

schemaError(data).ifPresent(error -> relationship.addMeta("errors", List.of(error)));

return relationship;
})
.filter(Objects::nonNull);
.or(() -> schemaError(data).map(error -> {
var relationship = new JsonApiRelationship();
relationship.addMeta("errors", List.of(error));
return relationship;
}));
}

Optional<com.github.streamshub.console.api.model.Error> schemaError(RecordData data) {
return Optional.ofNullable(data).map(RecordData::error);
}

<T> void setProperty(String fieldName, List<String> include, Supplier<T> source, java.util.function.Consumer<T> target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void write(RecordData data, org.apache.avro.io.Encoder out) throws IOExce
final Object datum = reader.read(null, jsonDecoder);
writer.write(datum, out);

/*
* Replace input data with the re-seralized record so response contains
* the data as it was sent to Kafka (but in JSON format). For example,
* unknown fields will have been dropped.
*/
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer);
writer.write(datum, jsonEncoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.kafka.common.header.Headers;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DynamicMessage;
Expand All @@ -29,7 +29,11 @@

public class MultiformatDeserializer extends AbstractKafkaDeserializer<Object, RecordData> implements ForceCloseable {

private static final SchemaLookupResult<Object> EMPTY_RESULT = SchemaLookupResult.builder().build();
private static final Logger LOG = Logger.getLogger(MultiformatDeserializer.class);

private static final SchemaLookupResult<Object> NO_SCHEMA_ID = SchemaLookupResult.builder().build();
private static final SchemaLookupResult<Object> RESOLVER_MISSING = SchemaLookupResult.builder().build();
private static final SchemaLookupResult<Object> LOOKUP_FAILURE = SchemaLookupResult.builder().build();

private final ObjectMapper objectMapper;
AvroDeserializer avroDeserializer;
Expand All @@ -56,7 +60,8 @@ static <S, D> SchemaResolver<S, D> newResolver(RegistryClient client) {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (getSchemaResolver() == null) {
// Do not attempt to configure anything if we will not be making remote calls to registry
super.key = isKey;
// Do not attempt to configure anything more if we will not be making remote calls to registry
return;
}

Expand Down Expand Up @@ -92,9 +97,6 @@ public SchemaParser<Object, RecordData> schemaParser() {
}

protected RecordData readData(Headers headers, SchemaLookupResult<Object> schemaResult, ByteBuffer buffer, int start, int length) {
ParsedSchema<Object> schema = Optional.ofNullable(schemaResult)
.map(s -> s.getParsedSchema())
.orElse(null);
Object parsedSchema = null;

if (schemaResult != null && schemaResult.getParsedSchema() != null) {
Expand All @@ -103,45 +105,84 @@ protected RecordData readData(Headers headers, SchemaLookupResult<Object> schema

RecordData result;

if (parsedSchema instanceof Schema avroSchema) {
try {
if (headers != null) {
result = avroDeserializer.readData(headers, cast(schema), buffer, start, length);
} else {
result = avroDeserializer.readData(cast(schema), buffer, start, length);
}
result.schema = schema;
result.meta.put("schema-type", ArtifactType.AVRO);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", avroSchema.getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result.meta.put("error", e.getMessage());
}
if (parsedSchema instanceof Schema) {
result = readAvroData(headers, schemaResult, buffer, start, length);
} else if (parsedSchema instanceof ProtobufSchema) {
try {
Message msg;
if (headers != null) {
msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length);
} else {
msg = protobufDeserializer.readData(cast(schema), buffer, start, length);
}
byte[] data = com.google.protobuf.util.JsonFormat.printer()
.omittingInsignificantWhitespace()
.print(msg)
.getBytes();
result = new RecordData(data, schema);
result.meta.put("schema-type", ArtifactType.PROTOBUF);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", msg.getDescriptorForType().getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result.meta.put("error", e.getMessage());
}
result = readProtobufData(headers, schemaResult, buffer, start, length);
} else {
byte[] bytes = new byte[length];
System.arraycopy(buffer.array(), start, bytes, 0, length);
result = new RecordData(bytes, null);
result = readRawData(schemaResult, buffer, start, length);
}

return result;
}

private RecordData readAvroData(Headers headers, SchemaLookupResult<Object> schemaResult, ByteBuffer buffer, int start, int length) {
ParsedSchema<Object> schema = schemaResult.getParsedSchema();
Schema avroSchema = (Schema) schema.getParsedSchema();
RecordData result;

try {
if (headers != null) {
result = avroDeserializer.readData(headers, cast(schema), buffer, start, length);
} else {
result = avroDeserializer.readData(cast(schema), buffer, start, length);
}
result.schema = schema;
result.meta.put("schema-type", ArtifactType.AVRO);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", avroSchema.getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Avro data");
}

return result;
}

private RecordData readProtobufData(Headers headers, SchemaLookupResult<Object> schemaResult, ByteBuffer buffer, int start, int length) {
ParsedSchema<Object> schema = schemaResult.getParsedSchema();
RecordData result;

try {
Message msg;
if (headers != null) {
msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length);
} else {
msg = protobufDeserializer.readData(cast(schema), buffer, start, length);
}
byte[] data = com.google.protobuf.util.JsonFormat.printer()
.omittingInsignificantWhitespace()
.print(msg)
.getBytes();
result = new RecordData(data, schema);
result.meta.put("schema-type", ArtifactType.PROTOBUF);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", msg.getDescriptorForType().getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Protobuf data");
}

return result;
}

private RecordData readRawData(SchemaLookupResult<Object> schemaResult, ByteBuffer buffer, int start, int length) {
byte[] bytes = new byte[length];
System.arraycopy(buffer.array(), start, bytes, 0, length);
RecordData result = new RecordData(bytes, null);

if (schemaResult == RESOLVER_MISSING) {
result.error = new com.github.streamshub.console.api.model.Error(
"Schema resolution error",
"%s encoded, but no schema registry is configured"
.formatted(isKey() ? "Key" : "Value"),
null);
} else if (schemaResult == LOOKUP_FAILURE) {
result.error = new com.github.streamshub.console.api.model.Error(
"Schema resolution error",
"Schema could not be retrieved from registry to decode %s"
.formatted(isKey() ? "Key" : "Value"),
null);
}

return result;
Expand Down Expand Up @@ -170,7 +211,7 @@ public RecordData deserialize(String topic, byte[] data) {
} else {
buffer = ByteBuffer.wrap(data);
// Empty schema
schema = EMPTY_RESULT;
schema = NO_SCHEMA_ID;
length = buffer.limit() - 1;
}

Expand Down Expand Up @@ -212,18 +253,33 @@ private RecordData readData(Headers headers, byte[] data, ArtifactReference arti
}

private SchemaLookupResult<Object> resolve(ArtifactReference artifactReference) {
if (!artifactReference.hasValue()) {
return NO_SCHEMA_ID;
}

var schemaResolver = getSchemaResolver();

if (schemaResolver == null) {
// Empty result
return EMPTY_RESULT;
return RESOLVER_MISSING;
}

try {
return getSchemaResolver().resolveSchemaByArtifactReference(artifactReference);
} catch (io.apicurio.registry.rest.client.exception.NotFoundException e) {
LOG.infof("Schema could not be resolved: %s", artifactReference);
return LOOKUP_FAILURE;
} catch (RuntimeException e) {
// Empty result
return EMPTY_RESULT;
if (LOG.isDebugEnabled()) {
/*
* Only log the stack trace at debug level. Schema resolution will be attempted
* for every message consumed and will lead to excessive logging in case of a
* problem.
*/
LOG.debugf(e, "Exception resolving schema reference: %s", artifactReference);
} else {
LOG.warnf("Exception resolving schema reference: %s ; %s", artifactReference, e.getMessage());
}
return LOOKUP_FAILURE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ static <S, D> SchemaResolver<S, D> newResolver(RegistryClient client) {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (getSchemaResolver() == null) {
// Do not attempt to configure anything if we will not be making remote calls to registry
key = isKey;
// Do not attempt to configure anything more if we will not be making remote calls to registry
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RecordData {
public final Map<String, String> meta = new LinkedHashMap<>(1);
byte[] data;
ParsedSchema<?> schema;
com.github.streamshub.console.api.model.Error error;

public RecordData(byte[] data, ParsedSchema<?> schema) {
super();
Expand All @@ -39,6 +40,10 @@ public ParsedSchema<?> schema() {
return schema;
}

public com.github.streamshub.console.api.model.Error error() {
return error;
}

public String dataString(Integer maxValueLength) {
return bytesToString(data, maxValueLength);
}
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ quarkus.swagger-ui.always-include=true
quarkus.swagger-ui.title=Console API

quarkus.log.category."org.apache.kafka".level=ERROR
# Apicurio cache logger is noisy for "not found" scenarios. We log the error
# in our own handler instead.
quarkus.log.category."io.apicurio.registry.resolver.ERCache".level=OFF

%build.quarkus.container-image.labels."org.opencontainers.image.version"=${quarkus.application.version}
%build.quarkus.container-image.labels."org.opencontainers.image.revision"=${git.revision}
Expand All @@ -54,6 +57,8 @@ quarkus.log.category."org.apache.kafka".level=ERROR
# this configuration will prevent the removal of those classes and make them
# eligible for access from a CDI `Instance`.
quarkus.arc.unremovable-types=com.github.streamshub.console.api.**
# Apicurio's Jackson customizer is distributed in a common jar by mistake
quarkus.arc.exclude-types=io.apicurio.registry.rest.JacksonDateTimeCustomizer

quarkus.index-dependency.strimzi-api.group-id=io.strimzi
quarkus.index-dependency.strimzi-api.artifact-id=api
Expand Down
2 changes: 1 addition & 1 deletion ui/api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function getHeaders(): Promise<Record<string, string>> {
}

export const ApiError = z.object({
meta: z.object({ type: z.string() }), // z.map(z.string(), z.string()),
meta: z.object({ type: z.string() }).optional(), // z.map(z.string(), z.string()),
id: z.string().optional(),
status: z.string().optional(),
code: z.string().optional(),
Expand Down
4 changes: 2 additions & 2 deletions ui/api/messages/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ export async function getTopicMessages(
} else {
return { messages: messages, ts: new Date() };
}
} catch {
} catch (e) {
log.error(
{ status: res.status, message: rawData, url },
{ error: e, status: res.status, message: rawData, url },
"Error fetching message",
);
if (res.status === 404) {
Expand Down
4 changes: 3 additions & 1 deletion ui/api/messages/schema.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { z } from "zod";
import { ApiError } from "@/api/api";

const RelatedSchema = z.object({
meta: z.object({
artifactType: z.string().optional(),
name: z.string().optional(),
errors: z.array(ApiError).optional(),
}).nullable().optional(),
links: z.object({
content: z.string(),
}).nullable().optional(),
data: z.object({
type: z.literal("schemas"),
id: z.string(),
}),
}).optional(),
});

export const MessageSchema = z.object({
Expand Down
Loading

0 comments on commit 256cc61

Please sign in to comment.