Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint for specific schema version #37

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
package com.bakdata.schemaregistrymock;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.github.tomakehurst.wiremock.WireMockServer;
Expand All @@ -42,6 +43,7 @@
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand All @@ -54,11 +56,14 @@

/**
* <p>The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.</p>
* In particular,
* In particular, you can
* <ul>
* <li>you can register a schema</li>
* <li>retrieve a schema by id.</li>
* <li> register a schema</li>
* <li>retrieve a schema by id</li>
* <li>list and get schema versions of a subject</li>
* <li>list all subjects</li>
* <li>delete a schema</li>
* <li>retrieve the version of a schema</li>
* </ul>
*
* <p>If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at
Expand Down Expand Up @@ -96,17 +101,19 @@ public class SchemaRegistryMock {
private static final String ALL_SUBJECT_PATTERN = "/subjects";
private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions";
private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/";
private static final String SCHEMA_VERSION_PATTERN = "/subjects/[^/]+\\?deleted=true";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is deleted=true part of the pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deleted parameter is set to true when calling getVersion(subject, schema) and to false when calling getId(subject, schema).
Originally, I did not plan to include support for id requests and I wanted to make sure that there are no unexpected responses. Especially, since the parameter is not documented anywhere.
I looked into the behavior of a real schema registry and it responds as expected: When set to false, querying a deleted schema (not subject!) returns `{"error_code": 40403, "message": "Schema not found"}. Otherwise, it returns the complete schema information.
I'll remove the parameter from the pattern and add the logic for both cases in the transformer.

private static final int IDENTITY_MAP_CAPACITY = 1000;

private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler();
private final GetVersionHandler getVersionHandler = new GetVersionHandler();
private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler();
private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler();
private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler();
private final SchemaVersionHandler schemaVersionHandler = new SchemaVersionHandler();
private final WireMockServer mockSchemaRegistry = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort()
.extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler,
this.deleteSubjectHandler, this.allSubjectsHandler));
this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaVersionHandler));
private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

private static UrlPattern getSchemaPattern(final Integer id) {
Expand Down Expand Up @@ -139,6 +146,8 @@ public void start() {
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName())));
this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlMatching(SCHEMA_VERSION_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.schemaVersionHandler.getName())));
}

public void stop() {
Expand Down Expand Up @@ -336,4 +345,42 @@ public String getName() {
return AllSubjectsHandler.class.getSimpleName();
}
}

private class SchemaVersionHandler extends SubjectsHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
try {
final Schema schema = new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema());
final String subject = this.getSubject(request);
final int schemaVersion = SchemaRegistryMock.this.schemaRegistryClient.getVersion(subject, schema);
final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(subject, schema);

return ResponseDefinitionBuilder
.jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
subject, schemaVersion, schemaId, schema.toString()
));
} catch (final IOException | RestClientException e) {
final ErrorMessage error = new ErrorMessage(HTTP_BAD_REQUEST, "Cannot fetch schema version");
return ResponseDefinitionBuilder.jsonResponse(error, HTTP_BAD_REQUEST);
}
}

@Override
public String getName() {
return SchemaVersionHandler.class.getSimpleName();
}

@Override
protected String getSubject(final Request request) {
String subject = super.getSubject(request);
// remove request parameters
if (subject.contains("?")) {
subject = subject.split("\\?")[0];
}
return subject;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -97,7 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException {
final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();

final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient()
.getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
final String schemaString = metadata.getSchema();
final Schema retrievedSchema = new Schema.Parser().parse(schemaString);
Expand Down Expand Up @@ -211,32 +213,58 @@ void shouldNotDeleteUnknownSubject() {
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}


@Test
void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
final String topic = "test-topic";
final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema);

final List<Integer> versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value");
final SchemaRegistryClient schemaRegistryClient = this.schemaRegistry.getSchemaRegistryClient();
final List<Integer> versions = schemaRegistryClient.getAllVersions(topic + "-value");
assertThat(versions.size()).isOne();

final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0));
final SchemaMetadata metadata = schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0));
assertThat(metadata.getId()).isEqualTo(id);
assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
assertThat(schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.isNotNull();
this.schemaRegistry.deleteValueSchema(topic);
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"))
.isThrownBy(() -> schemaRegistryClient.getAllVersions(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)))
.isThrownBy(() -> schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0)))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
assertThatExceptionOfType(RestClientException.class)
.isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"))
.isThrownBy(() -> schemaRegistryClient.getLatestSchemaMetadata(topic + "-value"))
.satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND));
}

@Test
void shouldReturnValueVersion() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerValueSchema("test-topic", valueSchema);

final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema);
assertThat(version).isEqualTo(1);
}

@Test
void shouldReturnKeyVersion() throws IOException, RestClientException {
final Schema valueSchema = createSchema("value_schema");
this.schemaRegistry.registerKeySchema("test-topic", valueSchema);

final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", valueSchema);
assertThat(version).isEqualTo(1);
}

@Test
void shouldNotReturnVersionForNonExistingSchema() {
final Schema test = createSchema("test");
assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic", test))
.isInstanceOf(RestClientException.class)
.hasMessage("Cannot fetch schema version; error code: 400");
}

private static Schema createSchema(final String name) {
return Schema.createRecord(name, "no doc", "", false, Collections.emptyList());
}
Expand Down