From 0b2634bf7e53a744fa99228219ac9a7ba2d1e1b5 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Wed, 23 Jun 2021 13:49:37 -0700 Subject: [PATCH 01/11] auto-detect whether schema is encoded in pravega event Signed-off-by: Andrew Robertson --- build.gradle | 1 + .../presto/PravegaRecordSetProvider.java | 16 +- .../PravegaTableDescriptionSupplier.java | 40 +---- .../presto/decoder/AvroSerializer.java | 39 ++--- .../presto/decoder/CsvSerializer.java | 9 ++ .../presto/decoder/JsonSerializer.java | 11 +- .../presto/decoder/KVSerializer.java | 59 ++++++- .../presto/decoder/ProtobufSerializer.java | 35 ++--- .../presto/util/PravegaSchemaUtils.java | 7 - .../presto/decoder/KVSerializerTest.java | 145 ++++++++++++++++++ .../integration/EmbeddedSchemaRegistry.java | 100 ++++++++++++ 11 files changed, 352 insertions(+), 110 deletions(-) create mode 100644 src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java create mode 100644 src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java diff --git a/build.gradle b/build.gradle index bd33990..fb15f9f 100644 --- a/build.gradle +++ b/build.gradle @@ -67,6 +67,7 @@ dependencies { testImplementation "com.facebook.airlift:testing:${airliftTestingVersion}" testCompile (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) + testCompile "io.pravega:schemaregistry-server:${pravegaSchemaRegistryVersion}" compileOnly "io.airlift:slice:${airliftSliceVersion}" compileOnly "io.airlift:units:${airliftUnitsVersion}" diff --git a/src/main/java/io/pravega/connectors/presto/PravegaRecordSetProvider.java b/src/main/java/io/pravega/connectors/presto/PravegaRecordSetProvider.java index 8714cca..91a282a 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaRecordSetProvider.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaRecordSetProvider.java @@ -47,12 +47,9 @@ import static io.pravega.connectors.presto.PravegaHandleResolver.convertSplit; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO_INLINE; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.CSV; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.JSON; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.JSON_INLINE; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.PROTOBUF; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.PROTOBUF_INLINE; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; @@ -166,18 +163,12 @@ private KVSerializer serializer(PravegaObjectSchema schema, SerializerConfig { switch (schema.getFormat()) { case AVRO: - return new AvroSerializer(schema.getSchemaLocation().get()); - case AVRO_INLINE: - return new AvroSerializer(serializerConfig); + return new AvroSerializer(serializerConfig, schema.getSchemaLocation().get()); case PROTOBUF: - return new ProtobufSerializer(schema.getSchemaLocation().get()); - case PROTOBUF_INLINE: - return new ProtobufSerializer(serializerConfig); + return new ProtobufSerializer(serializerConfig, schema.getSchemaLocation().get()); case JSON: - return new JsonSerializer(); - case JSON_INLINE: return new JsonSerializer(serializerConfig); case CSV: @@ -192,15 +183,12 @@ private EventDecoder eventDecoder(PravegaObjectSchema schema, Set streamDescriptionCodec; - // "inline" means that event was written using schema registry wrapped client and schema encoding id - // is within the raw event data in pravega @Inject PravegaTableDescriptionSupplier(PravegaConnectorConfig pravegaConnectorConfig, JsonCodec streamDescriptionCodec) @@ -370,7 +363,7 @@ private Optional> fieldGroupsFromSchemaRegistry(fi SerializationFormat format = schemas.get(i).getSchemaInfo().getSerializationFormat(); fieldGroups.add(new PravegaStreamFieldGroup( - dataFormat(properties.getProperties(), format, kv, i), + normalizeDataFormat(format), Optional.of(colPrefix), dataSchema(format, schemas.get(i)), Optional.of(mapFieldsFromSchema(colPrefix, format, schemas.get(i))))); @@ -442,37 +435,12 @@ private static List listFiles(File dir) return ImmutableList.of(); } - private static String dataFormat(ImmutableMap groupProperties, - SerializationFormat format, - boolean kvTable, - int kvIdx) + private static String normalizeDataFormat(SerializationFormat format) { - /* - TODO: auto-detect https://github.com/pravega/pravega-sql/issues/58 - - (1) no schema registry. - (2) Register and evolve schemas in registry but do not use registry client while writing data - (3) Register schemas in the registry and use registry client to encode schema Id with payload - "inline" is for #3. for e.g. "avro" -> "avro-inline". PravegaRecordSetProvider is interested in this - - hopefully this can all go away (see linked issue 58 above) - - but for now the following is our convention - if "inline" exists in our properties, all data uses SR - else if it is a kv table key+value may be different. both, neither, or either may use SR - look for "inlinekey" / "inlinevalue" - */ - - String key = GROUP_PROPERTIES_INLINE_KEY; - - if (kvTable && !groupProperties.containsKey(key)) { - key = kvIdx == 0 ? GROUP_PROPERTIES_INLINE_KV_KEY : GROUP_PROPERTIES_INLINE_KV_VALUE; - } - - String finalFormat = format == SerializationFormat.Custom + // (CSV is custom) + return format == SerializationFormat.Custom ? format.getFullTypeName().toLowerCase(Locale.ENGLISH) : format.name().toLowerCase(Locale.ENGLISH); - return finalFormat + (groupProperties.containsKey(key) ? INLINE_SUFFIX : ""); } private static Optional dataSchema(SerializationFormat format, SchemaWithVersion schemaWithVersion) diff --git a/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java index 83f5346..a6a0578 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java @@ -16,11 +16,11 @@ package io.pravega.connectors.presto.decoder; +import com.facebook.airlift.log.Logger; import io.pravega.connectors.presto.util.ByteBufferInputStream; import com.google.protobuf.DynamicMessage; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -33,32 +33,29 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class AvroSerializer - extends KVSerializer -{ + extends KVSerializer { + private static final Logger log = Logger.get(AvroSerializer.class); + private static class GenericRecordSerializer - implements Serializer - { + implements Serializer { private final DatumReader datumReader; private final Schema schema; - GenericRecordSerializer(Schema schema) - { + GenericRecordSerializer(Schema schema) { this.datumReader = new GenericDatumReader(schema); this.schema = schema; } @Override - public ByteBuffer serialize(Object value) - { + public ByteBuffer serialize(Object value) { return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); } @Override - public GenericRecord deserialize(ByteBuffer serializedValue) - { + public GenericRecord deserialize(ByteBuffer serializedValue) { try (DataFileStream dataFileReader = - new DataFileStream<>(new ByteBufferInputStream(serializedValue), datumReader)) { + new DataFileStream<>(new ByteBufferInputStream(serializedValue), datumReader)) { // TODO: need to figure out how to auto-detect format of avro data // for e.g, is schema provided for every row? (this is how the normal presto avro decoder takes it) // i would think more typically case would be that schema defined once and thus schema not provided @@ -66,24 +63,20 @@ public GenericRecord deserialize(ByteBuffer serializedValue) // // for now we will do it the "presto way" return dataFileReader.next(); - } - catch (IOException e) { + } catch (IOException e) { throw new UncheckedIOException(e); } } } - private final Serializer delegate; - - public AvroSerializer(SerializerConfig config) - { - this.delegate = SerializerFactory.genericDeserializer(config); + public AvroSerializer(SerializerConfig config, String schema) { + super(config, schema); } - public AvroSerializer(String encodedSchema) + @Override + public Serializer serializerForSchema(String schema) { - Schema schema = (new Schema.Parser()).parse(encodedSchema); - this.delegate = new GenericRecordSerializer(schema); + return new GenericRecordSerializer((new Schema.Parser()).parse(schema)); } @Override @@ -95,7 +88,7 @@ public ByteBuffer serialize(GenericRecord value) @Override public GenericRecord deserialize(ByteBuffer serializedValue) { - return (GenericRecord) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/src/main/java/io/pravega/connectors/presto/decoder/CsvSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/CsvSerializer.java index 43cfab2..f6085cd 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/CsvSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/CsvSerializer.java @@ -15,6 +15,8 @@ */ package io.pravega.connectors.presto.decoder; +import io.pravega.client.stream.Serializer; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -23,6 +25,7 @@ public class CsvSerializer { public CsvSerializer() { + super(null, null); } @Override @@ -39,6 +42,12 @@ public String deserialize(ByteBuffer serializedValue) serializedValue.remaining()); } + @Override + public Serializer serializerForSchema(String schema) + { + throw new UnsupportedOperationException(); + } + @Override public DecodableEvent toEvent(Object obj) { diff --git a/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java index d16d1c8..4bf1ae6 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java @@ -59,16 +59,15 @@ public JsonNode deserialize(ByteBuffer serializedValue) } } - private final Serializer delegate; - public JsonSerializer(SerializerConfig config) { - this.delegate = SerializerFactory.genericDeserializer(config); + super(config, null); } - public JsonSerializer() + @Override + public Serializer serializerForSchema(String schema /* null for json */) { - this.delegate = new JsonTreeSerializer(); + return new JsonTreeSerializer(); } @Override @@ -80,7 +79,7 @@ public ByteBuffer serialize(JsonNode value) @Override public JsonNode deserialize(ByteBuffer serializedValue) { - return (JsonNode) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java index 6b7f161..a454c31 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java @@ -16,12 +16,67 @@ package io.pravega.connectors.presto.decoder; +import com.facebook.airlift.log.Logger; import io.pravega.client.stream.Serializer; +import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; +import io.pravega.schemaregistry.serializers.SerializerFactory; + +import java.nio.ByteBuffer; // deserialize using externally provided schema or using SR+SerializerConfig public abstract class KVSerializer - implements Serializer -{ + implements Serializer { + private static final Logger log = Logger.get(KVSerializer.class); + + protected Serializer delegate = null; + + private boolean schemaRegistryDeserializer; + + private final SerializerConfig serializerConfig; + + private final String schema; + + protected KVSerializer(SerializerConfig serializerConfig, String schema) { + this.serializerConfig = serializerConfig; + this.schema = schema; + } + + public boolean schemaRegistryDeserializer() + { + return schemaRegistryDeserializer; + } + + // format of data is unknown, whether schema is encoded inline by pravega schema registry or not + // try to deserialize without, and if it fails, use serializerConfig + protected void chooseDeserializer(ByteBuffer serializedValue) + { + Serializer serializer = serializerForSchema(schema); + serializedValue.mark(); + try { + if (serializer.deserialize(serializedValue) != null) { + delegate = serializer; + } + } + catch (RuntimeException e) { + log.info("could not deserialize, try SR deserializer"); + delegate = SerializerFactory.genericDeserializer(serializerConfig); + schemaRegistryDeserializer = true; + } + finally { + serializedValue.reset(); + } + } + + public T deserialize(ByteBuffer serializedValue) + { + if (delegate == null) { + chooseDeserializer(serializedValue); + } + return (T) delegate.deserialize(serializedValue); + } + + public abstract Serializer serializerForSchema(String schema); + // create an event that can be passed down to decoders public abstract DecodableEvent toEvent(Object obj); } diff --git a/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java index 17256ab..45ebe9c 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java @@ -21,7 +21,6 @@ import com.google.protobuf.DynamicMessage; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import org.apache.commons.lang3.tuple.Pair; import java.io.IOException; @@ -33,48 +32,40 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class ProtobufSerializer - extends KVSerializer -{ + extends KVSerializer { private static class DynamicMessageSerializer - implements Serializer - { + implements Serializer { private final Descriptors.Descriptor descriptor; - DynamicMessageSerializer(Descriptors.Descriptor descriptor) - { + DynamicMessageSerializer(Descriptors.Descriptor descriptor) { this.descriptor = descriptor; } @Override - public ByteBuffer serialize(Object value) - { + public ByteBuffer serialize(Object value) { return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); } @Override - public DynamicMessage deserialize(ByteBuffer serializedValue) - { + public DynamicMessage deserialize(ByteBuffer serializedValue) { try { return DynamicMessage.parseFrom(descriptor, new ByteBufferInputStream(serializedValue)); - } - catch (IOException e) { + } catch (IOException e) { throw new UncheckedIOException(e); } } } - private final Serializer delegate; - - public ProtobufSerializer(SerializerConfig config) - { - this.delegate = SerializerFactory.genericDeserializer(config); + public ProtobufSerializer(SerializerConfig config, String schema) { + super(config, schema); } - public ProtobufSerializer(String encodedSchema) + @Override + public Serializer serializerForSchema(String schema) { - Pair pair = decodeSchema(encodedSchema); - this.delegate = new DynamicMessageSerializer(descriptorFor(pair.getLeft(), pair.getRight())); + Pair pair = decodeSchema(schema); + return new DynamicMessageSerializer(descriptorFor(pair.getLeft(), pair.getRight())); } @Override @@ -86,7 +77,7 @@ public ByteBuffer serialize(DynamicMessage value) @Override public DynamicMessage deserialize(ByteBuffer serializedValue) { - return (DynamicMessage) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java b/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java index 1938c6b..612b39f 100644 --- a/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java +++ b/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java @@ -40,13 +40,6 @@ private PravegaSchemaUtils() private static final Logger log = Logger.get(PravegaSchemaUtils.class); - public static final String AVRO_INLINE = "avro-inline"; - public static final String PROTOBUF_INLINE = "protobuf-inline"; - public static final String JSON_INLINE = "json-inline"; - public static final String INLINE_SUFFIX = "-inline"; - public static final String GROUP_PROPERTIES_INLINE_KEY = "inline"; - public static final String GROUP_PROPERTIES_INLINE_KV_KEY = "inlinekey"; - public static final String GROUP_PROPERTIES_INLINE_KV_VALUE = "inlinevalue"; public static final String AVRO = "avro"; public static final String PROTOBUF = "protobuf"; public static final String JSON = "json"; diff --git a/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java b/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java new file mode 100644 index 0000000..6d0fe6f --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.decoder; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.pravega.client.stream.Serializer; +import io.pravega.connectors.presto.integration.EmbeddedSchemaRegistry; +import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.contract.data.Compatibility; +import io.pravega.schemaregistry.contract.data.GroupProperties; +import io.pravega.schemaregistry.contract.data.SerializationFormat; +import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema; +import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; +import io.pravega.schemaregistry.serializers.SerializerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.testng.Assert.*; + +@Test +public class KVSerializerTest { + private final EmbeddedSchemaRegistry schemaRegistry; + + public KVSerializerTest() { + this.schemaRegistry = new EmbeddedSchemaRegistry(); + this.schemaRegistry.start(); + } + + private static class Employee { + public int id; + public String first; + public String last; + + @JsonCreator + public Employee(@JsonProperty("id") int id, + @JsonProperty("first") String first, + @JsonProperty("last") String last) { + this.id = id; + this.first = first; + this.last = last; + } + + @JsonProperty + public int getId() { + return id; + } + + @JsonProperty + public String getFirst() { + return first; + } + + @JsonProperty + public String getLast() { + return last; + } + } + + @Test + public void testJson() throws IOException + { + Employee expected = new Employee(1, "John", "Smith"); + ByteBuffer serialized = ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(expected)); + + JsonSerializer jsonSerializer = new JsonSerializer(null /* no need to pass config - won't fall back */); + + JsonNode actual = jsonSerializer.deserialize(serialized); + + assertEquals(actual.get("id").asInt(), expected.id); + assertEquals(actual.get("first").asText(), expected.first); + assertEquals(actual.get("last").asText(), expected.last); + + assertFalse(jsonSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testJsonInline() { + SerializerConfig serializerConfig = jsonGroup("inline"); + + Serializer serializer = + SerializerFactory.jsonSerializer(serializerConfig, JSONSchema.of(Employee.class)); + + Employee expected = new Employee(1, "John", "Smith"); + ByteBuffer serialized = serializer.serialize(expected); + + JsonSerializer jsonSerializer = new JsonSerializer(serializerConfig); + + JsonNode actual = jsonSerializer.deserialize(serialized); + + assertEquals(actual.get("id").asInt(), expected.id); + assertEquals(actual.get("first").asText(), expected.first); + assertEquals(actual.get("last").asText(), expected.last); + + assertTrue(jsonSerializer.schemaRegistryDeserializer()); + } + + private SerializerConfig jsonGroup(String stream) { + String groupId = "json." + stream; + addGroup(groupId, SerializationFormat.Json); + return serializerConfig(groupId); + } + + private void addGroup(String groupId, SerializationFormat format) + { + schemaRegistry.client().addGroup(groupId, + new GroupProperties( + format, + Compatibility.allowAny(), + true)); + } + + private SerializerConfig serializerConfig(String group) { + return SerializerConfig.builder() + .groupId(group).registryConfig(SchemaRegistryClientConfig.builder() + .schemaRegistryUri(schemaRegistry.getURI()) + .build()) + .registerSchema(true) + .build(); + } + + @AfterClass(alwaysRun = true) + public void destroy() + { + schemaRegistry.close(); + } +} diff --git a/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java new file mode 100644 index 0000000..f951a21 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Note: this class is based on SetupUtils from pravega/flink-connectors + * (rev 9332ad67e520c03c7122de1d3b90c6cafbf97634) + * https://github.com/pravega/flink-connectors/blob/v0.9.0/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java + */ +package io.pravega.connectors.presto.integration; + +import java.io.Closeable; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import io.pravega.schemaregistry.client.SchemaRegistryClient; +import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.client.SchemaRegistryClientFactory; +import io.pravega.schemaregistry.server.rest.RestServer; +import io.pravega.schemaregistry.server.rest.ServiceConfig; +import io.pravega.schemaregistry.service.SchemaRegistryService; +import io.pravega.schemaregistry.storage.SchemaStore; +import io.pravega.schemaregistry.storage.SchemaStoreFactory; + +public class EmbeddedSchemaRegistry + implements Closeable { + private final static AtomicInteger servers = new AtomicInteger(); + + private final int port; + + private final AtomicBoolean started = new AtomicBoolean(); + + private final ScheduledExecutorService executor; + + private RestServer restServer; + + private SchemaRegistryClient client; + + public EmbeddedSchemaRegistry() { + port = 9100 + servers.getAndIncrement(); + executor = Executors.newScheduledThreadPool(10); + + SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor); + + restServer = new RestServer( + new SchemaRegistryService(schemaStore, executor), + ServiceConfig.builder().port(port).build() + ); + } + + public void start() { + if (started.compareAndSet(false, true)) { + restServer.startAsync(); + restServer.awaitRunning(); + } + } + + public void stop() { + if (started.compareAndSet(true, false)) { + restServer.stopAsync(); + restServer.awaitTerminated(); + executor.shutdownNow(); + } + } + + public int port() { + return port; + } + + public URI getURI() { + return URI.create("http://localhost:" + port); + } + + public SchemaRegistryClient client() + { + if (client == null) { + SchemaRegistryClientConfig config = SchemaRegistryClientConfig.builder() + .schemaRegistryUri(getURI()) + .build(); + client = SchemaRegistryClientFactory.withDefaultNamespace(config); + } + return client; + } + + @Override + public void close() + { + try { + stop(); + } + catch (Exception quiet) {} + } +} From 665542b87f49f9de24b171c0b6da3288bd37dd40 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Thu, 24 Jun 2021 07:34:57 -0700 Subject: [PATCH 02/11] add avro, protobuf tests Signed-off-by: Andrew Robertson --- .gitignore | 3 +- build.gradle | 9 + .../presto/decoder/AvroSerializer.java | 9 +- .../util/PravegaSerializationUtils.java | 22 +++ src/test/avro/sensor.avsc | 10 ++ .../presto/decoder/KVSerializerTest.java | 162 ++++++++++++++++-- .../integration/PravegaKeyValueLoader.java | 18 +- src/test/proto/Product.proto | 7 + 8 files changed, 202 insertions(+), 38 deletions(-) create mode 100644 src/test/avro/sensor.avsc create mode 100644 src/test/proto/Product.proto diff --git a/.gitignore b/.gitignore index 8d65bda..afdb7d6 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ benchmark_outputs .checkstyle .editorconfig node_modules - +schema_registry.log +schema_registry.*gz diff --git a/build.gradle b/build.gradle index fb15f9f..42bb95b 100644 --- a/build.gradle +++ b/build.gradle @@ -6,10 +6,19 @@ plugins { id 'java' id 'distribution' id 'maven' + id 'com.commercehub.gradle.plugin.avro' version "0.99.99" + id 'com.google.protobuf' version "0.8.16" } +apply plugin: 'java' apply from: "$rootDir/gradle/checkstyle.gradle" +apply plugin: 'com.google.protobuf' +sourceSets.main.java.srcDirs += 'build/generated/source/proto/test/java/' + +apply plugin: "com.commercehub.gradle.plugin.avro" +sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/' + repositories { mavenLocal() diff --git a/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java b/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java index a6a0578..d664b5d 100644 --- a/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java +++ b/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java @@ -16,10 +16,9 @@ package io.pravega.connectors.presto.decoder; -import com.facebook.airlift.log.Logger; import io.pravega.connectors.presto.util.ByteBufferInputStream; -import com.google.protobuf.DynamicMessage; import io.pravega.client.stream.Serializer; +import io.pravega.connectors.presto.util.PravegaSerializationUtils; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; @@ -34,7 +33,6 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class AvroSerializer extends KVSerializer { - private static final Logger log = Logger.get(AvroSerializer.class); private static class GenericRecordSerializer implements Serializer { @@ -48,8 +46,9 @@ private static class GenericRecordSerializer } @Override - public ByteBuffer serialize(Object value) { - return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); + public ByteBuffer serialize(Object object) + { + return PravegaSerializationUtils.serialize((GenericRecord) object); } @Override diff --git a/src/main/java/io/pravega/connectors/presto/util/PravegaSerializationUtils.java b/src/main/java/io/pravega/connectors/presto/util/PravegaSerializationUtils.java index db2bcad..5db79f5 100644 --- a/src/main/java/io/pravega/connectors/presto/util/PravegaSerializationUtils.java +++ b/src/main/java/io/pravega/connectors/presto/util/PravegaSerializationUtils.java @@ -16,6 +16,10 @@ package io.pravega.connectors.presto.util; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -23,6 +27,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; public class PravegaSerializationUtils { @@ -57,4 +62,21 @@ public static T deserialize(byte[] bytes, Class clazz) throw new IllegalArgumentException(e); } } + + public static ByteBuffer serialize(GenericRecord record) + { + try { + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter dataFileWriter = new DataFileWriter<>(writer); + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + dataFileWriter.create(record.getSchema(), os); + dataFileWriter.append(record); + dataFileWriter.close(); + return ByteBuffer.wrap(os.toByteArray()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/src/test/avro/sensor.avsc b/src/test/avro/sensor.avsc new file mode 100644 index 0000000..5a6db7f --- /dev/null +++ b/src/test/avro/sensor.avsc @@ -0,0 +1,10 @@ +{ + "namespace": "io.pravega.avro", + "type": "record", + "name": "Sensor", + "fields": [ + {"name": "sensorId", "type": "int"}, + {"name": "timestamp", "type": "long"}, + {"name": "rate", "type": "double"} + ] +} \ No newline at end of file diff --git a/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java b/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java index 6d0fe6f..bf72832 100644 --- a/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java +++ b/src/test/java/io/pravega/connectors/presto/decoder/KVSerializerTest.java @@ -19,27 +19,51 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.pravega.avro.Sensor; import io.pravega.client.stream.Serializer; import io.pravega.connectors.presto.integration.EmbeddedSchemaRegistry; +import io.pravega.connectors.presto.util.PravegaSerializationUtils; +import io.pravega.protobuf.ProductOuterClass; import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; import io.pravega.schemaregistry.contract.data.Compatibility; import io.pravega.schemaregistry.contract.data.GroupProperties; import io.pravega.schemaregistry.contract.data.SerializationFormat; +import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema; import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema; +import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; import io.pravega.schemaregistry.serializers.SerializerFactory; +import org.apache.avro.generic.GenericRecord; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import static io.pravega.connectors.presto.ProtobufCommon.encodeSchema; import static org.testng.Assert.*; +/** + * when an object is written to pravega using serializer from SchemaRegistry, encoding info + * will be added to the raw event data so the event can be deserialized automatically later + * this is referred to in the connector code as "inline" + * + * when reading data in the connector, we don't know how event was serialized + * so first try with the actual avro, protobuf, or json schema + * if that fails fallback to trying with SchemaRegistry SerializerConfig + */ @Test public class KVSerializerTest { private final EmbeddedSchemaRegistry schemaRegistry; + private final Random random = new Random(); + public KVSerializerTest() { this.schemaRegistry = new EmbeddedSchemaRegistry(); this.schemaRegistry.start(); @@ -76,12 +100,13 @@ public String getLast() { } @Test - public void testJson() throws IOException - { + public void testJson() throws IOException { Employee expected = new Employee(1, "John", "Smith"); + ByteBuffer serialized = ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(expected)); - JsonSerializer jsonSerializer = new JsonSerializer(null /* no need to pass config - won't fall back */); + // use null SerializerConfig as we are not expecting initial deserialize to fail + JsonSerializer jsonSerializer = new JsonSerializer(null); JsonNode actual = jsonSerializer.deserialize(serialized); @@ -93,18 +118,19 @@ public void testJson() throws IOException } @Test - public void testJsonInline() { - SerializerConfig serializerConfig = jsonGroup("inline"); + public void testJsonInline() + { + Employee expected = new Employee(1, "John", "Smith"); - Serializer serializer = + SerializerConfig serializerConfig = jsonGroup("inline"); + Serializer schemaRegistrySerializer = SerializerFactory.jsonSerializer(serializerConfig, JSONSchema.of(Employee.class)); - - Employee expected = new Employee(1, "John", "Smith"); - ByteBuffer serialized = serializer.serialize(expected); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(expected); JsonSerializer jsonSerializer = new JsonSerializer(serializerConfig); - JsonNode actual = jsonSerializer.deserialize(serialized); + JsonNode actual = jsonSerializer.deserialize(serializedValue); assertEquals(actual.get("id").asInt(), expected.id); assertEquals(actual.get("first").asText(), expected.first); @@ -113,7 +139,106 @@ public void testJsonInline() { assertTrue(jsonSerializer.schemaRegistryDeserializer()); } - private SerializerConfig jsonGroup(String stream) { + @Test + public void testAvro() + { + Sensor sensor = new Sensor(); + sensor.setSensorId(random.nextInt()); + sensor.setTimestamp(System.currentTimeMillis()); + sensor.setRate(random.nextDouble()); + + ByteBuffer serializedValue = PravegaSerializationUtils.serialize((GenericRecord) sensor); + + // use null SerializerConfig as we are expecting parse with given schema to succeed + AvroSerializer avroSerializer = new AvroSerializer(null, sensor.getSchema().toString()); + GenericRecord actual = avroSerializer.deserialize(serializedValue); + + assertEquals(actual.get("sensorId"), sensor.getSensorId()); + assertEquals(actual.get("timestamp"), sensor.getTimestamp()); + assertEquals(actual.get("rate"), sensor.getRate()); + + assertFalse(avroSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testAvroInline() + { + Sensor sensor = new Sensor(); + sensor.setSensorId(random.nextInt()); + sensor.setTimestamp(System.currentTimeMillis()); + sensor.setRate(random.nextDouble()); + + SerializerConfig serializerConfig = avroGroup("inline"); + Serializer schemaRegistrySerializer = + SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(Sensor.class)); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(sensor); + + AvroSerializer avroSerializer = new AvroSerializer(serializerConfig, sensor.getSchema().toString()); + GenericRecord actual = avroSerializer.deserialize(serializedValue); + + assertEquals(actual.get("sensorId"), sensor.getSensorId()); + assertEquals(actual.get("timestamp"), sensor.getTimestamp()); + assertEquals(actual.get("rate"), sensor.getRate()); + + assertTrue(avroSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testProtobufInline() + { + ProductOuterClass.Product product = ProductOuterClass.Product.newBuilder() + .setId(random.nextInt()) + .setName(UUID.randomUUID().toString()) + .build(); + + SerializerConfig serializerConfig = protobufGroup("inline"); + Serializer schemaRegistrySerializer = + SerializerFactory.protobufSerializer(serializerConfig, ProtobufSchema.of(ProductOuterClass.Product.class)); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(product); + + // get file descriptor, which is needed for initial DynamicMessage#parseFrom + String schema = encodeSchema(schemaRegistry.client().getSchemas("protobuf.inline").get(0)); + + ProtobufSerializer protobufSerializer = new ProtobufSerializer(serializerConfig, schema); + DynamicMessage dynamicMessage = protobufSerializer.deserialize(serializedValue); + + int id = -1; + String name = null; + for (Map.Entry entry : dynamicMessage.getAllFields().entrySet()) { + if (entry.getKey().getJsonName().equals("id")) { + id = (int) entry.getValue(); + } + else if (entry.getKey().getJsonName().equals("name")) { + name = (String) entry.getValue(); + } + } + assertNotEquals(id, -1); + assertNotNull(name); + + assertEquals(id, product.getId()); + assertEquals(name, product.getName()); + + assertTrue(protobufSerializer.schemaRegistryDeserializer()); + } + + private SerializerConfig avroGroup(String stream) + { + String groupId = "avro." + stream; + addGroup(groupId, SerializationFormat.Avro); + return serializerConfig(groupId); + } + + private SerializerConfig protobufGroup(String stream) + { + String groupId = "protobuf." + stream; + addGroup(groupId, SerializationFormat.Protobuf); + return serializerConfig(groupId); + } + + private SerializerConfig jsonGroup(String stream) + { String groupId = "json." + stream; addGroup(groupId, SerializationFormat.Json); return serializerConfig(groupId); @@ -121,11 +246,16 @@ private SerializerConfig jsonGroup(String stream) { private void addGroup(String groupId, SerializationFormat format) { - schemaRegistry.client().addGroup(groupId, - new GroupProperties( - format, - Compatibility.allowAny(), - true)); + try { + schemaRegistry.client().getGroupProperties(groupId); + } + catch (RegistryExceptions.ResourceNotFoundException e) { + schemaRegistry.client().addGroup(groupId, + new GroupProperties( + format, + Compatibility.allowAny(), + true)); + } } private SerializerConfig serializerConfig(String group) { diff --git a/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueLoader.java b/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueLoader.java index 53b2f75..d7d0821 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueLoader.java +++ b/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueLoader.java @@ -24,15 +24,13 @@ import io.pravega.client.tables.KeyValueTableClientConfiguration; import io.pravega.client.tables.KeyValueTableConfiguration; import io.pravega.connectors.presto.util.ByteBufferInputStream; +import io.pravega.connectors.presto.util.PravegaSerializationUtils; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; -import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; @@ -54,19 +52,7 @@ public AvroSerializer(Schema schema) @Override public ByteBuffer serialize(GenericRecord record) { - try { - GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); - DataFileWriter dataFileWriter = new DataFileWriter<>(writer); - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - dataFileWriter.create(record.getSchema(), os); - dataFileWriter.append(record); - dataFileWriter.close(); - return ByteBuffer.wrap(os.toByteArray()); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + return PravegaSerializationUtils.serialize(record); } @Override diff --git a/src/test/proto/Product.proto b/src/test/proto/Product.proto new file mode 100644 index 0000000..26ea289 --- /dev/null +++ b/src/test/proto/Product.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package io.pravega.protobuf; + +message Product { + int32 id = 1; + string name = 2; +} \ No newline at end of file From 60759a9db574dadd391f798a2205b408c09efe52 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Thu, 24 Jun 2021 07:35:52 -0700 Subject: [PATCH 03/11] fix gitignore Signed-off-by: Andrew Robertson --- .gitignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index afdb7d6..36e78c4 100644 --- a/.gitignore +++ b/.gitignore @@ -25,5 +25,5 @@ benchmark_outputs .checkstyle .editorconfig node_modules -schema_registry.log -schema_registry.*gz +schema-registry.log +schema_registry_*.gz From 419386152080fbe6bf3921987c2fcefc910d7f94 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Thu, 24 Jun 2021 07:50:43 -0700 Subject: [PATCH 04/11] install protoc in github actions Signed-off-by: Andrew Robertson --- .github/workflows/pravega-build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pravega-build.yml b/.github/workflows/pravega-build.yml index 654703b..74599e5 100644 --- a/.github/workflows/pravega-build.yml +++ b/.github/workflows/pravega-build.yml @@ -29,6 +29,8 @@ jobs: uses: actions/setup-java@v1 with: java-version: 11 + - name: Install Protoc + uses: arduino/setup-protoc@v1 - name: Build Output Cache uses: actions/cache@v2.1.0 with: From b944cadc151d7339788934dc86a13580643bb1ce Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Thu, 24 Jun 2021 08:05:18 -0700 Subject: [PATCH 05/11] fix code attribution Signed-off-by: Andrew Robertson --- .../connectors/presto/integration/EmbeddedSchemaRegistry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java index f951a21..28ccf8c 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java +++ b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java @@ -9,7 +9,7 @@ * * Note: this class is based on SetupUtils from pravega/flink-connectors * (rev 9332ad67e520c03c7122de1d3b90c6cafbf97634) - * https://github.com/pravega/flink-connectors/blob/v0.9.0/src/test/java/io/pravega/connectors/flink/utils/SetupUtils.java + * https://github.com/pravega/flink-connectors/blob/v0.9.0/src/test/java/io/pravega/connectors/flink/utils/SchemaRegistryUtils.java */ package io.pravega.connectors.presto.integration; From 837a914c64d6b05331a7e006f211d2da71550373 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Thu, 24 Jun 2021 08:06:03 -0700 Subject: [PATCH 06/11] fix code attribution Signed-off-by: Andrew Robertson --- .../connectors/presto/integration/EmbeddedSchemaRegistry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java index 28ccf8c..134216f 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java +++ b/src/test/java/io/pravega/connectors/presto/integration/EmbeddedSchemaRegistry.java @@ -7,7 +7,7 @@ * * http://www.apache.org/licenses/LICENSE-2.0 * - * Note: this class is based on SetupUtils from pravega/flink-connectors + * Note: this class is based on SchemaRegistryUtils from pravega/flink-connectors * (rev 9332ad67e520c03c7122de1d3b90c6cafbf97634) * https://github.com/pravega/flink-connectors/blob/v0.9.0/src/test/java/io/pravega/connectors/flink/utils/SchemaRegistryUtils.java */ From 8a3128843571bdd9cc25fb93f6de9eb28c222e18 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Tue, 29 Jun 2021 14:01:32 -0700 Subject: [PATCH 07/11] compile protobuf used for unit test. avoid using gpl3 licensed github action arduino/setup-protoc that was used for github action Signed-off-by: Andrew Robertson --- .github/workflows/pravega-build.yml | 4 +- build.gradle | 7 +- .../pravega/protobuf/ProductOuterClass.java | 669 ++++++++++++++++++ src/test/proto/Product.proto | 2 + 4 files changed, 676 insertions(+), 6 deletions(-) create mode 100644 src/test/java/io/pravega/protobuf/ProductOuterClass.java diff --git a/.github/workflows/pravega-build.yml b/.github/workflows/pravega-build.yml index 74599e5..205eb8f 100644 --- a/.github/workflows/pravega-build.yml +++ b/.github/workflows/pravega-build.yml @@ -29,8 +29,6 @@ jobs: uses: actions/setup-java@v1 with: java-version: 11 - - name: Install Protoc - uses: arduino/setup-protoc@v1 - name: Build Output Cache uses: actions/cache@v2.1.0 with: @@ -58,4 +56,4 @@ jobs: runs-on: ubuntu-latest steps: - name: Check Build Status - run: echo build, unit and integration tests successful. \ No newline at end of file + run: echo build, unit and integration tests successful. diff --git a/build.gradle b/build.gradle index 42bb95b..cc35aff 100644 --- a/build.gradle +++ b/build.gradle @@ -7,14 +7,15 @@ plugins { id 'distribution' id 'maven' id 'com.commercehub.gradle.plugin.avro' version "0.99.99" - id 'com.google.protobuf' version "0.8.16" + //id 'com.google.protobuf' version "0.8.16" } apply plugin: 'java' apply from: "$rootDir/gradle/checkstyle.gradle" -apply plugin: 'com.google.protobuf' -sourceSets.main.java.srcDirs += 'build/generated/source/proto/test/java/' +// protobufs needed for test are already compiled +//apply plugin: 'com.google.protobuf' +//sourceSets.main.java.srcDirs += 'build/generated/source/proto/test/java/' apply plugin: "com.commercehub.gradle.plugin.avro" sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/' diff --git a/src/test/java/io/pravega/protobuf/ProductOuterClass.java b/src/test/java/io/pravega/protobuf/ProductOuterClass.java new file mode 100644 index 0000000..fea2ce6 --- /dev/null +++ b/src/test/java/io/pravega/protobuf/ProductOuterClass.java @@ -0,0 +1,669 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: Product.proto + +package io.pravega.protobuf; + +public final class ProductOuterClass { + private ProductOuterClass() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + public interface ProductOrBuilder extends + // @@protoc_insertion_point(interface_extends:io.pravega.protobuf.Product) + com.google.protobuf.MessageOrBuilder { + + /** + * int32 id = 1; + */ + int getId(); + + /** + * string name = 2; + */ + java.lang.String getName(); + /** + * string name = 2; + */ + com.google.protobuf.ByteString + getNameBytes(); + } + /** + * Protobuf type {@code io.pravega.protobuf.Product} + */ + public static final class Product extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:io.pravega.protobuf.Product) + ProductOrBuilder { + private static final long serialVersionUID = 0L; + // Use Product.newBuilder() to construct. + private Product(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private Product() { + id_ = 0; + name_ = ""; + } + + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Product( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + + id_ = input.readInt32(); + break; + } + case 18: { + java.lang.String s = input.readStringRequireUtf8(); + + name_ = s; + break; + } + default: { + if (!parseUnknownFieldProto3( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return io.pravega.protobuf.ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return io.pravega.protobuf.ProductOuterClass.internal_static_io_pravega_protobuf_Product_fieldAccessorTable + .ensureFieldAccessorsInitialized( + io.pravega.protobuf.ProductOuterClass.Product.class, io.pravega.protobuf.ProductOuterClass.Product.Builder.class); + } + + public static final int ID_FIELD_NUMBER = 1; + private int id_; + /** + * int32 id = 1; + */ + public int getId() { + return id_; + } + + public static final int NAME_FIELD_NUMBER = 2; + private volatile java.lang.Object name_; + /** + * string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } + } + /** + * string name = 2; + */ + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (id_ != 0) { + output.writeInt32(1, id_); + } + if (!getNameBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 2, name_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (id_ != 0) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, id_); + } + if (!getNameBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, name_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof io.pravega.protobuf.ProductOuterClass.Product)) { + return super.equals(obj); + } + io.pravega.protobuf.ProductOuterClass.Product other = (io.pravega.protobuf.ProductOuterClass.Product) obj; + + boolean result = true; + result = result && (getId() + == other.getId()); + result = result && getName() + .equals(other.getName()); + result = result && unknownFields.equals(other.unknownFields); + return result; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + getId(); + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getName().hashCode(); + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static io.pravega.protobuf.ProductOuterClass.Product parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(io.pravega.protobuf.ProductOuterClass.Product prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code io.pravega.protobuf.Product} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:io.pravega.protobuf.Product) + io.pravega.protobuf.ProductOuterClass.ProductOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return io.pravega.protobuf.ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return io.pravega.protobuf.ProductOuterClass.internal_static_io_pravega_protobuf_Product_fieldAccessorTable + .ensureFieldAccessorsInitialized( + io.pravega.protobuf.ProductOuterClass.Product.class, io.pravega.protobuf.ProductOuterClass.Product.Builder.class); + } + + // Construct using io.pravega.protobuf.ProductOuterClass.Product.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + id_ = 0; + + name_ = ""; + + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return io.pravega.protobuf.ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @java.lang.Override + public io.pravega.protobuf.ProductOuterClass.Product getDefaultInstanceForType() { + return io.pravega.protobuf.ProductOuterClass.Product.getDefaultInstance(); + } + + @java.lang.Override + public io.pravega.protobuf.ProductOuterClass.Product build() { + io.pravega.protobuf.ProductOuterClass.Product result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public io.pravega.protobuf.ProductOuterClass.Product buildPartial() { + io.pravega.protobuf.ProductOuterClass.Product result = new io.pravega.protobuf.ProductOuterClass.Product(this); + result.id_ = id_; + result.name_ = name_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return (Builder) super.clone(); + } + @java.lang.Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return (Builder) super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return (Builder) super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return (Builder) super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return (Builder) super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return (Builder) super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof io.pravega.protobuf.ProductOuterClass.Product) { + return mergeFrom((io.pravega.protobuf.ProductOuterClass.Product)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(io.pravega.protobuf.ProductOuterClass.Product other) { + if (other == io.pravega.protobuf.ProductOuterClass.Product.getDefaultInstance()) return this; + if (other.getId() != 0) { + setId(other.getId()); + } + if (!other.getName().isEmpty()) { + name_ = other.name_; + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + io.pravega.protobuf.ProductOuterClass.Product parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (io.pravega.protobuf.ProductOuterClass.Product) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + + private int id_ ; + /** + * int32 id = 1; + */ + public int getId() { + return id_; + } + /** + * int32 id = 1; + */ + public Builder setId(int value) { + + id_ = value; + onChanged(); + return this; + } + /** + * int32 id = 1; + */ + public Builder clearId() { + + id_ = 0; + onChanged(); + return this; + } + + private java.lang.Object name_ = ""; + /** + * string name = 2; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * string name = 2; + */ + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * string name = 2; + */ + public Builder setName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + name_ = value; + onChanged(); + return this; + } + /** + * string name = 2; + */ + public Builder clearName() { + + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + /** + * string name = 2; + */ + public Builder setNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + name_ = value; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFieldsProto3(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:io.pravega.protobuf.Product) + } + + // @@protoc_insertion_point(class_scope:io.pravega.protobuf.Product) + private static final io.pravega.protobuf.ProductOuterClass.Product DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new io.pravega.protobuf.ProductOuterClass.Product(); + } + + public static io.pravega.protobuf.ProductOuterClass.Product getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @java.lang.Override + public Product parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Product(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public io.pravega.protobuf.ProductOuterClass.Product getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_io_pravega_protobuf_Product_descriptor; + private static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_io_pravega_protobuf_Product_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\rProduct.proto\022\023io.pravega.protobuf\"#\n\007" + + "Product\022\n\n\002id\030\001 \001(\005\022\014\n\004name\030\002 \001(\tb\006proto" + + "3" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + internal_static_io_pravega_protobuf_Product_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_io_pravega_protobuf_Product_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_io_pravega_protobuf_Product_descriptor, + new java.lang.String[] { "Id", "Name", }); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/src/test/proto/Product.proto b/src/test/proto/Product.proto index 26ea289..65ded65 100644 --- a/src/test/proto/Product.proto +++ b/src/test/proto/Product.proto @@ -1,3 +1,5 @@ +// compiled and committed to repo, ./src/test/java/io/pravega/protobuf/ProductOuterClass.java + syntax = "proto3"; package io.pravega.protobuf; From 9bdda62012e41f8d1dd32e10004d4d2427b9e8f8 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Mon, 27 Sep 2021 08:11:35 -0700 Subject: [PATCH 08/11] 1. later version of avro plugin 2. minor: restore formatting Signed-off-by: Andrew Robertson --- prestodb/build.gradle | 10 +++------ .../presto/decoder/AvroSerializer.java | 12 +++++++---- .../presto/decoder/ProtobufSerializer.java | 21 ++++++++++++------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/prestodb/build.gradle b/prestodb/build.gradle index cc35aff..76d5056 100644 --- a/prestodb/build.gradle +++ b/prestodb/build.gradle @@ -6,7 +6,7 @@ plugins { id 'java' id 'distribution' id 'maven' - id 'com.commercehub.gradle.plugin.avro' version "0.99.99" + id 'com.github.davidmc24.gradle.plugin.avro-base' version "1.0.0" //id 'com.google.protobuf' version "0.8.16" } @@ -17,16 +17,12 @@ apply from: "$rootDir/gradle/checkstyle.gradle" //apply plugin: 'com.google.protobuf' //sourceSets.main.java.srcDirs += 'build/generated/source/proto/test/java/' -apply plugin: "com.commercehub.gradle.plugin.avro" +apply plugin: "com.github.davidmc24.gradle.plugin.avro" sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/' repositories { mavenLocal() - maven { - url = uri('https://oss.jfrog.org/jfrog-dependencies') - } - maven { url = uri('https://jitpack.io') } @@ -77,7 +73,7 @@ dependencies { testImplementation "com.facebook.airlift:testing:${airliftTestingVersion}" testCompile (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) - testCompile "io.pravega:schemaregistry-server:${pravegaSchemaRegistryVersion}" + testImplementation "io.pravega:schemaregistry-server:${pravegaSchemaRegistryVersion}" compileOnly "io.airlift:slice:${airliftSliceVersion}" compileOnly "io.airlift:units:${airliftUnitsVersion}" diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java index d664b5d..ebbe65f 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/AvroSerializer.java @@ -32,7 +32,8 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class AvroSerializer - extends KVSerializer { + extends KVSerializer +{ private static class GenericRecordSerializer implements Serializer { @@ -40,7 +41,8 @@ private static class GenericRecordSerializer private final Schema schema; - GenericRecordSerializer(Schema schema) { + GenericRecordSerializer(Schema schema) + { this.datumReader = new GenericDatumReader(schema); this.schema = schema; } @@ -52,7 +54,8 @@ public ByteBuffer serialize(Object object) } @Override - public GenericRecord deserialize(ByteBuffer serializedValue) { + public GenericRecord deserialize(ByteBuffer serializedValue) + { try (DataFileStream dataFileReader = new DataFileStream<>(new ByteBufferInputStream(serializedValue), datumReader)) { // TODO: need to figure out how to auto-detect format of avro data @@ -62,7 +65,8 @@ public GenericRecord deserialize(ByteBuffer serializedValue) { // // for now we will do it the "presto way" return dataFileReader.next(); - } catch (IOException e) { + } + catch (IOException e) { throw new UncheckedIOException(e); } } diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java index 45ebe9c..3554dc1 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/ProtobufSerializer.java @@ -32,32 +32,39 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class ProtobufSerializer - extends KVSerializer { + extends KVSerializer +{ private static class DynamicMessageSerializer - implements Serializer { + implements Serializer + { private final Descriptors.Descriptor descriptor; - DynamicMessageSerializer(Descriptors.Descriptor descriptor) { + DynamicMessageSerializer(Descriptors.Descriptor descriptor) + { this.descriptor = descriptor; } @Override - public ByteBuffer serialize(Object value) { + public ByteBuffer serialize(Object value) + { return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); } @Override - public DynamicMessage deserialize(ByteBuffer serializedValue) { + public DynamicMessage deserialize(ByteBuffer serializedValue) + { try { return DynamicMessage.parseFrom(descriptor, new ByteBufferInputStream(serializedValue)); - } catch (IOException e) { + } + catch (IOException e) { throw new UncheckedIOException(e); } } } - public ProtobufSerializer(SerializerConfig config, String schema) { + public ProtobufSerializer(SerializerConfig config, String schema) + { super(config, schema); } From 4f41971e15f0559a20f9ca37cf3244c4d72033fd Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Mon, 27 Sep 2021 10:27:07 -0700 Subject: [PATCH 09/11] changes to trino Signed-off-by: Andrew Robertson --- .gitignore | 1 + prestodb/build.gradle | 5 - trino/build.gradle | 6 + .../pravega/PravegaRecordSetProvider.java | 16 +- .../PravegaTableDescriptionSupplier.java | 35 +- .../pravega/decoder/AvroSerializer.java | 21 +- .../plugin/pravega/decoder/CsvSerializer.java | 9 + .../pravega/decoder/JsonSerializer.java | 12 +- .../plugin/pravega/decoder/KVSerializer.java | 56 ++ .../pravega/decoder/ProtobufSerializer.java | 35 +- .../pravega/util/PravegaSchemaUtils.java | 7 - .../util/PravegaSerializationUtils.java | 22 + trino/src/test/avro/sensor.avsc | 10 + .../pravega/decoder/KVSerializerTest.java | 276 ++++++++ .../integration/EmbeddedSchemaRegistry.java | 100 +++ .../integration/PravegaKeyValueLoader.java | 18 +- .../pravega/protobuf/ProductOuterClass.java | 669 ++++++++++++++++++ trino/src/test/proto/Product.proto | 9 + 18 files changed, 1193 insertions(+), 114 deletions(-) create mode 100644 trino/src/test/avro/sensor.avsc create mode 100644 trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java create mode 100644 trino/src/test/java/io/trino/plugin/pravega/integration/EmbeddedSchemaRegistry.java create mode 100644 trino/src/test/java/io/trino/plugin/pravega/protobuf/ProductOuterClass.java create mode 100644 trino/src/test/proto/Product.proto diff --git a/.gitignore b/.gitignore index 489eef6..0dc7f00 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ benchmark_outputs .checkstyle .editorconfig node_modules + /prestodb/controller-server.log diff --git a/prestodb/build.gradle b/prestodb/build.gradle index 76d5056..fc47995 100644 --- a/prestodb/build.gradle +++ b/prestodb/build.gradle @@ -7,16 +7,11 @@ plugins { id 'distribution' id 'maven' id 'com.github.davidmc24.gradle.plugin.avro-base' version "1.0.0" - //id 'com.google.protobuf' version "0.8.16" } apply plugin: 'java' apply from: "$rootDir/gradle/checkstyle.gradle" -// protobufs needed for test are already compiled -//apply plugin: 'com.google.protobuf' -//sourceSets.main.java.srcDirs += 'build/generated/source/proto/test/java/' - apply plugin: "com.github.davidmc24.gradle.plugin.avro" sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/' diff --git a/trino/build.gradle b/trino/build.gradle index 5b74c70..d349d29 100644 --- a/trino/build.gradle +++ b/trino/build.gradle @@ -6,9 +6,14 @@ plugins { id 'java' id 'distribution' id 'maven' + id 'com.github.davidmc24.gradle.plugin.avro-base' version "1.0.0" } + apply from: "$rootDir/gradle/checkstyle.gradle" +apply plugin: "com.github.davidmc24.gradle.plugin.avro" +sourceSets.main.java.srcDirs += 'build/generated-test-avro-java/' + repositories { mavenLocal() maven { @@ -79,6 +84,7 @@ dependencies { testImplementation 'org.openjdk.jmh:jmh-core:1.20' testImplementation 'org.openjdk.jmh:jmh-generator-annprocess:1.20' testImplementation 'org.testng:testng:6.10' + testImplementation "io.pravega:schemaregistry-server:0.2.0" } group = 'io.trino' diff --git a/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSetProvider.java b/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSetProvider.java index a164f96..3408fe9 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSetProvider.java +++ b/trino/src/main/java/io/trino/plugin/pravega/PravegaRecordSetProvider.java @@ -48,12 +48,9 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.pravega.PravegaHandleResolver.convertSplit; import static io.trino.plugin.pravega.util.PravegaSchemaUtils.AVRO; -import static io.trino.plugin.pravega.util.PravegaSchemaUtils.AVRO_INLINE; import static io.trino.plugin.pravega.util.PravegaSchemaUtils.CSV; import static io.trino.plugin.pravega.util.PravegaSchemaUtils.JSON; -import static io.trino.plugin.pravega.util.PravegaSchemaUtils.JSON_INLINE; import static io.trino.plugin.pravega.util.PravegaSchemaUtils.PROTOBUF; -import static io.trino.plugin.pravega.util.PravegaSchemaUtils.PROTOBUF_INLINE; import static java.util.Objects.requireNonNull; /** @@ -166,18 +163,12 @@ private KVSerializer serializer(PravegaObjectSchema schema, SerializerConfig { switch (schema.getFormat()) { case AVRO: - return new AvroSerializer(schema.getSchemaLocation().get()); - case AVRO_INLINE: - return new AvroSerializer(serializerConfig); + return new AvroSerializer(serializerConfig, schema.getSchemaLocation().get()); case PROTOBUF: - return new ProtobufSerializer(schema.getSchemaLocation().get()); - case PROTOBUF_INLINE: - return new ProtobufSerializer(serializerConfig); + return new ProtobufSerializer(serializerConfig, schema.getSchemaLocation().get()); case JSON: - return new JsonSerializer(); - case JSON_INLINE: return new JsonSerializer(serializerConfig); case CSV: @@ -192,15 +183,12 @@ private EventDecoder eventDecoder(PravegaObjectSchema schema, Set> fieldGroupsFromSchemaRegistry(fi SerializationFormat format = schemas.get(i).getSchemaInfo().getSerializationFormat(); fieldGroups.add(new PravegaStreamFieldGroup( - dataFormat(properties.getProperties(), format, kv, i), + normalizeDataFormat(format), Optional.of(colPrefix), dataSchema(format, schemas.get(i)), Optional.of(mapFieldsFromSchema(colPrefix, format, schemas.get(i))))); @@ -443,34 +438,12 @@ private static List listFiles(File dir) return ImmutableList.of(); } - private static String dataFormat(ImmutableMap groupProperties, - SerializationFormat format, - boolean kvTable, - int kvIdx) + private static String normalizeDataFormat(SerializationFormat format) { - /* - TODO: auto-detect https://github.com/pravega/presto-connector/issues/20 - (1) no schema registry. - (2) Register and evolve schemas in registry but do not use registry client while writing data - (3) Register schemas in the registry and use registry client to encode schema Id with payload - "inline" is for #3. for e.g. "avro" -> "avro-inline". PravegaRecordSetProvider is interested in this - hopefully this can all go away (see linked issue 58 above) - but for now the following is our convention - if "inline" exists in our properties, all data uses SR - else if it is a kv table key+value may be different. both, neither, or either may use SR - look for "inlinekey" / "inlinevalue" - */ - - String key = GROUP_PROPERTIES_INLINE_KEY; - - if (kvTable && !groupProperties.containsKey(key)) { - key = kvIdx == 0 ? GROUP_PROPERTIES_INLINE_KV_KEY : GROUP_PROPERTIES_INLINE_KV_VALUE; - } - - String finalFormat = format == SerializationFormat.Custom + // (CSV is custom) + return format == SerializationFormat.Custom ? format.getFullTypeName().toLowerCase(Locale.ENGLISH) : format.name().toLowerCase(Locale.ENGLISH); - return finalFormat + (groupProperties.containsKey(key) ? INLINE_SUFFIX : ""); } private static Optional dataSchema(SerializationFormat format, SchemaWithVersion schemaWithVersion) diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/AvroSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/AvroSerializer.java index 3616adb..28de56a 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/AvroSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/AvroSerializer.java @@ -16,11 +16,10 @@ package io.trino.plugin.pravega.decoder; -import com.google.protobuf.DynamicMessage; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import io.trino.plugin.pravega.util.ByteBufferInputStream; +import io.trino.plugin.pravega.util.PravegaSerializationUtils; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -49,9 +48,9 @@ private static class GenericRecordSerializer } @Override - public ByteBuffer serialize(Object value) + public ByteBuffer serialize(Object object) { - return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); + return PravegaSerializationUtils.serialize((GenericRecord) object); } @Override @@ -73,17 +72,15 @@ public GenericRecord deserialize(ByteBuffer serializedValue) } } - private final Serializer delegate; - - public AvroSerializer(SerializerConfig config) + public AvroSerializer(SerializerConfig config, String schema) { - this.delegate = SerializerFactory.genericDeserializer(config); + super(config, schema); } - public AvroSerializer(String encodedSchema) + @Override + public Serializer serializerForSchema(String schema) { - Schema schema = (new Schema.Parser()).parse(encodedSchema); - this.delegate = new GenericRecordSerializer(schema); + return new GenericRecordSerializer((new Schema.Parser()).parse(schema)); } @Override @@ -95,7 +92,7 @@ public ByteBuffer serialize(GenericRecord value) @Override public GenericRecord deserialize(ByteBuffer serializedValue) { - return (GenericRecord) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/CsvSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/CsvSerializer.java index 45c2ace..873181c 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/CsvSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/CsvSerializer.java @@ -15,6 +15,8 @@ */ package io.trino.plugin.pravega.decoder; +import io.pravega.client.stream.Serializer; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -23,6 +25,7 @@ public class CsvSerializer { public CsvSerializer() { + super(null, null); } @Override @@ -39,6 +42,12 @@ public String deserialize(ByteBuffer serializedValue) serializedValue.remaining()); } + @Override + public Serializer serializerForSchema(String schema) + { + throw new UnsupportedOperationException(); + } + @Override public DecodableEvent toEvent(Object obj) { diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/JsonSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/JsonSerializer.java index 5ef5908..8c988b1 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/JsonSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/JsonSerializer.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import io.trino.plugin.pravega.util.ByteBufferInputStream; import java.io.IOException; @@ -59,16 +58,15 @@ public JsonNode deserialize(ByteBuffer serializedValue) } } - private final Serializer delegate; - public JsonSerializer(SerializerConfig config) { - this.delegate = SerializerFactory.genericDeserializer(config); + super(config, null); } - public JsonSerializer() + @Override + public Serializer serializerForSchema(String schema /* null for json */) { - this.delegate = new JsonTreeSerializer(); + return new JsonTreeSerializer(); } @Override @@ -80,7 +78,7 @@ public ByteBuffer serialize(JsonNode value) @Override public JsonNode deserialize(ByteBuffer serializedValue) { - return (JsonNode) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java index 5c2ec79..ac9cccd 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java @@ -16,12 +16,68 @@ package io.trino.plugin.pravega.decoder; +import io.airlift.log.Logger; import io.pravega.client.stream.Serializer; +import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; +import io.pravega.schemaregistry.serializers.SerializerFactory; + +import java.nio.ByteBuffer; // deserialize using externally provided schema or using SR+SerializerConfig public abstract class KVSerializer implements Serializer { + private static final Logger log = Logger.get(KVSerializer.class); + + protected Serializer delegate = null; + + private boolean schemaRegistryDeserializer; + + private final SerializerConfig serializerConfig; + + private final String schema; + + protected KVSerializer(SerializerConfig serializerConfig, String schema) { + this.serializerConfig = serializerConfig; + this.schema = schema; + } + + public boolean schemaRegistryDeserializer() + { + return schemaRegistryDeserializer; + } + + // format of data is unknown, whether schema is encoded inline by pravega schema registry or not + // try to deserialize without, and if it fails, use serializerConfig + protected void chooseDeserializer(ByteBuffer serializedValue) + { + Serializer serializer = serializerForSchema(schema); + serializedValue.mark(); + try { + if (serializer.deserialize(serializedValue) != null) { + delegate = serializer; + } + } + catch (RuntimeException e) { + log.info("could not deserialize, try SR deserializer"); + delegate = SerializerFactory.genericDeserializer(serializerConfig); + schemaRegistryDeserializer = true; + } + finally { + serializedValue.reset(); + } + } + + public T deserialize(ByteBuffer serializedValue) + { + if (delegate == null) { + chooseDeserializer(serializedValue); + } + return (T) delegate.deserialize(serializedValue); + } + + public abstract Serializer serializerForSchema(String schema); + // create an event that can be passed down to decoders public abstract DecodableEvent toEvent(Object obj); } diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/ProtobufSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/ProtobufSerializer.java index 55c4356..0833774 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/ProtobufSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/ProtobufSerializer.java @@ -20,7 +20,6 @@ import com.google.protobuf.DynamicMessage; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import io.trino.plugin.pravega.util.ByteBufferInputStream; import org.apache.commons.lang3.tuple.Pair; @@ -33,48 +32,40 @@ // deserialize using externally provided schema or using SR+SerializerConfig public class ProtobufSerializer - extends KVSerializer -{ + extends KVSerializer { private static class DynamicMessageSerializer - implements Serializer - { + implements Serializer { private final Descriptors.Descriptor descriptor; - DynamicMessageSerializer(Descriptors.Descriptor descriptor) - { + DynamicMessageSerializer(Descriptors.Descriptor descriptor) { this.descriptor = descriptor; } @Override - public ByteBuffer serialize(Object value) - { + public ByteBuffer serialize(Object value) { return ByteBuffer.wrap(((DynamicMessage) value).toByteArray()); } @Override - public DynamicMessage deserialize(ByteBuffer serializedValue) - { + public DynamicMessage deserialize(ByteBuffer serializedValue) { try { return DynamicMessage.parseFrom(descriptor, new ByteBufferInputStream(serializedValue)); - } - catch (IOException e) { + } catch (IOException e) { throw new UncheckedIOException(e); } } } - private final Serializer delegate; - - public ProtobufSerializer(SerializerConfig config) - { - this.delegate = SerializerFactory.genericDeserializer(config); + public ProtobufSerializer(SerializerConfig config, String schema) { + super(config, schema); } - public ProtobufSerializer(String encodedSchema) + @Override + public Serializer serializerForSchema(String schema) { - Pair pair = decodeSchema(encodedSchema); - this.delegate = new DynamicMessageSerializer(descriptorFor(pair.getLeft(), pair.getRight())); + Pair pair = decodeSchema(schema); + return new DynamicMessageSerializer(descriptorFor(pair.getLeft(), pair.getRight())); } @Override @@ -86,7 +77,7 @@ public ByteBuffer serialize(DynamicMessage value) @Override public DynamicMessage deserialize(ByteBuffer serializedValue) { - return (DynamicMessage) delegate.deserialize(serializedValue); + return super.deserialize(serializedValue); } @Override diff --git a/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSchemaUtils.java b/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSchemaUtils.java index 85d214f..96a68c4 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSchemaUtils.java +++ b/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSchemaUtils.java @@ -40,13 +40,6 @@ private PravegaSchemaUtils() private static final Logger log = Logger.get(PravegaSchemaUtils.class); - public static final String AVRO_INLINE = "avro-inline"; - public static final String PROTOBUF_INLINE = "protobuf-inline"; - public static final String JSON_INLINE = "json-inline"; - public static final String INLINE_SUFFIX = "-inline"; - public static final String GROUP_PROPERTIES_INLINE_KEY = "inline"; - public static final String GROUP_PROPERTIES_INLINE_KV_KEY = "inlinekey"; - public static final String GROUP_PROPERTIES_INLINE_KV_VALUE = "inlinevalue"; public static final String AVRO = "avro"; public static final String PROTOBUF = "protobuf"; public static final String JSON = "json"; diff --git a/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSerializationUtils.java b/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSerializationUtils.java index a2c0e7f..8887b00 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSerializationUtils.java +++ b/trino/src/main/java/io/trino/plugin/pravega/util/PravegaSerializationUtils.java @@ -16,6 +16,10 @@ package io.trino.plugin.pravega.util; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -23,6 +27,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; public class PravegaSerializationUtils { @@ -57,4 +62,21 @@ public static T deserialize(byte[] bytes, Class clazz) throw new IllegalArgumentException(e); } } + + public static ByteBuffer serialize(GenericRecord record) + { + try { + GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter dataFileWriter = new DataFileWriter<>(writer); + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + dataFileWriter.create(record.getSchema(), os); + dataFileWriter.append(record); + dataFileWriter.close(); + return ByteBuffer.wrap(os.toByteArray()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/trino/src/test/avro/sensor.avsc b/trino/src/test/avro/sensor.avsc new file mode 100644 index 0000000..5a6db7f --- /dev/null +++ b/trino/src/test/avro/sensor.avsc @@ -0,0 +1,10 @@ +{ + "namespace": "io.pravega.avro", + "type": "record", + "name": "Sensor", + "fields": [ + {"name": "sensorId", "type": "int"}, + {"name": "timestamp", "type": "long"}, + {"name": "rate", "type": "double"} + ] +} \ No newline at end of file diff --git a/trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java b/trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java new file mode 100644 index 0000000..d877b29 --- /dev/null +++ b/trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java @@ -0,0 +1,276 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pravega.decoder; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.pravega.avro.Sensor; +import io.pravega.client.stream.Serializer; + +import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; +import io.pravega.schemaregistry.contract.data.Compatibility; +import io.pravega.schemaregistry.contract.data.GroupProperties; +import io.pravega.schemaregistry.contract.data.SerializationFormat; +import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema; +import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema; +import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema; +import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; +import io.pravega.schemaregistry.serializers.SerializerFactory; +import io.trino.plugin.pravega.integration.EmbeddedSchemaRegistry; +import io.trino.plugin.pravega.protobuf.ProductOuterClass; +import io.trino.plugin.pravega.util.PravegaSerializationUtils; +import org.apache.avro.generic.GenericRecord; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static io.trino.plugin.pravega.ProtobufCommon.encodeSchema; +import static org.testng.Assert.*; + +/** + * when an object is written to pravega using serializer from SchemaRegistry, encoding info + * will be added to the raw event data so the event can be deserialized automatically later + * this is referred to in the connector code as "inline" + * + * when reading data in the connector, we don't know how event was serialized + * so first try with the actual avro, protobuf, or json schema + * if that fails fallback to trying with SchemaRegistry SerializerConfig + */ +@Test +public class KVSerializerTest { + private final EmbeddedSchemaRegistry schemaRegistry; + + private final Random random = new Random(); + + public KVSerializerTest() { + this.schemaRegistry = new EmbeddedSchemaRegistry(); + this.schemaRegistry.start(); + } + + private static class Employee { + public int id; + public String first; + public String last; + + @JsonCreator + public Employee(@JsonProperty("id") int id, + @JsonProperty("first") String first, + @JsonProperty("last") String last) { + this.id = id; + this.first = first; + this.last = last; + } + + @JsonProperty + public int getId() { + return id; + } + + @JsonProperty + public String getFirst() { + return first; + } + + @JsonProperty + public String getLast() { + return last; + } + } + + @Test + public void testJson() throws IOException { + Employee expected = new Employee(1, "John", "Smith"); + + ByteBuffer serialized = ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(expected)); + + // use null SerializerConfig as we are not expecting initial deserialize to fail + JsonSerializer jsonSerializer = new JsonSerializer(null); + + JsonNode actual = jsonSerializer.deserialize(serialized); + + assertEquals(actual.get("id").asInt(), expected.id); + assertEquals(actual.get("firs").asText(), expected.first); + assertEquals(actual.get("last").asText(), expected.last); + + assertFalse(jsonSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testJsonInline() + { + Employee expected = new Employee(1, "John", "Smith"); + + SerializerConfig serializerConfig = jsonGroup("inline"); + Serializer schemaRegistrySerializer = + SerializerFactory.jsonSerializer(serializerConfig, JSONSchema.of(Employee.class)); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(expected); + + JsonSerializer jsonSerializer = new JsonSerializer(serializerConfig); + + JsonNode actual = jsonSerializer.deserialize(serializedValue); + + assertEquals(actual.get("id").asInt(), expected.id); + assertEquals(actual.get("first").asText(), expected.first); + assertEquals(actual.get("last").asText(), expected.last); + + assertTrue(jsonSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testAvro() + { + Sensor sensor = new Sensor(); + sensor.setSensorId(random.nextInt()); + sensor.setTimestamp(System.currentTimeMillis()); + sensor.setRate(random.nextDouble()); + + ByteBuffer serializedValue = PravegaSerializationUtils.serialize((GenericRecord) sensor); + + // use null SerializerConfig as we are expecting parse with given schema to succeed + AvroSerializer avroSerializer = new AvroSerializer(null, sensor.getSchema().toString()); + GenericRecord actual = avroSerializer.deserialize(serializedValue); + + assertEquals(actual.get("sensorId"), sensor.getSensorId()); + assertEquals(actual.get("timestamp"), sensor.getTimestamp()); + assertEquals(actual.get("rate"), sensor.getRate()); + + assertFalse(avroSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testAvroInline() + { + Sensor sensor = new Sensor(); + sensor.setSensorId(random.nextInt()); + sensor.setTimestamp(System.currentTimeMillis()); + sensor.setRate(random.nextDouble()); + + SerializerConfig serializerConfig = avroGroup("inline"); + Serializer schemaRegistrySerializer = + SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(Sensor.class)); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(sensor); + + AvroSerializer avroSerializer = new AvroSerializer(serializerConfig, sensor.getSchema().toString()); + GenericRecord actual = avroSerializer.deserialize(serializedValue); + + assertEquals(actual.get("sensorId"), sensor.getSensorId()); + assertEquals(actual.get("timestamp"), sensor.getTimestamp()); + assertEquals(actual.get("rate"), sensor.getRate()); + + assertTrue(avroSerializer.schemaRegistryDeserializer()); + } + + @Test + public void testProtobufInline() + { + ProductOuterClass.Product product = ProductOuterClass.Product.newBuilder() + .setId(random.nextInt()) + .setName(UUID.randomUUID().toString()) + .build(); + + SerializerConfig serializerConfig = protobufGroup("inline"); + Serializer schemaRegistrySerializer = + SerializerFactory.protobufSerializer(serializerConfig, ProtobufSchema.of(ProductOuterClass.Product.class)); + // using Serializer provided by SchemaRegistry + ByteBuffer serializedValue = schemaRegistrySerializer.serialize(product); + + // get file descriptor, which is needed for initial DynamicMessage#parseFrom + String schema = encodeSchema(schemaRegistry.client().getSchemas("protobuf.inline").get(0)); + + ProtobufSerializer protobufSerializer = new ProtobufSerializer(serializerConfig, schema); + DynamicMessage dynamicMessage = protobufSerializer.deserialize(serializedValue); + + int id = -1; + String name = null; + for (Map.Entry entry : dynamicMessage.getAllFields().entrySet()) { + if (entry.getKey().getJsonName().equals("id")) { + id = (int) entry.getValue(); + } + else if (entry.getKey().getJsonName().equals("name")) { + name = (String) entry.getValue(); + } + } + assertNotEquals(id, -1); + assertNotNull(name); + + assertEquals(id, product.getId()); + assertEquals(name, product.getName()); + + assertTrue(protobufSerializer.schemaRegistryDeserializer()); + } + + private SerializerConfig avroGroup(String stream) + { + String groupId = "avro." + stream; + addGroup(groupId, SerializationFormat.Avro); + return serializerConfig(groupId); + } + + private SerializerConfig protobufGroup(String stream) + { + String groupId = "protobuf." + stream; + addGroup(groupId, SerializationFormat.Protobuf); + return serializerConfig(groupId); + } + + private SerializerConfig jsonGroup(String stream) + { + String groupId = "json." + stream; + addGroup(groupId, SerializationFormat.Json); + return serializerConfig(groupId); + } + + private void addGroup(String groupId, SerializationFormat format) + { + try { + schemaRegistry.client().getGroupProperties(groupId); + } + catch (RegistryExceptions.ResourceNotFoundException e) { + schemaRegistry.client().addGroup(groupId, + new GroupProperties( + format, + Compatibility.allowAny(), + true)); + } + } + + private SerializerConfig serializerConfig(String group) { + return SerializerConfig.builder() + .groupId(group).registryConfig(SchemaRegistryClientConfig.builder() + .schemaRegistryUri(schemaRegistry.getURI()) + .build()) + .registerSchema(true) + .build(); + } + + @AfterClass(alwaysRun = true) + public void destroy() + { + schemaRegistry.close(); + } +} diff --git a/trino/src/test/java/io/trino/plugin/pravega/integration/EmbeddedSchemaRegistry.java b/trino/src/test/java/io/trino/plugin/pravega/integration/EmbeddedSchemaRegistry.java new file mode 100644 index 0000000..b5e6c4e --- /dev/null +++ b/trino/src/test/java/io/trino/plugin/pravega/integration/EmbeddedSchemaRegistry.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Note: this class is based on SchemaRegistryUtils from pravega/flink-connectors + * (rev 9332ad67e520c03c7122de1d3b90c6cafbf97634) + * https://github.com/pravega/flink-connectors/blob/v0.9.0/src/test/java/io/pravega/connectors/flink/utils/SchemaRegistryUtils.java + */ +package io.trino.plugin.pravega.integration; + +import io.pravega.schemaregistry.client.SchemaRegistryClient; +import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.client.SchemaRegistryClientFactory; +import io.pravega.schemaregistry.server.rest.RestServer; +import io.pravega.schemaregistry.server.rest.ServiceConfig; +import io.pravega.schemaregistry.service.SchemaRegistryService; +import io.pravega.schemaregistry.storage.SchemaStore; +import io.pravega.schemaregistry.storage.SchemaStoreFactory; + +import java.io.Closeable; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class EmbeddedSchemaRegistry + implements Closeable { + private final static AtomicInteger servers = new AtomicInteger(); + + private final int port; + + private final AtomicBoolean started = new AtomicBoolean(); + + private final ScheduledExecutorService executor; + + private RestServer restServer; + + private SchemaRegistryClient client; + + public EmbeddedSchemaRegistry() { + port = 9100 + servers.getAndIncrement(); + executor = Executors.newScheduledThreadPool(10); + + SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor); + + restServer = new RestServer( + new SchemaRegistryService(schemaStore, executor), + ServiceConfig.builder().port(port).build() + ); + } + + public void start() { + if (started.compareAndSet(false, true)) { + restServer.startAsync(); + restServer.awaitRunning(); + } + } + + public void stop() { + if (started.compareAndSet(true, false)) { + restServer.stopAsync(); + restServer.awaitTerminated(); + executor.shutdownNow(); + } + } + + public int port() { + return port; + } + + public URI getURI() { + return URI.create("http://localhost:" + port); + } + + public SchemaRegistryClient client() + { + if (client == null) { + SchemaRegistryClientConfig config = SchemaRegistryClientConfig.builder() + .schemaRegistryUri(getURI()) + .build(); + client = SchemaRegistryClientFactory.withDefaultNamespace(config); + } + return client; + } + + @Override + public void close() + { + try { + stop(); + } + catch (Exception quiet) {} + } +} diff --git a/trino/src/test/java/io/trino/plugin/pravega/integration/PravegaKeyValueLoader.java b/trino/src/test/java/io/trino/plugin/pravega/integration/PravegaKeyValueLoader.java index c06c3cb..e031227 100644 --- a/trino/src/test/java/io/trino/plugin/pravega/integration/PravegaKeyValueLoader.java +++ b/trino/src/test/java/io/trino/plugin/pravega/integration/PravegaKeyValueLoader.java @@ -24,15 +24,13 @@ import io.pravega.client.tables.KeyValueTableClientConfiguration; import io.pravega.client.tables.KeyValueTableConfiguration; import io.trino.plugin.pravega.util.ByteBufferInputStream; +import io.trino.plugin.pravega.util.PravegaSerializationUtils; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; -import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; @@ -54,19 +52,7 @@ public AvroSerializer(Schema schema) @Override public ByteBuffer serialize(GenericRecord record) { - try { - GenericDatumWriter writer = new GenericDatumWriter<>(record.getSchema()); - DataFileWriter dataFileWriter = new DataFileWriter<>(writer); - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - dataFileWriter.create(record.getSchema(), os); - dataFileWriter.append(record); - dataFileWriter.close(); - return ByteBuffer.wrap(os.toByteArray()); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + return PravegaSerializationUtils.serialize(record); } @Override diff --git a/trino/src/test/java/io/trino/plugin/pravega/protobuf/ProductOuterClass.java b/trino/src/test/java/io/trino/plugin/pravega/protobuf/ProductOuterClass.java new file mode 100644 index 0000000..1e2662f --- /dev/null +++ b/trino/src/test/java/io/trino/plugin/pravega/protobuf/ProductOuterClass.java @@ -0,0 +1,669 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: Product.proto + +package io.trino.plugin.pravega.protobuf; + +public final class ProductOuterClass { + private ProductOuterClass() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + public interface ProductOrBuilder extends + // @@protoc_insertion_point(interface_extends:io.pravega.protobuf.Product) + com.google.protobuf.MessageOrBuilder { + + /** + * int32 id = 1; + */ + int getId(); + + /** + * string name = 2; + */ + String getName(); + /** + * string name = 2; + */ + com.google.protobuf.ByteString + getNameBytes(); + } + /** + * Protobuf type {@code io.pravega.protobuf.Product} + */ + public static final class Product extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:io.pravega.protobuf.Product) + ProductOrBuilder { + private static final long serialVersionUID = 0L; + // Use Product.newBuilder() to construct. + private Product(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private Product() { + id_ = 0; + name_ = ""; + } + + @Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Product( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + + id_ = input.readInt32(); + break; + } + case 18: { + String s = input.readStringRequireUtf8(); + + name_ = s; + break; + } + default: { + if (!parseUnknownFieldProto3( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @Override + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return ProductOuterClass.internal_static_io_pravega_protobuf_Product_fieldAccessorTable + .ensureFieldAccessorsInitialized( + Product.class, Builder.class); + } + + public static final int ID_FIELD_NUMBER = 1; + private int id_; + /** + * int32 id = 1; + */ + public int getId() { + return id_; + } + + public static final int NAME_FIELD_NUMBER = 2; + private volatile Object name_; + /** + * string name = 2; + */ + public String getName() { + Object ref = name_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + name_ = s; + return s; + } + } + /** + * string name = 2; + */ + public com.google.protobuf.ByteString + getNameBytes() { + Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private byte memoizedIsInitialized = -1; + @Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (id_ != 0) { + output.writeInt32(1, id_); + } + if (!getNameBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 2, name_); + } + unknownFields.writeTo(output); + } + + @Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (id_ != 0) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, id_); + } + if (!getNameBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, name_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Product)) { + return super.equals(obj); + } + Product other = (Product) obj; + + boolean result = true; + result = result && (getId() + == other.getId()); + result = result && getName() + .equals(other.getName()); + result = result && unknownFields.equals(other.unknownFields); + return result; + } + + @Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + getId(); + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getName().hashCode(); + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static Product parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static Product parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static Product parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static Product parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static Product parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static Product parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static Product parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static Product parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static Product parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static Product parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static Product parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static Product parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(Product prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @Override + protected Builder newBuilderForType( + BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code io.pravega.protobuf.Product} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:io.pravega.protobuf.Product) + ProductOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @Override + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return ProductOuterClass.internal_static_io_pravega_protobuf_Product_fieldAccessorTable + .ensureFieldAccessorsInitialized( + Product.class, Builder.class); + } + + // Construct using io.pravega.protobuf.ProductOuterClass.Product.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @Override + public Builder clear() { + super.clear(); + id_ = 0; + + name_ = ""; + + return this; + } + + @Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return ProductOuterClass.internal_static_io_pravega_protobuf_Product_descriptor; + } + + @Override + public Product getDefaultInstanceForType() { + return Product.getDefaultInstance(); + } + + @Override + public Product build() { + Product result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @Override + public Product buildPartial() { + Product result = new Product(this); + result.id_ = id_; + result.name_ = name_; + onBuilt(); + return result; + } + + @Override + public Builder clone() { + return (Builder) super.clone(); + } + @Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + Object value) { + return (Builder) super.setField(field, value); + } + @Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return (Builder) super.clearField(field); + } + @Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return (Builder) super.clearOneof(oneof); + } + @Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, Object value) { + return (Builder) super.setRepeatedField(field, index, value); + } + @Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + Object value) { + return (Builder) super.addRepeatedField(field, value); + } + @Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof Product) { + return mergeFrom((Product)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(Product other) { + if (other == Product.getDefaultInstance()) return this; + if (other.getId() != 0) { + setId(other.getId()); + } + if (!other.getName().isEmpty()) { + name_ = other.name_; + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @Override + public final boolean isInitialized() { + return true; + } + + @Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Product parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (Product) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + + private int id_ ; + /** + * int32 id = 1; + */ + public int getId() { + return id_; + } + /** + * int32 id = 1; + */ + public Builder setId(int value) { + + id_ = value; + onChanged(); + return this; + } + /** + * int32 id = 1; + */ + public Builder clearId() { + + id_ = 0; + onChanged(); + return this; + } + + private Object name_ = ""; + /** + * string name = 2; + */ + public String getName() { + Object ref = name_; + if (!(ref instanceof String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + name_ = s; + return s; + } else { + return (String) ref; + } + } + /** + * string name = 2; + */ + public com.google.protobuf.ByteString + getNameBytes() { + Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * string name = 2; + */ + public Builder setName( + String value) { + if (value == null) { + throw new NullPointerException(); + } + + name_ = value; + onChanged(); + return this; + } + /** + * string name = 2; + */ + public Builder clearName() { + + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + /** + * string name = 2; + */ + public Builder setNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + name_ = value; + onChanged(); + return this; + } + @Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFieldsProto3(unknownFields); + } + + @Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:io.pravega.protobuf.Product) + } + + // @@protoc_insertion_point(class_scope:io.pravega.protobuf.Product) + private static final Product DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new Product(); + } + + public static Product getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @Override + public Product parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new Product(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @Override + public Product getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_io_pravega_protobuf_Product_descriptor; + private static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_io_pravega_protobuf_Product_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + String[] descriptorData = { + "\n\rProduct.proto\022\023io.pravega.protobuf\"#\n\007" + + "Product\022\n\n\002id\030\001 \001(\005\022\014\n\004name\030\002 \001(\tb\006proto" + + "3" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + internal_static_io_pravega_protobuf_Product_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_io_pravega_protobuf_Product_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_io_pravega_protobuf_Product_descriptor, + new String[] { "Id", "Name", }); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/trino/src/test/proto/Product.proto b/trino/src/test/proto/Product.proto new file mode 100644 index 0000000..65ded65 --- /dev/null +++ b/trino/src/test/proto/Product.proto @@ -0,0 +1,9 @@ +// compiled and committed to repo, ./src/test/java/io/pravega/protobuf/ProductOuterClass.java + +syntax = "proto3"; +package io.pravega.protobuf; + +message Product { + int32 id = 1; + string name = 2; +} \ No newline at end of file From c046358ab21c7183f45c1a05f5d74d202bc4d225 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Mon, 27 Sep 2021 10:31:07 -0700 Subject: [PATCH 10/11] trino naming scheme Signed-off-by: Andrew Robertson --- .../{KVSerializerTest.java => TestKVSerializer.java} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename trino/src/test/java/io/trino/plugin/pravega/decoder/{KVSerializerTest.java => TestKVSerializer.java} (98%) diff --git a/trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java b/trino/src/test/java/io/trino/plugin/pravega/decoder/TestKVSerializer.java similarity index 98% rename from trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java rename to trino/src/test/java/io/trino/plugin/pravega/decoder/TestKVSerializer.java index d877b29..c7fff68 100644 --- a/trino/src/test/java/io/trino/plugin/pravega/decoder/KVSerializerTest.java +++ b/trino/src/test/java/io/trino/plugin/pravega/decoder/TestKVSerializer.java @@ -60,12 +60,12 @@ * if that fails fallback to trying with SchemaRegistry SerializerConfig */ @Test -public class KVSerializerTest { +public class TestKVSerializer { private final EmbeddedSchemaRegistry schemaRegistry; private final Random random = new Random(); - public KVSerializerTest() { + public TestKVSerializer() { this.schemaRegistry = new EmbeddedSchemaRegistry(); this.schemaRegistry.start(); } @@ -112,7 +112,7 @@ public void testJson() throws IOException { JsonNode actual = jsonSerializer.deserialize(serialized); assertEquals(actual.get("id").asInt(), expected.id); - assertEquals(actual.get("firs").asText(), expected.first); + assertEquals(actual.get("first").asText(), expected.first); assertEquals(actual.get("last").asText(), expected.last); assertFalse(jsonSerializer.schemaRegistryDeserializer()); From ed5c13c3458c2db8e8506789417e45b3878f99d6 Mon Sep 17 00:00:00 2001 From: Andrew Robertson Date: Tue, 28 Sep 2021 14:49:20 -0700 Subject: [PATCH 11/11] workaround issue seen presumably compile with java11 run with java8 Signed-off-by: Andrew Robertson --- .../presto/decoder/JsonSerializer.java | 1 - .../presto/decoder/KVSerializer.java | 28 ++++++++++++++----- .../plugin/pravega/decoder/KVSerializer.java | 28 ++++++++++++++----- 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java index 4bf1ae6..e28a673 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/JsonSerializer.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.pravega.client.stream.Serializer; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; -import io.pravega.schemaregistry.serializers.SerializerFactory; import java.io.IOException; import java.io.UncheckedIOException; diff --git a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java index a454c31..5c1bec8 100644 --- a/prestodb/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java +++ b/prestodb/src/main/java/io/pravega/connectors/presto/decoder/KVSerializer.java @@ -18,9 +18,12 @@ import com.facebook.airlift.log.Logger; import io.pravega.client.stream.Serializer; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; import io.pravega.schemaregistry.serializers.SerializerFactory; +import javax.ws.rs.ProcessingException; +import java.nio.Buffer; import java.nio.ByteBuffer; // deserialize using externally provided schema or using SR+SerializerConfig @@ -30,14 +33,24 @@ public abstract class KVSerializer protected Serializer delegate = null; - private boolean schemaRegistryDeserializer; + private final Serializer schemaRegistrySerializer; - private final SerializerConfig serializerConfig; + private boolean schemaRegistryDeserializer; private final String schema; - protected KVSerializer(SerializerConfig serializerConfig, String schema) { - this.serializerConfig = serializerConfig; + protected KVSerializer(SerializerConfig config, String schema) { + // construct serializer up front to avoid classpath issues later + Serializer schemaRegistrySerializer1; + try { + schemaRegistrySerializer1 = config == null + ? null + : SerializerFactory.genericDeserializer(config); + } catch (ProcessingException | RegistryExceptions.ResourceNotFoundException e) { + // will not be found if schema.table doesn't use SR + schemaRegistrySerializer1 = null; + } + this.schemaRegistrySerializer = schemaRegistrySerializer1; this.schema = schema; } @@ -51,7 +64,8 @@ public boolean schemaRegistryDeserializer() protected void chooseDeserializer(ByteBuffer serializedValue) { Serializer serializer = serializerForSchema(schema); - serializedValue.mark(); + // cast to Buffer avoids any compile with java11 run in java8 weirdness (such as NoSuchMethodError) + ((Buffer) serializedValue).mark(); try { if (serializer.deserialize(serializedValue) != null) { delegate = serializer; @@ -59,11 +73,11 @@ protected void chooseDeserializer(ByteBuffer serializedValue) } catch (RuntimeException e) { log.info("could not deserialize, try SR deserializer"); - delegate = SerializerFactory.genericDeserializer(serializerConfig); + delegate = schemaRegistrySerializer; schemaRegistryDeserializer = true; } finally { - serializedValue.reset(); + ((Buffer) serializedValue).reset(); } } diff --git a/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java b/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java index ac9cccd..b9adb30 100644 --- a/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java +++ b/trino/src/main/java/io/trino/plugin/pravega/decoder/KVSerializer.java @@ -18,9 +18,12 @@ import io.airlift.log.Logger; import io.pravega.client.stream.Serializer; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; import io.pravega.schemaregistry.serializers.SerializerFactory; +import javax.ws.rs.ProcessingException; +import java.nio.Buffer; import java.nio.ByteBuffer; // deserialize using externally provided schema or using SR+SerializerConfig @@ -31,14 +34,24 @@ public abstract class KVSerializer protected Serializer delegate = null; - private boolean schemaRegistryDeserializer; + private final Serializer schemaRegistrySerializer; - private final SerializerConfig serializerConfig; + private boolean schemaRegistryDeserializer; private final String schema; - protected KVSerializer(SerializerConfig serializerConfig, String schema) { - this.serializerConfig = serializerConfig; + protected KVSerializer(SerializerConfig config, String schema) { + // construct serializer up front to avoid classpath issues later + Serializer schemaRegistrySerializer1; + try { + schemaRegistrySerializer1 = config == null + ? null + : SerializerFactory.genericDeserializer(config); + } catch (ProcessingException | RegistryExceptions.ResourceNotFoundException e) { + // will not be found if schema.table doesn't use SR + schemaRegistrySerializer1 = null; + } + this.schemaRegistrySerializer = schemaRegistrySerializer1; this.schema = schema; } @@ -52,7 +65,8 @@ public boolean schemaRegistryDeserializer() protected void chooseDeserializer(ByteBuffer serializedValue) { Serializer serializer = serializerForSchema(schema); - serializedValue.mark(); + // cast to Buffer avoids any compile with java11 run in java8 weirdness (such as NoSuchMethodError) + ((Buffer) serializedValue).mark(); try { if (serializer.deserialize(serializedValue) != null) { delegate = serializer; @@ -60,11 +74,11 @@ protected void chooseDeserializer(ByteBuffer serializedValue) } catch (RuntimeException e) { log.info("could not deserialize, try SR deserializer"); - delegate = SerializerFactory.genericDeserializer(serializerConfig); + delegate = schemaRegistrySerializer; schemaRegistryDeserializer = true; } finally { - serializedValue.reset(); + ((Buffer) serializedValue).reset(); } }