diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java
index 0b1c8e950..196eb9b16 100644
--- a/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java
+++ b/direct/core/src/test/java/cz/o2/proxima/direct/commitlog/CommitLogReaderTest.java
@@ -380,7 +380,7 @@ public boolean onError(Throwable error) {
}
@Test
- public void testOrderedObserverLifycycle() {
+ public void testOrderedObserverLifecycle() {
StreamElement update =
StreamElement.upsert(
entity,
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
index fd40d2e5b..c3366b66b 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
@@ -59,6 +59,7 @@
*
Unified metrics for monitoring and alerting.
*
*
+ * @param Source options.
* @param Reader to use, for reading data.
* @param Observer implementation.
* @param Offset used by the current reader.
@@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
+ OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver,
OffsetT extends Serializable,
@@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
@Getter private final RepositoryFactory repositoryFactory;
private final List> attributeDescriptors;
+ @Getter private final OptionsT options;
private final ResultExtractor resultExtractor;
@Nullable private transient List restoredOffsets;
@@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ OptionsT options,
ResultExtractor resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
+ this.options = options;
this.resultExtractor = resultExtractor;
}
@@ -290,14 +295,16 @@ void finishAndMarkAsIdle(SourceContext> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
- sourceContext.markAsTemporarilyIdle();
- while (cancelled.getCount() > 0) {
- try {
- cancelled.await();
- } catch (InterruptedException e) {
- if (cancelled.getCount() == 0) {
- // Re-interrupt if cancelled.
- Thread.currentThread().interrupt();
+ if (!options.shutdownFinishedSources()) {
+ sourceContext.markAsTemporarilyIdle();
+ while (cancelled.getCount() > 0) {
+ try {
+ cancelled.await();
+ } catch (InterruptedException e) {
+ if (cancelled.getCount() == 0) {
+ // Re-interrupt if cancelled.
+ Thread.currentThread().interrupt();
+ }
}
}
}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
index b39fe221b..0179d8374 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
@@ -33,6 +33,7 @@
@Slf4j
public class BatchLogSourceFunction
extends AbstractLogSourceFunction<
+ FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver,
Offset,
@@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ FlinkDataOperator.BatchLogOptions options,
ResultExtractor resultExtractor) {
- super(repositoryFactory, attributeDescriptors, resultExtractor);
+ super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}
@Override
@@ -82,7 +84,7 @@ BatchLogReader createLogReader(List> attributeDescriptors
@Override
List getPartitions(BatchLogReader reader) {
- return reader.getPartitions();
+ return reader.getPartitions(getOptions().startTimestamp(), getOptions().endTimestamp());
}
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
index 13c1057bc..698bfe931 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
@@ -23,7 +23,6 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
-import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,6 +31,7 @@
@Slf4j
public class CommitLogSourceFunction
extends AbstractLogSourceFunction<
+ FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver,
Offset,
@@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
public CommitLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ FlinkDataOperator.CommitLogOptions options,
ResultExtractor resultExtractor) {
- super(repositoryFactory, attributeDescriptors, resultExtractor);
+ super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}
@Override
@@ -109,8 +110,10 @@ UnifiedObserveHandle observePartitions(
List partitions,
List> attributeDescriptors,
LogObserver observer) {
+ final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
- reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
+ reader.observeBulkPartitions(
+ partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle() {
@Override
@@ -132,7 +135,7 @@ UnifiedObserveHandle observeRestoredOffsets(
List> attributeDescriptors,
LogObserver observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
- reader.observeBulkOffsets(offsets, false, observer);
+ reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle() {
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java
new file mode 100644
index 000000000..1d6b886b5
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java
@@ -0,0 +1,308 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core;
+
+import cz.o2.proxima.flink.core.table.LogCatalogTable;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.repository.DataOperator;
+import cz.o2.proxima.repository.Repository;
+import cz.o2.proxima.storage.StreamElement;
+import cz.o2.proxima.storage.commitlog.Position;
+import cz.o2.proxima.time.Watermarks;
+import cz.o2.proxima.util.Optionals;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Value;
+import lombok.experimental.Accessors;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.connector.ChangelogMode;
+
+/** Operate {@link Repository} using Apache Flink. */
+public class FlinkDataOperator implements DataOperator {
+
+ /**
+ * Create a standardized name for a new source.
+ *
+ * @param type Type of the source (eg. CommitLog, BatchLog).
+ * @param attributeDescriptors Attributes the source is reading.
+ * @return Source name.
+ */
+ private static String createSourceName(
+ String type, List> attributeDescriptors) {
+ return String.format(
+ "%s: %s",
+ type,
+ attributeDescriptors
+ .stream()
+ .sorted()
+ .map(AttributeDescriptor::getName)
+ .collect(Collectors.joining(",")));
+ }
+
+ public static CommitLogOptions.CommitLogOptionsBuilder newCommitLogOptions() {
+ return CommitLogOptions.newBuilder();
+ }
+
+ public static BatchLogOptions.BatchLogOptionsBuilder newBatchLogOptions() {
+ return BatchLogOptions.newBuilder();
+ }
+
+ /** Common options for all log based sources. */
+ public interface LogOptions extends Serializable {
+
+ /**
+ * Shutdown sources, when they have no more work to do.
+ *
+ * @return True to shutdown when finished.
+ */
+ boolean shutdownFinishedSources();
+ }
+
+ /** {@link LogOptions} specific to {@link CommitLogSourceFunction}. */
+ @Accessors(fluent = true)
+ @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
+ @Value
+ public static class CommitLogOptions implements LogOptions {
+
+ private static final long serialVersionUID = 1L;
+
+ /** @see LogOptions#shutdownFinishedSources() */
+ @Builder.Default boolean shutdownFinishedSources = false;
+
+ /**
+ * An initial position to start reading from, when sources is starting from a clean state (no
+ * checkpoint to restore from).
+ */
+ @Builder.Default Position initialPosition = Position.OLDEST;
+
+ /** Signal to stop reading, when event time aligns with processing time. */
+ @Builder.Default boolean stopAtCurrent = false;
+ }
+
+ /** {@link LogOptions} specific to {@link BatchLogSourceFunction}. */
+ @Accessors(fluent = true)
+ @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
+ @Value
+ public static class BatchLogOptions implements LogOptions {
+
+ private static final long serialVersionUID = 1L;
+
+ /** @see LogOptions#shutdownFinishedSources() */
+ @Builder.Default boolean shutdownFinishedSources = false;
+
+ @Builder.Default long startTimestamp = Watermarks.MIN_WATERMARK;
+
+ @Builder.Default long endTimestamp = Watermarks.MAX_WATERMARK;
+ }
+
+ private final Repository repository;
+ @Getter private final StreamExecutionEnvironment executionEnvironment;
+ @Getter private final StreamTableEnvironment tableEnvironment;
+
+ public FlinkDataOperator(Repository repository) {
+ this.repository = repository;
+ this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ this.tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
+ }
+
+ @Override
+ public void close() {
+ // Nothing to close.
+ }
+
+ @Override
+ public void reload() {
+ // Nothing to reload.
+ }
+
+ @Override
+ public Repository getRepository() {
+ return repository;
+ }
+
+ /**
+ * Create a new stream from historical batch data.
+ *
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @return A new DataStream.
+ */
+ public DataStream createBatchLogStream(
+ Collection> attributeDescriptors) {
+ return createBatchLogStream(attributeDescriptors, BatchLogOptions.newBuilder().build());
+ }
+
+ /**
+ * Create a new stream from historical batch data.
+ *
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param options Options for {@link BatchLogSourceFunction}.
+ * @return A new DataStream.
+ */
+ public DataStream createBatchLogStream(
+ Collection> attributeDescriptors, BatchLogOptions options) {
+ final List> attributeDescriptorsCopy =
+ new ArrayList<>(attributeDescriptors);
+ return executionEnvironment.addSource(
+ new BatchLogSourceFunction<>(
+ repository.asFactory(), attributeDescriptorsCopy, options, ResultExtractor.identity()),
+ createSourceName("BatchLog", attributeDescriptorsCopy));
+ }
+
+ /**
+ * Create a new stream from realtime streaming data.
+ *
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @return A new DataStream.
+ */
+ public DataStream createCommitLogStream(
+ Collection> attributeDescriptors) {
+ return createCommitLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build());
+ }
+
+ /**
+ * Create a new stream from realtime streaming data.
+ *
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param options Options for {@link CommitLogSourceFunction}.
+ * @return A new DataStream.
+ */
+ public DataStream createCommitLogStream(
+ Collection> attributeDescriptors, CommitLogOptions options) {
+ final List> attributeDescriptorsCopy =
+ new ArrayList<>(attributeDescriptors);
+ return executionEnvironment.addSource(
+ new CommitLogSourceFunction<>(
+ repository.asFactory(), attributeDescriptorsCopy, options, ResultExtractor.identity()),
+ createSourceName("CommitLog", attributeDescriptorsCopy));
+ }
+
+ /**
+ * Create a new table of realtime streaming data in current {@link Catalog catalog's} default
+ * database.
+ *
+ * @param tableName Table name.
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param changelogMode Changelog mode of the underlying source.
+ */
+ public void registerCommitLogTable(
+ String tableName,
+ Collection> attributeDescriptors,
+ ChangelogMode changelogMode) {
+ registerCommitLogTable(
+ tableName, attributeDescriptors, changelogMode, newCommitLogOptions().build());
+ }
+
+ /**
+ * Create a new table of realtime streaming data in current {@link Catalog catalog's} default
+ * database.
+ *
+ * @param tableName Table name.
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param changelogMode Changelog mode of the underlying source.
+ * @param options Options for {@link CommitLogSourceFunction}.
+ */
+ public void registerCommitLogTable(
+ String tableName,
+ Collection> attributeDescriptors,
+ ChangelogMode changelogMode,
+ CommitLogOptions options) {
+ final List> attributeDescriptorsCopy =
+ new ArrayList<>(attributeDescriptors);
+ final Catalog catalog =
+ Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog()));
+ try {
+ catalog.createTable(
+ ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), tableName)),
+ LogCatalogTable.ofCommitLog(repository, attributeDescriptorsCopy, options),
+ false);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Unable to register a new table [%s].", tableName), e);
+ }
+ }
+
+ /**
+ * Create a new table of historical batch data in current {@link Catalog catalog's} default
+ * database.
+ *
+ * @param tableName Table name.
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param changelogMode Changelog mode of the underlying source.
+ */
+ public void registerBatchLogTable(
+ String tableName,
+ Collection> attributeDescriptors,
+ ChangelogMode changelogMode) {
+ registerBatchLogTable(
+ tableName, attributeDescriptors, changelogMode, newBatchLogOptions().build());
+ }
+
+ /**
+ * Create a new table of historical batch data in current {@link Catalog catalog's} default
+ * database.
+ *
+ * @param tableName Table name.
+ * @param attributeDescriptors Attributes we want to get data from.
+ * @param changelogMode Changelog mode of the underlying source.
+ * @param options Options for {@link BatchLogSourceFunction}.
+ */
+ public void registerBatchLogTable(
+ String tableName,
+ Collection> attributeDescriptors,
+ ChangelogMode changelogMode,
+ BatchLogOptions options) {
+ final List> attributeDescriptorsCopy =
+ new ArrayList<>(attributeDescriptors);
+ final Catalog catalog =
+ Optionals.get(tableEnvironment.getCatalog(tableEnvironment.getCurrentCatalog()));
+ try {
+ catalog.createTable(
+ ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), tableName)),
+ LogCatalogTable.ofBatchLog(repository, attributeDescriptorsCopy, options),
+ false);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Unable to register a new table [%s].", tableName), e);
+ }
+ }
+
+ /** @see StreamTableEnvironment#from(String) for more details. */
+ public Table getTable(String tableName) {
+ return tableEnvironment.from(tableName);
+ }
+
+ /** @see StreamExecutionEnvironment#execute(String) for more details. */
+ public JobExecutionResult execute(String jobName) throws Exception {
+ return executionEnvironment.execute(jobName);
+ }
+
+ /** @see StreamExecutionEnvironment#executeAsync(String) for more details. */
+ public JobClient executeAsync(String jobName) throws Exception {
+ return executionEnvironment.executeAsync(jobName);
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java
new file mode 100644
index 000000000..1fc4642f4
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperatorFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core;
+
+import cz.o2.proxima.repository.DataOperator;
+import cz.o2.proxima.repository.DataOperatorFactory;
+import cz.o2.proxima.repository.Repository;
+
+public class FlinkDataOperatorFactory implements DataOperatorFactory {
+
+ @Override
+ public String getOperatorName() {
+ return "flink";
+ }
+
+ @Override
+ public boolean isOfType(Class extends DataOperator> cls) {
+ return cls.isAssignableFrom(FlinkDataOperator.class);
+ }
+
+ @Override
+ public FlinkDataOperator create(Repository repository) {
+ return new FlinkDataOperator(repository);
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java
new file mode 100644
index 000000000..4089c0b39
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/BatchLogDynamicTableSource.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core.table;
+
+import cz.o2.proxima.flink.core.BatchLogSourceFunction;
+import cz.o2.proxima.repository.Repository;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.types.RowKind;
+
+public class BatchLogDynamicTableSource implements ScanTableSource {
+
+ private final Repository repository;
+ private final LogCatalogTable.BatchLogCatalogTable catalogTable;
+
+ public BatchLogDynamicTableSource(
+ Repository repository, LogCatalogTable.BatchLogCatalogTable catalogTable) {
+ this.repository = repository;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ return SourceFunctionProvider.of(
+ new BatchLogSourceFunction<>(
+ repository.asFactory(),
+ catalogTable.getAttributeDescriptors(),
+ catalogTable.getLogOptions(),
+ new RowDataResultExtractor(catalogTable.getAttributeDescriptors())),
+ true);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "BatchLog table source";
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java
new file mode 100644
index 000000000..571c26830
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicTableSource.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core.table;
+
+import cz.o2.proxima.flink.core.CommitLogSourceFunction;
+import cz.o2.proxima.repository.Repository;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
+
+public class CommitLogDynamicTableSource implements ScanTableSource, SupportsSourceWatermark {
+
+ private final Repository repository;
+ private final LogCatalogTable.CommitLogCatalogTable catalogTable;
+
+ public CommitLogDynamicTableSource(
+ Repository repository, LogCatalogTable.CommitLogCatalogTable catalogTable) {
+ this.repository = repository;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ final boolean bounded = catalogTable.getLogOptions().stopAtCurrent();
+ return SourceFunctionProvider.of(
+ new CommitLogSourceFunction<>(
+ repository.asFactory(),
+ catalogTable.getAttributeDescriptors(),
+ catalogTable.getLogOptions(),
+ new RowDataResultExtractor(catalogTable.getAttributeDescriptors())),
+ bounded);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new CommitLogDynamicTableSource(repository, catalogTable);
+ }
+
+ @Override
+ public void applySourceWatermark() {
+ // No-op.
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "CommitLog table source";
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java
new file mode 100644
index 000000000..84be23441
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core.table;
+
+import cz.o2.proxima.flink.core.FlinkDataOperator;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.repository.Repository;
+import cz.o2.proxima.scheme.SchemaDescriptors;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Getter;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+public abstract class LogCatalogTable
+ implements org.apache.flink.table.catalog.CatalogTable {
+
+ public static CommitLogCatalogTable ofCommitLog(
+ Repository repository,
+ List> attributeDescriptors,
+ FlinkDataOperator.CommitLogOptions logOptions) {
+ return new CommitLogCatalogTable(repository, attributeDescriptors, logOptions);
+ }
+
+ public static BatchLogCatalogTable ofBatchLog(
+ Repository repository,
+ List> attributeDescriptors,
+ FlinkDataOperator.BatchLogOptions logOptions) {
+ return new BatchLogCatalogTable(repository, attributeDescriptors, logOptions);
+ }
+ /**
+ * Convert {@link SchemaDescriptors.SchemaTypeDescriptor} to Flink's {@link DataType}.
+ *
+ * @param schema Schema to convert.
+ * @return Data type.
+ */
+ @VisibleForTesting
+ static DataType toDataType(SchemaDescriptors.SchemaTypeDescriptor> schema) {
+ switch (schema.getType()) {
+ case INT:
+ return DataTypes.INT();
+ case BYTE:
+ return DataTypes.BYTES();
+ case ENUM:
+ return DataTypes.STRING();
+ case LONG:
+ return DataTypes.BIGINT();
+ case ARRAY:
+ return DataTypes.ARRAY(toDataType(schema.asArrayTypeDescriptor().getValueDescriptor()));
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case STRING:
+ return DataTypes.VARCHAR(VarCharType.MAX_LENGTH);
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case STRUCTURE:
+ final DataTypes.Field[] fields =
+ schema
+ .asStructureTypeDescriptor()
+ .getFields()
+ .entrySet()
+ .stream()
+ .map(entry -> DataTypes.FIELD(entry.getKey(), toDataType(entry.getValue())))
+ .toArray(DataTypes.Field[]::new);
+ return DataTypes.ROW(fields);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown data type %s.", schema.getType()));
+ }
+ }
+
+ public static class CommitLogCatalogTable
+ extends LogCatalogTable {
+
+ public CommitLogCatalogTable(
+ Repository repository,
+ List> attributeDescriptors,
+ FlinkDataOperator.CommitLogOptions logOptions) {
+ super(repository, attributeDescriptors, logOptions);
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return new CommitLogCatalogTable(getRepository(), getAttributeDescriptors(), getLogOptions());
+ }
+ }
+
+ public static class BatchLogCatalogTable
+ extends LogCatalogTable {
+
+ public BatchLogCatalogTable(
+ Repository repository,
+ List> attributeDescriptors,
+ FlinkDataOperator.BatchLogOptions logOptions) {
+ super(repository, attributeDescriptors, logOptions);
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return new BatchLogCatalogTable(getRepository(), getAttributeDescriptors(), getLogOptions());
+ }
+ }
+
+ @Getter private final Repository repository;
+ @Getter private final List> attributeDescriptors;
+ @Getter private final OptionsT logOptions;
+
+ public LogCatalogTable(
+ Repository repository,
+ List> attributeDescriptors,
+ OptionsT logOptions) {
+ this.repository = repository;
+ this.attributeDescriptors = attributeDescriptors;
+ this.logOptions = logOptions;
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return false;
+ }
+
+ @Override
+ public List getPartitionKeys() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public org.apache.flink.table.catalog.CatalogTable copy(Map map) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public Map getOptions() {
+ final Map options = new HashMap<>();
+ options.put("connector", LogDynamicTableSourceFactory.FACTORY_IDENTIFIER);
+ return options;
+ }
+
+ @Override
+ public Schema getUnresolvedSchema() {
+ final Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("key", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull())
+ .column("uuid", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).nullable())
+ .column("attribute", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull())
+ .column("attribute_prefix", DataTypes.VARCHAR(VarCharType.MAX_LENGTH).notNull())
+ .column("timestamp", DataTypes.TIMESTAMP(3).notNull())
+ .primaryKey("key")
+ .watermark("timestamp", "SOURCE_WATERMARK()");
+ for (AttributeDescriptor> attributeDescriptor : attributeDescriptors) {
+ schemaBuilder.column(
+ attributeDescriptor.toAttributePrefix(false),
+ toDataType(attributeDescriptor.getSchemaTypeDescriptor()));
+ }
+ return schemaBuilder.build();
+ }
+
+ @Override
+ public String getComment() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public Optional getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional getDetailedDescription() {
+ return Optional.empty();
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java
new file mode 100644
index 000000000..716860eda
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogDynamicTableSourceFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core.table;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+
+public class LogDynamicTableSourceFactory implements DynamicTableSourceFactory {
+
+ static final String FACTORY_IDENTIFIER = "proxima";
+
+ public LogDynamicTableSourceFactory() {}
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final LogCatalogTable> catalogTable =
+ (LogCatalogTable>) context.getCatalogTable().getOrigin();
+ if (catalogTable instanceof LogCatalogTable.CommitLogCatalogTable) {
+ final LogCatalogTable.CommitLogCatalogTable cast =
+ (LogCatalogTable.CommitLogCatalogTable) catalogTable;
+ return new CommitLogDynamicTableSource(catalogTable.getRepository(), cast);
+ }
+ if (catalogTable instanceof LogCatalogTable.BatchLogCatalogTable) {
+ final LogCatalogTable.BatchLogCatalogTable cast =
+ (LogCatalogTable.BatchLogCatalogTable) catalogTable;
+ return new BatchLogDynamicTableSource(catalogTable.getRepository(), cast);
+ }
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unknown %s implementation %s.", LogCatalogTable.class, catalogTable.getClass()));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return Collections.emptySet();
+ }
+}
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java
new file mode 100644
index 000000000..05c2b3546
--- /dev/null
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2017-2021 O2 Czech Republic, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cz.o2.proxima.flink.core.table;
+
+import cz.o2.proxima.flink.core.ResultExtractor;
+import cz.o2.proxima.repository.AttributeDescriptor;
+import cz.o2.proxima.scheme.AttributeValueAccessor;
+import cz.o2.proxima.scheme.AttributeValueAccessors;
+import cz.o2.proxima.scheme.SchemaDescriptors;
+import cz.o2.proxima.storage.StreamElement;
+import cz.o2.proxima.util.Optionals;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+/** Convert {@link StreamElement} into {@link RowData}. */
+public class RowDataResultExtractor implements ResultExtractor {
+
+ private final List> attributeDescriptors;
+
+ public RowDataResultExtractor(List> attributeDescriptors) {
+ this.attributeDescriptors = attributeDescriptors;
+ }
+
+ @Override
+ public RowData toResult(StreamElement element) {
+ final AttributeDescriptor>[] attributeDescriptors =
+ this.attributeDescriptors.toArray(new AttributeDescriptor>[0]);
+ final GenericRowData data = new GenericRowData(5 + attributeDescriptors.length);
+ // key
+ data.setField(0, StringData.fromString(element.getKey()));
+ // uuid
+ data.setField(1, StringData.fromString(element.getUuid()));
+ // attribute
+ data.setField(2, StringData.fromString(element.getAttribute()));
+ // attribute prefix
+ data.setField(
+ 3, StringData.fromString(element.getAttributeDescriptor().toAttributePrefix(false)));
+ // timestamp
+ data.setField(4, TimestampData.fromEpochMillis(element.getStamp()));
+ // value
+ for (int i = 0; i < attributeDescriptors.length; i++) {
+ if (element.getAttributeDescriptor().equals(attributeDescriptors[i])) {
+ data.setField(5 + i, toFlinkObject(element));
+ }
+ }
+ return data;
+ }
+
+ @VisibleForTesting
+ static Object toFlinkObject(StreamElement element) {
+ @SuppressWarnings("unchecked")
+ final AttributeDescriptor attributeDescriptor =
+ (AttributeDescriptor) element.getAttributeDescriptor();
+ final ValueT value =
+ Optionals.get(attributeDescriptor.getValueSerializer().deserialize(element.getValue()));
+ final SchemaDescriptors.SchemaTypeDescriptor schemaDescriptor =
+ attributeDescriptor.getSchemaTypeDescriptor();
+ final AttributeValueAccessor valueAccessor =
+ attributeDescriptor.getValueSerializer().getValueAccessor();
+ return toFlinkObject(schemaDescriptor, valueAccessor, value);
+ }
+
+ private static Object toFlinkObject(
+ SchemaDescriptors.SchemaTypeDescriptor descriptor,
+ AttributeValueAccessor accessor,
+ ValueT value) {
+ switch (descriptor.getType()) {
+ case STRUCTURE:
+ final AttributeValueAccessors.StructureValueAccessor structureAccessor =
+ (AttributeValueAccessors.StructureValueAccessor) accessor;
+ return toFlinkStructure(descriptor.asStructureTypeDescriptor(), structureAccessor, value);
+ case STRING:
+ case ENUM:
+ return StringData.fromString((String) value);
+ case BOOLEAN:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case INT:
+ case BYTE:
+ return value;
+ case ARRAY:
+ final AttributeValueAccessors.ArrayValueAccessor arrayAccessor =
+ (AttributeValueAccessors.ArrayValueAccessor) accessor;
+ @SuppressWarnings("unchecked")
+ final List values = (List) value;
+ return toFlinkArray(descriptor.asArrayTypeDescriptor(), arrayAccessor, values);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Type [%s] is not supported.", descriptor.getType()));
+ }
+ }
+
+ private static RowData toFlinkStructure(
+ SchemaDescriptors.StructureTypeDescriptor descriptor,
+ AttributeValueAccessors.StructureValueAccessor accessor,
+ ValueT value) {
+ final GenericRowData rowData = new GenericRowData(descriptor.getFields().size());
+ final AtomicInteger idx = new AtomicInteger(0);
+ descriptor
+ .getFields()
+ .forEach(
+ (fieldName, fieldType) -> {
+ @SuppressWarnings("unchecked")
+ final SchemaDescriptors.SchemaTypeDescriptor