diff --git a/beam/core/build.gradle b/beam/core/build.gradle index cc1ce98f8..4b6afae76 100644 --- a/beam/core/build.gradle +++ b/beam/core/build.gradle @@ -16,6 +16,7 @@ plugins { id 'cz.o2.proxima.java-conventions' + alias libs.plugins.protobuf } dependencies { @@ -30,8 +31,10 @@ dependencies { testImplementation project(path: ':proxima-direct-core') testImplementation project(path: ':proxima-direct-io-kafka') testImplementation project(path: ':proxima-direct-io-kafka', configuration: 'testsJar') + testImplementation project(path: ':proxima-scheme-proto-testing') testImplementation libraries.beam_runners_direct testImplementation libraries.beam_runners_flink + testImplementation libraries.beam_sql testImplementation libraries.junit4 testImplementation libraries.hamcrest testImplementation libraries.mockito_core @@ -44,4 +47,8 @@ dependencies { description = 'cz.o2.proxima:proxima-beam-core' +protobuf { + protoc { artifact = libraries.protoc } +} + publishArtifacts(project, "default") diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/AbstractDirectBoundedSource.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/AbstractDirectBoundedSource.java index 2cbaa5fb9..4124a870c 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/AbstractDirectBoundedSource.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/AbstractDirectBoundedSource.java @@ -34,8 +34,8 @@ abstract class AbstractDirectBoundedSource extends BoundedSource } @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return -1L; + public long getEstimatedSizeBytes(PipelineOptions options) { + return 0L; } @Override diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BeamCommitLogReader.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BeamCommitLogReader.java index 56ffb0941..ecb75f9cd 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BeamCommitLogReader.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BeamCommitLogReader.java @@ -157,7 +157,7 @@ static BoundedReader bounded( BeamCommitLogReader r = new BeamCommitLogReader(name, reader, position, true, partition, null, limit, true); - return new BoundedReader() { + return new BoundedReader<>() { @Override public BoundedSource getCurrentSource() { diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserver.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserver.java index 9e3bd4851..fd29dd567 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserver.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserver.java @@ -132,17 +132,17 @@ public CommitLogObserver(String name, long limit, long startingWatermark, int ca @Override public boolean onNext( - StreamElement ingest, + StreamElement element, cz.o2.proxima.direct.core.commitlog.CommitLogObserver.OnNextContext context) { if (log.isDebugEnabled()) { log.debug( "{}: Received next element {} at watermark {} offset {}", getName(), - ingest, + element, context.getWatermark(), context.getOffset()); } - return enqueue(ingest, new LogObserverUnifiedContext(context)); + return enqueue(element, new LogObserverUnifiedContext(context)); } @Override diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/DirectBoundedSource.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/DirectBoundedSource.java index d53c3ea63..39b1eca03 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/DirectBoundedSource.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/direct/io/DirectBoundedSource.java @@ -15,6 +15,7 @@ */ package cz.o2.proxima.beam.core.direct.io; +import cz.o2.proxima.beam.core.io.StreamElementCoder; import cz.o2.proxima.core.repository.RepositoryFactory; import cz.o2.proxima.core.storage.Partition; import cz.o2.proxima.core.storage.StreamElement; @@ -26,6 +27,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -69,6 +71,11 @@ static DirectBoundedSource of( this.reader = reader; } + @Override + public Coder getOutputCoder() { + return StreamElementCoder.of(factory); + } + @Override public List> split( long desiredBundleSizeBytes, PipelineOptions opts) { diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoder.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoder.java new file mode 100644 index 000000000..d9cba93a2 --- /dev/null +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoder.java @@ -0,0 +1,300 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.core.io; + +import cz.o2.proxima.core.repository.AttributeDescriptor; +import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor; +import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Regular; +import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Wildcard; +import cz.o2.proxima.core.repository.EntityDescriptor; +import cz.o2.proxima.core.repository.Repository; +import cz.o2.proxima.core.scheme.AttributeValueAccessor; +import cz.o2.proxima.core.scheme.AttributeValueAccessors.StructureValue; +import cz.o2.proxima.core.scheme.AttributeValueType; +import cz.o2.proxima.core.scheme.SchemaDescriptors.ArrayTypeDescriptor; +import cz.o2.proxima.core.scheme.SchemaDescriptors.SchemaTypeDescriptor; +import cz.o2.proxima.core.scheme.SchemaDescriptors.StructureTypeDescriptor; +import cz.o2.proxima.core.scheme.ValueSerializer; +import cz.o2.proxima.core.storage.StreamElement; +import cz.o2.proxima.core.util.Optionals; +import cz.o2.proxima.core.util.Pair; +import cz.o2.proxima.internal.com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Builder; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.Row.FieldValueBuilder; +import org.apache.beam.sdk.values.TypeDescriptor; + +public class SchemaStreamElementCoder extends SchemaCoder { + + public static final Map FIELD_TYPES = + ImmutableMap.builder() + .put(AttributeValueType.BOOLEAN, FieldType.BOOLEAN) + .put(AttributeValueType.BYTE, FieldType.BYTE) + .put(AttributeValueType.DOUBLE, FieldType.DOUBLE) + .put(AttributeValueType.FLOAT, FieldType.FLOAT) + .put(AttributeValueType.INT, FieldType.INT32) + .put(AttributeValueType.LONG, FieldType.INT64) + .put(AttributeValueType.STRING, FieldType.STRING) + .build(); + private static final Map, EntityAwareAttributeDescriptor> + ATTR_CACHE = new HashMap<>(); + + private static final Map> ATTR_NAME_CACHE = + new HashMap<>(); + + public static SchemaStreamElementCoder of(Repository repo) { + if (ATTR_CACHE.isEmpty()) { + repo.getAllEntities() + .flatMap(e -> e.getAllAttributes().stream().map(a -> Pair.of(e, a))) + .forEach( + p -> { + EntityAwareAttributeDescriptor aware = asAware(p.getFirst(), p.getSecond()); + ATTR_CACHE.put(p.getSecond(), aware); + ATTR_NAME_CACHE.put(asAttributeFieldName(p.getFirst(), p.getSecond()), aware); + }); + } + return new SchemaStreamElementCoder(asSchema(repo)); + } + + private static String asAttributeFieldName(String entity, String attribute) { + return entity + "_" + attribute; + } + + private static String asAttributeFieldName(EntityDescriptor entity, AttributeDescriptor attr) { + return asAttributeFieldName(entity.getName(), attr.toAttributePrefix(false)); + } + + private static String asAttributeFieldName(StreamElement element) { + return asAttributeFieldName(element.getEntityDescriptor(), element.getAttributeDescriptor()); + } + + @SuppressWarnings("unchecked") + private static EntityAwareAttributeDescriptor asAware( + EntityDescriptor entity, AttributeDescriptor attr) { + if (attr.isWildcard()) { + return Wildcard.of(entity, (AttributeDescriptor) attr); + } + return Regular.of(entity, (AttributeDescriptor) attr); + } + + private static Schema asSchema(Repository repo) { + Builder builder = + Schema.builder() + .addStringField("key") + .addInt64Field("stamp") + // entity.attribute format + .addStringField("attributeName") + .addBooleanField("delete") + .addNullableStringField("attribute") + .addNullableStringField("uuid") + .addNullableInt64Field("seqId"); + repo.getAllEntities() + .flatMap(e -> e.getAllAttributes().stream()) + .forEach(attr -> addToBuilder(attr, builder)); + return builder.build(); + } + + private static void addToBuilder(AttributeDescriptor attr, Builder builder) { + String entity = attr.getEntity(); + String name = attr.toAttributePrefix(false); + addFieldToBuilder( + asAttributeFieldName(entity, name), + attr.getValueSerializer().getValueSchemaDescriptor(), + builder); + } + + private static void addFieldToBuilder( + String name, SchemaTypeDescriptor valueSchemaDescriptor, Builder builder) { + + builder.addNullableField(name, getRequiredField(valueSchemaDescriptor)); + } + + private static @Nonnull FieldType getRequiredField(SchemaTypeDescriptor schema) { + AttributeValueType type = schema.getType(); + if (type.equals(AttributeValueType.STRUCTURE)) { + StructureTypeDescriptor structure = schema.asStructureTypeDescriptor(); + Schema.Builder builder = Schema.builder(); + structure.getFields().forEach((n, f) -> addFieldToBuilder(n, f, builder)); + return FieldType.row(builder.build()); + } else if (type.equals(AttributeValueType.ARRAY)) { + ArrayTypeDescriptor arrayDesc = schema.asArrayTypeDescriptor(); + if (arrayDesc.getValueType().equals(AttributeValueType.BYTE)) { + return FieldType.BYTES; + } + return FieldType.array(getRequiredField(arrayDesc.getValueDescriptor())); + } else { + return Objects.requireNonNull( + FIELD_TYPES.get(type), () -> String.format("Unknown type %s", type)); + } + } + + private static StreamElement fromRow(Row row) { + String key = row.getString("key"); + long stamp = Objects.requireNonNull(row.getInt64("stamp")); + @Nullable String attribute = row.getString("attribute"); + String attributeName = row.getString("attributeName"); + boolean delete = Objects.requireNonNull(row.getBoolean("delete")); + String uuid = row.getString("uuid"); + Long seqId = row.getInt64("seqId"); + EntityAwareAttributeDescriptor entityAware = + Objects.requireNonNull( + ATTR_NAME_CACHE.get(attributeName), + () -> String.format("Missing attribute %s", attributeName)); + if (entityAware.isWildcard()) { + Preconditions.checkArgumentNotNull(attribute); + Wildcard wildcard = (Wildcard) entityAware; + if (delete) { + if (attribute.endsWith(".*")) { + if (seqId == null) { + return wildcard.deleteWildcard(uuid, key, stamp); + } + return wildcard.deleteWildcard(seqId, key, stamp); + } + if (seqId == null) { + return wildcard.delete(uuid, key, attribute, stamp); + } + return wildcard.delete(seqId, key, attribute, stamp); + } + Object value = + entityAware + .getValueSerializer() + .getValueAccessor() + .createFrom(row.getBaseValue(attributeName)); + if (seqId == null) { + return wildcard.upsert(uuid, key, attribute, stamp, value); + } + return wildcard.upsert(seqId, key, attribute, stamp, value); + } + Regular regular = (Regular) entityAware; + if (delete) { + if (seqId == null) { + return regular.delete(uuid, key, stamp); + } + return regular.delete(seqId, key, stamp); + } + Object value = + entityAware + .getValueSerializer() + .getValueAccessor() + .createFrom( + fromFieldType( + row.getSchema().getField(attributeName).getType(), + row.getValue(attributeName))); + if (seqId == null) { + return regular.upsert(uuid, key, stamp, value); + } + return regular.upsert(seqId, key, stamp, value); + } + + private static Row toRow(Schema schema, StreamElement element) { + Objects.requireNonNull( + ATTR_CACHE.get(element.getAttributeDescriptor()), + () -> String.format("Missing attribute %s", element.getAttributeDescriptor())); + String attributeName = asAttributeFieldName(element); + FieldValueBuilder builder = + Row.withSchema(schema) + .withFieldValue("key", element.getKey()) + .withFieldValue("stamp", element.getStamp()) + .withFieldValue("attributeName", attributeName) + .withFieldValue("delete", element.isDelete()); + if (element.getAttributeDescriptor().isWildcard()) { + builder = + builder.withFieldValue( + "attribute", + element + .getAttribute() + .substring(element.getAttributeDescriptor().toAttributePrefix().length())); + } + if (element.hasSequentialId()) { + builder = builder.withFieldValue("seqId", element.getSequentialId()); + } else { + builder = builder.withFieldValue("uuid", element.getUuid()); + } + if (!element.isDelete()) { + Object parsed = Optionals.get(element.getParsed()); + @SuppressWarnings("unchecked") + ValueSerializer valueSerializer = + (ValueSerializer) element.getAttributeDescriptor().getValueSerializer(); + Object mapped = + intoFieldType( + valueSerializer.getValueAccessor().valueOf(parsed), + schema.getField(attributeName).getType()); + builder = builder.withFieldValue(attributeName, mapped); + } + return builder.build(); + } + + /** + * Convert the given accessor to object that is compatible with Beam Schema. + * + * @param accessor the object returned by Proxima {@link AttributeValueAccessor} + * @param field the type to convert the object to + * @return object compatible with equal Beam {@link FieldType} + */ + private static Object intoFieldType(Object accessor, FieldType field) { + if (field.getRowSchema() != null) { + @SuppressWarnings("unchecked") + Map map = (Map) accessor; + return asRow(map, field); + } + return accessor; + } + + private static Object fromFieldType(FieldType type, Object value) { + if (type.getRowSchema() != null) { + return StructureValue.of(asStructureValue((Row) value)); + } + return value; + } + + private static Row asRow(Map map, FieldType field) { + Schema schema = Objects.requireNonNull(field.getRowSchema()); + return Row.withSchema(schema).withFieldValues(map).build(); + } + + private static Map asStructureValue(Row value) { + Map res = new HashMap<>(); + for (Field f : value.getSchema().getFields()) { + Object fieldValue = value.getValue(f.getName()); + if (fieldValue != null) { + if (f.getType().getRowSchema() != null) { + res.put(f.getName(), asStructureValue((Row) fieldValue)); + } else { + res.put(f.getName(), fieldValue); + } + } + } + return res; + } + + private SchemaStreamElementCoder(Schema schema) { + super( + schema, + TypeDescriptor.of(StreamElement.class), + elem -> toRow(schema, elem), + SchemaStreamElementCoder::fromRow); + } +} diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserverTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserverTest.java index ca6b2684c..48121bddd 100644 --- a/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserverTest.java +++ b/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BlockingQueueLogObserverTest.java @@ -113,11 +113,11 @@ public void testCapacityFullWithCancel() throws InterruptedException { "name", Long.MAX_VALUE, Long.MIN_VALUE, capacity) { @Override public boolean onNext( - StreamElement ingest, + StreamElement element, cz.o2.proxima.direct.core.batch.BatchLogObserver.OnNextContext context) { elements.countDown(); - return super.onNext(ingest, context); + return super.onNext(element, context); } }; long now = System.currentTimeMillis(); diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoderTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoderTest.java new file mode 100644 index 000000000..6525bd0c3 --- /dev/null +++ b/beam/core/src/test/java/cz/o2/proxima/beam/core/io/SchemaStreamElementCoderTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.core.io; + +import static org.junit.Assert.*; + +import cz.o2.proxima.beam.core.BeamDataOperator; +import cz.o2.proxima.beam.core.testing.Messages; +import cz.o2.proxima.beam.core.testing.Messages.Device; +import cz.o2.proxima.beam.core.testing.Messages.Status; +import cz.o2.proxima.beam.core.testing.Messages.User; +import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Regular; +import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Wildcard; +import cz.o2.proxima.core.repository.EntityDescriptor; +import cz.o2.proxima.core.repository.Repository; +import cz.o2.proxima.core.storage.StreamElement; +import cz.o2.proxima.core.util.Optionals; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.typesafe.config.ConfigFactory; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.schemas.transforms.Convert; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; + +public class SchemaStreamElementCoderTest { + + private final Repository repo = + Repository.ofTest(ConfigFactory.load("test-schema-coder.conf").resolve()); + + private final DirectDataOperator direct = repo.getOrCreateOperator(DirectDataOperator.class); + private final BeamDataOperator beam = repo.getOrCreateOperator(BeamDataOperator.class); + + private final SchemaStreamElementCoder coder = SchemaStreamElementCoder.of(repo); + + private final EntityDescriptor gateway = repo.getEntity("gateway"); + private final Regular gatewayStatus = Regular.of(gateway, gateway.getAttribute("status")); + private final Wildcard gatewayUser = Wildcard.of(gateway, gateway.getAttribute("user.*")); + private final Wildcard device = Wildcard.of(gateway, gateway.getAttribute("device.*")); + + private final EntityDescriptor user = repo.getEntity("user"); + private final Regular userDetails = Regular.of(user, user.getAttribute("details")); + + @Test + public void testParsing() throws CoderException { + StreamElement element = + gatewayStatus.upsert( + "key", System.currentTimeMillis(), Messages.Status.newBuilder().setAlive(true).build()); + byte[] encoded = CoderUtils.encodeToByteArray(coder, element); + assertNotNull(encoded); + StreamElement decoded = CoderUtils.decodeFromByteArray(coder, encoded); + assertEquals(element, decoded); + } + + @Test + public void testParsingWildcard() throws CoderException { + StreamElement element = gatewayUser.upsert("key", "user", System.currentTimeMillis(), ""); + byte[] encoded = CoderUtils.encodeToByteArray(coder, element); + assertNotNull(encoded); + StreamElement decoded = CoderUtils.decodeFromByteArray(coder, encoded); + assertEquals(element, decoded); + } + + @Test + public void testQuery1() { + Pipeline p = Pipeline.create(); + PCollection input = + beam.getBatchSnapshot(p, gatewayStatus).setCoder(SchemaStreamElementCoder.of(repo)); + PCollection result = + input + .apply(SqlTransform.query("SELECT COUNT(*) FROM PCOLLECTION WHERE key = 'a'")) + .apply(Convert.to(TypeDescriptors.longs())); + PAssert.that(result).containsInAnyOrder(1L); + + long now = System.currentTimeMillis(); + write(gatewayStatus.upsert("a", now, Status.newBuilder().setAlive(true).build())); + write(gatewayStatus.upsert("b", now, Status.newBuilder().setAlive(false).build())); + + p.run().waitUntilFinish(); + } + + @Test + public void testQuery2() { + Pipeline p = Pipeline.create(); + PCollection input = + beam.getBatchUpdates(p, gatewayStatus).setCoder(SchemaStreamElementCoder.of(repo)); + PCollection result = + input + .apply( + SqlTransform.query( + "SELECT key FROM PCOLLECTION WHERE PCOLLECTION.gateway_status.alive = false")) + .apply(Convert.to(TypeDescriptors.strings())); + PAssert.that(result).containsInAnyOrder("b"); + + long now = System.currentTimeMillis(); + write(gatewayStatus.upsert("a", now, Status.newBuilder().setAlive(true).build())); + write(gatewayStatus.upsert("b", now, Status.newBuilder().setAlive(false).build())); + + p.run().waitUntilFinish(); + } + + @Test + public void testQuery3() { + long now = System.currentTimeMillis(); + write(gatewayUser.upsert("gw1", "u1", now, "")); + write(gatewayUser.upsert("gw2", "u1", now, "")); + write(gatewayUser.upsert("gw2", "u2", now, "")); + + Pipeline p = Pipeline.create(); + PCollection gatewayUsers = + beam.getBatchUpdates(p, gatewayUser).setCoder(SchemaStreamElementCoder.of(repo)); + PCollection result = + gatewayUsers + .apply(SqlTransform.query("SELECT key, COUNT(*) c FROM PCOLLECTION GROUP BY key")) + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> String.format("%s:%d", row.getString(0), row.getInt64(1)))); + PAssert.that(result).containsInAnyOrder("gw1:1", "gw2:2"); + p.run().waitUntilFinish(); + } + + @Test + public void testQuery4() { + long now = System.currentTimeMillis(); + write( + userDetails.upsert( + "u1", + now, + User.newBuilder().addPhone("phone1").addPhone("phone2").setName("alice").build())); + write( + userDetails.upsert( + "u2", + now, + User.newBuilder().addPhone("phone3").addPhone("phone4").setName("bob").build())); + + write(gatewayUser.upsert("gw1", "u1", now, "")); + write(gatewayUser.upsert("gw2", "u1", now, "")); + write(gatewayUser.upsert("gw2", "u2", now, "")); + + Pipeline p = Pipeline.create(); + PCollection gatewayUsers = + beam.getBatchUpdates(p, gatewayUser).setCoder(SchemaStreamElementCoder.of(repo)); + PCollection users = + beam.getBatchUpdates(p, userDetails).setCoder(SchemaStreamElementCoder.of(repo)); + + PCollection result = + PCollectionTuple.of("gateway", gatewayUsers) + .and("user", users) + .apply( + SqlTransform.query( + "SELECT u.user_details.name, COUNT(*) FROM `gateway` g " + + "JOIN `user` u ON u.key = g.attribute GROUP BY u.user_details.name")) + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> String.format("%s:%d", row.getString(0), row.getInt64(1)))); + PAssert.that(result).containsInAnyOrder("alice:2", "bob:1"); + + p.run().waitUntilFinish(); + } + + private void write(StreamElement elem) { + Optionals.get(direct.getWriter(elem.getAttributeDescriptor())).write(elem, (succ, exc) -> {}); + } +} diff --git a/beam/core/src/test/proto/messages.proto b/beam/core/src/test/proto/messages.proto new file mode 100644 index 000000000..fdb3d5588 --- /dev/null +++ b/beam/core/src/test/proto/messages.proto @@ -0,0 +1,19 @@ + +syntax = "proto3"; + +package cz.o2.proxima.beam.core.testing; + +message User { + string name = 1; + repeated string phone = 2; +} + +message Status { + bool alive = 1; + repeated string user = 2; +} + +message Device { + string name = 1; + bytes raw = 2; +} diff --git a/beam/core/src/test/resources/test-schema-coder.conf b/beam/core/src/test/resources/test-schema-coder.conf new file mode 100644 index 000000000..04b64f72a --- /dev/null +++ b/beam/core/src/test/resources/test-schema-coder.conf @@ -0,0 +1,38 @@ +{ + entities: { + user { + attributes { + details: { scheme: "proto:cz.o2.proxima.beam.core.testing.Messages.User" } + } + } + + gateway: { + attributes: { + status: { scheme: "proto:cz.o2.proxima.beam.core.testing.Messages.Status" } + "user.*": { scheme: string } + "device.*": { scheme: "proto:cz.o2.proxima.beam.core.testing.Messages.Device" } + bytes: { scheme: bytes } + metric: { scheme: float } + } + } + } + + attributeFamilies: { + gateway-storage-stream: { + entity: gateway + attributes: [ "*" ] + storage: "inmem:///proxima_gateway" + type: primary + access: [ commit-log, cached-view, random-access, batch-updates ] + } + + user-storage-stream { + entity: user + attributes: [ "*" ] + storage: "inmem:///proxima_user" + type: primary + access: [ commit-log, random-access, batch-updates ] + } + } + +} diff --git a/buildSrc/src/main/groovy/cz.o2.proxima.java-conventions.gradle b/buildSrc/src/main/groovy/cz.o2.proxima.java-conventions.gradle index 67fc01ea0..4100a4827 100644 --- a/buildSrc/src/main/groovy/cz.o2.proxima.java-conventions.gradle +++ b/buildSrc/src/main/groovy/cz.o2.proxima.java-conventions.gradle @@ -53,6 +53,7 @@ ext.libraries = [ beam_runners_direct: "org.apache.beam:beam-runners-direct-java:${beam_version}", beam_runners_flink: "org.apache.beam:beam-runners-flink-1.16:${beam_version}", beam_runners_spark: "org.apache.beam:beam-runners-spark-3:${beam_version}", + beam_sql: "org.apache.beam:beam-sdks-java-extensions-sql:${beam_version}", flink_clients: "org.apache.flink:flink-clients:${flink_version}", flink_runtime: "org.apache.flink:flink-runtime:${flink_version}", flink_streaming: "org.apache.flink:flink-streaming-java:${flink_version}", diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/BytesSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/BytesSerializer.java index c2a3612bf..44a876541 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/BytesSerializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/BytesSerializer.java @@ -43,7 +43,7 @@ public String getAcceptableScheme() { @Override public ValueSerializer getValueSerializer(URI scheme) { return (ValueSerializer) - new ValueSerializer() { + new PrimitiveValueSerializer() { private static final long serialVersionUID = 1L; diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/DoubleSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/DoubleSerializer.java index 73294757d..8b1c739f9 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/DoubleSerializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/DoubleSerializer.java @@ -36,7 +36,7 @@ public String getAcceptableScheme() { @Override public ValueSerializer getValueSerializer(URI specifier) { return (ValueSerializer) - new ValueSerializer() { + new PrimitiveValueSerializer() { @Override public Optional deserialize(byte[] input) { try { diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/FloatSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/FloatSerializer.java index 86846d657..23591b3a6 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/FloatSerializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/FloatSerializer.java @@ -41,7 +41,7 @@ public String getAcceptableScheme() { @Override public ValueSerializer getValueSerializer(URI specifier) { return (ValueSerializer) - new ValueSerializer() { + new PrimitiveValueSerializer() { private static final long serialVersionUID = 1L; diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/IntSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/IntSerializer.java index 6e3cf346e..5965653c2 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/IntSerializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/IntSerializer.java @@ -41,7 +41,7 @@ public String getAcceptableScheme() { @Override public ValueSerializer getValueSerializer(URI specifier) { return (ValueSerializer) - new ValueSerializer() { + new PrimitiveValueSerializer() { private static final long serialVersionUID = 1L; diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/LongSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/LongSerializer.java index ef1ade06d..7e540ba94 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/LongSerializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/LongSerializer.java @@ -41,7 +41,7 @@ public String getAcceptableScheme() { @Override public ValueSerializer getValueSerializer(URI specifier) { return (ValueSerializer) - new ValueSerializer() { + new PrimitiveValueSerializer() { private static final long serialVersionUID = 1L; diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/PrimitiveValueSerializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/PrimitiveValueSerializer.java new file mode 100644 index 000000000..d134711a2 --- /dev/null +++ b/core/src/main/java/cz/o2/proxima/core/scheme/PrimitiveValueSerializer.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.core.scheme; + +abstract class PrimitiveValueSerializer implements ValueSerializer { + + @Override + public AttributeValueAccessor getValueAccessor() { + return new AttributeValueAccessor() { + @Override + public Type getType() { + return Type.PRIMITIVE; + } + + @SuppressWarnings("unchecked") + @Override + public V valueOf(T object) { + return (V) object; + } + + @SuppressWarnings("unchecked") + @Override + public T createFrom(V object) { + return (T) object; + } + }; + } +} diff --git a/core/src/main/java/cz/o2/proxima/core/scheme/StringUtf8Serializer.java b/core/src/main/java/cz/o2/proxima/core/scheme/StringUtf8Serializer.java index 8d9c6a480..016812b0e 100644 --- a/core/src/main/java/cz/o2/proxima/core/scheme/StringUtf8Serializer.java +++ b/core/src/main/java/cz/o2/proxima/core/scheme/StringUtf8Serializer.java @@ -27,7 +27,7 @@ public class StringUtf8Serializer implements ValueSerializerFactory { private static final long serialVersionUID = 1L; - static class StringValueSerializer implements ValueSerializer { + static class StringValueSerializer extends PrimitiveValueSerializer { private static final long serialVersionUID = 1L; diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java index 28e0b988f..39d659504 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java @@ -593,6 +593,35 @@ private void handleFlushDataBaseOnPosition( CountDownLatch latch, CommitLogObserver observer) { + try { + doHandleFlushDataBaseOnPosition( + position, + subscribedPartitions, + consumerId, + stopAtCurrent, + killSwitch, + consumedOffsets, + watermarkEstimator, + latch, + observer); + } catch (Throwable err) { + log.error("Error running observer", err); + observer.onError(err); + latch.countDown(); + } + } + + private void doHandleFlushDataBaseOnPosition( + Position position, + Set subscribedPartitions, + int consumerId, + boolean stopAtCurrent, + AtomicBoolean killSwitch, + Set consumedOffsets, + PartitionedWatermarkEstimator watermarkEstimator, + CountDownLatch latch, + CommitLogObserver observer) { + AtomicReference> onIdleRef = new AtomicReference<>(); Runnable onIdle = @@ -687,14 +716,13 @@ private void handleFlushDataBaseOnPosition( if (!stopAtCurrent) { uriObservers.put( consumerId, - (partition, data) -> { - consumer.accept( - partition, - data, - (success, error) -> { - // Noop. - }); - }); + (partition, data) -> + consumer.accept( + partition, + data, + (success, error) -> { + // Noop. + })); } else { observer.onCompleted(); onIdleFuture.cancel(true); @@ -705,14 +733,13 @@ private void handleFlushDataBaseOnPosition( if (!stopAtCurrent) { uriObservers.put( consumerId, - (partition, data) -> { - consumer.accept( - partition, - data, - (success, error) -> { - // Noop. - }); - }); + (partition, data) -> + consumer.accept( + partition, + data, + (success, error) -> { + // Noop. + })); } else { observer.onCompleted(); onIdleFuture.cancel(true); @@ -1180,7 +1207,8 @@ public InMemStorage() { } private DataHolder holder() { - return DataHolders.get(this); + return Objects.requireNonNull( + DataHolders.get(this), () -> String.format("Missing holder for %s", this)); } public NavigableMap getData() { @@ -1188,8 +1216,9 @@ public NavigableMap getData() { } NavigableMap getObservers(URI uri) { - return Objects.requireNonNull( - holder().observers.get(uri), () -> String.format("Missing observer for [%s]", uri)); + return holder() + .observers + .computeIfAbsent(uri, tmp -> Collections.synchronizedNavigableMap(new TreeMap<>())); } @Override @@ -1205,9 +1234,11 @@ public DataAccessor createAccessor( final Map cfg = familyDescriptor.getCfg(); log.info("Creating accessor {} for URI {}", getClass(), uri); + /* holder() .observers .computeIfAbsent(uri, k -> Collections.synchronizedNavigableMap(new TreeMap<>())); + */ final int numPartitions = Optional.ofNullable(cfg.get(NUM_PARTITIONS)) diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java index b4a69abfe..373a06542 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorageTest.java @@ -113,7 +113,6 @@ public void onRepartition(OnRepartitionContext context) { @Override public boolean onNext(StreamElement element, OnNextContext context) { - assertEquals(0, context.getPartition().getId()); assertEquals("key", element.getKey()); context.confirm(); @@ -387,7 +386,8 @@ public void onRepartition(CommitLogObserver.OnRepartitionContext context) { } @Override - public boolean onNext(StreamElement element, CommitLogObserver.OnNextContext context) { + public boolean onNext( + StreamElement element, CommitLogObserver.OnNextContext context) { assertEquals(0, context.getPartition().getId()); assertEquals("key", element.getKey()); @@ -549,6 +549,44 @@ public boolean onError(Throwable error) { } } + @Test(timeout = 10000) + public void testObserveBulkPartitionsEmpty() throws InterruptedException { + InMemStorage storage = new InMemStorage(); + DataAccessor accessor = + storage.createAccessor( + direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest"))); + CommitLogReader reader = Optionals.get(accessor.getCommitLogReader(direct.getContext())); + CountDownLatch latch = new CountDownLatch(1); + CommitLogObserver observer = + new CommitLogObserver() { + + @Override + public void onRepartition(CommitLogObserver.OnRepartitionContext context) { + assertEquals(1, context.partitions().size()); + } + + @Override + public boolean onNext(StreamElement element, CommitLogObserver.OnNextContext context) { + return true; + } + + @Override + public void onCompleted() { + latch.countDown(); + } + + @Override + public boolean onError(Throwable error) { + throw new RuntimeException(error); + } + }; + + try (ObserveHandle handle = + reader.observeBulkPartitions(reader.getPartitions(), Position.OLDEST, true, observer)) { + latch.await(); + } + } + @Test(timeout = 10000) public void testFetchOffsetsSinglePartition() throws InterruptedException { testFetchOffsets(1);