diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
index ea28a3319..f6e28265d 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/AbstractLogSourceFunction.java
@@ -59,6 +59,7 @@
*
Unified metrics for monitoring and alerting.
*
*
+ * @param Source options.
* @param Reader to use, for reading data.
* @param Observer implementation.
* @param Offset used by the current reader.
@@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
+ OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver,
OffsetT extends Serializable,
@@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
@Getter private final RepositoryFactory repositoryFactory;
private final List> attributeDescriptors;
+ @Getter private final OptionsT options;
private final ResultExtractor resultExtractor;
@Nullable private transient List restoredOffsets;
@@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ OptionsT options,
ResultExtractor resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
+ this.options = options;
this.resultExtractor = resultExtractor;
}
@@ -290,17 +295,19 @@ void finishAndMarkAsIdle(SourceContext> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
- sourceContext.markAsTemporarilyIdle();
- // while (cancelled.getCount() > 0) {
- // try {
- // cancelled.await();
- // } catch (InterruptedException e) {
- // if (cancelled.getCount() == 0) {
- // // Re-interrupt if cancelled.
- // Thread.currentThread().interrupt();
- // }
- // }
- // }
+ if (!options.shutdownFinishedSources()) {
+ sourceContext.markAsTemporarilyIdle();
+ while (cancelled.getCount() > 0) {
+ try {
+ cancelled.await();
+ } catch (InterruptedException e) {
+ if (cancelled.getCount() == 0) {
+ // Re-interrupt if cancelled.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
}
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
index b39fe221b..c5fdf2cc4 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/BatchLogSourceFunction.java
@@ -33,6 +33,7 @@
@Slf4j
public class BatchLogSourceFunction
extends AbstractLogSourceFunction<
+ FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver,
Offset,
@@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ FlinkDataOperator.BatchLogOptions options,
ResultExtractor resultExtractor) {
- super(repositoryFactory, attributeDescriptors, resultExtractor);
+ super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
index 13c1057bc..698bfe931 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/CommitLogSourceFunction.java
@@ -23,7 +23,6 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
-import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,6 +31,7 @@
@Slf4j
public class CommitLogSourceFunction
extends AbstractLogSourceFunction<
+ FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver,
Offset,
@@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
public CommitLogSourceFunction(
RepositoryFactory repositoryFactory,
List> attributeDescriptors,
+ FlinkDataOperator.CommitLogOptions options,
ResultExtractor resultExtractor) {
- super(repositoryFactory, attributeDescriptors, resultExtractor);
+ super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}
@Override
@@ -109,8 +110,10 @@ UnifiedObserveHandle observePartitions(
List partitions,
List> attributeDescriptors,
LogObserver observer) {
+ final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
- reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
+ reader.observeBulkPartitions(
+ partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle() {
@Override
@@ -132,7 +135,7 @@ UnifiedObserveHandle observeRestoredOffsets(
List> attributeDescriptors,
LogObserver observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
- reader.observeBulkOffsets(offsets, false, observer);
+ reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle() {
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java
index 3307caefa..f44a4687b 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/FlinkDataOperator.java
@@ -27,6 +27,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Value;
+import lombok.experimental.Accessors;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,8 +35,47 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
+/** Operate {@link Repository} using Apache Flink. */
public class FlinkDataOperator implements DataOperator {
+ /** Common options for all log based sources. */
+ public interface LogOptions {
+
+ /**
+ * Shutdown sources, when they have no more work to do.
+ *
+ * @return True to shutdown when finished.
+ */
+ boolean shutdownFinishedSources();
+ }
+
+ @Accessors(fluent = true)
+ @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
+ @Value
+ public static class CommitLogOptions implements LogOptions {
+
+ /** @see LogOptions#shutdownFinishedSources() */
+ @Builder.Default boolean shutdownFinishedSources = false;
+
+ /**
+ * An initial position to start reading from, when sources is starting from a clean state (no
+ * checkpoint to restore from).
+ */
+ @Builder.Default Position initialPosition = Position.OLDEST;
+
+ /** Signal to stop reading, when event time aligns with processing time. */
+ @Builder.Default boolean stopAtCurrent = false;
+ }
+
+ @Accessors(fluent = true)
+ @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
+ @Value
+ public static class BatchLogOptions implements LogOptions {
+
+ /** @see LogOptions#shutdownFinishedSources() */
+ @Builder.Default boolean shutdownFinishedSources = false;
+ }
+
private final Repository repository;
@Getter private final StreamExecutionEnvironment executionEnvironment;
@Getter private final StreamTableEnvironment tableEnvironment;
@@ -61,25 +101,16 @@ public Repository getRepository() {
return repository;
}
- @Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
- @Value
- public static class CommitLogOptions {
-
- @Builder.Default Position initialPosition = Position.OLDEST;
-
- @Builder.Default boolean stopAtCurrent = false;
- }
-
public DataStream createBatchLogStream(
List> attributeDescriptors) {
- return createBatchLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build());
+ return createBatchLogStream(attributeDescriptors, BatchLogOptions.newBuilder().build());
}
public DataStream createBatchLogStream(
- List> attributeDescriptors, CommitLogOptions options) {
+ List> attributeDescriptors, BatchLogOptions options) {
return executionEnvironment.addSource(
new BatchLogSourceFunction<>(
- repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
+ repository.asFactory(), attributeDescriptors, options, ResultExtractor.identity()),
sourceName("BatchLog", attributeDescriptors));
}
@@ -103,7 +134,7 @@ public DataStream createCommitLogStream(
List> attributeDescriptors, CommitLogOptions options) {
return executionEnvironment.addSource(
new CommitLogSourceFunction<>(
- repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
+ repository.asFactory(), attributeDescriptors, options, ResultExtractor.identity()),
sourceName("CommitLog", attributeDescriptors));
}
@@ -114,8 +145,8 @@ public void registerCommitLogTable(
try {
catalog.createTable(
ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), table)),
- new LogCatalogTable(repository, attributeDescriptors),
- true);
+ new LogCatalogTable(repository, attributeDescriptors, options),
+ false);
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to register a new table [%s].", table), e);
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java
index 287202ccf..a1e786632 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/CommitLogDynamicSource.java
@@ -43,12 +43,14 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ final boolean bounded = catalogTable.getLogOptions().stopAtCurrent();
return SourceFunctionProvider.of(
new CommitLogSourceFunction<>(
repository.asFactory(),
catalogTable.getAttributeDescriptors(),
+ catalogTable.getLogOptions(),
new RowDataResultExtractor(catalogTable.getAttributeDescriptors())),
- false);
+ bounded);
}
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java
index 6c2c2c105..941b33aaa 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/LogCatalogTable.java
@@ -15,31 +15,76 @@
*/
package cz.o2.proxima.flink.core.table;
+import cz.o2.proxima.flink.core.FlinkDataOperator;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.scheme.SchemaDescriptors;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
public class LogCatalogTable implements org.apache.flink.table.catalog.CatalogTable {
+ /**
+ * Convert {@link SchemaDescriptors.SchemaTypeDescriptor} to Flink's {@link DataType}.
+ *
+ * @param schema Schema to convert.
+ * @return Data type.
+ */
+ private static DataType toDataType(SchemaDescriptors.SchemaTypeDescriptor> schema) {
+ switch (schema.getType()) {
+ case INT:
+ return DataTypes.INT();
+ case BYTE:
+ return DataTypes.BYTES();
+ case ENUM:
+ throw new UnsupportedOperationException("Not implemented.");
+ case LONG:
+ return DataTypes.BIGINT();
+ case ARRAY:
+ return DataTypes.ARRAY(toDataType(schema.asArrayTypeDescriptor().getValueDescriptor()));
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case STRING:
+ return DataTypes.VARCHAR(VarCharType.MAX_LENGTH);
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case STRUCTURE:
+ final DataTypes.Field[] fields =
+ schema
+ .asStructureTypeDescriptor()
+ .getFields()
+ .entrySet()
+ .stream()
+ .map(entry -> DataTypes.FIELD(entry.getKey(), toDataType(entry.getValue())))
+ .toArray(DataTypes.Field[]::new);
+ return DataTypes.ROW(fields);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown data type %s.", schema.getType()));
+ }
+ }
+
@Getter private final Repository repository;
@Getter private final List> attributeDescriptors;
+ @Getter private final FlinkDataOperator.CommitLogOptions logOptions;
- public LogCatalogTable(Repository repository, List> attributeDescriptors) {
+ public LogCatalogTable(
+ Repository repository,
+ List> attributeDescriptors,
+ FlinkDataOperator.CommitLogOptions logOptions) {
this.repository = repository;
this.attributeDescriptors = attributeDescriptors;
+ this.logOptions = logOptions;
}
@Override
@@ -65,60 +110,22 @@ public Map getOptions() {
}
@Override
- public TableSchema getSchema() {
- final TableSchema.Builder schemaBuilder =
- TableSchema.builder()
- .add(TableColumn.physical("key", DataTypes.VARCHAR(VarCharType.MAX_LENGTH)))
- .add(TableColumn.physical("uuid", DataTypes.VARCHAR(VarCharType.MAX_LENGTH)))
- .add(TableColumn.physical("attribute", DataTypes.VARCHAR(VarCharType.MAX_LENGTH)))
- .add(
- TableColumn.physical("attribute_prefix", DataTypes.VARCHAR(VarCharType.MAX_LENGTH)))
- .add(TableColumn.physical("timestamp", DataTypes.TIMESTAMP()));
+ public Schema getUnresolvedSchema() {
+ final Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("key", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))
+ .column("uuid", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))
+ .column("attribute", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))
+ .column("attribute_prefix", DataTypes.VARCHAR(VarCharType.MAX_LENGTH))
+ .column("timestamp", DataTypes.TIMESTAMP());
for (AttributeDescriptor> attributeDescriptor : attributeDescriptors) {
- schemaBuilder.add(
- TableColumn.physical(
- attributeDescriptor.toAttributePrefix(false),
- convert(attributeDescriptor.getSchemaTypeDescriptor())));
+ schemaBuilder.column(
+ attributeDescriptor.toAttributePrefix(false),
+ toDataType(attributeDescriptor.getSchemaTypeDescriptor()));
}
return schemaBuilder.build();
}
- private DataType convert(SchemaDescriptors.SchemaTypeDescriptor> schema) {
- switch (schema.getType()) {
- case INT:
- return DataTypes.INT();
- case BYTE:
- return DataTypes.BYTES();
- case ENUM:
- throw new UnsupportedOperationException("Not implemented.");
- case LONG:
- return DataTypes.BIGINT();
- case ARRAY:
- return DataTypes.ARRAY(convert(schema.asArrayTypeDescriptor().getValueDescriptor()));
- case FLOAT:
- return DataTypes.FLOAT();
- case DOUBLE:
- return DataTypes.DOUBLE();
- case STRING:
- return DataTypes.VARCHAR(VarCharType.MAX_LENGTH);
- case BOOLEAN:
- return DataTypes.BOOLEAN();
- case STRUCTURE:
- final DataTypes.Field[] fields =
- schema
- .asStructureTypeDescriptor()
- .getFields()
- .entrySet()
- .stream()
- .map(entry -> DataTypes.FIELD(entry.getKey(), convert(entry.getValue())))
- .toArray(DataTypes.Field[]::new);
- return DataTypes.ROW(fields);
- default:
- throw new IllegalArgumentException(
- String.format("Unknown data type %s.", schema.getType()));
- }
- }
-
@Override
public String getComment() {
throw new UnsupportedOperationException("Not implemented.");
@@ -126,7 +133,7 @@ public String getComment() {
@Override
public CatalogBaseTable copy() {
- return new LogCatalogTable(repository, attributeDescriptors);
+ return new LogCatalogTable(repository, attributeDescriptors, logOptions);
}
@Override
diff --git a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java
index 27d8691a8..443580cad 100644
--- a/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java
+++ b/flink/core/src/main/java/cz/o2/proxima/flink/core/table/RowDataResultExtractor.java
@@ -31,6 +31,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+/** Convert {@link StreamElement} into {@link RowData}. */
public class RowDataResultExtractor implements ResultExtractor {
private final List> attributeDescriptors;
@@ -58,13 +59,13 @@ public RowData toResult(StreamElement element) {
// value
for (int i = 0; i < attributeDescriptors.length; i++) {
if (element.getAttributeDescriptor().equals(attributeDescriptors[i])) {
- data.setField(5 + i, x(element, element.getAttributeDescriptor()));
+ data.setField(5 + i, convertStreamElement(element, element.getAttributeDescriptor()));
}
}
return data;
}
- private static Object x(
+ private static Object convertStreamElement(
StreamElement element, AttributeDescriptor attributeDescriptor) {
final ValueT value =
Optionals.get(attributeDescriptor.getValueSerializer().deserialize(element.getValue()));
@@ -75,7 +76,7 @@ private static Object x(
return toFlinkData(schemaDescriptor, valueAccessor, value);
}
- public static Object toFlinkData(
+ private static Object toFlinkData(
SchemaDescriptors.SchemaTypeDescriptor descriptor,
AttributeValueAccessor accessor,
ValueT value) {
@@ -105,7 +106,7 @@ public static Object toFlinkData(
}
}
- public static RowData toFlinkStructure(
+ private static RowData toFlinkStructure(
SchemaDescriptors.StructureTypeDescriptor descriptor,
AttributeValueAccessors.StructureValueAccessor accessor,
ValueT value) {
@@ -127,7 +128,7 @@ public static RowData toFlinkStructure(
return rowData;
}
- public static ArrayData toFlinkArray(
+ private static ArrayData toFlinkArray(
SchemaDescriptors.ArrayTypeDescriptor descriptor,
AttributeValueAccessors.ArrayValueAccessor accessor,
List values) {
diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java
index f922f3977..c7018f90e 100644
--- a/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java
+++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/BatchLogSourceFunctionTest.java
@@ -106,6 +106,7 @@ void testRunAndClose() throws Exception {
new BatchLogSourceFunction(
repository.asFactory(),
Collections.singletonList(attribute),
+ FlinkDataOperator.BatchLogOptions.newBuilder().build(),
ResultExtractor.identity()) {
@Override
@@ -263,6 +264,7 @@ private OperatorSubtaskState runSubtask(
new BatchLogSourceFunction(
repository.asFactory(),
Collections.singletonList(attributeDescriptor),
+ FlinkDataOperator.BatchLogOptions.newBuilder().build(),
ResultExtractor.identity()) {
@Override
diff --git a/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java b/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java
index 4e9c9df22..3be19cb98 100644
--- a/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java
+++ b/flink/core/src/test/java/cz/o2/proxima/flink/core/CommitLogSourceFunctionTest.java
@@ -100,6 +100,7 @@ void testRunAndClose() throws Exception {
new CommitLogSourceFunction<>(
repository.asFactory(),
Collections.singletonList(attribute),
+ FlinkDataOperator.CommitLogOptions.newBuilder().build(),
ResultExtractor.identity());
final AbstractStreamOperatorTestHarness testHarness =
createTestHarness(sourceFunction, 1, 0);
@@ -141,6 +142,7 @@ void testObserverErrorPropagatesToTheMainThread() throws Exception {
new CommitLogSourceFunction<>(
repository.asFactory(),
Collections.singletonList(attributeDescriptor),
+ FlinkDataOperator.CommitLogOptions.newBuilder().build(),
element -> {
throw new IllegalStateException("Test failure.");
});
@@ -284,6 +286,7 @@ private OperatorSubtaskState runSubtask(
new CommitLogSourceFunction<>(
repository.asFactory(),
Collections.singletonList(attributeDescriptor),
+ FlinkDataOperator.CommitLogOptions.newBuilder().build(),
ResultExtractor.identity());
final AbstractStreamOperatorTestHarness testHarness =
createTestHarness(sourceFunction, numSubtasks, subtaskIndex);