diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java index 3c175a80c8d58..bb44462ab051e 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java @@ -225,6 +225,11 @@ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {} public UserCodeClassLoader getUserCodeClassLoader() { return SimpleUserCodeClassLoader.create(getClass().getClassLoader()); } + + @Override + public int currentParallelism() { + return 1; + } } private static final class NoOpReaderOutput implements ReaderOutput { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java new file mode 100644 index 0000000000000..a6c3216d04ee9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io.ratelimiting; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter; + +/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */ +public class GuavaRateLimiter implements org.apache.flink.api.common.io.ratelimiting.RateLimiter { + + private final RateLimiter rateLimiter; + + public GuavaRateLimiter(long maxPerSecond, int numParallelExecutors) { + final float maxPerSecondPerSubtask = (float) maxPerSecond / numParallelExecutors; + this.rateLimiter = RateLimiter.create(maxPerSecondPerSubtask); + } + + @Override + public int acquire() { + return (int) (1000 * rateLimiter.acquire()); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java new file mode 100644 index 0000000000000..577897f41ae51 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io.ratelimiting; + +import java.io.Serializable; + +/** The interface that can be used to throttle execution of methods. */ +public interface RateLimiter extends Serializable { + + /** + * Acquire method is a blocking call that is intended to be used in places where it is required + * to limit the rate at which results are produced or other functions are called. + * + * @return The number of milliseconds this call blocked its caller. + * @throws InterruptedException The interrupted exception. + */ + int acquire() throws InterruptedException; +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java index 304afddf90a83..f2861758b6865 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java @@ -63,4 +63,11 @@ public interface SourceReaderContext { * @see UserCodeClassLoader */ UserCodeClassLoader getUserCodeClassLoader(); + + /** + * Get the current parallelism of this Source. + * + * @return the parallelism of the Source. + */ + int currentParallelism(); } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java new file mode 100644 index 0000000000000..21f4297ea2b5c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter; +import org.apache.flink.api.common.io.ratelimiting.RateLimiter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit; +import org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A data source that produces N data points in parallel. This source is useful for testing and for + * cases that just need a stream of N events of any kind. + * + *

The source splits the sequence into as many parallel sub-sequences as there are parallel + * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is + * limited to one, this will produce one sequence in order. + * + *

This source is always bounded. For very long sequences (for example over the entire domain of + * long integer values), user may want to consider executing the application in a s treaming manner, + * because, despite the fact that the produced stream is bounded, the end bound is pretty far away. + */ +@Experimental +public class DataGeneratorSourceV3 + implements Source< + OUT, + NumberSequenceSource.NumberSequenceSplit, + Collection>, + ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private final TypeInformation typeInfo; + + private final MapFunction generatorFunction; + + private final NumberSequenceSource numberSource; + + private long maxPerSecond = -1; + + /** + * Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel. + * + * @param generatorFunction The generator function that receives index numbers and translates + * them into events of the output type. + * @param count The number of events to be produced. + * @param typeInfo The type information of the returned events. + */ + public DataGeneratorSourceV3( + MapFunction generatorFunction, long count, TypeInformation typeInfo) { + this.typeInfo = checkNotNull(typeInfo); + this.generatorFunction = checkNotNull(generatorFunction); + this.numberSource = new NumberSequenceSource(0, count); + } + + /** + * Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel. + * + * @param generatorFunction The generator function that receives index numbers and translates + * them into events of the output type. + * @param count The number of events to be produced. + * @param sourceRatePerSecond The maximum number of events per seconds that this generator aims + * to produce. This is a target number for the whole source and the individual parallel + * source instances automatically adjust their rate taking based on the {@code + * sourceRatePerSecond} and the source parallelism. + * @param typeInfo The type information of the returned events. + */ + public DataGeneratorSourceV3( + MapFunction generatorFunction, + long count, + long sourceRatePerSecond, + TypeInformation typeInfo) { + checkArgument(sourceRatePerSecond > 0, "maxPerSeconds has to be a positive number"); + this.typeInfo = checkNotNull(typeInfo); + this.generatorFunction = checkNotNull(generatorFunction); + this.numberSource = new NumberSequenceSource(0, count); + this.maxPerSecond = sourceRatePerSecond; + } + + /** @return The number of records produced by this source. */ + public long getCount() { + + return numberSource.getTo(); + } + + // ------------------------------------------------------------------------ + // source methods + // ------------------------------------------------------------------------ + + @Override + public TypeInformation getProducedType() { + return typeInfo; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + if (maxPerSecond > 0) { + int parallelism = readerContext.currentParallelism(); + RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism); + return new RateLimitedSourceReader<>( + new GeneratingIteratorSourceReader<>(readerContext, generatorFunction), + rateLimiter); + } else { + return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction); + } + } + + @Override + public SplitEnumerator> restoreEnumerator( + SplitEnumeratorContext enumContext, + Collection checkpoint) + throws Exception { + return new IteratorSourceEnumerator<>(enumContext, checkpoint); + } + + @Override + public SplitEnumerator> createEnumerator( + final SplitEnumeratorContext enumContext) { + final List splits = + numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism()); + return new IteratorSourceEnumerator<>(enumContext, splits); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return numberSource.getSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer> + getEnumeratorCheckpointSerializer() { + return numberSource.getEnumeratorCheckpointSerializer(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV4.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV4.java new file mode 100644 index 0000000000000..21ac37a5e71fc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV4.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A data source that produces N data points in parallel. This source is useful for testing and for + * cases that just need a stream of N events of any kind. + * + *

The source splits the sequence into as many parallel sub-sequences as there are parallel + * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is + * limited to one, this will produce one sequence in order. + * + *

This source is always bounded. For very long sequences (for example over the entire domain of + * long integer values), user may want to consider executing the application in a s treaming manner, + * because, despite the fact that the produced stream is bounded, the end bound is pretty far away. + */ +@Experimental +public class DataGeneratorSourceV4 + implements Source>, + ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private final SourceReaderFactory sourceReaderFactory; + private final TypeInformation typeInfo; + + private final NumberSequenceSource numberSource; + + public DataGeneratorSourceV4( + SourceReaderFactory sourceReaderFactory, + long count, + TypeInformation typeInfo) { + this.sourceReaderFactory = checkNotNull(sourceReaderFactory); + this.typeInfo = checkNotNull(typeInfo); + this.numberSource = new NumberSequenceSource(0, count); + } + + public DataGeneratorSourceV4( + GeneratorFunction generatorFunction, + long count, + long sourceRatePerSecond, + TypeInformation typeInfo) { + this( + new GeneratorSourceReaderFactory<>(generatorFunction, sourceRatePerSecond), + count, + typeInfo); + } + + public DataGeneratorSourceV4( + GeneratorFunction generatorFunction, + long count, + TypeInformation typeInfo) { + this(generatorFunction, count, -1, typeInfo); + } + + /** @return The number of records produced by this source. */ + public long getCount() { + return numberSource.getTo(); + } + + // ------------------------------------------------------------------------ + // source methods + // ------------------------------------------------------------------------ + + @Override + public TypeInformation getProducedType() { + return typeInfo; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + return sourceReaderFactory.newSourceReader(readerContext); + } + + @Override + public SplitEnumerator> restoreEnumerator( + SplitEnumeratorContext enumContext, + Collection checkpoint) + throws Exception { + return new IteratorSourceEnumerator<>(enumContext, checkpoint); + } + + @Override + public SplitEnumerator> createEnumerator( + final SplitEnumeratorContext enumContext) { + final List splits = + numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism()); + return new IteratorSourceEnumerator<>(enumContext, splits); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return numberSource.getSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer> + getEnumeratorCheckpointSerializer() { + return numberSource.getEnumeratorCheckpointSerializer(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorFunction.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorFunction.java new file mode 100644 index 0000000000000..ea506b2923156 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.connector.source.SourceReaderContext; + +@Experimental +public interface GeneratorFunction extends Function { + + /** + * Initialization method for the function. It is called once before the actual working process + * methods. + */ + default void open(SourceReaderContext readerContext) throws Exception {} + + /** Tear-down method for the function. */ + default void close() throws Exception {} + + O map(T value) throws Exception; +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSourceReaderFactory.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSourceReaderFactory.java new file mode 100644 index 0000000000000..003f058875e61 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSourceReaderFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter; +import org.apache.flink.api.common.io.ratelimiting.RateLimiter; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class GeneratorSourceReaderFactory + implements SourceReaderFactory { + + private final GeneratorFunction generatorFunction; + private final long sourceRatePerSecond; + + public GeneratorSourceReaderFactory( + GeneratorFunction generatorFunction, long sourceRatePerSecond) { + this.generatorFunction = checkNotNull(generatorFunction); + this.sourceRatePerSecond = sourceRatePerSecond; + } + + @Override + public SourceReader newSourceReader( + SourceReaderContext readerContext) { + if (sourceRatePerSecond > 0) { + int parallelism = readerContext.currentParallelism(); + RateLimiter rateLimiter = new GuavaRateLimiter(sourceRatePerSecond, parallelism); + return new RateLimitedSourceReader<>( + new GeneratingIteratorSourceReader<>(readerContext, generatorFunction), + rateLimiter); + } else { + return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java index 53e6a2832b7be..f8814d3c72981 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java @@ -247,7 +247,7 @@ static NumberSequenceSplit deserializeV1(DataInputView in) throws IOException { } } - private static final class CheckpointSerializer + static final class CheckpointSerializer implements SimpleVersionedSerializer> { private static final int CURRENT_VERSION = 1; diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/SourceReaderFactory.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/SourceReaderFactory.java new file mode 100644 index 0000000000000..5b2ce7f95c7c4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/SourceReaderFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; + +import java.io.Serializable; + +public interface SourceReaderFactory extends Serializable { + SourceReader newSourceReader(SourceReaderContext readerContext); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java new file mode 100644 index 0000000000000..8f702724ba384 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.GeneratorFunction; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Iterator; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +@Experimental +public class GeneratingIteratorSourceReader< + E, O, IterT extends Iterator, SplitT extends IteratorSourceSplit> + extends IteratorSourceReaderBase { + + private final GeneratorFunction generatorFunction; + + // TODO: [tmp] kept for compatibility with V3. To remove if V4 is accepted. + public GeneratingIteratorSourceReader( + SourceReaderContext context, MapFunction generatorFunction) { + super(context); + checkNotNull(generatorFunction); + this.generatorFunction = generatorFunction::map; + } + + public GeneratingIteratorSourceReader( + SourceReaderContext context, GeneratorFunction generatorFunction) { + super(context); + this.generatorFunction = checkNotNull(generatorFunction); + } + + // ------------------------------------------------------------------------ + + @Override + public InputStatus pollNext(ReaderOutput output) { + if (iterator != null) { + if (iterator.hasNext()) { + E next = iterator.next(); + try { + O mapped = generatorFunction.map(next); + output.collect(mapped); + } catch (Exception e) { + String message = + String.format( + "A user-provided generator function threw an exception on this input: %s", + next.toString()); + throw new FlinkRuntimeException(message); + } + return InputStatus.MORE_AVAILABLE; + } else { + finishSplit(); + } + } + return tryMoveToNextSplit(); + } + + @Override + public void close() throws Exception { + super.close(); + generatorFunction.close(); + } + + @Override + public void start() { + super.start(); + try { + generatorFunction.open(context); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to open the GeneratorFunction", e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index d7a63c06a186b..396bc57f4f97d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -24,17 +24,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; -import javax.annotation.Nullable; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * A {@link SourceReader} that returns the values of an iterator, supplied via an {@link @@ -52,47 +42,14 @@ @Public public class IteratorSourceReader< E, IterT extends Iterator, SplitT extends IteratorSourceSplit> - implements SourceReader { - - /** The context for this reader, to communicate with the enumerator. */ - private final SourceReaderContext context; - - /** The availability future. This reader is available as soon as a split is assigned. */ - private CompletableFuture availability; - - /** - * The iterator producing data. Non-null after a split has been assigned. This field is null or - * non-null always together with the {@link #currentSplit} field. - */ - @Nullable private IterT iterator; - - /** - * The split whose data we return. Non-null after a split has been assigned. This field is null - * or non-null always together with the {@link #iterator} field. - */ - @Nullable private SplitT currentSplit; - - /** The remaining splits that were assigned but not yet processed. */ - private final Queue remainingSplits; - - private boolean noMoreSplits; + extends IteratorSourceReaderBase { public IteratorSourceReader(SourceReaderContext context) { - this.context = checkNotNull(context); - this.availability = new CompletableFuture<>(); - this.remainingSplits = new ArrayDeque<>(); + super(context); } // ------------------------------------------------------------------------ - @Override - public void start() { - // request a split if we don't have one - if (remainingSplits.isEmpty()) { - context.sendSplitRequest(); - } - } - @Override public InputStatus pollNext(ReaderOutput output) { if (iterator != null) { @@ -103,77 +60,6 @@ public InputStatus pollNext(ReaderOutput output) { finishSplit(); } } - return tryMoveToNextSplit(); } - - private void finishSplit() { - iterator = null; - currentSplit = null; - - // request another split if no other is left - // we do this only here in the finishSplit part to avoid requesting a split - // whenever the reader is polled and doesn't currently have a split - if (remainingSplits.isEmpty() && !noMoreSplits) { - context.sendSplitRequest(); - } - } - - private InputStatus tryMoveToNextSplit() { - currentSplit = remainingSplits.poll(); - if (currentSplit != null) { - iterator = currentSplit.getIterator(); - return InputStatus.MORE_AVAILABLE; - } else if (noMoreSplits) { - return InputStatus.END_OF_INPUT; - } else { - // ensure we are not called in a loop by resetting the availability future - if (availability.isDone()) { - availability = new CompletableFuture<>(); - } - - return InputStatus.NOTHING_AVAILABLE; - } - } - - @Override - public CompletableFuture isAvailable() { - return availability; - } - - @Override - public void addSplits(List splits) { - remainingSplits.addAll(splits); - // set availability so that pollNext is actually called - availability.complete(null); - } - - @Override - public void notifyNoMoreSplits() { - noMoreSplits = true; - // set availability so that pollNext is actually called - availability.complete(null); - } - - @Override - public List snapshotState(long checkpointId) { - if (currentSplit == null && remainingSplits.isEmpty()) { - return Collections.emptyList(); - } - - final ArrayList allSplits = new ArrayList<>(1 + remainingSplits.size()); - if (iterator != null && iterator.hasNext()) { - assert currentSplit != null; - - @SuppressWarnings("unchecked") - final SplitT inProgressSplit = - (SplitT) currentSplit.getUpdatedSplitForIterator(iterator); - allSplits.add(inProgressSplit); - } - allSplits.addAll(remainingSplits); - return allSplits; - } - - @Override - public void close() throws Exception {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java new file mode 100644 index 0000000000000..c8957bc124339 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceReader} that returns the values of an iterator, supplied via an {@link + * IteratorSourceSplit}. + * + *

The {@code IteratorSourceSplit} is also responsible for taking the current iterator and + * turning it back into a split for checkpointing. + * + * @param The type of events returned by the reader. + * @param The type of the iterator that produces the events. This type exists to make the + * conversion between iterator and {@code IteratorSourceSplit} type safe. + * @param The concrete type of the {@code IteratorSourceSplit} that creates and converts + * the iterator that produces this reader's elements. + */ +@Public +abstract class IteratorSourceReaderBase< + E, O, IterT extends Iterator, SplitT extends IteratorSourceSplit> + implements SourceReader { + + /** The context for this reader, to communicate with the enumerator. */ + protected final SourceReaderContext context; + + /** The availability future. This reader is available as soon as a split is assigned. */ + protected CompletableFuture availability; + + /** + * The iterator producing data. Non-null after a split has been assigned. This field is null or + * non-null always together with the {@link #currentSplit} field. + */ + @Nullable protected IterT iterator; + + /** + * The split whose data we return. Non-null after a split has been assigned. This field is null + * or non-null always together with the {@link #iterator} field. + */ + @Nullable protected SplitT currentSplit; + + /** The remaining splits that were assigned but not yet processed. */ + protected final Queue remainingSplits; + + protected boolean noMoreSplits; + + public IteratorSourceReaderBase(SourceReaderContext context) { + this.context = checkNotNull(context); + this.availability = new CompletableFuture<>(); + this.remainingSplits = new ArrayDeque<>(); + } + + // ------------------------------------------------------------------------ + + @Override + public void start() { + // request a split if we don't have one + if (remainingSplits.isEmpty()) { + context.sendSplitRequest(); + } + } + + protected void finishSplit() { + iterator = null; + currentSplit = null; + + // request another split if no other is left + // we do this only here in the finishSplit part to avoid requesting a split + // whenever the reader is polled and doesn't currently have a split + if (remainingSplits.isEmpty() && !noMoreSplits) { + context.sendSplitRequest(); + } + } + + protected InputStatus tryMoveToNextSplit() { + currentSplit = remainingSplits.poll(); + if (currentSplit != null) { + iterator = currentSplit.getIterator(); + return InputStatus.MORE_AVAILABLE; + } else if (noMoreSplits) { + return InputStatus.END_OF_INPUT; + } else { + // ensure we are not called in a loop by resetting the availability future + if (availability.isDone()) { + availability = new CompletableFuture<>(); + } + + return InputStatus.NOTHING_AVAILABLE; + } + } + + @Override + public CompletableFuture isAvailable() { + return availability; + } + + @Override + public void addSplits(List splits) { + remainingSplits.addAll(splits); + // set availability so that pollNext is actually called + availability.complete(null); + } + + @Override + public void notifyNoMoreSplits() { + noMoreSplits = true; + // set availability so that pollNext is actually called + availability.complete(null); + } + + @Override + public List snapshotState(long checkpointId) { + if (currentSplit == null && remainingSplits.isEmpty()) { + return Collections.emptyList(); + } + + final ArrayList allSplits = new ArrayList<>(1 + remainingSplits.size()); + if (iterator != null && iterator.hasNext()) { + assert currentSplit != null; + + @SuppressWarnings("unchecked") + final SplitT inProgressSplit = + (SplitT) currentSplit.getUpdatedSplitForIterator(iterator); + allSplits.add(inProgressSplit); + } + allSplits.addAll(remainingSplits); + return allSplits; + } + + @Override + public void close() throws Exception {} +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java new file mode 100644 index 0000000000000..084b0a945f98d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.io.ratelimiting.RateLimiter; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +@Experimental +public class RateLimitedSourceReader + implements SourceReader { + + private final SourceReader sourceReader; + private final RateLimiter rateLimiter; + + public RateLimitedSourceReader(SourceReader sourceReader, RateLimiter rateLimiter) { + checkNotNull(sourceReader); + checkNotNull(rateLimiter); + this.sourceReader = sourceReader; + this.rateLimiter = rateLimiter; + } + + // ------------------------------------------------------------------------ + + @Override + public void start() { + sourceReader.start(); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + rateLimiter.acquire(); + return sourceReader.pollNext(output); + } + + @Override + public CompletableFuture isAvailable() { + return sourceReader.isAvailable(); + } + + @Override + public void addSplits(List splits) { + sourceReader.addSplits(splits); + } + + @Override + public void notifyNoMoreSplits() { + sourceReader.notifyNoMoreSplits(); + } + + @Override + public List snapshotState(long checkpointId) { + return sourceReader.snapshotState(checkpointId); + } + + @Override + public void close() throws Exception { + sourceReader.close(); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java index 5944201d1ad7d..ce97ff7126911 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java @@ -143,6 +143,11 @@ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {} public UserCodeClassLoader getUserCodeClassLoader() { return SimpleUserCodeClassLoader.create(getClass().getClassLoader()); } + + @Override + public int currentParallelism() { + return 1; + } } private static final class TestingReaderOutput implements ReaderOutput { diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourcePOC.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourcePOC.java new file mode 100644 index 0000000000000..4c568cb2a50cf --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourcePOC.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.wordcount; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.DataGeneratorSourceV4; +import org.apache.flink.api.connector.source.lib.GeneratorFunction; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.time.Duration; + +public class GeneratorSourcePOC { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + int parallelism = 2; + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.setParallelism(parallelism); + + MapFunction generator = index -> ">>> " + index; + + /* V0 */ + /* + DataGeneratorSourceV0 source = + new DataGeneratorSourceV0<>(generator, 1000, Types.STRING); + */ + + /* V1 */ + /* + DataGeneratorSource source = + new DataGeneratorSource<>(generator, 1000, 2, Types.STRING); + */ + + /* V2 */ + /* + + DataGeneratorSourceV2 source = + new DataGeneratorSourceV2<>(generator, 1000, 2, Types.STRING); + */ + + /* V3 */ + /* + DataGeneratorSourceV3 source = + new DataGeneratorSourceV3<>(generator, 1000, 2, Types.STRING); + */ + + GeneratorFunction generatorFunction = + new GeneratorFunction() { + + transient SourceReaderMetricGroup sourceReaderMetricGroup; + + @Override + public void open(SourceReaderContext readerContext) { + sourceReaderMetricGroup = readerContext.metricGroup(); + } + + @Override + public String map(Long value) { + return "Generated: >> " + + value.toString() + + "; local metric group: " + + sourceReaderMetricGroup.hashCode(); + } + }; + + GeneratorFunction generatorFunctionStateless = index -> ">>> " + index; + + DataGeneratorSourceV4 source = + new DataGeneratorSourceV4<>(generatorFunction, 1000, 2, Types.STRING); + + DataStreamSource watermarked = + env.fromSource( + source, + WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), + "watermarked"); + watermarked.print(); + + env.execute("Generator Source POC"); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 53621bd1b6e0b..b379653ceab7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -281,6 +281,11 @@ public void registerReleaseHookIfAbsent( } }; } + + @Override + public int currentParallelism() { + return getRuntimeContext().getNumberOfParallelSubtasks(); + } }; sourceReader = readerFactory.apply(context); diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java index d83f8978684d9..4855ced967b34 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java @@ -86,6 +86,11 @@ public UserCodeClassLoader getUserCodeClassLoader() { return SimpleUserCodeClassLoader.create(getClass().getClassLoader()); } + @Override + public int currentParallelism() { + return 1; + } + // ------------------------------------------------------------------------ public int getNumSplitRequests() {