Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27919][connectors] Add FLIP-27-based Data Generator Source. #1

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}

@Override
public int currentParallelism() {
return 1;
}
}

private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.io.ratelimiting;

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;

/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
public class GuavaRateLimiter implements org.apache.flink.api.common.io.ratelimiting.RateLimiter {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем тут полный путь до RateLimiter?

Copy link
Author

@afedulov afedulov Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Клэш с гуавой (см import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter)


private final RateLimiter rateLimiter;

public GuavaRateLimiter(long maxPerSecond, int numParallelExecutors) {
final float maxPerSecondPerSubtask = (float) maxPerSecond / numParallelExecutors;
this.rateLimiter = RateLimiter.create(maxPerSecondPerSubtask);
}

@Override
public int acquire() {
return (int) (1000 * rateLimiter.acquire());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.io.ratelimiting;

import java.io.Serializable;

/** The interface that can be used to throttle execution of methods. */
public interface RateLimiter extends Serializable {

/**
* Acquire method is a blocking call that is intended to be used in places where it is required
* to limit the rate at which results are produced or other functions are called.
*
* @return The number of milliseconds this call blocked its caller.
* @throws InterruptedException The interrupted exception.
*/
int acquire() throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ public interface SourceReaderContext {
* @see UserCodeClassLoader
*/
UserCodeClassLoader getUserCodeClassLoader();

/**
* Get the current parallelism of this Source.
*
* @return the parallelism of the Source.
*/
int currentParallelism();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.source.lib;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ratelimiting.GuavaRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.RateLimiter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
import org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.Collection;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A data source that produces N data points in parallel. This source is useful for testing and for
* cases that just need a stream of N events of any kind.
*
* <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
* source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
* limited to one, this will produce one sequence in order.
*
* <p>This source is always bounded. For very long sequences (for example over the entire domain of
* long integer values), user may want to consider executing the application in a s treaming manner,
* because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
*/
@Experimental
public class DataGeneratorSourceV3<OUT>
implements Source<
OUT,
NumberSequenceSource.NumberSequenceSplit,
Collection<NumberSequenceSource.NumberSequenceSplit>>,
ResultTypeQueryable<OUT> {

private static final long serialVersionUID = 1L;

private final TypeInformation<OUT> typeInfo;

private final MapFunction<Long, OUT> generatorFunction;

private final NumberSequenceSource numberSource;

private long maxPerSecond = -1;

/**
* Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel.
*
* @param generatorFunction The generator function that receives index numbers and translates
* them into events of the output type.
* @param count The number of events to be produced.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSourceV3(
MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) {
this.typeInfo = checkNotNull(typeInfo);
this.generatorFunction = checkNotNull(generatorFunction);
this.numberSource = new NumberSequenceSource(0, count);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше иметь только один конструктор с присвоением значений, для избежания ошибок, а остальные должны использовать this(*) с дефолтными значениями если нужно.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

поправил.

}

/**
* Creates a new {@code DataGeneratorSource} that produces {@code count} records in parallel.
*
* @param generatorFunction The generator function that receives index numbers and translates
* them into events of the output type.
* @param count The number of events to be produced.
* @param sourceRatePerSecond The maximum number of events per seconds that this generator aims
* to produce. This is a target number for the whole source and the individual parallel
* source instances automatically adjust their rate taking based on the {@code
* sourceRatePerSecond} and the source parallelism.
* @param typeInfo The type information of the returned events.
*/
public DataGeneratorSourceV3(
MapFunction<Long, OUT> generatorFunction,
long count,
long sourceRatePerSecond,
TypeInformation<OUT> typeInfo) {
checkArgument(sourceRatePerSecond > 0, "maxPerSeconds has to be a positive number");
this.typeInfo = checkNotNull(typeInfo);
this.generatorFunction = checkNotNull(generatorFunction);
this.numberSource = new NumberSequenceSource(0, count);
this.maxPerSecond = sourceRatePerSecond;
}

/** @return The number of records produced by this source. */
public long getCount() {

return numberSource.getTo();
}

// ------------------------------------------------------------------------
// source methods
// ------------------------------------------------------------------------

@Override
public TypeInformation<OUT> getProducedType() {
return typeInfo;
}

@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}

@Override
public SourceReader<OUT, NumberSequenceSplit> createReader(SourceReaderContext readerContext)
throws Exception {
if (maxPerSecond > 0) {
int parallelism = readerContext.currentParallelism();
RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism);
return new RateLimitedSourceReader<>(
new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
rateLimiter);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Пока мне не понятно, как мне использовать другой RateLimiter если понадобиться. Может быть это и не особо имеет смысл, и если это уже было обсуждено то ок. Но просто как идея, возможно стоит рассмотреть передачу RateLimiter как параметра в этот класс. Т.е. вместо:

 new DataGeneratorSourceV3<>(generator, 1000, 2, Types.STRING);

будет

 new DataGeneratorSourceV3<>(generator, 1000, RateLimiters.guava(2), Types.STRING);

Хотя скорее всего это будет не сам Limiter а его билдер, т.к. ему нужно как передать readerContext. Вообщем не уверен что есть смысл так заморачиваться, но можно подумать.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я решил сделать весь генератор более extendable с возможностью factory для SourceReader's . Пользователь сможет в этой factory решить что и как инициализировать.

} else {
return new GeneratingIteratorSourceReader<>(readerContext, generatorFunction);
}
}

@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> restoreEnumerator(
SplitEnumeratorContext<NumberSequenceSplit> enumContext,
Collection<NumberSequenceSplit> checkpoint)
throws Exception {
return new IteratorSourceEnumerator<>(enumContext, checkpoint);
}

@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator(
final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
final List<NumberSequenceSplit> splits =
numberSource.splitNumberRange(0, getCount(), enumContext.currentParallelism());
return new IteratorSourceEnumerator<>(enumContext, splits);
}

@Override
public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() {
return numberSource.getSplitSerializer();
}

@Override
public SimpleVersionedSerializer<Collection<NumberSequenceSplit>>
getEnumeratorCheckpointSerializer() {
return numberSource.getEnumeratorCheckpointSerializer();
}
}
Loading