Skip to content

Commit

Permalink
[FLINK-35904][test] Make async test harness extend exsit test harness (
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Dec 26, 2024
1 parent e858969 commit 0a55677
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes);
}

private MultipleInputStreamOperator<OUT> getCastedOperator() {
protected MultipleInputStreamOperator<OUT> getCastedOperator() {
return (MultipleInputStreamOperator<OUT>) operator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.List;
Expand All @@ -48,10 +48,10 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends MultiInputStreamOperatorTestHarness<OUT> {

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> create(
StreamOperatorFactory<OUT> operatorFactory,
Expand Down Expand Up @@ -108,23 +108,31 @@ public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
config.serializeAllConfigs();
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void processElement(int idx, StreamRecord<?> element) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
RecordProcessorUtils.getRecordProcessor(input);
execute(executor, (ignore) -> inputProcessor.accept(element)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermark(int idx, Watermark mark) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermark(mark)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermarkStatus(watermarkStatus)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
Expand All @@ -137,16 +145,7 @@ public void drainStateRequests() throws Exception {

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}

private MultipleInputStreamOperator<OUT> getCastedOperator() {
return (MultipleInputStreamOperator<OUT>) operator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -34,7 +35,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.ArrayList;
Expand All @@ -54,13 +55,15 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends OneInputStreamOperatorTestHarness<IN, OUT> {

/** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */
private final List<Input> inputs = new ArrayList<>();
private final List<Input<IN>> inputs = new ArrayList<>();

private long currentWatermark;

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(
OneInputStreamOperator<IN, OUT> operator,
Expand Down Expand Up @@ -120,6 +123,7 @@ protected AsyncKeyedOneInputStreamOperatorTestHarness(
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void setup(TypeSerializer<OUT> outputSerializer) {
super.setup(outputSerializer);
if (operator instanceof MultipleInputStreamOperator) {
Expand All @@ -128,10 +132,7 @@ public void setup(TypeSerializer<OUT> outputSerializer) {
}
}

public OneInputStreamOperator<IN, OUT> getOneInputOperator() {
return (OneInputStreamOperator<IN, OUT>) this.operator;
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
processElementInternal(element).get();
}
Expand All @@ -140,6 +141,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
* Submit an element processing in an executor thread. This method is mainly used for internal
* testing, please use {@link #processElement} for common operator testing.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element)
throws Exception {
if (inputs.isEmpty()) {
Expand All @@ -160,22 +162,24 @@ public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element)
}
}

@Override
public void processWatermark(long watermark) throws Exception {
processWatermarkInternal(watermark).get();
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkInternal(long watermark) throws Exception {
public CompletableFuture<Void> processWatermarkInternal(long watermark) {
return processWatermarkInternal(new Watermark(watermark));
}

@Override
public void processWatermarkStatus(WatermarkStatus status) throws Exception {
processWatermarkStatusInternal(status).get();
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status)
throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) {
if (inputs.isEmpty()) {
return execute(
executor, (ignore) -> getOneInputOperator().processWatermarkStatus(status));
Expand All @@ -186,12 +190,22 @@ public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus st
}
}

@Override
public void processWatermark(Watermark mark) throws Exception {
processWatermarkInternal(mark).get();
}

@Override
public void endInput() throws Exception {
if (operator instanceof BoundedOneInput) {
execute(executor, (ignore) -> ((BoundedOneInput) operator).endInput()).get();
}
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {
return execute(executor, (ignore) -> getOneInputOperator().processWatermark(mark));
} else {
Expand All @@ -206,6 +220,7 @@ public void processLatencyMarker(LatencyMarker marker) throws Exception {
}

/** For internal testing. */
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker) {
if (inputs.isEmpty()) {
return execute(
Expand All @@ -217,11 +232,18 @@ public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker
}
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
processRecordAttributesInternal(recordAttributes).get();
}

@Override
public long getCurrentWatermark() {
return currentWatermark;
}

/** For internal testing. */
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processRecordAttributesInternal(
RecordAttributes recordAttributes) {
if (inputs.isEmpty()) {
Expand All @@ -246,12 +268,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -30,7 +31,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

Expand All @@ -48,15 +49,15 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {

private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;

private ThrowingConsumer<StreamRecord<IN1>, Exception> processor1;
private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, IN1, IN2, OUT>
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(
Expand Down Expand Up @@ -132,62 +133,85 @@ private ThrowingConsumer<StreamRecord<IN2>, Exception> getRecordProcessor2() {
return processor2;
}

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor1().accept(element)).get();
}

@Override
public void processElement1(IN1 value, long timestamp) throws Exception {
processElement1(new StreamRecord<>(value, timestamp));
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor2().accept(element)).get();
}

@Override
public void processElement2(IN2 value, long timestamp) throws Exception {
processElement2(new StreamRecord<>(value, timestamp));
}

@Override
public void processWatermark1(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
}

@Override
public void processBothWatermarks(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
}

@Override
public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus))
.get();
}

@Override
public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus))
.get();
}

@Override
public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes))
.get();
}

@Override
public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes))
.get();
}

public void endInput1() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(1)).get();
}
}

public void endInput2() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(2)).get();
}
}

public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
}

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static <OUT> void drain(StreamOperator<OUT> operator) {

public static CompletableFuture<Void> execute(
ExecutorService executor, ThrowingConsumer<Void, Exception> processor) {
CompletableFuture<Void> future = new CompletableFuture();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(
() -> {
try {
Expand Down

0 comments on commit 0a55677

Please sign in to comment.