Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk committed Jun 8, 2021
1 parent 75e1fae commit cd2f3e5
Show file tree
Hide file tree
Showing 17 changed files with 1,191 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public boolean onError(Throwable error) {
}

@Test
public void testOrderedObserverLifycycle() {
public void testOrderedObserverLifecycle() {
StreamElement update =
StreamElement.upsert(
entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <li>Unified metrics for monitoring and alerting.
* </ul>
*
* @param <OptionsT> Source options.
* @param <ReaderT> Reader to use, for reading data.
* @param <ObserverT> Observer implementation.
* @param <OffsetT> Offset used by the current reader.
Expand All @@ -67,6 +68,7 @@
*/
@Slf4j
abstract class AbstractLogSourceFunction<
OptionsT extends FlinkDataOperator.LogOptions,
ReaderT,
ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>,
OffsetT extends Serializable,
Expand Down Expand Up @@ -110,6 +112,7 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)

@Getter private final RepositoryFactory repositoryFactory;
private final List<AttributeDescriptor<?>> attributeDescriptors;
@Getter private final OptionsT options;
private final ResultExtractor<OutputT> resultExtractor;

@Nullable private transient List<OffsetT> restoredOffsets;
Expand All @@ -133,9 +136,11 @@ private static int getSubtaskIndex(Partition partition, int numParallelSubtasks)
AbstractLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
OptionsT options,
ResultExtractor<OutputT> resultExtractor) {
this.repositoryFactory = repositoryFactory;
this.attributeDescriptors = attributeDescriptors;
this.options = options;
this.resultExtractor = resultExtractor;
}

Expand Down Expand Up @@ -290,14 +295,16 @@ void finishAndMarkAsIdle(SourceContext<?> sourceContext) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.emitWatermark(new Watermark(Watermarks.MAX_WATERMARK));
}
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
if (!options.shutdownFinishedSources()) {
sourceContext.markAsTemporarilyIdle();
while (cancelled.getCount() > 0) {
try {
cancelled.await();
} catch (InterruptedException e) {
if (cancelled.getCount() == 0) {
// Re-interrupt if cancelled.
Thread.currentThread().interrupt();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
@Slf4j
public class BatchLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.BatchLogOptions,
BatchLogReader,
BatchLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -61,8 +62,9 @@ void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
public BatchLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.BatchLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand All @@ -82,7 +84,7 @@ BatchLogReader createLogReader(List<AttributeDescriptor<?>> attributeDescriptors

@Override
List<Partition> getPartitions(BatchLogReader reader) {
return reader.getPartitions();
return reader.getPartitions(getOptions().startTimestamp(), getOptions().endTimestamp());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -32,6 +31,7 @@
@Slf4j
public class CommitLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<
FlinkDataOperator.CommitLogOptions,
CommitLogReader,
CommitLogSourceFunction.LogObserver<OutputT>,
Offset,
Expand Down Expand Up @@ -63,8 +63,9 @@ void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
public CommitLogSourceFunction(
RepositoryFactory repositoryFactory,
List<AttributeDescriptor<?>> attributeDescriptors,
FlinkDataOperator.CommitLogOptions options,
ResultExtractor<OutputT> resultExtractor) {
super(repositoryFactory, attributeDescriptors, resultExtractor);
super(repositoryFactory, attributeDescriptors, options, resultExtractor);
}

@Override
Expand Down Expand Up @@ -109,8 +110,10 @@ UnifiedObserveHandle<Offset> observePartitions(
List<Partition> partitions,
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final FlinkDataOperator.CommitLogOptions options = getOptions();
final ObserveHandle commitLogHandle =
reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
reader.observeBulkPartitions(
partitions, options.initialPosition(), options.stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand All @@ -132,7 +135,7 @@ UnifiedObserveHandle<Offset> observeRestoredOffsets(
List<AttributeDescriptor<?>> attributeDescriptors,
LogObserver<OutputT> observer) {
final cz.o2.proxima.direct.commitlog.ObserveHandle delegate =
reader.observeBulkOffsets(offsets, false, observer);
reader.observeBulkOffsets(offsets, getOptions().stopAtCurrent(), observer);
return new UnifiedObserveHandle<Offset>() {

@Override
Expand Down
Loading

0 comments on commit cd2f3e5

Please sign in to comment.