Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Flink data operator fixes #701

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion flink/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -123,6 +122,10 @@
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <li>Unified metrics for monitoring and alerting.
* </ul>
*
* @param <OptionsT> Source options.
* @param <ReaderT> Reader to use, for reading data.
* @param <ObserverT> Observer implementation.
* @param <OffsetT> Offset used by the current reader.
Expand All @@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>,
OffsetT extends Serializable,
Expand Down Expand Up @@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)

@Getter private final RepositoryFactory repositoryFactory;
private final List<AttributeDescriptor<?>> attributeDescriptors;
@Getter private final OptionsT options;
private final ResultExtractor<OutputT> resultExtractor;

@Nullable private transient List<OffsetT> restoredOffsets;
Expand All @@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
OptionsT options,
ResultExtractor<OutputT> resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
this.options = options;
this.resultExtractor = resultExtractor;
}

Expand Down Expand Up @@ -190,12 +195,14 @@ public void open(Configuration parameters) {
* cz.o2.proxima.storage.StreamElement}.
* @param skipFirstElement List of partitions, we want to skip the first element from. See {@link
* #getSkipFirstElementFromPartitions(List)} for more details.
* @param attributesToEmit List of requested attributes
* @return Log observer.
*/
abstract ObserverT createLogObserver(
SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElement);
Set<Partition> skipFirstElement,
List<AttributeDescriptor<?>> attributesToEmit);

/**
* Observer partitions using a given reader. This method is called when starting a source from the
Expand Down Expand Up @@ -254,11 +261,15 @@ public void run(SourceContext<OutputT> sourceContext) throws Exception {
== getRuntimeContext().getIndexOfThisSubtask())
.collect(Collectors.toList());
final Set<Partition> skipFirstElement = getSkipFirstElementFromPartitions(filteredOffsets);
observer = createLogObserver(sourceContext, resultExtractor, skipFirstElement);
observer =
createLogObserver(
sourceContext, resultExtractor, skipFirstElement, attributeDescriptors);
observeHandle =
observeRestoredOffsets(reader, filteredOffsets, attributeDescriptors, observer);
} else {
observer = createLogObserver(sourceContext, resultExtractor, Collections.emptySet());
observer =
createLogObserver(
sourceContext, resultExtractor, Collections.emptySet(), attributeDescriptors);
observeHandle = observePartitions(reader, partitions, attributeDescriptors, observer);
}
log.info("Source [{}]: RUNNING", this);
Expand Down Expand Up @@ -290,14 +301,16 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
if (!options.shutdownFinishedSources()) {
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
}
}
}
}
Expand All @@ -323,7 +336,7 @@ public void notifyCheckpointComplete(long l) {
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
Objects.requireNonNull(persistedOffsets).clear();
if (observeHandle != null) {
for (OffsetT offset : observeHandle.getConsumedOffsets()) {
for (OffsetT offset : Objects.requireNonNull(observeHandle).getConsumedOffsets()) {
persistedOffsets.add(offset);
}
}
Expand All @@ -339,12 +352,12 @@ public void initializeState(FunctionInitializationContext context) throws Except
restoredOffsets = new ArrayList<>();
Objects.requireNonNull(persistedOffsets).get().forEach(restoredOffsets::add);
log.info(
"BatchLog subtask {} restored state: {}.",
"LogSource subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(),
restoredOffsets);
} else {
log.info(
"BatchLog subtask {} has no state to restore.",
"LogSource subtask {} has no state to restore.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package cz.o2.proxima.flink.core;

import cz.o2.proxima.direct.LogObserver;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.time.Watermarks;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -47,6 +49,8 @@ abstract class AbstractSourceLogObserver<
@Getter private final SourceFunction.SourceContext<OutputT> sourceContext;
private final ResultExtractor<OutputT> resultExtractor;

private final List<AttributeDescriptor<?>> attributesToEmit;

/**
* When restoring from checkpoint, we need to skip the first element in each partition, because it
* has been already consumed.
Expand All @@ -60,9 +64,11 @@ abstract class AbstractSourceLogObserver<
AbstractSourceLogObserver(
SourceFunction.SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElementFromEachPartition) {
Set<Partition> skipFirstElementFromEachPartition,
List<AttributeDescriptor<?>> attributesToEmit) {
this.sourceContext = sourceContext;
this.resultExtractor = resultExtractor;
this.attributesToEmit = attributesToEmit;
this.skipFirstElementFromEachPartition = skipFirstElementFromEachPartition;
}

Expand Down Expand Up @@ -93,13 +99,15 @@ public void onCancelled() {
}

@Override
public boolean onNext(StreamElement ingest, ContextT context) {
public boolean onNext(StreamElement element, ContextT context) {
final boolean skipElement =
skipFirstElementFromEachPartition.contains(context.getPartition())
&& seenPartitions.add(context.getPartition());
if (!skipElement) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collectWithTimestamp(resultExtractor.toResult(ingest), ingest.getStamp());
if (attributesToEmit.contains(element.getAttributeDescriptor())) {
sourceContext.collectWithTimestamp(resultExtractor.toResult(element), element.getStamp());
}
markOffsetAsConsumed(context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@Slf4j
public class BatchLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand All @@ -48,8 +49,9 @@ static class LogObserver<OutputT>
LogObserver(
SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElementFromEachPartition) {
super(sourceContext, resultExtractor, skipFirstElementFromEachPartition);
Set<Partition> skipFirstElementFromEachPartition,
List<AttributeDescriptor<?>> attributesToEmit) {
super(sourceContext, resultExtractor, skipFirstElementFromEachPartition, attributesToEmit);
}

@Override
Expand All @@ -63,8 +65,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.BatchLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand All @@ -84,7 +87,7 @@ BatchLogReader createLogReader(List<AttributeDescriptor<?>> attributeDescriptors

@Override
List<Partition> getPartitions(BatchLogReader reader) {
return reader.getPartitions();
return reader.getPartitions(getOptions().startTimestamp(), getOptions().endTimestamp());
}

@Override
Expand All @@ -106,8 +109,9 @@ Set<Partition> getSkipFirstElementFromPartitions(List<Offset> offsets) {
LogObserver<OutputT> createLogObserver(
SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElement) {
return new LogObserver<>(sourceContext, resultExtractor, skipFirstElement);
Set<Partition> skipFirstElement,
List<AttributeDescriptor<?>> attributesToEmit) {
return new LogObserver<>(sourceContext, resultExtractor, skipFirstElement, attributesToEmit);
}

@Override
Expand All @@ -116,6 +120,7 @@ UnifiedObserveHandle<Offset> observeRestoredOffsets(
List<Offset> offsets,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
@SuppressWarnings("resource")
final OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle delegate =
(OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle)
reader.observeOffsets(offsets, attributeDescriptors, wrapSourceObserver(observer));
Expand Down Expand Up @@ -144,6 +149,7 @@ UnifiedObserveHandle<Offset> observePartitions(
List<Partition> partitions,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
@SuppressWarnings("resource")
final ObserveHandle batchReaderHandle =
reader.observe(partitions, attributeDescriptors, wrapSourceObserver(observer));
// We've wrapped BatchLogReader with the OffsetTrackingBatchLogReader, so we can safely cast its
Expand All @@ -165,6 +171,7 @@ public List<Offset> getConsumedOffsets() {
@Override
public void close() {
offsetTrackingHandle.close();
batchReaderHandle.close();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package cz.o2.proxima.flink.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import cz.o2.proxima.annotations.Experimental;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
Expand All @@ -24,31 +26,48 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;

@Experimental(value = "API can be changed.")
@Slf4j
public class CommitLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver<OutputT>,
Offset,
CommitLogObserver.OnNextContext,
OutputT> {

private static final String CONSUMER_NAME_STATE_NAME = "consumer-name";

@Nullable private transient ListState<String> consumerNameState;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private String consumerName;

static class LogObserver<OutputT>
extends AbstractSourceLogObserver<Offset, CommitLogObserver.OnNextContext, OutputT>
implements CommitLogObserver {

LogObserver(
SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElementFromEachPartition) {
super(sourceContext, resultExtractor, skipFirstElementFromEachPartition);
Set<Partition> skipFirstElementFromEachPartition,
List<AttributeDescriptor<?>> attributesToEmit) {
super(sourceContext, resultExtractor, skipFirstElementFromEachPartition, attributesToEmit);
}

@Override
Expand All @@ -63,10 +82,13 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
}

public CommitLogSourceFunction(
String consumerName,
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.CommitLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
this.consumerName = Objects.requireNonNull(consumerName);
}

@Override
Expand Down Expand Up @@ -101,8 +123,29 @@ Set<Partition> getSkipFirstElementFromPartitions(List<Offset> offsets) {
LogObserver<OutputT> createLogObserver(
SourceContext<OutputT> sourceContext,
ResultExtractor<OutputT> resultExtractor,
Set<Partition> skipFirstElement) {
return new LogObserver<>(sourceContext, resultExtractor, skipFirstElement);
Set<Partition> skipFirstElement,
List<AttributeDescriptor<?>> attributesToEmit) {
return new LogObserver<>(sourceContext, resultExtractor, skipFirstElement, attributesToEmit);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
super.initializeState(context);
consumerNameState =
context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(CONSUMER_NAME_STATE_NAME, String.class));
if (context.isRestored()) {
Objects.requireNonNull(consumerNameState);
consumerName = Iterables.getFirst(consumerNameState.get(), consumerName);
}
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
super.snapshotState(functionSnapshotContext);
Objects.requireNonNull(consumerNameState).clear();
consumerNameState.add(Objects.requireNonNull(consumerName));
}

@Override
Expand All @@ -111,8 +154,10 @@ UnifiedObserveHandle<Offset> observePartitions(
List<Partition> partitions,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
reader.observeBulkPartitions(
consumerName, partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand All @@ -133,8 +178,9 @@ UnifiedObserveHandle<Offset> observeRestoredOffsets(
List<Offset> offsets,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
reader.observeBulkOffsets(offsets, false, observer);
@SuppressWarnings("resource")
final ObserveHandle delegate =
reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand Down
Loading