From 75e1fae6fe0e4c5f3e1a0afa8e937666bd706b50 Mon Sep 17 00:00:00 2001 From: David Moravek Date: Tue, 8 Jun 2021 15:01:58 +0200 Subject: [PATCH 1/2] [core] Introduce StructureValueAccessor#getFieldAccessor. --- .../scheme/AttributeValueAccessor.java | 4 +- .../scheme/AttributeValueAccessors.java | 22 ++++++++++ .../scheme/AttributeValueAccessorsTest.java | 10 +++++ .../proto/ProtoMessageValueAccessor.java | 25 ++++++++++++ .../proto/ProtoMessageValueAccessorTest.java | 40 +++++++++++++++++++ .../proto/src/test/resources/test-proto.conf | 2 +- 6 files changed, 100 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java index 0cdb14980..1ecc85a12 100644 --- a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java +++ b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java @@ -21,8 +21,8 @@ /** * Interface for value accessors allowed create and get value of attribute * - * @param input type - * @param output type + * @param Type of "raw" input element, that we want to convert into normalized form. + * @param Normalized "output" type, eg. `bytes -> string` for schema type `string`. */ @Experimental public interface AttributeValueAccessor extends Serializable { diff --git a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java index 511a9f831..bf52868d2 100644 --- a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java +++ b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java @@ -157,6 +157,28 @@ static StructureValue of(Map value) { public interface StructureValueAccessor extends AttributeValueAccessor { + /** + * Get accessor for a given field. Please not that accessors only work on "raw values". See + * {@link #getRawFieldValue(String, Object)} for more details. + * + * @param name Name of the field to get accessor for. + * @return Field accessor. + */ + AttributeValueAccessor getFieldAccessor(String name); + + /** + * Get raw value of a given field. In this context, raw value means a value before applying a + * field accessor on it (for example it can be a byte representation, that would be converted to + * a string after "accessing"). This is intended for partial message parsing and to be used in + * combination with {@link #getFieldAccessor(String)}. + * + * @param name Name of the field. + * @param structure Structure to get a raw value from. + * @param Type of raw value. This is only to simplify casting of returned value. + * @return Raw value. + */ + OutputT getRawFieldValue(String name, T structure); + @Override default Type getType() { return Type.STRUCTURE; diff --git a/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java b/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java index 45a39701b..49c5a3701 100644 --- a/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java +++ b/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java @@ -98,6 +98,16 @@ public void testArrayWithStructureValue() { private static class TestStructureAccessor implements StructureValueAccessor> { + @Override + public AttributeValueAccessor getFieldAccessor(String name) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public OutputT getRawFieldValue(String name, Map structure) { + throw new UnsupportedOperationException("Not implemented."); + } + @Override public StructureValue valueOf(Map object) { return StructureValue.of(object); diff --git a/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java b/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java index 0fa961aa4..2e480b7ef 100644 --- a/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java +++ b/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java @@ -69,6 +69,31 @@ public ProtoMessageValueAccessor(Factory defaultValueFactory) { })); } + @Override + public AttributeValueAccessor getFieldAccessor(String name) { + return fieldAccessors.get(name); + } + + @Override + public OutputT getRawFieldValue(String name, T structure) { + for (FieldDescriptor field : structure.getDescriptorForType().getFields()) { + if (name.equals(field.getName())) { + final Object rawValue = structure.getField(field); + if (rawValue instanceof EnumValueDescriptor) { + final EnumValueDescriptor cast = (EnumValueDescriptor) rawValue; + @SuppressWarnings("unchecked") + final OutputT result = (OutputT) cast.getName(); + return result; + } + @SuppressWarnings("unchecked") + final OutputT result = (OutputT) rawValue; + return result; + } + } + throw new IllegalStateException( + String.format("Field %s not found in %s.", name, structure.getClass())); + } + @Override public StructureValue valueOf(T object) { return StructureValue.of( diff --git a/scheme/proto/src/test/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessorTest.java b/scheme/proto/src/test/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessorTest.java index b57c947dd..8c9a2eb03 100644 --- a/scheme/proto/src/test/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessorTest.java +++ b/scheme/proto/src/test/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessorTest.java @@ -17,9 +17,11 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; +import cz.o2.proxima.scheme.AttributeValueAccessors; import cz.o2.proxima.scheme.AttributeValueAccessors.StructureValue; import cz.o2.proxima.scheme.AttributeValueAccessors.StructureValueAccessor; import cz.o2.proxima.scheme.proto.test.Scheme.Event; @@ -229,4 +231,42 @@ public void testCreateProtoWhereRepeatedFieldChangedToOptional() { assertArrayEquals( "second".getBytes(StandardCharsets.UTF_8), created.getPayload().toByteArray()); } + + @Test + public void testFieldAccessors() { + final StructureValueAccessor accessor = + new ProtoMessageValueAccessor<>(ValueSchemeMessage::getDefaultInstance); + final InnerMessage innerMessage = InnerMessage.newBuilder().setInnerDoubleType(1.2).build(); + final ValueSchemeMessage message = + ValueSchemeMessage.newBuilder() + .setStringType("test string") + .setInnerMessage(innerMessage) + .build(); + + // Test getting "raw field value". This means that inner message doesn't get converted into map. + assertEquals("test string", accessor.getRawFieldValue("string_type", message)); + assertEquals(innerMessage, accessor.getRawFieldValue("inner_message", message)); + + // Test inner message accessor. + @SuppressWarnings("unchecked") + final StructureValueAccessor innerMessageAccessor = + (StructureValueAccessor) accessor.getFieldAccessor("inner_message"); + assertEquals( + 1.2d, innerMessageAccessor.getRawFieldValue("inner_double_type", innerMessage), 0.0); + + // Test string type accessor. + @SuppressWarnings("unchecked") + final AttributeValueAccessors.PrimitiveValueAccessor stringTypeAccessor = + (AttributeValueAccessors.PrimitiveValueAccessor) + accessor.getFieldAccessor("string_type"); + assertEquals( + "test string", + stringTypeAccessor.valueOf(accessor.getRawFieldValue("string_type", message))); + + // Test accessing unknown field. + assertThrows(IllegalStateException.class, () -> accessor.getRawFieldValue("unknown", message)); + + // Test defaults. + assertEquals(0L, (long) accessor.getRawFieldValue("long_type", message)); + } } diff --git a/scheme/proto/src/test/resources/test-proto.conf b/scheme/proto/src/test/resources/test-proto.conf index 27fd902f2..b8c221b7b 100644 --- a/scheme/proto/src/test/resources/test-proto.conf +++ b/scheme/proto/src/test/resources/test-proto.conf @@ -66,7 +66,7 @@ attributes: [ "*" ] storage: "inmem:///data/proxima/gateway" type: replica - access: [ commit-log, random-access ] + access: [ random-access ] } dummy-storage: { entity: dummy From cd2f3e5a1dd888805f7233effb19f1b4e166570e Mon Sep 17 00:00:00 2001 From: David Moravek Date: Tue, 8 Jun 2021 15:52:56 +0200 Subject: [PATCH 2/2] x --- .../direct/commitlog/CommitLogReaderTest.java | 2 +- .../flink/core/AbstractLogSourceFunction.java | 23 +- .../flink/core/BatchLogSourceFunction.java | 6 +- .../flink/core/CommitLogSourceFunction.java | 11 +- .../proxima/flink/core/FlinkDataOperator.java | 308 ++++++++++++++++++ .../flink/core/FlinkDataOperatorFactory.java | 38 +++ .../table/BatchLogDynamicTableSource.java | 65 ++++ .../table/CommitLogDynamicTableSource.java | 68 ++++ .../flink/core/table/LogCatalogTable.java | 193 +++++++++++ .../table/LogDynamicTableSourceFactory.java | 63 ++++ .../core/table/RowDataResultExtractor.java | 147 +++++++++ ....o2.proxima.repository.DataOperatorFactory | 1 + .../org.apache.flink.table.factories.Factory | 1 + .../core/BatchLogSourceFunctionTest.java | 2 + .../core/CommitLogSourceFunctionTest.java | 3 + .../flink/core/FlinkDataOperatorTest.java | 216 ++++++++++++ .../table/RowDataResultExtractorTest.java | 59 ++++ 17 files changed, 1191 insertions(+), 15 deletions(-) create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java create mode 100644 flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java create mode 100644 flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory create mode 100644 flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java create mode 100644 flink/core/src/test/java/cz/o2/proxima/flink/core/table/RowDataResultExtractorTest.java diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java index 0b1c8e950..196eb9b16 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java @@ -380,7 +380,7 @@ public boolean onError(Throwable error) { } @Test - public void testOrderedObserverLifycycle() { + public void testOrderedObserverLifecycle() { StreamElement update = StreamElement.upsert( entity, diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java index fd40d2e5b..c3366b66b 100644 --- a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java @@ -59,6 +59,7 @@ *
  • Unified metrics for monitoring and alerting. * * + * @param Source options. * @param Reader to use, for reading data. * @param Observer implementation. * @param Offset used by the current reader. @@ -67,6 +68,7 @@ */ @Slf4j abstract class AbstractLogSourceFunction< + OptionsT extends FlinkDataOperator.LogOptions, ReaderT, ObserverT extends AbstractSourceLogObserver, OffsetT extends Serializable, @@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks) @Getter private final RepositoryFactory repositoryFactory; private final List> attributeDescriptors; + @Getter private final OptionsT options; private final ResultExtractor resultExtractor; @Nullable private transient List restoredOffsets; @@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks) AbstractLogSourceFunction( RepositoryFactory repositoryFactory, List> attributeDescriptors, + OptionsT options, ResultExtractor resultExtractor) { this.repositoryFactory = repositoryFactory; this.attributeDescriptors = attributeDescriptors; + this.options = options; this.resultExtractor = resultExtractor; } @@ -290,14 +295,16 @@ void finishAndMarkAsIdle(SourceContext sourceContext) { synchronized (sourceContext.getCheckpointLock()) { sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK)); } - sourceContext.markAsTemporarilyIdle(); - while (cancelled.getCount() > 0) { - try { - cancelled.await(); - } catch (InterruptedException e) { - if (cancelled.getCount() == 0) { - // Re-interrupt if cancelled. - Thread.currentThread().interrupt(); + if (!options.shutdownFinishedSources()) { + sourceContext.markAsTemporarilyIdle(); + while (cancelled.getCount() > 0) { + try { + cancelled.await(); + } catch (InterruptedException e) { + if (cancelled.getCount() == 0) { + // Re-interrupt if cancelled. + Thread.currentThread().interrupt(); + } } } } diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java index b39fe221b..0179d8374 100644 --- a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java @@ -33,6 +33,7 @@ @Slf4j public class BatchLogSourceFunction extends AbstractLogSourceFunction< + FlinkDataOperator.BatchLogOptions, BatchLogReader, BatchLogSourceFunction.LogObserver, Offset, @@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) { public BatchLogSourceFunction( RepositoryFactory repositoryFactory, List> attributeDescriptors, + FlinkDataOperator.BatchLogOptions options, ResultExtractor resultExtractor) { - super(repositoryFactory, attributeDescriptors, resultExtractor); + super(repositoryFactory, attributeDescriptors, options, resultExtractor); } @Override @@ -82,7 +84,7 @@ BatchLogReader createLogReader(List> attributeDescriptors @Override List getPartitions(BatchLogReader reader) { - return reader.getPartitions(); + return reader.getPartitions(getOptions().startTimestamp(), getOptions().endTimestamp()); } @Override diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java index 13c1057bc..698bfe931 100644 --- a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java @@ -23,7 +23,6 @@ import cz.o2.proxima.repository.AttributeDescriptor; import cz.o2.proxima.repository.RepositoryFactory; import cz.o2.proxima.storage.Partition; -import cz.o2.proxima.storage.commitlog.Position; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -32,6 +31,7 @@ @Slf4j public class CommitLogSourceFunction extends AbstractLogSourceFunction< + FlinkDataOperator.CommitLogOptions, CommitLogReader, CommitLogSourceFunction.LogObserver, Offset, @@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) { public CommitLogSourceFunction( RepositoryFactory repositoryFactory, List> attributeDescriptors, + FlinkDataOperator.CommitLogOptions options, ResultExtractor resultExtractor) { - super(repositoryFactory, attributeDescriptors, resultExtractor); + super(repositoryFactory, attributeDescriptors, options, resultExtractor); } @Override @@ -109,8 +110,10 @@ UnifiedObserveHandle observePartitions( List partitions, List> attributeDescriptors, LogObserver observer) { + final FlinkDataOperator.CommitLogOptions options = getOptions(); final ObserveHandle commitLogHandle = - reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer); + reader.observeBulkPartitions( + partitions, options.initialPosition(), options.stopAtCurrent(), observer); return new UnifiedObserveHandle() { @Override @@ -132,7 +135,7 @@ UnifiedObserveHandle observeRestoredOffsets( List> attributeDescriptors, LogObserver observer) { final cz.o2.proxima.direct.commitlog.ObserveHandle delegate = - reader.observeBulkOffsets(offsets, false, observer); + reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer); return new UnifiedObserveHandle() { @Override diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java new file mode 100644 index 000000000..1d6b886b5 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java @@ -0,0 +1,308 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import cz.o2.proxima.flink.core.table.LogCatalogTable; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.DataOperator; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.storage.commitlog.Position; +import cz.o2.proxima.time.Watermarks; +import cz.o2.proxima.util.Optionals; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.Getter; +import lombok.Value; +import lombok.experimental.Accessors; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.connector.ChangelogMode; + +/** Operate {@link Repository} using Apache Flink. */ +public class FlinkDataOperator implements DataOperator { + + /** + * Create a standardized name for a new source. + * + * @param type Type of the source (eg. CommitLog, BatchLog). + * @param attributeDescriptors Attributes the source is reading. + * @return Source name. + */ + private static String createSourceName( + String type, List> attributeDescriptors) { + return String.format( + "%s: %s", + type, + attributeDescriptors + .stream() + .sorted() + .map(AttributeDescriptor::getName) + .collect(Collectors.joining(","))); + } + + public static CommitLogOptions.CommitLogOptionsBuilder newCommitLogOptions() { + return CommitLogOptions.newBuilder(); + } + + public static BatchLogOptions.BatchLogOptionsBuilder newBatchLogOptions() { + return BatchLogOptions.newBuilder(); + } + + /** Common options for all log based sources. */ + public interface LogOptions extends Serializable { + + /** + * Shutdown sources, when they have no more work to do. + * + * @return True to shutdown when finished. + */ + boolean shutdownFinishedSources(); + } + + /** {@link LogOptions} specific to {@link CommitLogSourceFunction}. */ + @Accessors(fluent = true) + @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true) + @Value + public static class CommitLogOptions implements LogOptions { + + private static final long serialVersionUID = 1L; + + /** @see LogOptions#shutdownFinishedSources() */ + @Builder.Default boolean shutdownFinishedSources = false; + + /** + * An initial position to start reading from, when sources is starting from a clean state (no + * checkpoint to restore from). + */ + @Builder.Default Position initialPosition = Position.OLDEST; + + /** Signal to stop reading, when event time aligns with processing time. */ + @Builder.Default boolean stopAtCurrent = false; + } + + /** {@link LogOptions} specific to {@link BatchLogSourceFunction}. */ + @Accessors(fluent = true) + @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true) + @Value + public static class BatchLogOptions implements LogOptions { + + private static final long serialVersionUID = 1L; + + /** @see LogOptions#shutdownFinishedSources() */ + @Builder.Default boolean shutdownFinishedSources = false; + + @Builder.Default long startTimestamp = Watermarks.MIN_WATERMARK; + + @Builder.Default long endTimestamp = Watermarks.MAX_WATERMARK; + } + + private final Repository repository; + @Getter private final StreamExecutionEnvironment executionEnvironment; + @Getter private final StreamTableEnvironment tableEnvironment; + + public FlinkDataOperator(Repository repository) { + this.repository = repository; + this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + this.tableEnvironment = StreamTableEnvironment.create(executionEnvironment); + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public void reload() { + // Nothing to reload. + } + + @Override + public Repository getRepository() { + return repository; + } + + /** + * Create a new stream from historical batch data. + * + * @param attributeDescriptors Attributes we want to get data from. + * @return A new DataStream. + */ + public DataStream createBatchLogStream( + Collection> attributeDescriptors) { + return createBatchLogStream(attributeDescriptors, BatchLogOptions.newBuilder().build()); + } + + /** + * Create a new stream from historical batch data. + * + * @param attributeDescriptors Attributes we want to get data from. + * @param options Options for {@link BatchLogSourceFunction}. + * @return A new DataStream. + */ + public DataStream createBatchLogStream( + Collection> attributeDescriptors, BatchLogOptions options) { + final List> attributeDescriptorsCopy = + new ArrayList<>(attributeDescriptors); + return executionEnvironment.addSource( + new BatchLogSourceFunction<>( + repository.asFactory(), attributeDescriptorsCopy, options, ResultExtractor.identity()), + createSourceName("BatchLog", attributeDescriptorsCopy)); + } + + /** + * Create a new stream from realtime streaming data. + * + * @param attributeDescriptors Attributes we want to get data from. + * @return A new DataStream. + */ + public DataStream createCommitLogStream( + Collection> attributeDescriptors) { + return createCommitLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build()); + } + + /** + * Create a new stream from realtime streaming data. + * + * @param attributeDescriptors Attributes we want to get data from. + * @param options Options for {@link CommitLogSourceFunction}. + * @return A new DataStream. + */ + public DataStream createCommitLogStream( + Collection> attributeDescriptors, CommitLogOptions options) { + final List> attributeDescriptorsCopy = + new ArrayList<>(attributeDescriptors); + return executionEnvironment.addSource( + new CommitLogSourceFunction<>( + repository.asFactory(), attributeDescriptorsCopy, options, ResultExtractor.identity()), + createSourceName("CommitLog", attributeDescriptorsCopy)); + } + + /** + * Create a new table of realtime streaming data in current {@link Catalog catalog's} default + * database. + * + * @param tableName Table name. + * @param attributeDescriptors Attributes we want to get data from. + * @param changelogMode Changelog mode of the underlying source. + */ + public void registerCommitLogTable( + String tableName, + Collection> attributeDescriptors, + ChangelogMode changelogMode) { + registerCommitLogTable( + tableName, attributeDescriptors, changelogMode, newCommitLogOptions().build()); + } + + /** + * Create a new table of realtime streaming data in current {@link Catalog catalog's} default + * database. + * + * @param tableName Table name. + * @param attributeDescriptors Attributes we want to get data from. + * @param changelogMode Changelog mode of the underlying source. + * @param options Options for {@link CommitLogSourceFunction}. + */ + public void registerCommitLogTable( + String tableName, + Collection> attributeDescriptors, + ChangelogMode changelogMode, + CommitLogOptions options) { + final List> attributeDescriptorsCopy = + new ArrayList<>(attributeDescriptors); + final Catalog catalog = + Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog())); + try { + catalog.createTable( + ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), tableName)), + LogCatalogTable.ofCommitLog(repository, attributeDescriptorsCopy, options), + false); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Unable to register a new table [%s].", tableName), e); + } + } + + /** + * Create a new table of historical batch data in current {@link Catalog catalog's} default + * database. + * + * @param tableName Table name. + * @param attributeDescriptors Attributes we want to get data from. + * @param changelogMode Changelog mode of the underlying source. + */ + public void registerBatchLogTable( + String tableName, + Collection> attributeDescriptors, + ChangelogMode changelogMode) { + registerBatchLogTable( + tableName, attributeDescriptors, changelogMode, newBatchLogOptions().build()); + } + + /** + * Create a new table of historical batch data in current {@link Catalog catalog's} default + * database. + * + * @param tableName Table name. + * @param attributeDescriptors Attributes we want to get data from. + * @param changelogMode Changelog mode of the underlying source. + * @param options Options for {@link BatchLogSourceFunction}. + */ + public void registerBatchLogTable( + String tableName, + Collection> attributeDescriptors, + ChangelogMode changelogMode, + BatchLogOptions options) { + final List> attributeDescriptorsCopy = + new ArrayList<>(attributeDescriptors); + final Catalog catalog = + Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog())); + try { + catalog.createTable( + ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), tableName)), + LogCatalogTable.ofBatchLog(repository, attributeDescriptorsCopy, options), + false); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Unable to register a new table [%s].", tableName), e); + } + } + + /** @see StreamTableEnvironment#from(String) for more details. */ + public Table getTable(String tableName) { + return tableEnvironment.from(tableName); + } + + /** @see StreamExecutionEnvironment#execute(String) for more details. */ + public JobExecutionResult execute(String jobName) throws Exception { + return executionEnvironment.execute(jobName); + } + + /** @see StreamExecutionEnvironment#executeAsync(String) for more details. */ + public JobClient executeAsync(String jobName) throws Exception { + return executionEnvironment.executeAsync(jobName); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java new file mode 100644 index 000000000..1fc4642f4 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java @@ -0,0 +1,38 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import cz.o2.proxima.repository.DataOperator; +import cz.o2.proxima.repository.DataOperatorFactory; +import cz.o2.proxima.repository.Repository; + +public class FlinkDataOperatorFactory implements DataOperatorFactory { + + @Override + public String getOperatorName() { + return "flink"; + } + + @Override + public boolean isOfType(Class cls) { + return cls.isAssignableFrom(FlinkDataOperator.class); + } + + @Override + public FlinkDataOperator create(Repository repository) { + return new FlinkDataOperator(repository); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java new file mode 100644 index 000000000..4089c0b39 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java @@ -0,0 +1,65 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.BatchLogSourceFunction; +import cz.o2.proxima.repository.Repository; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.types.RowKind; + +public class BatchLogDynamicTableSource implements ScanTableSource { + + private final Repository repository; + private final LogCatalogTable.BatchLogCatalogTable catalogTable; + + public BatchLogDynamicTableSource( + Repository repository, LogCatalogTable.BatchLogCatalogTable catalogTable) { + this.repository = repository; + this.catalogTable = catalogTable; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return SourceFunctionProvider.of( + new BatchLogSourceFunction<>( + repository.asFactory(), + catalogTable.getAttributeDescriptors(), + catalogTable.getLogOptions(), + new RowDataResultExtractor(catalogTable.getAttributeDescriptors())), + true); + } + + @Override + public DynamicTableSource copy() { + throw new UnsupportedOperationException(); + } + + @Override + public String asSummaryString() { + return "BatchLog table source"; + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java new file mode 100644 index 000000000..571c26830 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java @@ -0,0 +1,68 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.CommitLogSourceFunction; +import cz.o2.proxima.repository.Repository; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; + +public class CommitLogDynamicTableSource implements ScanTableSource, SupportsSourceWatermark { + + private final Repository repository; + private final LogCatalogTable.CommitLogCatalogTable catalogTable; + + public CommitLogDynamicTableSource( + Repository repository, LogCatalogTable.CommitLogCatalogTable catalogTable) { + this.repository = repository; + this.catalogTable = catalogTable; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + final boolean bounded = catalogTable.getLogOptions().stopAtCurrent(); + return SourceFunctionProvider.of( + new CommitLogSourceFunction<>( + repository.asFactory(), + catalogTable.getAttributeDescriptors(), + catalogTable.getLogOptions(), + new RowDataResultExtractor(catalogTable.getAttributeDescriptors())), + bounded); + } + + @Override + public DynamicTableSource copy() { + return new CommitLogDynamicTableSource(repository, catalogTable); + } + + @Override + public void applySourceWatermark() { + // No-op. + } + + @Override + public String asSummaryString() { + return "CommitLog table source"; + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java new file mode 100644 index 000000000..84be23441 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java @@ -0,0 +1,193 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.FlinkDataOperator; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.scheme.SchemaDescriptors; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.Getter; +import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.VarCharType; + +public abstract class LogCatalogTable + implements org.apache.flink.table.catalog.CatalogTable { + + public static CommitLogCatalogTable ofCommitLog( + Repository repository, + List> attributeDescriptors, + FlinkDataOperator.CommitLogOptions logOptions) { + return new CommitLogCatalogTable(repository, attributeDescriptors, logOptions); + } + + public static BatchLogCatalogTable ofBatchLog( + Repository repository, + List> attributeDescriptors, + FlinkDataOperator.BatchLogOptions logOptions) { + return new BatchLogCatalogTable(repository, attributeDescriptors, logOptions); + } + /** + * Convert {@link SchemaDescriptors.SchemaTypeDescriptor} to Flink's {@link DataType}. + * + * @param schema Schema to convert. + * @return Data type. + */ + @VisibleForTesting + static DataType toDataType(SchemaDescriptors.SchemaTypeDescriptor schema) { + switch (schema.getType()) { + case INT: + return DataTypes.INT(); + case BYTE: + return DataTypes.BYTES(); + case ENUM: + return DataTypes.STRING(); + case LONG: + return DataTypes.BIGINT(); + case ARRAY: + return DataTypes.ARRAY(toDataType(schema.asArrayTypeDescriptor().getValueDescriptor())); + case FLOAT: + return DataTypes.FLOAT(); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.VARCHAR(VarCharType.MAX_LENGTH); + case BOOLEAN: + return DataTypes.BOOLEAN(); + case STRUCTURE: + final DataTypes.Field[] fields = + schema + .asStructureTypeDescriptor() + .getFields() + .entrySet() + .stream() + .map(entry -> DataTypes.FIELD(entry.getKey(), toDataType(entry.getValue()))) + .toArray(DataTypes.Field[]::new); + return DataTypes.ROW(fields); + default: + throw new IllegalArgumentException( + String.format("Unknown data type %s.", schema.getType())); + } + } + + public static class CommitLogCatalogTable + extends LogCatalogTable { + + public CommitLogCatalogTable( + Repository repository, + List> attributeDescriptors, + FlinkDataOperator.CommitLogOptions logOptions) { + super(repository, attributeDescriptors, logOptions); + } + + @Override + public CatalogBaseTable copy() { + return new CommitLogCatalogTable(getRepository(), getAttributeDescriptors(), getLogOptions()); + } + } + + public static class BatchLogCatalogTable + extends LogCatalogTable { + + public BatchLogCatalogTable( + Repository repository, + List> attributeDescriptors, + FlinkDataOperator.BatchLogOptions logOptions) { + super(repository, attributeDescriptors, logOptions); + } + + @Override + public CatalogBaseTable copy() { + return new BatchLogCatalogTable(getRepository(), getAttributeDescriptors(), getLogOptions()); + } + } + + @Getter private final Repository repository; + @Getter private final List> attributeDescriptors; + @Getter private final OptionsT logOptions; + + public LogCatalogTable( + Repository repository, + List> attributeDescriptors, + OptionsT logOptions) { + this.repository = repository; + this.attributeDescriptors = attributeDescriptors; + this.logOptions = logOptions; + } + + @Override + public boolean isPartitioned() { + return false; + } + + @Override + public List getPartitionKeys() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public org.apache.flink.table.catalog.CatalogTable copy(Map map) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Map getOptions() { + final Map options = new HashMap<>(); + options.put("connector", LogDynamicTableSourceFactory.FACTORY_IDENTIFIER); + return options; + } + + @Override + public Schema getUnresolvedSchema() { + final Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("key", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull()) + .column("uuid", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).nullable()) + .column("attribute", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull()) + .column("attribute_prefix", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull()) + .column("timestamp", DataTypes.TIMESTAMP(3).notNull()) + .primaryKey("key") + .watermark("timestamp", "SOURCE_WATERMARK()"); + for (AttributeDescriptor attributeDescriptor : attributeDescriptors) { + schemaBuilder.column( + attributeDescriptor.toAttributePrefix(false), + toDataType(attributeDescriptor.getSchemaTypeDescriptor())); + } + return schemaBuilder.build(); + } + + @Override + public String getComment() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getDetailedDescription() { + return Optional.empty(); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java new file mode 100644 index 000000000..716860eda --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java @@ -0,0 +1,63 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +public class LogDynamicTableSourceFactory implements DynamicTableSourceFactory { + + static final String FACTORY_IDENTIFIER = "proxima"; + + public LogDynamicTableSourceFactory() {} + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final LogCatalogTable catalogTable = + (LogCatalogTable) context.getCatalogTable().getOrigin(); + if (catalogTable instanceof LogCatalogTable.CommitLogCatalogTable) { + final LogCatalogTable.CommitLogCatalogTable cast = + (LogCatalogTable.CommitLogCatalogTable) catalogTable; + return new CommitLogDynamicTableSource(catalogTable.getRepository(), cast); + } + if (catalogTable instanceof LogCatalogTable.BatchLogCatalogTable) { + final LogCatalogTable.BatchLogCatalogTable cast = + (LogCatalogTable.BatchLogCatalogTable) catalogTable; + return new BatchLogDynamicTableSource(catalogTable.getRepository(), cast); + } + throw new UnsupportedOperationException( + String.format( + "Unknown %s implementation %s.", LogCatalogTable.class, catalogTable.getClass())); + } + + @Override + public String factoryIdentifier() { + return FACTORY_IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java new file mode 100644 index 000000000..05c2b3546 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java @@ -0,0 +1,147 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.ResultExtractor; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.scheme.AttributeValueAccessor; +import cz.o2.proxima.scheme.AttributeValueAccessors; +import cz.o2.proxima.scheme.SchemaDescriptors; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.util.Optionals; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +/** Convert {@link StreamElement} into {@link RowData}. */ +public class RowDataResultExtractor implements ResultExtractor { + + private final List> attributeDescriptors; + + public RowDataResultExtractor(List> attributeDescriptors) { + this.attributeDescriptors = attributeDescriptors; + } + + @Override + public RowData toResult(StreamElement element) { + final AttributeDescriptor[] attributeDescriptors = + this.attributeDescriptors.toArray(new AttributeDescriptor[0]); + final GenericRowData data = new GenericRowData(5 + attributeDescriptors.length); + // key + data.setField(0, StringData.fromString(element.getKey())); + // uuid + data.setField(1, StringData.fromString(element.getUuid())); + // attribute + data.setField(2, StringData.fromString(element.getAttribute())); + // attribute prefix + data.setField( + 3, StringData.fromString(element.getAttributeDescriptor().toAttributePrefix(false))); + // timestamp + data.setField(4, TimestampData.fromEpochMillis(element.getStamp())); + // value + for (int i = 0; i < attributeDescriptors.length; i++) { + if (element.getAttributeDescriptor().equals(attributeDescriptors[i])) { + data.setField(5 + i, toFlinkObject(element)); + } + } + return data; + } + + @VisibleForTesting + static Object toFlinkObject(StreamElement element) { + @SuppressWarnings("unchecked") + final AttributeDescriptor attributeDescriptor = + (AttributeDescriptor) element.getAttributeDescriptor(); + final ValueT value = + Optionals.get(attributeDescriptor.getValueSerializer().deserialize(element.getValue())); + final SchemaDescriptors.SchemaTypeDescriptor schemaDescriptor = + attributeDescriptor.getSchemaTypeDescriptor(); + final AttributeValueAccessor valueAccessor = + attributeDescriptor.getValueSerializer().getValueAccessor(); + return toFlinkObject(schemaDescriptor, valueAccessor, value); + } + + private static Object toFlinkObject( + SchemaDescriptors.SchemaTypeDescriptor descriptor, + AttributeValueAccessor accessor, + ValueT value) { + switch (descriptor.getType()) { + case STRUCTURE: + final AttributeValueAccessors.StructureValueAccessor structureAccessor = + (AttributeValueAccessors.StructureValueAccessor) accessor; + return toFlinkStructure(descriptor.asStructureTypeDescriptor(), structureAccessor, value); + case STRING: + case ENUM: + return StringData.fromString((String) value); + case BOOLEAN: + case LONG: + case DOUBLE: + case FLOAT: + case INT: + case BYTE: + return value; + case ARRAY: + final AttributeValueAccessors.ArrayValueAccessor arrayAccessor = + (AttributeValueAccessors.ArrayValueAccessor) accessor; + @SuppressWarnings("unchecked") + final List values = (List) value; + return toFlinkArray(descriptor.asArrayTypeDescriptor(), arrayAccessor, values); + default: + throw new UnsupportedOperationException( + String.format("Type [%s] is not supported.", descriptor.getType())); + } + } + + private static RowData toFlinkStructure( + SchemaDescriptors.StructureTypeDescriptor descriptor, + AttributeValueAccessors.StructureValueAccessor accessor, + ValueT value) { + final GenericRowData rowData = new GenericRowData(descriptor.getFields().size()); + final AtomicInteger idx = new AtomicInteger(0); + descriptor + .getFields() + .forEach( + (fieldName, fieldType) -> { + @SuppressWarnings("unchecked") + final SchemaDescriptors.SchemaTypeDescriptor cast = + (SchemaDescriptors.SchemaTypeDescriptor) fieldType; + final Object fieldValue = accessor.getRawFieldValue(fieldName, value); + @SuppressWarnings("unchecked") + final AttributeValueAccessor fieldAccessor = + (AttributeValueAccessor) accessor.getFieldAccessor(fieldName); + rowData.setField( + idx.getAndIncrement(), toFlinkObject(cast, fieldAccessor, fieldValue)); + }); + return rowData; + } + + private static ArrayData toFlinkArray( + SchemaDescriptors.ArrayTypeDescriptor descriptor, + AttributeValueAccessors.ArrayValueAccessor accessor, + List values) { + final SchemaDescriptors.SchemaTypeDescriptor valueDescriptor = + descriptor.getValueDescriptor(); + final AttributeValueAccessor valueAccessor = accessor.getValueAccessor(); + return new GenericArrayData( + values.stream().map(item -> toFlinkObject(valueDescriptor, valueAccessor, item)).toArray()); + } +} diff --git a/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory b/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory new file mode 100644 index 000000000..70e70484f --- /dev/null +++ b/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory @@ -0,0 +1 @@ +cz.o2.proxima.flink.core.FlinkDataOperatorFactory diff --git a/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..0055431e1 --- /dev/null +++ b/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1 @@ +cz.o2.proxima.flink.core.table.LogDynamicTableSourceFactory \ No newline at end of file diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java index f922f3977..c7018f90e 100644 --- a/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java +++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java @@ -106,6 +106,7 @@ void testRunAndClose() throws Exception { new BatchLogSourceFunction( repository.asFactory(), Collections.singletonList(attribute), + FlinkDataOperator.BatchLogOptions.newBuilder().build(), ResultExtractor.identity()) { @Override @@ -263,6 +264,7 @@ private OperatorSubtaskState runSubtask( new BatchLogSourceFunction( repository.asFactory(), Collections.singletonList(attributeDescriptor), + FlinkDataOperator.BatchLogOptions.newBuilder().build(), ResultExtractor.identity()) { @Override diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java index 4e9c9df22..3be19cb98 100644 --- a/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java +++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java @@ -100,6 +100,7 @@ void testRunAndClose() throws Exception { new CommitLogSourceFunction<>( repository.asFactory(), Collections.singletonList(attribute), + FlinkDataOperator.CommitLogOptions.newBuilder().build(), ResultExtractor.identity()); final AbstractStreamOperatorTestHarness testHarness = createTestHarness(sourceFunction, 1, 0); @@ -141,6 +142,7 @@ void testObserverErrorPropagatesToTheMainThread() throws Exception { new CommitLogSourceFunction<>( repository.asFactory(), Collections.singletonList(attributeDescriptor), + FlinkDataOperator.CommitLogOptions.newBuilder().build(), element -> { throw new IllegalStateException("Test failure."); }); @@ -284,6 +286,7 @@ private OperatorSubtaskState runSubtask( new CommitLogSourceFunction<>( repository.asFactory(), Collections.singletonList(attributeDescriptor), + FlinkDataOperator.CommitLogOptions.newBuilder().build(), ResultExtractor.identity()); final AbstractStreamOperatorTestHarness testHarness = createTestHarness(sourceFunction, numSubtasks, subtaskIndex); diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java new file mode 100644 index 000000000..e07160464 --- /dev/null +++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java @@ -0,0 +1,216 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import static org.apache.flink.table.api.Expressions.$; + +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.direct.core.CommitCallback; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.scheme.proto.test.Scheme; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.time.Watermarks; +import cz.o2.proxima.util.Optionals; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import lombok.Data; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.ChangelogMode; +import org.junit.jupiter.api.Test; + +class FlinkDataOperatorTest { + + private static final String MODEL = + "{\n" + + " entities: {\n" + + " test {\n" + + " attributes {\n" + + " data: { scheme: \"string\" }\n" + + " }\n" + + " }\n" + + " }\n" + + "\n" + + " attributeFamilies: {\n" + + " test_storage_stream {\n" + + " entity: test\n" + + " attributes: [ data ]\n" + + " storage: \"inmem:///test_inmem\"\n" + + " type: primary\n" + + " access: commit-log\n" + + " }\n" + + " }\n" + + "\n" + + "}\n"; + + @Data + public static class Result { + String attribute; + long latest; + } + + @Test + void testCreateOperator() { + final Repository repository = Repository.ofTest(ConfigFactory.parseString(MODEL)); + repository.getOrCreateOperator(FlinkDataOperator.class); + } + + @Test + void testComplexSchema() throws Exception { + final Repository repository = Repository.ofTest(ConfigFactory.load("test-proto.conf")); + final FlinkDataOperator operator = repository.getOrCreateOperator(FlinkDataOperator.class); + final StreamTableEnvironment tableEnvironment = operator.getTableEnvironment(); + operator.getExecutionEnvironment().setParallelism(3); + + final List> attributes = + Arrays.asList( + repository.getEntity("gateway").getAttribute("status"), + repository.getEntity("gateway").getAttribute("users")); + + operator.registerCommitLogTable( + "gateway", + attributes, + ChangelogMode.insertOnly(), + FlinkDataOperator.newCommitLogOptions().withShutdownFinishedSources(true).build()); + + final CheckedThread thread = + new CheckedThread() { + + @Override + public void go() { + final Random random = new Random(); + final TestWriter writer = new TestWriter(repository); + writer.writeStatus( + String.format("key_%s", random.nextInt(100)), + Instant.now(), + Scheme.Status.newBuilder() + .setConnected(true) + .setLastContact(1000L + random.nextInt(10000)) + .build()); + writer.writeStatus( + String.format("key_%s", random.nextInt(100)), + Instant.ofEpochMilli(Watermarks.MAX_WATERMARK), + Scheme.Status.newBuilder() + .setConnected(true) + .setLastContact(1000L + random.nextInt(10000)) + .build()); + writer.writeUsers( + String.format("key_%s", random.nextInt(100)), + Instant.MAX, + Scheme.Users.newBuilder() + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .build()); + } + }; + + thread.start(); + + final Table gateway = operator.getTable("gateway"); + + System.out.printf("%n======= Print Schema =========%n%n"); + gateway.printSchema(); + + tableEnvironment + .from("gateway") + .addColumns($("status").flatten()) + .select($("status$lastContact")); + final Table result = + tableEnvironment.sqlQuery( + "SELECT attribute, MAX(status.lastContact) latest\n" + + "FROM gateway\n" + + "GROUP BY attribute"); + + System.out.printf("%n======= EXPLAIN =========%n%n"); + System.out.println(result.explain()); + + System.out.printf("%n======= Print Result Schema =========%n%n"); + result.printSchema(); + + tableEnvironment + .toRetractStream(result, Result.class) + .addSink( + new SinkFunction>() { + @Override + public void invoke(Tuple2 value, Context context) throws Exception { + System.out.println("======== " + value); + } + }); + + result.printSchema(); + + System.out.printf("%n======= Execute Query =========%n%n"); + + operator.execute("test"); + } + + public static class TestWriter { + + private final Repository repository; + private final DirectDataOperator direct; + + public TestWriter(Repository repository) { + this.repository = repository; + this.direct = repository.getOrCreateOperator(DirectDataOperator.class); + } + + public void writeStatus(String key, Instant timestamp, Scheme.Status status) { + final EntityDescriptor entity = repository.getEntity("gateway"); + final AttributeDescriptor attribute = entity.getAttribute("status"); + try (final OnlineAttributeWriter writer = Optionals.get(direct.getWriter(attribute))) { + writer.write( + StreamElement.upsert( + entity, + attribute, + UUID.randomUUID().toString(), + key, + attribute.getName(), + timestamp.toEpochMilli(), + attribute.getValueSerializer().serialize(status)), + CommitCallback.noop()); + } + } + + public void writeUsers(String key, Instant timestamp, Scheme.Users users) { + final EntityDescriptor entity = repository.getEntity("gateway"); + final AttributeDescriptor attribute = entity.getAttribute("users"); + try (final OnlineAttributeWriter writer = Optionals.get(direct.getWriter(attribute))) { + writer.write( + StreamElement.upsert( + entity, + attribute, + UUID.randomUUID().toString(), + key, + attribute.getName(), + timestamp.toEpochMilli(), + attribute.getValueSerializer().serialize(users)), + CommitCallback.noop()); + } + } + } +} diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/table/RowDataResultExtractorTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/table/RowDataResultExtractorTest.java new file mode 100644 index 000000000..ca722f339 --- /dev/null +++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/table/RowDataResultExtractorTest.java @@ -0,0 +1,59 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.scheme.proto.test.Scheme; +import cz.o2.proxima.storage.StreamElement; +import java.time.Instant; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +class RowDataResultExtractorTest { + + @Test + void testToFlinkObjectCompatibilityWithDataType() { + final Repository repository = + Repository.ofTest(ConfigFactory.load("test-proto.conf").resolve()); + final EntityDescriptor entity = repository.getEntity("event"); + final AttributeDescriptor attribute = entity.getAttribute("complex"); + final Scheme.ValueSchemeMessage message = + Scheme.ValueSchemeMessage.newBuilder().setStringType("test string").setIntType(123).build(); + final StreamElement streamElement = + StreamElement.upsert( + entity, + attribute, + 1L, + "key", + attribute.getName(), + Instant.now().toEpochMilli(), + attribute.getValueSerializer().serialize(message)); + final RowData rowData = (RowData) RowDataResultExtractor.toFlinkObject(streamElement); + final DataType dataType = LogCatalogTable.toDataType(attribute.getSchemaTypeDescriptor()); + final RowType logicalType = (RowType) dataType.getLogicalType(); + assertEquals( + message.getStringType(), + rowData.getString(logicalType.getFieldIndex("string_type")).toString()); + assertEquals(message.getIntType(), rowData.getInt(logicalType.getFieldIndex("int_type"))); + } +}