Skip to content

Commit

Permalink
[flink-core] Introduce LogOptions. Add javadocs.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk committed Jun 4, 2021
1 parent 20c6bcf commit d7e817a
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <li>Unified metrics for monitoring and alerting.
* </ul>
*
* @param <OptionsT> Source options.
* @param <ReaderT> Reader to use, for reading data.
* @param <ObserverT> Observer implementation.
* @param <OffsetT> Offset used by the current reader.
Expand All @@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>,
OffsetT extends Serializable,
Expand Down Expand Up @@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)

@Getter private final RepositoryFactory repositoryFactory;
private final List<AttributeDescriptor<?>> attributeDescriptors;
@Getter private final OptionsT options;
private final ResultExtractor<OutputT> resultExtractor;

@Nullable private transient List<OffsetT> restoredOffsets;
Expand All @@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
OptionsT options,
ResultExtractor<OutputT> resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
this.options = options;
this.resultExtractor = resultExtractor;
}

Expand Down Expand Up @@ -290,17 +295,19 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
sourceContext.markAsTemporarilyIdle();
// while (cancelled.getCount() > 0) {
// try {
// cancelled.await();
// } catch (InterruptedException e) {
// if (cancelled.getCount() == 0) {
// // Re-interrupt if cancelled.
// Thread.currentThread().interrupt();
// }
// }
// }
if (!options.shutdownFinishedSources()) {
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@Slf4j
public class BatchLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.BatchLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -32,6 +31,7 @@
@Slf4j
public class CommitLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
public CommitLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.CommitLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand Down Expand Up @@ -109,8 +110,10 @@ UnifiedObserveHandle<Offset> observePartitions(
List<Partition> partitions,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
reader.observeBulkPartitions(
partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand All @@ -132,7 +135,7 @@ UnifiedObserveHandle<Offset> observeRestoredOffsets(
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
reader.observeBulkOffsets(offsets, false, observer);
reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,55 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;

/** Operate {@link Repository} using Apache Flink. */
public class FlinkDataOperator implements DataOperator {

/** Common options for all log based sources. */
public interface LogOptions {

/**
* Shutdown sources, when they have no more work to do.
*
* @return True to shutdown when finished.
*/
boolean shutdownFinishedSources();
}

@Accessors(fluent = true)
@Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
@Value
public static class CommitLogOptions implements LogOptions {

/** @see LogOptions#shutdownFinishedSources() */
@Builder.Default boolean shutdownFinishedSources = false;

/**
* An initial position to start reading from, when sources is starting from a clean state (no
* checkpoint to restore from).
*/
@Builder.Default Position initialPosition = Position.OLDEST;

/** Signal to stop reading, when event time aligns with processing time. */
@Builder.Default boolean stopAtCurrent = false;
}

@Accessors(fluent = true)
@Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
@Value
public static class BatchLogOptions implements LogOptions {

/** @see LogOptions#shutdownFinishedSources() */
@Builder.Default boolean shutdownFinishedSources = false;
}

private final Repository repository;
@Getter private final StreamExecutionEnvironment executionEnvironment;
@Getter private final StreamTableEnvironment tableEnvironment;
Expand All @@ -61,25 +101,16 @@ public Repository getRepository() {
return repository;
}

@Builder(builderMethodName = "newBuilder", setterPrefix = "with", toBuilder = true)
@Value
public static class CommitLogOptions {

@Builder.Default Position initialPosition = Position.OLDEST;

@Builder.Default boolean stopAtCurrent = false;
}

public DataStream<StreamElement> createBatchLogStream(
List<AttributeDescriptor<?>> attributeDescriptors) {
return createBatchLogStream(attributeDescriptors, CommitLogOptions.newBuilder().build());
return createBatchLogStream(attributeDescriptors, BatchLogOptions.newBuilder().build());
}

public DataStream<StreamElement> createBatchLogStream(
List<AttributeDescriptor<?>> attributeDescriptors, CommitLogOptions options) {
List<AttributeDescriptor<?>> attributeDescriptors, BatchLogOptions options) {
return executionEnvironment.addSource(
new BatchLogSourceFunction<>(
repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
repository.asFactory(), attributeDescriptors, options, ResultExtractor.identity()),
sourceName("BatchLog", attributeDescriptors));
}

Expand All @@ -103,7 +134,7 @@ public DataStream<StreamElement> createCommitLogStream(
List<AttributeDescriptor<?>> attributeDescriptors, CommitLogOptions options) {
return executionEnvironment.addSource(
new CommitLogSourceFunction<>(
repository.asFactory(), attributeDescriptors, ResultExtractor.identity()),
repository.asFactory(), attributeDescriptors, options, ResultExtractor.identity()),
sourceName("CommitLog", attributeDescriptors));
}

Expand All @@ -114,8 +145,8 @@ public void registerCommitLogTable(
try {
catalog.createTable(
ObjectPath.fromString(String.format("%s.%s", catalog.getDefaultDatabase(), table)),
new LogCatalogTable(repository, attributeDescriptors),
true);
new LogCatalogTable(repository, attributeDescriptors, options),
false);
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to register a new table [%s].", table), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
final boolean bounded = catalogTable.getLogOptions().stopAtCurrent();
return SourceFunctionProvider.of(
new CommitLogSourceFunction<>(
repository.asFactory(),
catalogTable.getAttributeDescriptors(),
catalogTable.getLogOptions(),
new RowDataResultExtractor(catalogTable.getAttributeDescriptors())),
false);
bounded);
}

@Override
Expand Down
Loading

0 comments on commit d7e817a

Please sign in to comment.