-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-27919][connectors] Add FLIP-27-based Data Generator Source. #1
base: master
Are you sure you want to change the base?
Changes from all commits
a4a4e5b
357c036
42f1a2a
71ac36d
f5e84f9
78f441d
460a61a
2da82fa
798c106
4e576f8
ca4a232
38febef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.io.ratelimiting; | ||
|
||
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter; | ||
|
||
/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */ | ||
public class GuavaRateLimiter implements org.apache.flink.api.common.io.ratelimiting.RateLimiter { | ||
|
||
private final RateLimiter rateLimiter; | ||
|
||
public GuavaRateLimiter(long maxPerSecond, int numParallelExecutors) { | ||
final float maxPerSecondPerSubtask = (float) maxPerSecond / numParallelExecutors; | ||
this.rateLimiter = RateLimiter.create(maxPerSecondPerSubtask); | ||
} | ||
|
||
@Override | ||
public int acquire() { | ||
return (int) (1000 * rateLimiter.acquire()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.io.ratelimiting; | ||
|
||
import java.io.Serializable; | ||
|
||
/** The interface that can be used to throttle execution of methods. */ | ||
public interface RateLimiter extends Serializable { | ||
|
||
/** | ||
* Acquire method is a blocking call that is intended to be used in places where it is required | ||
* to limit the rate at which results are produced or other functions are called. | ||
* | ||
* @return The number of milliseconds this call blocked its caller. | ||
* @throws InterruptedException The interrupted exception. | ||
*/ | ||
int acquire() throws InterruptedException; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.connector.source.lib; | ||
|
||
import org.apache.flink.annotation.Experimental; | ||
import org.apache.flink.api.common.functions.MapFunction; | ||
import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter; | ||
import org.apache.flink.api.common.io.ratelimiting.RateLimiter; | ||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
import org.apache.flink.api.connector.source.Boundedness; | ||
import org.apache.flink.api.connector.source.Source; | ||
import org.apache.flink.api.connector.source.SourceReader; | ||
import org.apache.flink.api.connector.source.SourceReaderContext; | ||
import org.apache.flink.api.connector.source.SplitEnumerator; | ||
import org.apache.flink.api.connector.source.SplitEnumeratorContext; | ||
import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit; | ||
import org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader; | ||
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; | ||
import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader; | ||
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; | ||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
|
||
import static org.apache.flink.util.Preconditions.checkArgument; | ||
import static org.apache.flink.util.Preconditions.checkNotNull; | ||
|
||
/** | ||
* A data source that produces N data points in parallel. This source is useful for testing and for | ||
* cases that just need a stream of N events of any kind. | ||
* | ||
* <p>The source splits the sequence into as many parallel sub-sequences as there are parallel | ||
* source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is | ||
* limited to one, this will produce one sequence in order. | ||
* | ||
* <p>This source is always bounded. For very long sequences (for example over the entire domain of | ||
* long integer values), user may want to consider executing the application in a s treaming manner, | ||
* because, despite the fact that the produced stream is bounded, the end bound is pretty far away. | ||
*/ | ||
@Experimental | ||
public class DataGeneratorSourceV3<OUT> | ||
implements Source< | ||
OUT, | ||
NumberSequenceSource.NumberSequenceSplit, | ||
Collection<NumberSequenceSource.NumberSequenceSplit>>, | ||
ResultTypeQueryable<OUT> { | ||
|
||
private static final long serialVersionUID = 1L; | ||
|
||
private final TypeInformation<OUT> typeInfo; | ||
|
||
private final MapFunction<Long, OUT> generatorFunction; | ||
|
||
private final NumberSequenceSource numberSource; | ||
|
||
private long maxPerSecond = -1; | ||
|
||
/** | ||
* Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel. | ||
* | ||
* @param generatorFunction The generator function that receives index numbers and translates | ||
* them into events of the output type. | ||
* @param count The number of events to be produced. | ||
* @param typeInfo The type information of the returned events. | ||
*/ | ||
public DataGeneratorSourceV3( | ||
MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) { | ||
this.typeInfo = checkNotNull(typeInfo); | ||
this.generatorFunction = checkNotNull(generatorFunction); | ||
this.numberSource = new NumberSequenceSource(0, count); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Лучше иметь только один конструктор с присвоением значений, для избежания ошибок, а остальные должны использовать this(*) с дефолтными значениями если нужно. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. поправил. |
||
} | ||
|
||
/** | ||
* Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel. | ||
* | ||
* @param generatorFunction The generator function that receives index numbers and translates | ||
* them into events of the output type. | ||
* @param count The number of events to be produced. | ||
* @param sourceRatePerSecond The maximum number of events per seconds that this generator aims | ||
* to produce. This is a target number for the whole source and the individual parallel | ||
* source instances automatically adjust their rate taking based on the {@code | ||
* sourceRatePerSecond} and the source parallelism. | ||
* @param typeInfo The type information of the returned events. | ||
*/ | ||
public DataGeneratorSourceV3( | ||
afedulov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
MapFunction<Long, OUT> generatorFunction, | ||
long count, | ||
long sourceRatePerSecond, | ||
TypeInformation<OUT> typeInfo) { | ||
checkArgument(sourceRatePerSecond > 0, "maxPerSeconds has to be a positive number"); | ||
this.typeInfo = checkNotNull(typeInfo); | ||
this.generatorFunction = checkNotNull(generatorFunction); | ||
this.numberSource = new NumberSequenceSource(0, count); | ||
this.maxPerSecond = sourceRatePerSecond; | ||
} | ||
|
||
/** @return The number of records produced by this source. */ | ||
public long getCount() { | ||
|
||
return numberSource.getTo(); | ||
} | ||
|
||
// ------------------------------------------------------------------------ | ||
// source methods | ||
// ------------------------------------------------------------------------ | ||
|
||
@Override | ||
public TypeInformation<OUT> getProducedType() { | ||
return typeInfo; | ||
} | ||
|
||
@Override | ||
public Boundedness getBoundedness() { | ||
return Boundedness.BOUNDED; | ||
} | ||
|
||
@Override | ||
public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext) | ||
throws Exception { | ||
if (maxPerSecond > 0) { | ||
int parallelism = readerContext.currentParallelism(); | ||
RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism); | ||
return new RateLimitedSourceReader<>( | ||
new GeneratingIteratorSourceReader<>(readerContext, generatorFunction), | ||
rateLimiter); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Пока мне не понятно, как мне использовать другой RateLimiter если понадобиться. Может быть это и не особо имеет смысл, и если это уже было обсуждено то ок. Но просто как идея, возможно стоит рассмотреть передачу RateLimiter как параметра в этот класс. Т.е. вместо:
будет
Хотя скорее всего это будет не сам Limiter а его билдер, т.к. ему нужно как передать There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Я решил сделать весь генератор более extendable с возможностью factory для SourceReader's . Пользователь сможет в этой factory решить что и как инициализировать. |
||
} else { | ||
return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction); | ||
} | ||
} | ||
|
||
@Override | ||
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> restoreEnumerator( | ||
SplitEnumeratorContext<NumberSequenceSplit> enumContext, | ||
Collection<NumberSequenceSplit> checkpoint) | ||
throws Exception { | ||
return new IteratorSourceEnumerator<>(enumContext, checkpoint); | ||
} | ||
|
||
@Override | ||
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator( | ||
final SplitEnumeratorContext<NumberSequenceSplit> enumContext) { | ||
final List<NumberSequenceSplit> splits = | ||
numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism()); | ||
return new IteratorSourceEnumerator<>(enumContext, splits); | ||
} | ||
|
||
@Override | ||
public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() { | ||
return numberSource.getSplitSerializer(); | ||
} | ||
|
||
@Override | ||
public SimpleVersionedSerializer<Collection<NumberSequenceSplit>> | ||
getEnumeratorCheckpointSerializer() { | ||
return numberSource.getEnumeratorCheckpointSerializer(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Зачем тут полный путь до RateLimiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Клэш с гуавой (см
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter
)