diff --git a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java index 0cdb14980d..1ecc85a124 100644 --- a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java +++ b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessor.java @@ -21,8 +21,8 @@ /** * Interface for value accessors allowed create and get value of attribute * - * @param input type - * @param output type + * @param Type of "raw" input element, that we want to convert into normalized form. + * @param Normalized "output" type, eg. `bytes -> string` for schema type `string`. */ @Experimental public interface AttributeValueAccessor extends Serializable { diff --git a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java index 511a9f8315..7509536867 100644 --- a/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java +++ b/core/src/main/java/cz/o2/proxima/scheme/AttributeValueAccessors.java @@ -157,6 +157,10 @@ static StructureValue of(Map value) { public interface StructureValueAccessor extends AttributeValueAccessor { + AttributeValueAccessor getFieldAccessor(String name); + + Object getRawValue(String name, T object); + @Override default Type getType() { return Type.STRUCTURE; diff --git a/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java b/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java index 45a39701bc..3ca3a74740 100644 --- a/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java +++ b/core/src/test/java/cz/o2/proxima/scheme/AttributeValueAccessorsTest.java @@ -98,6 +98,16 @@ public void testArrayWithStructureValue() { private static class TestStructureAccessor implements StructureValueAccessor> { + @Override + public AttributeValueAccessor getFieldAccessor(String name) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Object getRawValue(String name, Map object) { + throw new UnsupportedOperationException("Not implemented."); + } + @Override public StructureValue valueOf(Map object) { return StructureValue.of(object); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectDataOperator.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectDataOperator.java index af88ad508b..8b2ac78c1c 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectDataOperator.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectDataOperator.java @@ -420,7 +420,7 @@ public final Optional getRandomAccess(AttributeDescriptor return getRandomAccess(Arrays.asList(attrs)); } - private Optional getFamilyForAttributes( + public Optional getFamilyForAttributes( Collection> attrs, UnaryFunction mask) { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java index 0b1c8e9508..196eb9b164 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java @@ -380,7 +380,7 @@ public boolean onError(Throwable error) { } @Test - public void testOrderedObserverLifycycle() { + public void testOrderedObserverLifecycle() { StreamElement update = StreamElement.upsert( entity, diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java index 2b9e48c2f1..ea28a3319a 100644 --- a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java @@ -291,16 +291,16 @@ void finishAndMarkAsIdle(SourceContext sourceContext) { sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK)); } sourceContext.markAsTemporarilyIdle(); - while (cancelled.getCount() > 0) { - try { - cancelled.await(); - } catch (InterruptedException e) { - if (cancelled.getCount() == 0) { - // Re-interrupt if cancelled. - Thread.currentThread().interrupt(); - } - } - } + // while (cancelled.getCount() > 0) { + // try { + // cancelled.await(); + // } catch (InterruptedException e) { + // if (cancelled.getCount() == 0) { + // // Re-interrupt if cancelled. + // Thread.currentThread().interrupt(); + // } + // } + // } } @Override diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java new file mode 100644 index 0000000000..3307caefa2 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java @@ -0,0 +1,129 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import cz.o2.proxima.flink.core.table.LogCatalogTable; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.DataOperator; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.storage.commitlog.Position; +import cz.o2.proxima.util.Optionals; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.Getter; +import lombok.Value; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectPath; + +public class FlinkDataOperator implements DataOperator { + + private final Repository repository; + @Getter private final StreamExecutionEnvironment executionEnvironment; + @Getter private final StreamTableEnvironment tableEnvironment; + + public FlinkDataOperator(Repository repository) { + this.repository = repository; + this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + this.tableEnvironment = StreamTableEnvironment.create(executionEnvironment); + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public void reload() { + // Nothing to reload. + } + + @Override + public Repository getRepository() { + return repository; + } + + @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true) + @Value + public static class CommitLogOptions { + + @Builder.Default Position initialPosition = Position.OLDEST; + + @Builder.Default boolean stopAtCurrent = false; + } + + public DataStream createBatchLogStream( + List> attributeDescriptors) { + return createBatchLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build()); + } + + public DataStream createBatchLogStream( + List> attributeDescriptors, CommitLogOptions options) { + return executionEnvironment.addSource( + new BatchLogSourceFunction<>( + repository.asFactory(), attributeDescriptors, ResultExtractor.identity()), + sourceName("BatchLog", attributeDescriptors)); + } + + private static String sourceName(String type, List> attributeDescriptors) { + return String.format( + "%s: %s", + type, + attributeDescriptors + .stream() + .sorted() + .map(AttributeDescriptor::getName) + .collect(Collectors.joining(","))); + } + + public DataStream createCommitLogStream( + List> attributeDescriptors) { + return createCommitLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build()); + } + + public DataStream createCommitLogStream( + List> attributeDescriptors, CommitLogOptions options) { + return executionEnvironment.addSource( + new CommitLogSourceFunction<>( + repository.asFactory(), attributeDescriptors, ResultExtractor.identity()), + sourceName("CommitLog", attributeDescriptors)); + } + + public void registerCommitLogTable( + String table, List> attributeDescriptors, CommitLogOptions options) { + final Catalog catalog = + Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog())); + try { + catalog.createTable( + ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), table)), + new LogCatalogTable(repository, attributeDescriptors), + true); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Unable to register a new table [%s].", table), e); + } + } + + /** @see StreamExecutionEnvironment#execute(String) for more details. */ + public JobExecutionResult execute(String jobName) throws Exception { + return executionEnvironment.execute(jobName); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java new file mode 100644 index 0000000000..1fc4642f4d --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java @@ -0,0 +1,38 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import cz.o2.proxima.repository.DataOperator; +import cz.o2.proxima.repository.DataOperatorFactory; +import cz.o2.proxima.repository.Repository; + +public class FlinkDataOperatorFactory implements DataOperatorFactory { + + @Override + public String getOperatorName() { + return "flink"; + } + + @Override + public boolean isOfType(Class cls) { + return cls.isAssignableFrom(FlinkDataOperator.class); + } + + @Override + public FlinkDataOperator create(Repository repository) { + return new FlinkDataOperator(repository); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java new file mode 100644 index 0000000000..287202ccf9 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java @@ -0,0 +1,63 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.CommitLogSourceFunction; +import cz.o2.proxima.repository.Repository; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.types.RowKind; + +public class CommitLogDynamicSource implements ScanTableSource { + + private final Repository repository; + private final LogCatalogTable catalogTable; + + public CommitLogDynamicSource(Repository repository, LogCatalogTable catalogTable) { + this.repository = repository; + this.catalogTable = catalogTable; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return SourceFunctionProvider.of( + new CommitLogSourceFunction<>( + repository.asFactory(), + catalogTable.getAttributeDescriptors(), + new RowDataResultExtractor(catalogTable.getAttributeDescriptors())), + false); + } + + @Override + public DynamicTableSource copy() { + throw new UnsupportedOperationException(); + } + + @Override + public String asSummaryString() { + return "CommitLog table source"; + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogSourceFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogSourceFactory.java new file mode 100644 index 0000000000..34c48df418 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogSourceFactory.java @@ -0,0 +1,50 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +public class CommitLogSourceFactory implements DynamicTableSourceFactory { + + static final String FACTORY_IDENTIFIER = "proxima"; + + public CommitLogSourceFactory() {} + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final LogCatalogTable catalogTable = (LogCatalogTable) context.getCatalogTable().getOrigin(); + return new CommitLogDynamicSource(catalogTable.getRepository(), catalogTable); + } + + @Override + public String factoryIdentifier() { + return FACTORY_IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java new file mode 100644 index 0000000000..6c2c2c105a --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java @@ -0,0 +1,141 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.scheme.SchemaDescriptors; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Getter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.VarCharType; + +public class LogCatalogTable implements org.apache.flink.table.catalog.CatalogTable { + + @Getter private final Repository repository; + @Getter private final List> attributeDescriptors; + + public LogCatalogTable(Repository repository, List> attributeDescriptors) { + this.repository = repository; + this.attributeDescriptors = attributeDescriptors; + } + + @Override + public boolean isPartitioned() { + return false; + } + + @Override + public List getPartitionKeys() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public org.apache.flink.table.catalog.CatalogTable copy(Map map) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Map getOptions() { + final Map options = new HashMap<>(); + options.put("connector", CommitLogSourceFactory.FACTORY_IDENTIFIER); + return options; + } + + @Override + public TableSchema getSchema() { + final TableSchema.Builder schemaBuilder = + TableSchema.builder() + .add(TableColumn.physical("key", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))) + .add(TableColumn.physical("uuid", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))) + .add(TableColumn.physical("attribute", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))) + .add( + TableColumn.physical("attribute_prefix", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))) + .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP())); + for (AttributeDescriptor attributeDescriptor : attributeDescriptors) { + schemaBuilder.add( + TableColumn.physical( + attributeDescriptor.toAttributePrefix(false), + convert(attributeDescriptor.getSchemaTypeDescriptor()))); + } + return schemaBuilder.build(); + } + + private DataType convert(SchemaDescriptors.SchemaTypeDescriptor schema) { + switch (schema.getType()) { + case INT: + return DataTypes.INT(); + case BYTE: + return DataTypes.BYTES(); + case ENUM: + throw new UnsupportedOperationException("Not implemented."); + case LONG: + return DataTypes.BIGINT(); + case ARRAY: + return DataTypes.ARRAY(convert(schema.asArrayTypeDescriptor().getValueDescriptor())); + case FLOAT: + return DataTypes.FLOAT(); + case DOUBLE: + return DataTypes.DOUBLE(); + case STRING: + return DataTypes.VARCHAR(VarCharType.MAX_LENGTH); + case BOOLEAN: + return DataTypes.BOOLEAN(); + case STRUCTURE: + final DataTypes.Field[] fields = + schema + .asStructureTypeDescriptor() + .getFields() + .entrySet() + .stream() + .map(entry -> DataTypes.FIELD(entry.getKey(), convert(entry.getValue()))) + .toArray(DataTypes.Field[]::new); + return DataTypes.ROW(fields); + default: + throw new IllegalArgumentException( + String.format("Unknown data type %s.", schema.getType())); + } + } + + @Override + public String getComment() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public CatalogBaseTable copy() { + return new LogCatalogTable(repository, attributeDescriptors); + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getDetailedDescription() { + return Optional.empty(); + } +} diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java new file mode 100644 index 0000000000..27d8691a87 --- /dev/null +++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java @@ -0,0 +1,140 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core.table; + +import cz.o2.proxima.flink.core.ResultExtractor; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.scheme.AttributeValueAccessor; +import cz.o2.proxima.scheme.AttributeValueAccessors; +import cz.o2.proxima.scheme.SchemaDescriptors; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.util.Optionals; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +public class RowDataResultExtractor implements ResultExtractor { + + private final List> attributeDescriptors; + + public RowDataResultExtractor(List> attributeDescriptors) { + this.attributeDescriptors = attributeDescriptors; + } + + @Override + public RowData toResult(StreamElement element) { + final AttributeDescriptor[] attributeDescriptors = + this.attributeDescriptors.toArray(new AttributeDescriptor[0]); + final GenericRowData data = new GenericRowData(5 + attributeDescriptors.length); + // key + data.setField(0, StringData.fromString(element.getKey())); + // uuid + data.setField(1, StringData.fromString(element.getUuid())); + // attribute + data.setField(2, StringData.fromString(element.getAttribute())); + // attribute prefix + data.setField( + 3, StringData.fromString(element.getAttributeDescriptor().toAttributePrefix(false))); + // timestamp + data.setField(4, TimestampData.fromEpochMillis(element.getStamp())); + // value + for (int i = 0; i < attributeDescriptors.length; i++) { + if (element.getAttributeDescriptor().equals(attributeDescriptors[i])) { + data.setField(5 + i, x(element, element.getAttributeDescriptor())); + } + } + return data; + } + + private static Object x( + StreamElement element, AttributeDescriptor attributeDescriptor) { + final ValueT value = + Optionals.get(attributeDescriptor.getValueSerializer().deserialize(element.getValue())); + final SchemaDescriptors.SchemaTypeDescriptor schemaDescriptor = + attributeDescriptor.getSchemaTypeDescriptor(); + final AttributeValueAccessor valueAccessor = + attributeDescriptor.getValueSerializer().getValueAccessor(); + return toFlinkData(schemaDescriptor, valueAccessor, value); + } + + public static Object toFlinkData( + SchemaDescriptors.SchemaTypeDescriptor descriptor, + AttributeValueAccessor accessor, + ValueT value) { + switch (descriptor.getType()) { + case STRUCTURE: + final AttributeValueAccessors.StructureValueAccessor structureAccessor = + (AttributeValueAccessors.StructureValueAccessor) accessor; + return toFlinkStructure(descriptor.asStructureTypeDescriptor(), structureAccessor, value); + case STRING: + return StringData.fromString((String) value); + case BOOLEAN: + case LONG: + case DOUBLE: + case FLOAT: + case INT: + case BYTE: + return value; + case ARRAY: + final AttributeValueAccessors.ArrayValueAccessor arrayAccessor = + (AttributeValueAccessors.ArrayValueAccessor) accessor; + @SuppressWarnings("unchecked") + final List values = (List) value; + return toFlinkArray(descriptor.asArrayTypeDescriptor(), arrayAccessor, values); + default: + throw new UnsupportedOperationException( + String.format("Type [%s] is not supported.", descriptor.getType())); + } + } + + public static RowData toFlinkStructure( + SchemaDescriptors.StructureTypeDescriptor descriptor, + AttributeValueAccessors.StructureValueAccessor accessor, + ValueT value) { + final GenericRowData rowData = new GenericRowData(descriptor.getFields().size()); + final AtomicInteger idx = new AtomicInteger(0); + descriptor + .getFields() + .forEach( + (fieldName, fieldType) -> { + @SuppressWarnings("unchecked") + final SchemaDescriptors.SchemaTypeDescriptor cast = + (SchemaDescriptors.SchemaTypeDescriptor) fieldType; + final Object fieldValue = accessor.getRawValue(fieldName, value); + @SuppressWarnings("unchecked") + final AttributeValueAccessor fieldAccessor = + (AttributeValueAccessor) accessor.getFieldAccessor(fieldName); + rowData.setField(idx.getAndIncrement(), toFlinkData(cast, fieldAccessor, fieldValue)); + }); + return rowData; + } + + public static ArrayData toFlinkArray( + SchemaDescriptors.ArrayTypeDescriptor descriptor, + AttributeValueAccessors.ArrayValueAccessor accessor, + List values) { + final SchemaDescriptors.SchemaTypeDescriptor valueDescriptor = + descriptor.getValueDescriptor(); + final AttributeValueAccessor valueAccessor = accessor.getValueAccessor(); + return new GenericArrayData( + values.stream().map(item -> toFlinkData(valueDescriptor, valueAccessor, item)).toArray()); + } +} diff --git a/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory b/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory new file mode 100644 index 0000000000..70e70484f6 --- /dev/null +++ b/flink/core/src/main/resources/META-INF/services/cz.o2.proxima.repository.DataOperatorFactory @@ -0,0 +1 @@ +cz.o2.proxima.flink.core.FlinkDataOperatorFactory diff --git a/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..369306815f --- /dev/null +++ b/flink/core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1 @@ +cz.o2.proxima.flink.core.table.CommitLogSourceFactory \ No newline at end of file diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java new file mode 100644 index 0000000000..8b9c8839d6 --- /dev/null +++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/FlinkDataOperatorTest.java @@ -0,0 +1,208 @@ +/** + * Copyright 2017-2021 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.flink.core; + +import static org.apache.flink.table.api.Expressions.$; + +import com.typesafe.config.ConfigFactory; +import cz.o2.proxima.direct.core.CommitCallback; +import cz.o2.proxima.direct.core.DirectDataOperator; +import cz.o2.proxima.direct.core.OnlineAttributeWriter; +import cz.o2.proxima.repository.AttributeDescriptor; +import cz.o2.proxima.repository.EntityDescriptor; +import cz.o2.proxima.repository.Repository; +import cz.o2.proxima.scheme.proto.test.Scheme; +import cz.o2.proxima.storage.StreamElement; +import cz.o2.proxima.storage.commitlog.Position; +import cz.o2.proxima.time.Watermarks; +import cz.o2.proxima.util.Optionals; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class FlinkDataOperatorTest { + + private static final String MODEL = + "{\n" + + " entities: {\n" + + " test {\n" + + " attributes {\n" + + " data: { scheme: \"string\" }\n" + + " }\n" + + " }\n" + + " }\n" + + "\n" + + " attributeFamilies: {\n" + + " test_storage_stream {\n" + + " entity: test\n" + + " attributes: [ data ]\n" + + " storage: \"inmem:///test_inmem\"\n" + + " type: primary\n" + + " access: commit-log\n" + + " }\n" + + " }\n" + + "\n" + + "}\n"; + + @Test + void testCreateOperator() { + final Repository repository = Repository.ofTest(ConfigFactory.parseString(MODEL)); + repository.getOrCreateOperator(FlinkDataOperator.class); + } + + @Test + @Disabled(value = "Needs termination.") + void testComplexSchema() throws Exception { + final Repository repository = Repository.ofTest(ConfigFactory.load("test-proto.conf")); + final FlinkDataOperator operator = repository.getOrCreateOperator(FlinkDataOperator.class); + final StreamTableEnvironment tableEnvironment = operator.getTableEnvironment(); + operator.getExecutionEnvironment().setParallelism(3); + + final List> attributes = + Arrays.asList( + repository.getEntity("gateway").getAttribute("status"), + repository.getEntity("gateway").getAttribute("users")); + operator.registerCommitLogTable( + "gateway", + attributes, + FlinkDataOperator.CommitLogOptions.newBuilder() + .withInitialPosition(Position.OLDEST) + .withStopAtCurrent(false) + .build()); + + final CheckedThread thread = + new CheckedThread() { + + @Override + public void go() throws Exception { + final Random random = new Random(); + final TestWriter writer = new TestWriter(repository); + writer.writeStatus( + String.format("key_%s", random.nextInt(100)), + Instant.now(), + Scheme.Status.newBuilder() + .setConnected(true) + .setLastContact(1000L + random.nextInt(10000)) + .build()); + writer.writeStatus( + String.format("key_%s", random.nextInt(100)), + Instant.ofEpochMilli(Watermarks.MAX_WATERMARK), + Scheme.Status.newBuilder() + .setConnected(true) + .setLastContact(1000L + random.nextInt(10000)) + .build()); + writer.writeUsers( + String.format("key_%s", random.nextInt(100)), + Instant.MAX, + Scheme.Users.newBuilder() + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .addUser(String.format("test_user_%s", random.nextInt(15))) + .build()); + } + }; + + thread.start(); + + System.out.printf("%n======= List Tables =========%n%n"); + System.out.println(Arrays.toString(tableEnvironment.listTables())); + + System.out.printf("%n======= Print Schema =========%n%n"); + tableEnvironment.from("gateway").printSchema(); + + System.out.printf("%n======= Print Result Schema =========%n%n"); + + final Table table = + tableEnvironment + .from("gateway") + .addColumns($("status").flatten()) + .select($("key"), $("attribute"), $("status$connected"), $("status$lastContact")) + .groupBy($("attribute")) + .select($("status$lastContact").max().as("max")); + + tableEnvironment + .toChangelogStream(table) + .addSink( + new SinkFunction() { + + @Override + public void invoke(Row value, Context context) throws Exception { + System.out.println(value); + } + }); + + table.printSchema(); + + System.out.printf("%n======= Execute Query =========%n%n"); + + operator.execute("test"); + } + + public static class TestWriter { + + private final Repository repository; + private final DirectDataOperator direct; + + public TestWriter(Repository repository) { + this.repository = repository; + this.direct = repository.getOrCreateOperator(DirectDataOperator.class); + } + + public void writeStatus(String key, Instant timestamp, Scheme.Status status) { + final EntityDescriptor entity = repository.getEntity("gateway"); + final AttributeDescriptor attribute = entity.getAttribute("status"); + try (final OnlineAttributeWriter writer = Optionals.get(direct.getWriter(attribute))) { + writer.write( + StreamElement.upsert( + entity, + attribute, + UUID.randomUUID().toString(), + key, + attribute.getName(), + timestamp.toEpochMilli(), + attribute.getValueSerializer().serialize(status)), + CommitCallback.noop()); + } + } + + public void writeUsers(String key, Instant timestamp, Scheme.Users users) { + final EntityDescriptor entity = repository.getEntity("gateway"); + final AttributeDescriptor attribute = entity.getAttribute("users"); + try (final OnlineAttributeWriter writer = Optionals.get(direct.getWriter(attribute))) { + writer.write( + StreamElement.upsert( + entity, + attribute, + UUID.randomUUID().toString(), + key, + attribute.getName(), + timestamp.toEpochMilli(), + attribute.getValueSerializer().serialize(users)), + CommitCallback.noop()); + } + } + } +} diff --git a/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java b/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java index 0fa961aa40..0c01d30fbe 100644 --- a/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java +++ b/scheme/proto/src/main/java/cz/o2/proxima/scheme/proto/ProtoMessageValueAccessor.java @@ -30,6 +30,7 @@ import cz.o2.proxima.scheme.AttributeValueAccessors.PrimitiveValueAccessor; import cz.o2.proxima.scheme.AttributeValueAccessors.StructureValue; import cz.o2.proxima.scheme.AttributeValueAccessors.StructureValueAccessor; +import cz.o2.proxima.util.Optionals; import java.util.Collections; import java.util.List; import java.util.Map; @@ -69,6 +70,23 @@ public ProtoMessageValueAccessor(Factory defaultValueFactory) { })); } + @Override + public AttributeValueAccessor getFieldAccessor(String name) { + return fieldAccessors.get(name); + } + + @Override + public Object getRawValue(String name, T object) { + return Optionals.get( + object + .getAllFields() + .entrySet() + .stream() + .filter(entry -> name.equals(entry.getKey().getName())) + .map(Map.Entry::getValue) + .findFirst()); + } + @Override public StructureValue valueOf(T object) { return StructureValue.of( diff --git a/scheme/proto/src/test/resources/test-proto.conf b/scheme/proto/src/test/resources/test-proto.conf index 27fd902f29..b8c221b7bb 100644 --- a/scheme/proto/src/test/resources/test-proto.conf +++ b/scheme/proto/src/test/resources/test-proto.conf @@ -66,7 +66,7 @@ attributes: [ "*" ] storage: "inmem:///data/proxima/gateway" type: replica - access: [ commit-log, random-access ] + access: [ random-access ] } dummy-storage: { entity: dummy