From 256cc616a96a9b52f27f8eb28395d2e34717ac5d Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 16 Oct 2024 15:54:26 -0400 Subject: [PATCH] Add error meta data for scenarios when key/value can not be decoded Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 6 +- .../console/api/SchemasResource.java | 2 +- .../console/api/service/RecordService.java | 24 ++- .../api/support/serdes/AvroDatumProvider.java | 5 + .../serdes/MultiformatDeserializer.java | 152 ++++++++++++------ .../support/serdes/MultiformatSerializer.java | 3 +- .../api/support/serdes/RecordData.java | 5 + api/src/main/resources/application.properties | 5 + ui/api/api.ts | 2 +- ui/api/messages/actions.ts | 4 +- ui/api/messages/schema.ts | 4 +- ui/components/MessagesTable/MessagesTable.tsx | 28 +++- 12 files changed, 172 insertions(+), 68 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index e4b05240e..5ae5bd2f9 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -424,8 +424,10 @@ boolean validConfigs(Optional kafkaResource, KafkaClusterConfig clusterCo } if (createContext) { - clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey())); - log.warn(clientsMessage.toString().trim()); + if (clientsMessage.length() > 0) { + clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey())); + log.warn(clientsMessage.toString().trim()); + } } else { clientsMessage.insert(0, "Missing configuration detected for connection to cluster %s, no connection will be setup".formatted(clusterConfig.clusterKey())); log.error(clientsMessage.toString().trim()); diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java index bd44fd98d..0bb4a9c3b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java @@ -48,7 +48,7 @@ public class SchemasResource { @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") - public Response describeConfigs( + public Response getSchemaContent( @Parameter(description = "Schema identifier") @PathParam("schemaId") String schemaId) { diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 0a2fcd670..09a2298f1 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -15,7 +15,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -201,8 +200,9 @@ public byte[] value() { (map, hdr) -> map.put(hdr.key(), headerValue(hdr, null)), HashMap::putAll)); result.size(sizeOf(meta, request.headers())); - maybeRelate(key).ifPresent(result::keySchema); - maybeRelate(value).ifPresent(result::valueSchema); + + schemaRelationship(key).ifPresent(result::keySchema); + schemaRelationship(value).ifPresent(result::valueSchema); return result; }) @@ -321,13 +321,13 @@ KafkaRecord getItems(ConsumerRecord rec, String topicId, setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::headers); setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::size); - maybeRelate(rec.key()).ifPresent(item::keySchema); - maybeRelate(rec.value()).ifPresent(item::valueSchema); + schemaRelationship(rec.key()).ifPresent(item::keySchema); + schemaRelationship(rec.value()).ifPresent(item::valueSchema); return item; } - Optional maybeRelate(RecordData data) { + Optional schemaRelationship(RecordData data) { return Optional.ofNullable(data) .map(d -> d.meta) .filter(recordMeta -> recordMeta.containsKey("schema-id")) @@ -343,9 +343,19 @@ Optional maybeRelate(RecordData data) { relationship.data(new Identifier("schemas", schemaId)); relationship.addLink("content", "/api/schemas/" + schemaId); + schemaError(data).ifPresent(error -> relationship.addMeta("errors", List.of(error))); + return relationship; }) - .filter(Objects::nonNull); + .or(() -> schemaError(data).map(error -> { + var relationship = new JsonApiRelationship(); + relationship.addMeta("errors", List.of(error)); + return relationship; + })); + } + + Optional schemaError(RecordData data) { + return Optional.ofNullable(data).map(RecordData::error); } void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java index f486ecb2d..b983ef7ab 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java @@ -32,6 +32,11 @@ public void write(RecordData data, org.apache.avro.io.Encoder out) throws IOExce final Object datum = reader.read(null, jsonDecoder); writer.write(datum, out); + /* + * Replace input data with the re-seralized record so response contains + * the data as it was sent to Kafka (but in JSON format). For example, + * unknown fields will have been dropped. + */ final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); writer.write(datum, jsonEncoder); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java index 05266c516..902b8bee3 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -3,11 +3,11 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.Set; import org.apache.avro.Schema; import org.apache.kafka.common.header.Headers; +import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.DynamicMessage; @@ -29,7 +29,11 @@ public class MultiformatDeserializer extends AbstractKafkaDeserializer implements ForceCloseable { - private static final SchemaLookupResult EMPTY_RESULT = SchemaLookupResult.builder().build(); + private static final Logger LOG = Logger.getLogger(MultiformatDeserializer.class); + + private static final SchemaLookupResult NO_SCHEMA_ID = SchemaLookupResult.builder().build(); + private static final SchemaLookupResult RESOLVER_MISSING = SchemaLookupResult.builder().build(); + private static final SchemaLookupResult LOOKUP_FAILURE = SchemaLookupResult.builder().build(); private final ObjectMapper objectMapper; AvroDeserializer avroDeserializer; @@ -56,7 +60,8 @@ static SchemaResolver newResolver(RegistryClient client) { @Override public void configure(Map configs, boolean isKey) { if (getSchemaResolver() == null) { - // Do not attempt to configure anything if we will not be making remote calls to registry + super.key = isKey; + // Do not attempt to configure anything more if we will not be making remote calls to registry return; } @@ -92,9 +97,6 @@ public SchemaParser schemaParser() { } protected RecordData readData(Headers headers, SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { - ParsedSchema schema = Optional.ofNullable(schemaResult) - .map(s -> s.getParsedSchema()) - .orElse(null); Object parsedSchema = null; if (schemaResult != null && schemaResult.getParsedSchema() != null) { @@ -103,45 +105,84 @@ protected RecordData readData(Headers headers, SchemaLookupResult schema RecordData result; - if (parsedSchema instanceof Schema avroSchema) { - try { - if (headers != null) { - result = avroDeserializer.readData(headers, cast(schema), buffer, start, length); - } else { - result = avroDeserializer.readData(cast(schema), buffer, start, length); - } - result.schema = schema; - result.meta.put("schema-type", ArtifactType.AVRO); - result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); - result.meta.put("schema-name", avroSchema.getFullName()); - } catch (Exception e) { - result = new RecordData(null, schema); - result.meta.put("error", e.getMessage()); - } + if (parsedSchema instanceof Schema) { + result = readAvroData(headers, schemaResult, buffer, start, length); } else if (parsedSchema instanceof ProtobufSchema) { - try { - Message msg; - if (headers != null) { - msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length); - } else { - msg = protobufDeserializer.readData(cast(schema), buffer, start, length); - } - byte[] data = com.google.protobuf.util.JsonFormat.printer() - .omittingInsignificantWhitespace() - .print(msg) - .getBytes(); - result = new RecordData(data, schema); - result.meta.put("schema-type", ArtifactType.PROTOBUF); - result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); - result.meta.put("schema-name", msg.getDescriptorForType().getFullName()); - } catch (Exception e) { - result = new RecordData(null, schema); - result.meta.put("error", e.getMessage()); - } + result = readProtobufData(headers, schemaResult, buffer, start, length); } else { - byte[] bytes = new byte[length]; - System.arraycopy(buffer.array(), start, bytes, 0, length); - result = new RecordData(bytes, null); + result = readRawData(schemaResult, buffer, start, length); + } + + return result; + } + + private RecordData readAvroData(Headers headers, SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { + ParsedSchema schema = schemaResult.getParsedSchema(); + Schema avroSchema = (Schema) schema.getParsedSchema(); + RecordData result; + + try { + if (headers != null) { + result = avroDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + result = avroDeserializer.readData(cast(schema), buffer, start, length); + } + result.schema = schema; + result.meta.put("schema-type", ArtifactType.AVRO); + result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); + result.meta.put("schema-name", avroSchema.getFullName()); + } catch (Exception e) { + result = new RecordData(null, schema); + result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Avro data"); + } + + return result; + } + + private RecordData readProtobufData(Headers headers, SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { + ParsedSchema schema = schemaResult.getParsedSchema(); + RecordData result; + + try { + Message msg; + if (headers != null) { + msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + msg = protobufDeserializer.readData(cast(schema), buffer, start, length); + } + byte[] data = com.google.protobuf.util.JsonFormat.printer() + .omittingInsignificantWhitespace() + .print(msg) + .getBytes(); + result = new RecordData(data, schema); + result.meta.put("schema-type", ArtifactType.PROTOBUF); + result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); + result.meta.put("schema-name", msg.getDescriptorForType().getFullName()); + } catch (Exception e) { + result = new RecordData(null, schema); + result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Protobuf data"); + } + + return result; + } + + private RecordData readRawData(SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { + byte[] bytes = new byte[length]; + System.arraycopy(buffer.array(), start, bytes, 0, length); + RecordData result = new RecordData(bytes, null); + + if (schemaResult == RESOLVER_MISSING) { + result.error = new com.github.streamshub.console.api.model.Error( + "Schema resolution error", + "%s encoded, but no schema registry is configured" + .formatted(isKey() ? "Key" : "Value"), + null); + } else if (schemaResult == LOOKUP_FAILURE) { + result.error = new com.github.streamshub.console.api.model.Error( + "Schema resolution error", + "Schema could not be retrieved from registry to decode %s" + .formatted(isKey() ? "Key" : "Value"), + null); } return result; @@ -170,7 +211,7 @@ public RecordData deserialize(String topic, byte[] data) { } else { buffer = ByteBuffer.wrap(data); // Empty schema - schema = EMPTY_RESULT; + schema = NO_SCHEMA_ID; length = buffer.limit() - 1; } @@ -212,18 +253,33 @@ private RecordData readData(Headers headers, byte[] data, ArtifactReference arti } private SchemaLookupResult resolve(ArtifactReference artifactReference) { + if (!artifactReference.hasValue()) { + return NO_SCHEMA_ID; + } + var schemaResolver = getSchemaResolver(); if (schemaResolver == null) { - // Empty result - return EMPTY_RESULT; + return RESOLVER_MISSING; } try { return getSchemaResolver().resolveSchemaByArtifactReference(artifactReference); + } catch (io.apicurio.registry.rest.client.exception.NotFoundException e) { + LOG.infof("Schema could not be resolved: %s", artifactReference); + return LOOKUP_FAILURE; } catch (RuntimeException e) { - // Empty result - return EMPTY_RESULT; + if (LOG.isDebugEnabled()) { + /* + * Only log the stack trace at debug level. Schema resolution will be attempted + * for every message consumed and will lead to excessive logging in case of a + * problem. + */ + LOG.debugf(e, "Exception resolving schema reference: %s", artifactReference); + } else { + LOG.warnf("Exception resolving schema reference: %s ; %s", artifactReference, e.getMessage()); + } + return LOOKUP_FAILURE; } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java index 4e29199bc..a44c651fc 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -67,7 +67,8 @@ static SchemaResolver newResolver(RegistryClient client) { @Override public void configure(Map configs, boolean isKey) { if (getSchemaResolver() == null) { - // Do not attempt to configure anything if we will not be making remote calls to registry + key = isKey; + // Do not attempt to configure anything more if we will not be making remote calls to registry return; } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java index 4f47f8c83..81f0e6ac8 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -18,6 +18,7 @@ public class RecordData { public final Map meta = new LinkedHashMap<>(1); byte[] data; ParsedSchema schema; + com.github.streamshub.console.api.model.Error error; public RecordData(byte[] data, ParsedSchema schema) { super(); @@ -39,6 +40,10 @@ public ParsedSchema schema() { return schema; } + public com.github.streamshub.console.api.model.Error error() { + return error; + } + public String dataString(Integer maxValueLength) { return bytesToString(data, maxValueLength); } diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index ab851f035..dfc2213b5 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -41,6 +41,9 @@ quarkus.swagger-ui.always-include=true quarkus.swagger-ui.title=Console API quarkus.log.category."org.apache.kafka".level=ERROR +# Apicurio cache logger is noisy for "not found" scenarios. We log the error +# in our own handler instead. +quarkus.log.category."io.apicurio.registry.resolver.ERCache".level=OFF %build.quarkus.container-image.labels."org.opencontainers.image.version"=${quarkus.application.version} %build.quarkus.container-image.labels."org.opencontainers.image.revision"=${git.revision} @@ -54,6 +57,8 @@ quarkus.log.category."org.apache.kafka".level=ERROR # this configuration will prevent the removal of those classes and make them # eligible for access from a CDI `Instance`. quarkus.arc.unremovable-types=com.github.streamshub.console.api.** +# Apicurio's Jackson customizer is distributed in a common jar by mistake +quarkus.arc.exclude-types=io.apicurio.registry.rest.JacksonDateTimeCustomizer quarkus.index-dependency.strimzi-api.group-id=io.strimzi quarkus.index-dependency.strimzi-api.artifact-id=api diff --git a/ui/api/api.ts b/ui/api/api.ts index 52558aa81..52377c1eb 100644 --- a/ui/api/api.ts +++ b/ui/api/api.ts @@ -14,7 +14,7 @@ export async function getHeaders(): Promise> { } export const ApiError = z.object({ - meta: z.object({ type: z.string() }), // z.map(z.string(), z.string()), + meta: z.object({ type: z.string() }).optional(), // z.map(z.string(), z.string()), id: z.string().optional(), status: z.string().optional(), code: z.string().optional(), diff --git a/ui/api/messages/actions.ts b/ui/api/messages/actions.ts index 3f866d924..8fdd78886 100644 --- a/ui/api/messages/actions.ts +++ b/ui/api/messages/actions.ts @@ -99,9 +99,9 @@ export async function getTopicMessages( } else { return { messages: messages, ts: new Date() }; } - } catch { + } catch (e) { log.error( - { status: res.status, message: rawData, url }, + { error: e, status: res.status, message: rawData, url }, "Error fetching message", ); if (res.status === 404) { diff --git a/ui/api/messages/schema.ts b/ui/api/messages/schema.ts index f003463e0..4c2170278 100644 --- a/ui/api/messages/schema.ts +++ b/ui/api/messages/schema.ts @@ -1,9 +1,11 @@ import { z } from "zod"; +import { ApiError } from "@/api/api"; const RelatedSchema = z.object({ meta: z.object({ artifactType: z.string().optional(), name: z.string().optional(), + errors: z.array(ApiError).optional(), }).nullable().optional(), links: z.object({ content: z.string(), @@ -11,7 +13,7 @@ const RelatedSchema = z.object({ data: z.object({ type: z.literal("schemas"), id: z.string(), - }), + }).optional(), }); export const MessageSchema = z.object({ diff --git a/ui/components/MessagesTable/MessagesTable.tsx b/ui/components/MessagesTable/MessagesTable.tsx index 706424f13..d47ccf101 100644 --- a/ui/components/MessagesTable/MessagesTable.tsx +++ b/ui/components/MessagesTable/MessagesTable.tsx @@ -12,7 +12,7 @@ import { TextContent, Tooltip, } from "@/libs/patternfly/react-core"; -import { HelpIcon } from "@/libs/patternfly/react-icons"; +import { ExclamationTriangleIcon, HelpIcon } from "@/libs/patternfly/react-icons"; import { BaseCellProps, InnerScrollContainer, @@ -272,10 +272,19 @@ export function MessagesTable({ onSelectMessage(row); }} /> - {row.relationships.keySchema?.meta?.artifactType && ( + {row.relationships.keySchema && ( - {row.relationships.keySchema?.meta?.artifactType} + {row.relationships.keySchema?.meta?.artifactType && ( + <> + {row.relationships.keySchema?.meta?.artifactType} + + )} + {row.relationships.keySchema?.meta?.errors && ( + <> + { row.relationships.keySchema?.meta?.errors[0].detail } + + )} )} @@ -317,10 +326,19 @@ export function MessagesTable({ onSelectMessage(row); }} /> - {row.relationships.valueSchema?.meta?.artifactType && ( + {row.relationships.valueSchema && ( - {row.relationships.valueSchema?.meta?.artifactType} + {row.relationships.valueSchema?.meta?.artifactType && ( + <> + {row.relationships.valueSchema?.meta?.artifactType} + + )} + {row.relationships.valueSchema?.meta?.errors && ( + <> + { row.relationships.valueSchema?.meta?.errors[0].detail } + + )} )}