Skip to content

Commit

Permalink
[FLINK-31980][Connectors/Kinesis] Initial implementation of EFO subsc…
Browse files Browse the repository at this point in the history
…ription
  • Loading branch information
hlteoh37 committed Sep 12, 2024
1 parent 7e2468b commit 164d1e1
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
Expand All @@ -51,14 +55,21 @@

import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.utils.AttributeMap;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.EFO_CONSUMER_ARN;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.READER_TYPE;

/**
* The {@link KinesisStreamsSource} is an exactly-once parallel streaming data source that
* subscribes to a single AWS Kinesis data stream. It is able to handle resharding of streams, and
Expand Down Expand Up @@ -125,20 +136,11 @@ public Boundedness getBoundedness() {
public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext readerContext)
throws Exception {
setUpDeserializationSchema(readerContext);

Map<String, KinesisShardMetrics> shardMetricGroupMap = new ConcurrentHashMap<>();

// We create a new stream proxy for each split reader since they have their own independent
// lifecycle.
Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
() ->
new PollingKinesisShardSplitReader(
createKinesisStreamProxy(sourceConfig), shardMetricGroupMap);
KinesisStreamsRecordEmitter<T> recordEmitter =
new KinesisStreamsRecordEmitter<>(deserializationSchema);

return new KinesisStreamsSourceReader<>(
new SingleThreadFetcherManager<>(splitReaderSupplier::get),
new SingleThreadFetcherManager<>(getKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap)),
recordEmitter,
sourceConfig,
readerContext,
Expand Down Expand Up @@ -178,6 +180,58 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
}

private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(
Configuration sourceConfig,
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
KinesisStreamsSourceConfigConstants.ReaderType readerType = sourceConfig.get(READER_TYPE);
switch (readerType) {
case POLLING:
return getPollingKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap);
case EFO:
return getFanOutKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap);
default:
throw new IllegalArgumentException("Unsupported reader type: " + readerType);
}
}

private Supplier<SplitReader<Record, KinesisShardSplit>> getPollingKinesisShardSplitReaderSupplier(
Configuration sourceConfig,
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
// We create a new stream proxy for each split reader since they have their own independent lifecycle.
return () -> new PollingKinesisShardSplitReader(
createKinesisStreamProxy(sourceConfig), shardMetricGroupMap);
}

private Supplier<SplitReader<Record, KinesisShardSplit>> getFanOutKinesisShardSplitReaderSupplier(
Configuration sourceConfig,
Map<String, KinesisShardMetrics> shardMetricGroupMap) {
String consumerArn = sourceConfig.get(EFO_CONSUMER_ARN);
// TODO: EFO consumer registration logic

// We create a new stream proxy for each split reader since they have their own independent lifecycle.
return () -> new FanOutKinesisShardSplitReader(createKinesisAsyncStreamProxy(sourceConfig), consumerArn, shardMetricGroupMap);
}

private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(Configuration consumerConfig) {
SdkAsyncHttpClient asyncHttpClient = AWSGeneralUtil.createAsyncHttpClient(AttributeMap.builder().build(), NettyNioAsyncHttpClient.builder());
String region = AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
() ->
new IllegalStateException(
"Unable to determine region from stream arn"));
Properties kinesisClientProperties = new Properties();
sourceConfig.addAllToProperties(kinesisClientProperties);
kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region);

KinesisAsyncClient kinesisAsyncClient =
AWSClientUtil.createAwsAsyncClient(kinesisClientProperties,
asyncHttpClient,
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient);
}

private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ public enum InitialPosition {
AT_TIMESTAMP
}

/** Defines mechanism used to consume records from Kinesis stream. */
public enum ReaderType {
POLLING,
EFO
}

public enum EfoConsumerRegistrationStrategy {
EAGER,
NONE
}

public static final ConfigOption<InitialPosition> STREAM_INITIAL_POSITION =
ConfigOptions.key("flink.stream.initpos")
.enumType(InitialPosition.class)
Expand All @@ -57,4 +68,26 @@ public enum InitialPosition {
.longType()
.defaultValue(10000L)
.withDescription("The interval between each attempt to discover new shards.");

public static final ConfigOption<ReaderType> READER_TYPE =
ConfigOptions.key("type")
.enumType(ReaderType.class)
.defaultValue(ReaderType.POLLING)
.withDescription("The type of reader used to read from the Kinesis stream.");

public static final ConfigOption<EfoConsumerRegistrationStrategy> EFO_CONSUMER_REGISTRATION_STRATEGY =
ConfigOptions.key("efo.consumer.registration.type")
.enumType(EfoConsumerRegistrationStrategy.class)
.defaultValue(EfoConsumerRegistrationStrategy.EAGER)
.withDescription("Strategy used for EFO consumer registration. If EAGER is selected, consumer with specified name will be registered if it doesn't exist.");

public static final ConfigOption<String> EFO_CONSUMER_NAME =
ConfigOptions.key("efo.consumer.name")
.stringType()
.noDefaultValue();

public static final ConfigOption<String> EFO_CONSUMER_ARN =
ConfigOptions.key("efo.consumer.arn")
.stringType()
.noDefaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.source.proxy;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;

import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/** Interface for interactions with async client of Kinesis streams. */
@Internal
public interface AsyncStreamProxy extends Closeable {
CompletableFuture<Void> subscribeToShard(
String consumerArn,
String shardId,
StartingPosition startingPosition,
SubscribeToShardResponseHandler responseHandler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.flink.connector.kinesis.source.proxy;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;

import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** Implementation of async stream proxy for the Kinesis client. */
@Internal
public class KinesisAsyncStreamProxy implements AsyncStreamProxy {
private final KinesisAsyncClient kinesisAsyncClient;
private final SdkAsyncHttpClient asyncHttpClient;

public KinesisAsyncStreamProxy(KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient) {
this.kinesisAsyncClient = kinesisAsyncClient;
this.asyncHttpClient = asyncHttpClient;
}


@Override
public CompletableFuture<Void> subscribeToShard(String consumerArn, String shardId, StartingPosition startingPosition, SubscribeToShardResponseHandler responseHandler) {
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(startingPosition.getSdkStartingPosition())
.build();
return kinesisAsyncClient.subscribeToShard(request, responseHandler);
}

@Override
public void close() throws IOException {
kinesisAsyncClient.close();
asyncHttpClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
Collections.emptyIterator(), splitState.getSplitId(), true);
}

if (recordBatch == null) {
assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}

if (!recordBatch.isCompleted()) {
assignedSplits.add(splitState);
}
Expand Down Expand Up @@ -124,7 +129,7 @@ public RecordsWithSplitIds<Record> fetch() throws IOException {
* Main method implementations must implement to fetch records from Kinesis.
*
* @param splitState split to fetch records for
* @return RecordBatch containing the fetched records and metadata
* @return RecordBatch containing the fetched records and metadata. Returns null if there are no records but fetching should be retried at a later time.
*/
protected abstract RecordBatch fetchRecords(KinesisShardSplitState splitState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
package org.apache.flink.connector.kinesis.source.reader.fanout;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -31,15 +37,44 @@
*/
@Internal
public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase {
protected FanOutKinesisShardSplitReader(Map<String, KinesisShardMetrics> shardMetricGroupMap) {
private final KinesisAsyncStreamProxy asyncStreamProxy;
private final String consumerArn;

private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>();

public FanOutKinesisShardSplitReader(KinesisAsyncStreamProxy asyncStreamProxy, String consumerArn, Map<String, KinesisShardMetrics> shardMetricGroupMap) {
super(shardMetricGroupMap);
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
}

@Override
protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
return null;
FanOutKinesisShardSubscription subscription = splitSubscriptions.get(splitState.getShardId());

SubscribeToShardEvent event = subscription.nextEvent();
if (event == null) {
return null;
}

boolean shardCompleted = event.continuationSequenceNumber() == null;
if (shardCompleted) {
splitSubscriptions.remove(splitState.getShardId());
}
return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted);
}

@Override
public void close() throws Exception {}
public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {
super.handleSplitsChanges(splitsChanges);
for (KinesisShardSplit split : splitsChanges.splits()) {
FanOutKinesisShardSubscription subscription = new FanOutKinesisShardSubscription(asyncStreamProxy, consumerArn, split.getShardId(), split.getStartingPosition());
subscription.activateSubscription();
splitSubscriptions.put(split.splitId(), subscription);
}
}

@Override
public void close() throws Exception {
}
}
Loading

0 comments on commit 164d1e1

Please sign in to comment.