From a4a4e5bb1daeec7089b937e19cab248a35fa8ff0 Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Wed, 11 May 2022 09:59:36 +0200
Subject: [PATCH 01/12] DataGeneratorSource that is composed of
NumberSequenceSource.
---
.../source/lib/DataGeneratorSource.java | 304 ++++++++++++++++++
1 file changed, 304 insertions(+)
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
new file mode 100644
index 0000000000000..142cffa28519f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.DataGeneratorSource.GeneratorSequenceSplit;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces generators N data points in parallel. This source is useful for
+ * testing and for cases that just need a stream of N events of any kind.
+ *
+ *
The source splits the sequence into as many parallel sub-sequences as there are parallel
+ * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
+ * limited to one, this will produce one sequence in order.
+ *
+ *
This source is always bounded. For very long sequences (for example over the entire domain of
+ * long integer values), user may want to consider executing the application in a streaming manner,
+ * because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
+ */
+@Public
+public class DataGeneratorSource
+ implements Source<
+ OUT, GeneratorSequenceSplit, Collection>>,
+ ResultTypeQueryable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeInformation typeInfo;
+
+ public final MapFunction generatorFunction;
+
+ /** The end Generator in the sequence, inclusive. */
+ private final NumberSequenceSource numberSource;
+
+ /**
+ * Creates a new {@code DataGeneratorSource} that produces count
records in
+ * parallel.
+ *
+ * @param typeInfo the type info
+ * @param generatorFunction the generator function
+ * @param count The count
+ */
+ public DataGeneratorSource(
+ MapFunction generatorFunction, long count, TypeInformation typeInfo) {
+ this.typeInfo = checkNotNull(typeInfo);
+ this.generatorFunction = checkNotNull(generatorFunction);
+ this.numberSource = new NumberSequenceSource(0, count);
+ }
+
+ public long getCount() {
+ return numberSource.getTo();
+ }
+
+ // ------------------------------------------------------------------------
+ // source methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader> createReader(
+ SourceReaderContext readerContext) {
+ return new IteratorSourceReader<>(readerContext);
+ }
+
+ @Override
+ public SplitEnumerator, Collection>>
+ createEnumerator(
+ final SplitEnumeratorContext> enumContext) {
+
+ final List splits =
+ numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism());
+ return new IteratorSourceEnumerator<>(enumContext, wrapSplits(splits, generatorFunction));
+ }
+
+ @Override
+ public SplitEnumerator, Collection>>
+ restoreEnumerator(
+ final SplitEnumeratorContext> enumContext,
+ Collection> checkpoint) {
+ return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer> getSplitSerializer() {
+ return new SplitSerializer<>(numberSource.getSplitSerializer(), generatorFunction);
+ }
+
+ @Override
+ public SimpleVersionedSerializer>>
+ getEnumeratorCheckpointSerializer() {
+ return new CheckpointSerializer<>(
+ numberSource.getEnumeratorCheckpointSerializer(), generatorFunction);
+ }
+
+ // ------------------------------------------------------------------------
+ // splits & checkpoint
+ // ------------------------------------------------------------------------
+ public static class GeneratorSequenceIterator implements Iterator {
+
+ private final MapFunction generatorFunction;
+ private final NumberSequenceIterator numSeqIterator;
+
+ public GeneratorSequenceIterator(
+ NumberSequenceIterator numSeqIterator, MapFunction generatorFunction) {
+ this.generatorFunction = generatorFunction;
+ this.numSeqIterator = numSeqIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numSeqIterator.hasNext();
+ }
+
+ public long getCurrent() {
+ return numSeqIterator.getCurrent();
+ }
+
+ public long getTo() {
+ return numSeqIterator.getTo();
+ }
+
+ @Override
+ public T next() {
+ try {
+ return generatorFunction.map(numSeqIterator.next());
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Exception while generating element %d",
+ numSeqIterator.getCurrent()),
+ e);
+ }
+ }
+ }
+
+ /** A split of the source, representing a Generator sub-sequence. */
+ public static class GeneratorSequenceSplit
+ implements IteratorSourceSplit> {
+
+ public GeneratorSequenceSplit(
+ NumberSequenceSplit numberSequenceSplit, MapFunction generatorFunction) {
+ this.numberSequenceSplit = numberSequenceSplit;
+ this.generatorFunction = generatorFunction;
+ }
+
+ private final NumberSequenceSplit numberSequenceSplit;
+
+ private final MapFunction generatorFunction;
+
+ public GeneratorSequenceIterator getIterator() {
+ return new GeneratorSequenceIterator<>(
+ numberSequenceSplit.getIterator(), generatorFunction);
+ }
+
+ @Override
+ public String splitId() {
+ return numberSequenceSplit.splitId();
+ }
+
+ @Override
+ public IteratorSourceSplit> getUpdatedSplitForIterator(
+ final GeneratorSequenceIterator iterator) {
+ return new GeneratorSequenceSplit<>(
+ (NumberSequenceSplit)
+ numberSequenceSplit.getUpdatedSplitForIterator(iterator.numSeqIterator),
+ generatorFunction);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "GeneratorSequenceSplit [%d, %d] (%s)",
+ numberSequenceSplit.from(),
+ numberSequenceSplit.to(),
+ numberSequenceSplit.splitId());
+ }
+ }
+
+ private static final class SplitSerializer
+ implements SimpleVersionedSerializer> {
+
+ private final SimpleVersionedSerializer numberSplitSerializer;
+ private final MapFunction generatorFunction;
+
+ private SplitSerializer(
+ SimpleVersionedSerializer numberSplitSerializer,
+ MapFunction generatorFunction) {
+ this.numberSplitSerializer = numberSplitSerializer;
+ this.generatorFunction = generatorFunction;
+ }
+
+ @Override
+ public int getVersion() {
+ return numberSplitSerializer.getVersion();
+ }
+
+ @Override
+ public byte[] serialize(GeneratorSequenceSplit split) throws IOException {
+ return numberSplitSerializer.serialize(split.numberSequenceSplit);
+ }
+
+ @Override
+ public GeneratorSequenceSplit deserialize(int version, byte[] serialized)
+ throws IOException {
+ return new GeneratorSequenceSplit<>(
+ numberSplitSerializer.deserialize(version, serialized), generatorFunction);
+ }
+ }
+
+ private static final class CheckpointSerializer
+ implements SimpleVersionedSerializer>> {
+
+ private final SimpleVersionedSerializer>
+ numberCheckpointSerializer;
+ private final MapFunction generatorFunction;
+
+ public CheckpointSerializer(
+ SimpleVersionedSerializer>
+ numberCheckpointSerializer,
+ MapFunction generatorFunction) {
+ this.numberCheckpointSerializer = numberCheckpointSerializer;
+ this.generatorFunction = generatorFunction;
+ }
+
+ @Override
+ public int getVersion() {
+ return numberCheckpointSerializer.getVersion();
+ }
+
+ @Override
+ public byte[] serialize(Collection> checkpoint)
+ throws IOException {
+ return numberCheckpointSerializer.serialize(
+ checkpoint.stream()
+ .map(split -> split.numberSequenceSplit)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public Collection> deserialize(int version, byte[] serialized)
+ throws IOException {
+ Collection numberSequenceSplits =
+ numberCheckpointSerializer.deserialize(version, serialized);
+ return wrapSplits(numberSequenceSplits, generatorFunction);
+ }
+ }
+
+ private static List> wrapSplits(
+ Collection numberSequenceSplits,
+ MapFunction generatorFunction) {
+ return numberSequenceSplits.stream()
+ .map(split -> new GeneratorSequenceSplit<>(split, generatorFunction))
+ .collect(Collectors.toList());
+ }
+}
From 357c036a2fc931aa5ecec706573b07832273e7ea Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Fri, 10 Jun 2022 13:06:46 +0200
Subject: [PATCH 02/12] GeneratorSourceCheck
---
.../wordcount/GeneratorSourceCheck.java | 85 +++++++++++++++++++
1 file changed, 85 insertions(+)
create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
new file mode 100644
index 0000000000000..6c91dcfb41dc6
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.DataGeneratorSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.time.Duration;
+
+public class GeneratorSourceCheck {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.setParallelism(2);
+
+ // MapFunction generator = value -> ">>> " + value;
+ // DataGeneratorSource source = new DataGeneratorSource<>(generator, 10,
+ // Types.STRING);
+ // DataStreamSource watermarked =
+ // env.fromSource(source, WatermarkStrategy.noWatermarks(), "watermarked");
+ // watermarked.print();
+
+ MapFunction generator = index -> ">>> " + index;
+ DataGeneratorSource source = new DataGeneratorSource<>(generator, 10, Types.STRING);
+ DataStreamSource watermarked =
+ env.fromSource(
+ source,
+ WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
+ "watermarked");
+ watermarked.print();
+
+ env.fromSequence(1, 10).print();
+
+ // DataStreamSource ds = env.fromGenerator(generator, 10, Types.STRING);
+ // ds.print();
+
+ /* MapFunction generator = value -> ">>> " + value;
+ GeneratorSource from = GeneratorSource.from(generator, 10, Types.STRING);
+ DataStreamSource watermarked =
+ env.fromSource(from, WatermarkStrategy.noWatermarks(), "watermarked");
+ watermarked.print();*/
+
+ // DataStreamSource ds = env.fromGenerator(generator, 10, Types.STRING);
+ // ds.print();
+
+ // DataStreamSource longDataStreamSource = env.fromSequence(0, 10);
+ // longDataStreamSource.print();
+
+ // ---
+ // MapFunction generator2 = value -> ">>>>>> " + value;
+ // SingleOutputStreamOperator ds2 = env.fromFunction(generator2, 10);
+ // ds2.print();
+
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
+ env.execute("WordCount");
+ }
+}
From 42f1a2a5bf4f6ed2548beb705eccd65795333b26 Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Thu, 30 Jun 2022 20:41:03 +0200
Subject: [PATCH 03/12] WIP: RateLimiter
---
.../source/lib/DataGeneratorSource.java | 75 +++++++++++++----
.../source/lib/util/NoOpRateLimiter.java | 33 ++++++++
.../source/lib/util/RateLimiter.java | 34 ++++++++
.../source/lib/util/SimpleRateLimiter.java | 80 +++++++++++++++++++
.../wordcount/GeneratorSourceCheck.java | 11 ++-
5 files changed, 215 insertions(+), 18 deletions(-)
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
index 142cffa28519f..696d838e6d8e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
@@ -32,11 +32,16 @@
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.connector.source.lib.util.NoOpRateLimiter;
+import org.apache.flink.api.connector.source.lib.util.RateLimiter;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NumberSequenceIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
@@ -63,15 +68,19 @@ public class DataGeneratorSource
OUT, GeneratorSequenceSplit, Collection>>,
ResultTypeQueryable {
+ private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
private static final long serialVersionUID = 1L;
private final TypeInformation typeInfo;
- public final MapFunction generatorFunction;
+ private final MapFunction generatorFunction;
/** The end Generator in the sequence, inclusive. */
private final NumberSequenceSource numberSource;
+ private RateLimiter rateLimiter = new NoOpRateLimiter();
+
/**
* Creates a new {@code DataGeneratorSource} that produces count
records in
* parallel.
@@ -87,6 +96,18 @@ public DataGeneratorSource(
this.numberSource = new NumberSequenceSource(0, count);
}
+ public DataGeneratorSource(
+ MapFunction generatorFunction,
+ long count,
+ RateLimiter rateLimiter,
+ TypeInformation typeInfo) {
+ // TODO: checkArgument(maxPerSecond > 0, "maxPerSeconds has to be a positive number");
+ this.typeInfo = checkNotNull(typeInfo);
+ this.generatorFunction = checkNotNull(generatorFunction);
+ this.numberSource = new NumberSequenceSource(0, count);
+ this.rateLimiter = rateLimiter;
+ }
+
public long getCount() {
return numberSource.getTo();
}
@@ -116,9 +137,12 @@ public SourceReader> createReader(
createEnumerator(
final SplitEnumeratorContext> enumContext) {
+ final int parallelism = enumContext.currentParallelism();
final List splits =
- numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism());
- return new IteratorSourceEnumerator<>(enumContext, wrapSplits(splits, generatorFunction));
+ numberSource.splitNumberRange(0, getCount(), parallelism);
+
+ return new IteratorSourceEnumerator<>(
+ enumContext, wrapSplits(splits, generatorFunction, rateLimiter));
}
@Override
@@ -131,14 +155,15 @@ public SourceReader> createReader(
@Override
public SimpleVersionedSerializer> getSplitSerializer() {
- return new SplitSerializer<>(numberSource.getSplitSerializer(), generatorFunction);
+ return new SplitSerializer<>(
+ numberSource.getSplitSerializer(), generatorFunction, rateLimiter);
}
@Override
public SimpleVersionedSerializer>>
getEnumeratorCheckpointSerializer() {
return new CheckpointSerializer<>(
- numberSource.getEnumeratorCheckpointSerializer(), generatorFunction);
+ numberSource.getEnumeratorCheckpointSerializer(), generatorFunction, rateLimiter);
}
// ------------------------------------------------------------------------
@@ -148,11 +173,15 @@ public static class GeneratorSequenceIterator implements Iterator {
private final MapFunction generatorFunction;
private final NumberSequenceIterator numSeqIterator;
+ private final RateLimiter rateLimiter;
public GeneratorSequenceIterator(
- NumberSequenceIterator numSeqIterator, MapFunction generatorFunction) {
+ NumberSequenceIterator numSeqIterator,
+ MapFunction generatorFunction,
+ RateLimiter rateLimiter) {
this.generatorFunction = generatorFunction;
this.numSeqIterator = numSeqIterator;
+ this.rateLimiter = rateLimiter;
}
@Override
@@ -171,6 +200,8 @@ public long getTo() {
@Override
public T next() {
try {
+ LOG.error("NEXT!");
+ rateLimiter.acquire();
return generatorFunction.map(numSeqIterator.next());
} catch (Exception e) {
throw new FlinkRuntimeException(
@@ -187,18 +218,22 @@ public static class GeneratorSequenceSplit
implements IteratorSourceSplit> {
public GeneratorSequenceSplit(
- NumberSequenceSplit numberSequenceSplit, MapFunction generatorFunction) {
+ NumberSequenceSplit numberSequenceSplit,
+ MapFunction generatorFunction,
+ RateLimiter rateLimiter) {
this.numberSequenceSplit = numberSequenceSplit;
this.generatorFunction = generatorFunction;
+ this.rateLimiter = rateLimiter;
}
private final NumberSequenceSplit numberSequenceSplit;
private final MapFunction generatorFunction;
+ private final RateLimiter rateLimiter;
public GeneratorSequenceIterator getIterator() {
return new GeneratorSequenceIterator<>(
- numberSequenceSplit.getIterator(), generatorFunction);
+ numberSequenceSplit.getIterator(), generatorFunction, rateLimiter);
}
@Override
@@ -212,7 +247,8 @@ public IteratorSourceSplit> getUpdatedSplitForIt
return new GeneratorSequenceSplit<>(
(NumberSequenceSplit)
numberSequenceSplit.getUpdatedSplitForIterator(iterator.numSeqIterator),
- generatorFunction);
+ generatorFunction,
+ rateLimiter);
}
@Override
@@ -230,12 +266,15 @@ private static final class SplitSerializer
private final SimpleVersionedSerializer numberSplitSerializer;
private final MapFunction generatorFunction;
+ private final RateLimiter rateLimiter;
private SplitSerializer(
SimpleVersionedSerializer numberSplitSerializer,
- MapFunction generatorFunction) {
+ MapFunction generatorFunction,
+ RateLimiter rateLimiter) {
this.numberSplitSerializer = numberSplitSerializer;
this.generatorFunction = generatorFunction;
+ this.rateLimiter = rateLimiter;
}
@Override
@@ -252,7 +291,9 @@ public byte[] serialize(GeneratorSequenceSplit split) throws IOException {
public GeneratorSequenceSplit deserialize(int version, byte[] serialized)
throws IOException {
return new GeneratorSequenceSplit<>(
- numberSplitSerializer.deserialize(version, serialized), generatorFunction);
+ numberSplitSerializer.deserialize(version, serialized),
+ generatorFunction,
+ rateLimiter);
}
}
@@ -262,13 +303,16 @@ private static final class CheckpointSerializer
private final SimpleVersionedSerializer>
numberCheckpointSerializer;
private final MapFunction generatorFunction;
+ private final RateLimiter throttler;
public CheckpointSerializer(
SimpleVersionedSerializer>
numberCheckpointSerializer,
- MapFunction generatorFunction) {
+ MapFunction generatorFunction,
+ RateLimiter throttler) {
this.numberCheckpointSerializer = numberCheckpointSerializer;
this.generatorFunction = generatorFunction;
+ this.throttler = throttler;
}
@Override
@@ -290,15 +334,16 @@ public Collection> deserialize(int version, byte[] ser
throws IOException {
Collection numberSequenceSplits =
numberCheckpointSerializer.deserialize(version, serialized);
- return wrapSplits(numberSequenceSplits, generatorFunction);
+ return wrapSplits(numberSequenceSplits, generatorFunction, throttler);
}
}
private static List> wrapSplits(
Collection numberSequenceSplits,
- MapFunction generatorFunction) {
+ MapFunction generatorFunction,
+ RateLimiter throttler) {
return numberSequenceSplits.stream()
- .map(split -> new GeneratorSequenceSplit<>(split, generatorFunction))
+ .map(split -> new GeneratorSequenceSplit<>(split, generatorFunction, throttler))
.collect(Collectors.toList());
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java
new file mode 100644
index 0000000000000..9ba4f69e44f8b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoOpRateLimiter implements RateLimiter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NoOpRateLimiter.class);
+
+ @Override
+ public int acquire() throws InterruptedException {
+ LOG.error("!!!THROTTLE!");
+ return 0;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java
new file mode 100644
index 0000000000000..dcd11b0d4c932
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.io.Serializable;
+
+/** The interface that can be used to throttle execution of methods. */
+public interface RateLimiter extends Serializable {
+
+ /**
+ * Acquire method is a blocking call that is intended to be used in places where it is required
+ * to limit the rate at which results are produced or other functions are called.
+ *
+ * @return The number of milliseconds this call blocked its caller.
+ * @throws InterruptedException The interrupted exception.
+ */
+ int acquire() throws InterruptedException;
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java
new file mode 100644
index 0000000000000..8dfc905af12d2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utility to throttle a thread to a given number of executions (records) per second. */
+public final class SimpleRateLimiter implements RateLimiter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleRateLimiter.class);
+
+ private long maxPerBucket;
+ private long nanosInBucket;
+
+ private long endOfCurrentBucketNanos;
+ private int inCurrentBucket;
+
+ private static final int DEFAULT_BUCKETS_PER_SECOND = 10;
+ private static final long NANOS_IN_ONE_SECOND = 1_000_000_000L;
+
+ public SimpleRateLimiter(long maxPerSecond, int numParallelExecutors) {
+ this(maxPerSecond, numParallelExecutors, DEFAULT_BUCKETS_PER_SECOND);
+ }
+
+ public SimpleRateLimiter(long maxPerSecond, int numParallelExecutors, int bucketsPerSecond) {
+ checkArgument(maxPerSecond > 0, "maxPerSecond must be a positive number");
+ checkArgument(numParallelExecutors > 0, "numParallelExecutors must be greater than 0");
+
+ final float maxPerSecondPerSubtask = (float) maxPerSecond / numParallelExecutors;
+
+ maxPerBucket = ((int) (maxPerSecondPerSubtask / bucketsPerSecond)) + 1;
+ nanosInBucket = ((int) (NANOS_IN_ONE_SECOND / maxPerSecondPerSubtask)) * maxPerBucket;
+
+ this.endOfCurrentBucketNanos = System.nanoTime() + nanosInBucket;
+ this.inCurrentBucket = 0;
+ }
+
+ // TODO: JavaDoc, returns number of seconds idling on this call.
+ public int acquire() throws InterruptedException {
+ LOG.error("THROTTLE!");
+ if (++inCurrentBucket != maxPerBucket) {
+ return 0;
+ }
+ // The current bucket is "full". Wait until the next bucket.
+ final long now = System.nanoTime();
+ final int millisRemaining = (int) ((endOfCurrentBucketNanos - now) / 1_000_000);
+ inCurrentBucket = 0;
+
+ if (millisRemaining > 0) {
+ endOfCurrentBucketNanos += nanosInBucket;
+ Thread.sleep(millisRemaining);
+ return millisRemaining;
+ } else {
+ // Throttle was not called often enough so that the bucket's capacity was not exhausted
+ // "in time". We need to push the bucket's "end time" further to compensate and avoid
+ // bursts in case polling behaviour catches up.
+ endOfCurrentBucketNanos = now + nanosInBucket;
+ return 0;
+ }
+ }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
index 6c91dcfb41dc6..e191494fb8aee 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
@@ -23,6 +23,8 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.lib.DataGeneratorSource;
+import org.apache.flink.api.connector.source.lib.util.RateLimiter;
+import org.apache.flink.api.connector.source.lib.util.SimpleRateLimiter;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -37,8 +39,9 @@ public class GeneratorSourceCheck {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ int parallelism = 2;
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- env.setParallelism(2);
+ env.setParallelism(parallelism);
// MapFunction generator = value -> ">>> " + value;
// DataGeneratorSource source = new DataGeneratorSource<>(generator, 10,
@@ -48,7 +51,9 @@ public static void main(String[] args) throws Exception {
// watermarked.print();
MapFunction generator = index -> ">>> " + index;
- DataGeneratorSource source = new DataGeneratorSource<>(generator, 10, Types.STRING);
+ RateLimiter throttler = new SimpleRateLimiter(1, parallelism);
+ DataGeneratorSource source =
+ new DataGeneratorSource<>(generator, 100, throttler, Types.STRING);
DataStreamSource watermarked =
env.fromSource(
source,
@@ -56,7 +61,7 @@ public static void main(String[] args) throws Exception {
"watermarked");
watermarked.print();
- env.fromSequence(1, 10).print();
+ // env.fromSequence(1, 10).print();
// DataStreamSource ds = env.fromGenerator(generator, 10, Types.STRING);
// ds.print();
From 71ac36d649b1ff184e3336baaf7d645de78a07bf Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Fri, 1 Jul 2022 01:03:02 +0200
Subject: [PATCH 04/12] WIP: parallelism in Splits
---
.../source/lib/DataGeneratorSource.java | 181 ++++++++++++------
1 file changed, 125 insertions(+), 56 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
index 696d838e6d8e8..18bac125d4895 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
@@ -32,10 +32,14 @@
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
-import org.apache.flink.api.connector.source.lib.util.NoOpRateLimiter;
import org.apache.flink.api.connector.source.lib.util.RateLimiter;
+import org.apache.flink.api.connector.source.lib.util.SimpleRateLimiter;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NumberSequenceIterator;
@@ -43,11 +47,14 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -79,7 +86,7 @@ public class DataGeneratorSource
/** The end Generator in the sequence, inclusive. */
private final NumberSequenceSource numberSource;
- private RateLimiter rateLimiter = new NoOpRateLimiter();
+ private long maxPerSecond = -1;
/**
* Creates a new {@code DataGeneratorSource} that produces count
records in
@@ -99,13 +106,13 @@ public DataGeneratorSource(
public DataGeneratorSource(
MapFunction generatorFunction,
long count,
- RateLimiter rateLimiter,
+ long maxPerSecond,
TypeInformation typeInfo) {
- // TODO: checkArgument(maxPerSecond > 0, "maxPerSeconds has to be a positive number");
+ checkArgument(maxPerSecond > 0, "maxPerSeconds has to be a positive number");
this.typeInfo = checkNotNull(typeInfo);
this.generatorFunction = checkNotNull(generatorFunction);
this.numberSource = new NumberSequenceSource(0, count);
- this.rateLimiter = rateLimiter;
+ this.maxPerSecond = maxPerSecond;
}
public long getCount() {
@@ -142,7 +149,7 @@ public SourceReader> createReader(
numberSource.splitNumberRange(0, getCount(), parallelism);
return new IteratorSourceEnumerator<>(
- enumContext, wrapSplits(splits, generatorFunction, rateLimiter));
+ enumContext, wrapSplits(splits, generatorFunction, maxPerSecond, parallelism));
}
@Override
@@ -155,15 +162,13 @@ public SourceReader> createReader(
@Override
public SimpleVersionedSerializer> getSplitSerializer() {
- return new SplitSerializer<>(
- numberSource.getSplitSerializer(), generatorFunction, rateLimiter);
+ return new SplitSerializer<>(generatorFunction, maxPerSecond);
}
@Override
public SimpleVersionedSerializer>>
getEnumeratorCheckpointSerializer() {
- return new CheckpointSerializer<>(
- numberSource.getEnumeratorCheckpointSerializer(), generatorFunction, rateLimiter);
+ return new CheckpointSerializer<>(generatorFunction, maxPerSecond);
}
// ------------------------------------------------------------------------
@@ -178,10 +183,11 @@ public static class GeneratorSequenceIterator implements Iterator {
public GeneratorSequenceIterator(
NumberSequenceIterator numSeqIterator,
MapFunction generatorFunction,
- RateLimiter rateLimiter) {
+ long maxPerSecond,
+ int parallelism) {
this.generatorFunction = generatorFunction;
this.numSeqIterator = numSeqIterator;
- this.rateLimiter = rateLimiter;
+ this.rateLimiter = new SimpleRateLimiter(maxPerSecond, parallelism);
}
@Override
@@ -200,7 +206,6 @@ public long getTo() {
@Override
public T next() {
try {
- LOG.error("NEXT!");
rateLimiter.acquire();
return generatorFunction.map(numSeqIterator.next());
} catch (Exception e) {
@@ -220,20 +225,25 @@ public static class GeneratorSequenceSplit
public GeneratorSequenceSplit(
NumberSequenceSplit numberSequenceSplit,
MapFunction generatorFunction,
- RateLimiter rateLimiter) {
+ long maxPerSecond,
+ int parallelism) {
this.numberSequenceSplit = numberSequenceSplit;
this.generatorFunction = generatorFunction;
- this.rateLimiter = rateLimiter;
+ this.maxPerSecond = maxPerSecond;
+ this.parallelism = parallelism;
}
private final NumberSequenceSplit numberSequenceSplit;
-
private final MapFunction generatorFunction;
- private final RateLimiter rateLimiter;
+ private final long maxPerSecond;
+ private final int parallelism;
public GeneratorSequenceIterator getIterator() {
return new GeneratorSequenceIterator<>(
- numberSequenceSplit.getIterator(), generatorFunction, rateLimiter);
+ numberSequenceSplit.getIterator(),
+ generatorFunction,
+ maxPerSecond,
+ parallelism);
}
@Override
@@ -248,7 +258,8 @@ public IteratorSourceSplit> getUpdatedSplitForIt
(NumberSequenceSplit)
numberSequenceSplit.getUpdatedSplitForIterator(iterator.numSeqIterator),
generatorFunction,
- rateLimiter);
+ maxPerSecond,
+ parallelism);
}
@Override
@@ -264,86 +275,144 @@ public String toString() {
private static final class SplitSerializer
implements SimpleVersionedSerializer> {
- private final SimpleVersionedSerializer numberSplitSerializer;
+ private static final int CURRENT_VERSION = 1;
+
private final MapFunction generatorFunction;
- private final RateLimiter rateLimiter;
+ private final long maxPerSecond;
- private SplitSerializer(
- SimpleVersionedSerializer numberSplitSerializer,
- MapFunction generatorFunction,
- RateLimiter rateLimiter) {
- this.numberSplitSerializer = numberSplitSerializer;
+ private SplitSerializer(MapFunction generatorFunction, long maxPerSecond) {
this.generatorFunction = generatorFunction;
- this.rateLimiter = rateLimiter;
+ this.maxPerSecond = maxPerSecond;
}
@Override
public int getVersion() {
- return numberSplitSerializer.getVersion();
+ return CURRENT_VERSION;
}
@Override
public byte[] serialize(GeneratorSequenceSplit split) throws IOException {
- return numberSplitSerializer.serialize(split.numberSequenceSplit);
+ checkArgument(
+ split.getClass() == GeneratorSequenceSplit.class,
+ "cannot serialize subclasses");
+
+ // We will serialize 2 longs (16 bytes) plus the UTF representation of the string (2 +
+ // length)
+ final DataOutputSerializer out =
+ new DataOutputSerializer(split.splitId().length() + 18);
+ serializeV1(out, split);
+ return out.getCopyOfBuffer();
}
@Override
public GeneratorSequenceSplit deserialize(int version, byte[] serialized)
throws IOException {
+ if (version != CURRENT_VERSION) {
+ throw new IOException("Unrecognized version: " + version);
+ }
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+ return deserializeV1(in, generatorFunction, maxPerSecond);
+ }
+
+ static void serializeV1(DataOutputView out, GeneratorSequenceSplit split)
+ throws IOException {
+ serializeNumberSequenceSplit(out, split.numberSequenceSplit);
+ out.writeInt(split.parallelism);
+ }
+
+ static void serializeNumberSequenceSplit(
+ DataOutputView out, NumberSequenceSplit numberSequenceSplit) throws IOException {
+ out.writeUTF(numberSequenceSplit.splitId());
+ out.writeLong(numberSequenceSplit.from());
+ out.writeLong(numberSequenceSplit.to());
+ }
+
+ static GeneratorSequenceSplit deserializeV1(
+ DataInputView in, MapFunction generatorFunction, long maxPerSecond)
+ throws IOException {
+ NumberSequenceSplit numberSequenceSplit = deserializeNumberSequenceSplit(in);
+ int parallelism = in.readInt();
return new GeneratorSequenceSplit<>(
- numberSplitSerializer.deserialize(version, serialized),
- generatorFunction,
- rateLimiter);
+ numberSequenceSplit, generatorFunction, maxPerSecond, parallelism);
+ }
+
+ private static NumberSequenceSplit deserializeNumberSequenceSplit(DataInputView in)
+ throws IOException {
+ return new NumberSequenceSplit(in.readUTF(), in.readLong(), in.readLong());
}
}
private static final class CheckpointSerializer
implements SimpleVersionedSerializer>> {
- private final SimpleVersionedSerializer>
- numberCheckpointSerializer;
- private final MapFunction generatorFunction;
- private final RateLimiter throttler;
-
- public CheckpointSerializer(
- SimpleVersionedSerializer>
- numberCheckpointSerializer,
- MapFunction generatorFunction,
- RateLimiter throttler) {
- this.numberCheckpointSerializer = numberCheckpointSerializer;
- this.generatorFunction = generatorFunction;
- this.throttler = throttler;
- }
+ private static final int CURRENT_VERSION = 1;
@Override
public int getVersion() {
- return numberCheckpointSerializer.getVersion();
+ return CURRENT_VERSION;
+ }
+
+ private final MapFunction generatorFunction;
+ private final long maxPerSecond;
+
+ public CheckpointSerializer(MapFunction generatorFunction, long maxPerSecond) {
+ this.generatorFunction = generatorFunction;
+ this.maxPerSecond = maxPerSecond;
}
@Override
public byte[] serialize(Collection> checkpoint)
throws IOException {
- return numberCheckpointSerializer.serialize(
- checkpoint.stream()
- .map(split -> split.numberSequenceSplit)
- .collect(Collectors.toList()));
+ // Each split needs 2 longs (16 bytes) plus the UTG representation of the string (2 +
+ // length).
+ // Assuming at most 4 digit split IDs, 22 bytes per split avoids any intermediate array
+ // resizing.
+ // Plus four bytes for the length field.
+ // Plus four bytes for the parallelism.
+ final DataOutputSerializer out =
+ new DataOutputSerializer(checkpoint.size() * 22 + 4 + 4);
+ out.writeInt(checkpoint.size());
+ for (GeneratorSequenceSplit split : checkpoint) {
+ DataGeneratorSource.SplitSerializer.serializeNumberSequenceSplit(
+ out, split.numberSequenceSplit);
+ }
+
+ final Optional> aSplit = checkpoint.stream().findFirst();
+ if (aSplit.isPresent()) {
+ int parallelism = aSplit.get().parallelism;
+ out.writeInt(parallelism);
+ }
+
+ return out.getCopyOfBuffer();
}
@Override
public Collection> deserialize(int version, byte[] serialized)
throws IOException {
- Collection numberSequenceSplits =
- numberCheckpointSerializer.deserialize(version, serialized);
- return wrapSplits(numberSequenceSplits, generatorFunction, throttler);
+ if (version != CURRENT_VERSION) {
+ throw new IOException("Unrecognized version: " + version);
+ }
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+ final int num = in.readInt();
+ final List result = new ArrayList<>(num);
+ for (int remaining = num; remaining > 0; remaining--) {
+ result.add(SplitSerializer.deserializeNumberSequenceSplit(in));
+ }
+ final int parallelism = in.readInt();
+ return wrapSplits(result, generatorFunction, maxPerSecond, parallelism);
}
}
private static List> wrapSplits(
Collection numberSequenceSplits,
MapFunction generatorFunction,
- RateLimiter throttler) {
+ long maxPerSecond,
+ int parallelism) {
return numberSequenceSplits.stream()
- .map(split -> new GeneratorSequenceSplit<>(split, generatorFunction, throttler))
+ .map(
+ split ->
+ new GeneratorSequenceSplit<>(
+ split, generatorFunction, maxPerSecond, parallelism))
.collect(Collectors.toList());
}
}
From f5e84f9cd14ad518a051c77370dd167ce09aa29f Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Fri, 1 Jul 2022 17:02:39 +0200
Subject: [PATCH 05/12] WIP: Move ratelimiters
---
.../ratelimiting/BucketingRateLimiter.java} | 18 +++------
.../io/ratelimiting/GuavaRateLimiter.java | 37 +++++++++++++++++++
.../io/ratelimiting}/NoOpRateLimiter.java | 8 +---
.../io/ratelimiting}/RateLimiter.java | 12 +++---
.../source/lib/DataGeneratorSource.java | 11 ++++--
.../wordcount/GeneratorSourceCheck.java | 10 +++--
6 files changed, 63 insertions(+), 33 deletions(-)
rename flink-core/src/main/java/org/apache/flink/api/{connector/source/lib/util/SimpleRateLimiter.java => common/io/ratelimiting/BucketingRateLimiter.java} (84%)
create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java
rename flink-core/src/main/java/org/apache/flink/api/{connector/source/lib/util => common/io/ratelimiting}/NoOpRateLimiter.java (80%)
rename flink-core/src/main/java/org/apache/flink/api/{connector/source/lib/util => common/io/ratelimiting}/RateLimiter.java (73%)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/BucketingRateLimiter.java
similarity index 84%
rename from flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java
rename to flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/BucketingRateLimiter.java
index 8dfc905af12d2..8ff619c2bf30f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SimpleRateLimiter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/BucketingRateLimiter.java
@@ -16,20 +16,15 @@
* limitations under the License.
*/
-package org.apache.flink.api.connector.source.lib.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.flink.api.common.io.ratelimiting;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Utility to throttle a thread to a given number of executions (records) per second. */
-public final class SimpleRateLimiter implements RateLimiter {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleRateLimiter.class);
+public final class BucketingRateLimiter implements RateLimiter {
- private long maxPerBucket;
- private long nanosInBucket;
+ private final long maxPerBucket;
+ private final long nanosInBucket;
private long endOfCurrentBucketNanos;
private int inCurrentBucket;
@@ -37,11 +32,11 @@ public final class SimpleRateLimiter implements RateLimiter {
private static final int DEFAULT_BUCKETS_PER_SECOND = 10;
private static final long NANOS_IN_ONE_SECOND = 1_000_000_000L;
- public SimpleRateLimiter(long maxPerSecond, int numParallelExecutors) {
+ public BucketingRateLimiter(long maxPerSecond, int numParallelExecutors) {
this(maxPerSecond, numParallelExecutors, DEFAULT_BUCKETS_PER_SECOND);
}
- public SimpleRateLimiter(long maxPerSecond, int numParallelExecutors, int bucketsPerSecond) {
+ public BucketingRateLimiter(long maxPerSecond, int numParallelExecutors, int bucketsPerSecond) {
checkArgument(maxPerSecond > 0, "maxPerSecond must be a positive number");
checkArgument(numParallelExecutors > 0, "numParallelExecutors must be greater than 0");
@@ -56,7 +51,6 @@ public SimpleRateLimiter(long maxPerSecond, int numParallelExecutors, int bucket
// TODO: JavaDoc, returns number of seconds idling on this call.
public int acquire() throws InterruptedException {
- LOG.error("THROTTLE!");
if (++inCurrentBucket != maxPerBucket) {
return 0;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java
new file mode 100644
index 0000000000000..264e8fff1d0ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/GuavaRateLimiter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io.ratelimiting;
+
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
+public class GuavaRateLimiter implements org.apache.flink.api.common.io.ratelimiting.RateLimiter {
+
+ private final RateLimiter rateLimiter;
+
+ public GuavaRateLimiter(long maxPerSecond, int numParallelExecutors) {
+ final float maxPerSecondPerSubtask = (float) maxPerSecond / numParallelExecutors;
+ this.rateLimiter = RateLimiter.create(maxPerSecondPerSubtask);
+ }
+
+ @Override
+ public int acquire() {
+ return (int) rateLimiter.acquire();
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/NoOpRateLimiter.java
similarity index 80%
rename from flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java
rename to flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/NoOpRateLimiter.java
index 9ba4f69e44f8b..10d55c4211ca8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoOpRateLimiter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/NoOpRateLimiter.java
@@ -16,18 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.api.connector.source.lib.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.flink.api.common.io.ratelimiting;
public class NoOpRateLimiter implements RateLimiter {
- private static final Logger LOG = LoggerFactory.getLogger(NoOpRateLimiter.class);
-
@Override
public int acquire() throws InterruptedException {
- LOG.error("!!!THROTTLE!");
return 0;
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java
similarity index 73%
rename from flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java
rename to flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java
index dcd11b0d4c932..577897f41ae51 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ratelimiting/RateLimiter.java
@@ -9,14 +9,14 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.flink.api.connector.source.lib.util;
+package org.apache.flink.api.common.io.ratelimiting;
import java.io.Serializable;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
index 18bac125d4895..224a144d9acc7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java
@@ -20,6 +20,9 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.NoOpRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
@@ -32,8 +35,6 @@
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
-import org.apache.flink.api.connector.source.lib.util.RateLimiter;
-import org.apache.flink.api.connector.source.lib.util.SimpleRateLimiter;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -178,7 +179,7 @@ public static class GeneratorSequenceIterator implements Iterator {
private final MapFunction generatorFunction;
private final NumberSequenceIterator numSeqIterator;
- private final RateLimiter rateLimiter;
+ private RateLimiter rateLimiter = new NoOpRateLimiter();
public GeneratorSequenceIterator(
NumberSequenceIterator numSeqIterator,
@@ -187,7 +188,9 @@ public GeneratorSequenceIterator(
int parallelism) {
this.generatorFunction = generatorFunction;
this.numSeqIterator = numSeqIterator;
- this.rateLimiter = new SimpleRateLimiter(maxPerSecond, parallelism);
+ if (maxPerSecond > 0) {
+ this.rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
+ }
}
@Override
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
index e191494fb8aee..d4b4b1907b283 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
@@ -21,10 +21,10 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ratelimiting.BucketingRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.lib.DataGeneratorSource;
-import org.apache.flink.api.connector.source.lib.util.RateLimiter;
-import org.apache.flink.api.connector.source.lib.util.SimpleRateLimiter;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,9 +51,11 @@ public static void main(String[] args) throws Exception {
// watermarked.print();
MapFunction generator = index -> ">>> " + index;
- RateLimiter throttler = new SimpleRateLimiter(1, parallelism);
+ RateLimiter rateLimiter = new BucketingRateLimiter(1, parallelism);
+ // DataGeneratorSource source =
+ // new DataGeneratorSource<>(generator, 100, rateLimiter, Types.STRING);
DataGeneratorSource source =
- new DataGeneratorSource<>(generator, 100, throttler, Types.STRING);
+ new DataGeneratorSource<>(generator, 10, 2, Types.STRING);
DataStreamSource watermarked =
env.fromSource(
source,
From 78f441da34b16e40ee23fa3a01a9b184192c94d2 Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Mon, 4 Jul 2022 13:44:35 +0200
Subject: [PATCH 06/12] WIP: parallelism in SourceReaderContext
---
.../connector/file/src/FileSourceHeavyThroughputTest.java | 5 +++++
.../flink/api/connector/source/SourceReaderContext.java | 3 +++
.../api/connector/source/lib/NumberSequenceSourceTest.java | 5 +++++
.../testutils/source/reader/TestingReaderContext.java | 5 +++++
4 files changed, 18 insertions(+)
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 3c175a80c8d58..bb44462ab051e 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -225,6 +225,11 @@ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
}
private static final class NoOpReaderOutput implements ReaderOutput {
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 304afddf90a83..d5763cc41ae70 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -63,4 +63,7 @@ public interface SourceReaderContext {
* @see UserCodeClassLoader
*/
UserCodeClassLoader getUserCodeClassLoader();
+
+ // TODO: add javadoc
+ int currentParallelism();
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index 5944201d1ad7d..ce97ff7126911 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -143,6 +143,11 @@ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
}
private static final class TestingReaderOutput implements ReaderOutput {
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index d83f8978684d9..4855ced967b34 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -86,6 +86,11 @@ public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
+
// ------------------------------------------------------------------------
public int getNumSplitRequests() {
From 460a61aa50b44ccd9c283ed11fb4ef6918279953 Mon Sep 17 00:00:00 2001
From: Alexander Fedulov <1492164+afedulov@users.noreply.github.com>
Date: Mon, 4 Jul 2022 17:10:58 +0200
Subject: [PATCH 07/12] WIP: working V2
---
.../src/FileSourceHeavyThroughputTest.java | 6 +
.../connector/source/SourceReaderContext.java | 14 +-
.../source/lib/DataGeneratorSourceV2.java | 324 ++++++++++++++++++
.../RateLimitedIteratorSourceReaderNew.java | 98 ++++++
.../source/lib/NumberSequenceSourceTest.java | 6 +
.../wordcount/GeneratorSourceCheck.java | 11 +-
.../api/operators/SourceOperator.java | 12 +
7 files changed, 465 insertions(+), 6 deletions(-)
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV2.java
create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedIteratorSourceReaderNew.java
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index bb44462ab051e..4528b221c1efa 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.src;
import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
@@ -230,6 +231,11 @@ public UserCodeClassLoader getUserCodeClassLoader() {
public int currentParallelism() {
return 1;
}
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return null;
+ }
}
private static final class NoOpReaderOutput implements ReaderOutput {
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index d5763cc41ae70..bbce2687af46e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
@@ -27,7 +28,9 @@
@Public
public interface SourceReaderContext {
- /** @return The metric group this source belongs to. */
+ /**
+ * @return The metric group this source belongs to.
+ */
SourceReaderMetricGroup metricGroup();
/** Gets the configuration with which Flink was started. */
@@ -39,7 +42,9 @@ public interface SourceReaderContext {
*/
String getLocalHostName();
- /** @return The index of this subtask. */
+ /**
+ * @return The index of this subtask.
+ */
int getIndexOfSubtask();
/**
@@ -64,6 +69,9 @@ public interface SourceReaderContext {
*/
UserCodeClassLoader getUserCodeClassLoader();
- // TODO: add javadoc
+ // TODO: add JavaDoc
int currentParallelism();
+
+ // TODO: add JavaDoc
+ RuntimeContext getRuntimeContext();
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV2.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV2.java
new file mode 100644
index 0000000000000..894f9908db842
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV2.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter;
+import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.connector.source.lib.util.RateLimitedIteratorSourceReaderNew;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces generators N data points in parallel. This source is useful for
+ * testing and for cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as there are parallel
+ * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
+ * limited to one, this will produce one sequence in order.
+ *
+ *
This source is always bounded. For very long sequences (for example over the entire domain of
+ * long integer values), user may want to consider executing the application in a streaming manner,
+ * because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
+ */
+@Public
+public class DataGeneratorSourceV2
+ implements Source<
+ OUT,
+ DataGeneratorSourceV2.GeneratorSequenceSplit,
+ Collection>>,
+ ResultTypeQueryable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeInformation typeInfo;
+
+ public final MapFunction generatorFunction;
+
+ /** The end Generator in the sequence, inclusive. */
+ private final NumberSequenceSource numberSource;
+
+ private long maxPerSecond = -1;
+
+ /**
+ * Creates a new {@code DataGeneratorSource} that produces count
records in
+ * parallel.
+ *
+ * @param typeInfo the type info
+ * @param generatorFunction the generator function
+ * @param count The count
+ */
+ public DataGeneratorSourceV2(
+ MapFunction generatorFunction, long count, TypeInformation typeInfo) {
+ this.typeInfo = checkNotNull(typeInfo);
+ this.generatorFunction = checkNotNull(generatorFunction);
+ this.numberSource = new NumberSequenceSource(0, count);
+ }
+
+ public DataGeneratorSourceV2(
+ MapFunction generatorFunction,
+ long count,
+ long maxPerSecond,
+ TypeInformation typeInfo) {
+ checkArgument(maxPerSecond > 0, "maxPerSeconds has to be a positive number");
+ this.typeInfo = checkNotNull(typeInfo);
+ this.generatorFunction = checkNotNull(generatorFunction);
+ this.numberSource = new NumberSequenceSource(0, count);
+ this.maxPerSecond = maxPerSecond;
+ }
+
+ public long getCount() {
+ return numberSource.getTo();
+ }
+
+ // ------------------------------------------------------------------------
+ // source methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader> createReader(
+ SourceReaderContext readerContext) {
+ RateLimiter rateLimiter =
+ new GuavaRateLimiter(maxPerSecond, readerContext.currentParallelism());
+ return new RateLimitedIteratorSourceReaderNew<>(readerContext, rateLimiter);
+ }
+
+ @Override
+ public SplitEnumerator, Collection>>
+ createEnumerator(
+ final SplitEnumeratorContext> enumContext) {
+
+ final List splits =
+ numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism());
+ return new IteratorSourceEnumerator<>(enumContext, wrapSplits(splits, generatorFunction));
+ }
+
+ @Override
+ public SplitEnumerator, Collection>>
+ restoreEnumerator(
+ final SplitEnumeratorContext> enumContext,
+ Collection> checkpoint) {
+ return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer> getSplitSerializer() {
+ return new SplitSerializer<>(numberSource.getSplitSerializer(), generatorFunction);
+ }
+
+ @Override
+ public SimpleVersionedSerializer>>
+ getEnumeratorCheckpointSerializer() {
+ return new CheckpointSerializer<>(
+ numberSource.getEnumeratorCheckpointSerializer(), generatorFunction);
+ }
+
+ // ------------------------------------------------------------------------
+ // splits & checkpoint
+ // ------------------------------------------------------------------------
+ public static class GeneratorSequenceIterator implements Iterator {
+
+ private final MapFunction generatorFunction;
+ private final NumberSequenceIterator numSeqIterator;
+
+ public GeneratorSequenceIterator(
+ NumberSequenceIterator numSeqIterator, MapFunction generatorFunction) {
+ this.generatorFunction = generatorFunction;
+ this.numSeqIterator = numSeqIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numSeqIterator.hasNext();
+ }
+
+ public long getCurrent() {
+ return numSeqIterator.getCurrent();
+ }
+
+ public long getTo() {
+ return numSeqIterator.getTo();
+ }
+
+ @Override
+ public T next() {
+ try {
+ return generatorFunction.map(numSeqIterator.next());
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Exception while generating element %d",
+ numSeqIterator.getCurrent()),
+ e);
+ }
+ }
+ }
+
+ /** A split of the source, representing a Generator sub-sequence. */
+ public static class GeneratorSequenceSplit
+ implements IteratorSourceSplit> {
+
+ public GeneratorSequenceSplit(
+ NumberSequenceSplit numberSequenceSplit, MapFunction generatorFunction) {
+ this.numberSequenceSplit = numberSequenceSplit;
+ this.generatorFunction = generatorFunction;
+ }
+
+ private final NumberSequenceSplit numberSequenceSplit;
+
+ private final MapFunction generatorFunction;
+
+ public GeneratorSequenceIterator getIterator() {
+ return new GeneratorSequenceIterator<>(
+ numberSequenceSplit.getIterator(), generatorFunction);
+ }
+
+ @Override
+ public String splitId() {
+ return numberSequenceSplit.splitId();
+ }
+
+ @Override
+ public IteratorSourceSplit> getUpdatedSplitForIterator(
+ final GeneratorSequenceIterator iterator) {
+ return new GeneratorSequenceSplit<>(
+ (NumberSequenceSplit)
+ numberSequenceSplit.getUpdatedSplitForIterator(iterator.numSeqIterator),
+ generatorFunction);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "GeneratorSequenceSplit [%d, %d] (%s)",
+ numberSequenceSplit.from(),
+ numberSequenceSplit.to(),
+ numberSequenceSplit.splitId());
+ }
+ }
+
+ private static final class SplitSerializer
+ implements SimpleVersionedSerializer> {
+
+ private final SimpleVersionedSerializer numberSplitSerializer;
+ private final MapFunction generatorFunction;
+
+ private SplitSerializer(
+ SimpleVersionedSerializer numberSplitSerializer,
+ MapFunction generatorFunction) {
+ this.numberSplitSerializer = numberSplitSerializer;
+ this.generatorFunction = generatorFunction;
+ }
+
+ @Override
+ public int getVersion() {
+ return numberSplitSerializer.getVersion();
+ }
+
+ @Override
+ public byte[] serialize(GeneratorSequenceSplit split) throws IOException {
+ return numberSplitSerializer.serialize(split.numberSequenceSplit);
+ }
+
+ @Override
+ public GeneratorSequenceSplit deserialize(int version, byte[] serialized)
+ throws IOException {
+ return new GeneratorSequenceSplit<>(
+ numberSplitSerializer.deserialize(version, serialized), generatorFunction);
+ }
+ }
+
+ private static final class CheckpointSerializer
+ implements SimpleVersionedSerializer>> {
+
+ private final SimpleVersionedSerializer>
+ numberCheckpointSerializer;
+ private final MapFunction generatorFunction;
+
+ public CheckpointSerializer(
+ SimpleVersionedSerializer>
+ numberCheckpointSerializer,
+ MapFunction generatorFunction) {
+ this.numberCheckpointSerializer = numberCheckpointSerializer;
+ this.generatorFunction = generatorFunction;
+ }
+
+ @Override
+ public int getVersion() {
+ return numberCheckpointSerializer.getVersion();
+ }
+
+ @Override
+ public byte[] serialize(Collection> checkpoint)
+ throws IOException {
+ return numberCheckpointSerializer.serialize(
+ checkpoint.stream()
+ .map(split -> split.numberSequenceSplit)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public Collection> deserialize(int version, byte[] serialized)
+ throws IOException {
+ Collection numberSequenceSplits =
+ numberCheckpointSerializer.deserialize(version, serialized);
+ return wrapSplits(numberSequenceSplits, generatorFunction);
+ }
+ }
+
+ private static List> wrapSplits(
+ Collection numberSequenceSplits,
+ MapFunction generatorFunction) {
+ return numberSequenceSplits.stream()
+ .map(split -> new GeneratorSequenceSplit<>(split, generatorFunction))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedIteratorSourceReaderNew.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedIteratorSourceReaderNew.java
new file mode 100644
index 0000000000000..1741b796a5502
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedIteratorSourceReaderNew.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param The type of events returned by the reader.
+ * @param The type of the iterator that produces the events. This type exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param The concrete type of the {@code IteratorSourceSplit} that creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+public class RateLimitedIteratorSourceReaderNew<
+ E, IterT extends Iterator, SplitT extends IteratorSourceSplit>
+ implements SourceReader {
+
+ private final IteratorSourceReader iteratorSourceReader;
+ private final RateLimiter rateLimiter;
+
+ public RateLimitedIteratorSourceReaderNew(
+ SourceReaderContext readerContext, RateLimiter rateLimiter) {
+ checkNotNull(readerContext);
+ iteratorSourceReader = new IteratorSourceReader<>(readerContext);
+ this.rateLimiter = rateLimiter;
+ }
+
+ // ------------------------------------------------------------------------
+ @Override
+ public void start() {
+ iteratorSourceReader.start();
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) throws InterruptedException {
+ rateLimiter.acquire();
+ return iteratorSourceReader.pollNext(output);
+ }
+
+ @Override
+ public CompletableFuture isAvailable() {
+ return iteratorSourceReader.isAvailable();
+ }
+
+ @Override
+ public void addSplits(List splits) {
+ iteratorSourceReader.addSplits(splits);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ iteratorSourceReader.notifyNoMoreSplits();
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return iteratorSourceReader.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ iteratorSourceReader.close();
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index ce97ff7126911..daee302729223 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source.lib;
import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
@@ -148,6 +149,11 @@ public UserCodeClassLoader getUserCodeClassLoader() {
public int currentParallelism() {
return 1;
}
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return null;
+ }
}
private static final class TestingReaderOutput implements ReaderOutput {
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
index d4b4b1907b283..8c227bc1a8fff 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourceCheck.java
@@ -24,7 +24,7 @@
import org.apache.flink.api.common.io.ratelimiting.BucketingRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.connector.source.lib.DataGeneratorSource;
+import org.apache.flink.api.connector.source.lib.DataGeneratorSourceV2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,8 +54,13 @@ public static void main(String[] args) throws Exception {
RateLimiter rateLimiter = new BucketingRateLimiter(1, parallelism);
// DataGeneratorSource source =
// new DataGeneratorSource<>(generator, 100, rateLimiter, Types.STRING);
- DataGeneratorSource source =
- new DataGeneratorSource<>(generator, 10, 2, Types.STRING);
+
+ // DataGeneratorSource