diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 745e09d2..9db712e1 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -31,15 +31,18 @@ import org.apache.flink.connector.aws.util.AWSClientUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy; import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader; +import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSplitReader; import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader; import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; @@ -51,7 +54,11 @@ import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.utils.AttributeMap; import java.util.Map; @@ -130,10 +137,14 @@ public SourceReader createReader(SourceReaderContext reade // We create a new stream proxy for each split reader since they have their own independent // lifecycle. - Supplier splitReaderSupplier = +// Supplier splitReaderSupplier = +// () -> +// new PollingKinesisShardSplitReader( +// createKinesisStreamProxy(sourceConfig), shardMetricGroupMap); + Supplier> splitReaderSupplier = () -> - new PollingKinesisShardSplitReader( - createKinesisStreamProxy(sourceConfig), shardMetricGroupMap); + new FanOutKinesisShardSplitReader(createKinesisAsyncStreamProxy(sourceConfig), + "arn:aws:kinesis:us-east-1:290038087681:stream/efo-test-large/consumer/efo-test:1722967044", shardMetricGroupMap); KinesisStreamsRecordEmitter recordEmitter = new KinesisStreamsRecordEmitter<>(deserializationSchema); @@ -178,6 +189,26 @@ public SimpleVersionedSerializer getSplitSerializer() { return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer()); } + private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(Configuration consumerConfig) { + SdkAsyncHttpClient asyncHttpClient = AWSGeneralUtil.createAsyncHttpClient(AttributeMap.builder().build(), NettyNioAsyncHttpClient.builder()); + String region = AWSGeneralUtil.getRegionFromArn(streamArn) + .orElseThrow( + () -> + new IllegalStateException( + "Unable to determine region from stream arn")); + Properties kinesisClientProperties = new Properties(); + sourceConfig.addAllToProperties(kinesisClientProperties); + kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region); + + KinesisAsyncClient kinesisAsyncClient = + AWSClientUtil.createAwsAsyncClient(kinesisClientProperties, + asyncHttpClient, + KinesisAsyncClient.builder(), + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); + return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient); + } + private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) { SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient( diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java new file mode 100644 index 00000000..9cdcb40d --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; + +/** Interface for interactions with async client of Kinesis streams. */ +@Internal +public interface AsyncStreamProxy extends Closeable { + CompletableFuture subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler); +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java new file mode 100644 index 00000000..d3d3f693 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java @@ -0,0 +1,41 @@ +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** Implementation of async stream proxy for the Kinesis client. */ +@Internal +public class KinesisAsyncStreamProxy implements AsyncStreamProxy { + private final KinesisAsyncClient kinesisAsyncClient; + private final SdkAsyncHttpClient asyncHttpClient; + + public KinesisAsyncStreamProxy(KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient) { + this.kinesisAsyncClient = kinesisAsyncClient; + this.asyncHttpClient = asyncHttpClient; + } + + + @Override + public CompletableFuture subscribeToShard(String consumerArn, String shardId, StartingPosition startingPosition, SubscribeToShardResponseHandler responseHandler) { + SubscribeToShardRequest request = SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(startingPosition.getSdkStartingPosition()) + .build(); + return kinesisAsyncClient.subscribeToShard(request, responseHandler); + } + + @Override + public void close() throws IOException { + kinesisAsyncClient.close(); + asyncHttpClient.close(); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index b4757d40..70e9b012 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -90,6 +90,11 @@ public RecordsWithSplitIds fetch() throws IOException { Collections.emptyIterator(), splitState.getSplitId(), true); } + if (recordBatch == null) { + assignedSplits.add(splitState); + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } + if (!recordBatch.isCompleted()) { assignedSplits.add(splitState); } @@ -124,7 +129,7 @@ public RecordsWithSplitIds fetch() throws IOException { * Main method implementations must implement to fetch records from Kinesis. * * @param splitState split to fetch records for - * @return RecordBatch containing the fetched records and metadata + * @return RecordBatch containing the fetched records and metadata. Returns null if there are no records but fetching should be retried at a later time. */ protected abstract RecordBatch fetchRecords(KinesisShardSplitState splitState); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java index 21d0e81f..f4c48f72 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java @@ -19,10 +19,16 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; + +import java.util.HashMap; import java.util.Map; /** @@ -31,15 +37,44 @@ */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { - protected FanOutKinesisShardSplitReader(Map shardMetricGroupMap) { + private final KinesisAsyncStreamProxy asyncStreamProxy; + private final String consumerArn; + + private final Map splitSubscriptions = new HashMap<>(); + + public FanOutKinesisShardSplitReader(KinesisAsyncStreamProxy asyncStreamProxy, String consumerArn, Map shardMetricGroupMap) { super(shardMetricGroupMap); + this.asyncStreamProxy = asyncStreamProxy; + this.consumerArn = consumerArn; } @Override protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { - return null; + FanOutKinesisShardSubscription subscription = splitSubscriptions.get(splitState.getShardId()); + + SubscribeToShardEvent event = subscription.nextEvent(); + if (event == null) { + return null; + } + + boolean shardCompleted = event.continuationSequenceNumber() == null; + if (shardCompleted) { + splitSubscriptions.remove(splitState.getShardId()); + } + return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted); } @Override - public void close() throws Exception {} + public void handleSplitsChanges(SplitsChange splitsChanges) { + super.handleSplitsChanges(splitsChanges); + for (KinesisShardSplit split : splitsChanges.splits()) { + FanOutKinesisShardSubscription subscription = new FanOutKinesisShardSubscription(asyncStreamProxy, consumerArn, split.getShardId(), split.getStartingPosition()); + subscription.activateSubscription(); + splitSubscriptions.put(split.splitId(), subscription); + } + } + + @Override + public void close() throws Exception { + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java new file mode 100644 index 00000000..9c5ea01f --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.time.Duration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * FanOutSubscription class responsible for handling the subscription to a single shard of the Kinesis stream. + * Given a shardId, it will manage the lifecycle of the subscription, and eagerly keep the next batch of records + * available for consumption when next polled. + */ +@Internal +public class FanOutKinesisShardSubscription { + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSubscription.class); + + // TODO: configure these + // Queue is meant for eager retrieval of records from the Kinesis stream. We will always have 2 record batches available on next read. + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(2); + private final AsyncStreamProxy kinesis; + + private final String consumerArn; + private final String shardId; + + // TODO: configure these + private final Duration subscriptionTimeout = Duration.ofSeconds(60); + + // Store the current starting position for this subscription. Will be updated each time new batch of records is consumed + private StartingPosition startingPosition; + private final AtomicBoolean subscriptionActive = new AtomicBoolean(false); + private final AtomicReference subscriptionException = new AtomicReference<>(); + private FanOutShardSubscriber shardSubscriber; + + public FanOutKinesisShardSubscription(AsyncStreamProxy kinesis, String consumerArn, String shardId, StartingPosition startingPosition) { + this.kinesis = kinesis; + this.consumerArn = consumerArn; + this.shardId = shardId; + this.startingPosition = startingPosition; + } + + + /** + * Method to allow eager activation of the subscription. + */ + public void activateSubscription() { + LOG.info("Activating subscription to shard {} with starting position {} for consumer {}.", shardId, startingPosition, consumerArn); + if (subscriptionActive.get()) { + LOG.warn("Skipping activation of subscription since it is already active."); + return; + } + + // We have to use our own CountDownLatch to wait for subscription to be acquired because subscription event is tracked via the handler. + CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1); + shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch); + SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler.builder() + .subscriber(() -> shardSubscriber) + .onError(throwable -> { + // Errors that occur when obtaining a subscription are thrown here. + // After subscription is acquired, these errors can be ignored. + if (waitForSubscriptionLatch.getCount() > 0) { + terminateSubscription(throwable); + waitForSubscriptionLatch.countDown(); + }}) + .build(); + + // We don't need to keep track of the future here because we monitor subscription success using our own CountDownLatch + kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler) + .exceptionally(throwable -> { + LOG.error("Error subscribing to shard {} with starting position {} for consumer {}.", shardId, startingPosition, consumerArn, throwable); + terminateSubscription(throwable); + return null; + }); + + // We have to handle timeout for subscriptions separately because Java 8 does not support a fluent orTimeout() methods on CompletableFuture. + CompletableFuture.runAsync(() -> { + try { + if (waitForSubscriptionLatch.await(subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + LOG.info("Successfully subscribed to shard {} with starting position {} for consumer {}.", shardId, startingPosition, consumerArn); + subscriptionActive.set(true); + // Request first batch of records. + shardSubscriber.requestRecords(); + } else { + String errorMessage = "Timeout when subscribing to shard "+shardId+" with starting position "+startingPosition+" for consumer "+consumerArn+"."; + LOG.error(errorMessage); + terminateSubscription(new TimeoutException(errorMessage)); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for subscription to complete.", e); + terminateSubscription(e); + Thread.currentThread().interrupt(); + } + }); + } + + private void terminateSubscription(Throwable t) { + if (!subscriptionException.compareAndSet(null, t)) { + LOG.warn("Another subscription exception has been queued, ignoring subsequent exceptions", t); + } + shardSubscriber.cancel(); + } + + /** + * This is the main entrypoint for this subscription class. + * It will retrieve the next batch of records from the Kinesis stream shard. + * It will throw any unrecoverable exceptions encountered during the subscription process. + * + * @return next FanOut subscription event containing records. Returns null if subscription is not yet active and fetching should be retried at a later time. + */ + public SubscribeToShardEvent nextEvent() { + Throwable throwable = subscriptionException.get(); + if (throwable != null) { + LOG.error("Subscription encountered unrecoverable exception.", throwable); + throw new KinesisStreamsSourceException("Subscription encountered unrecoverable exception.", throwable); + } + + if (!subscriptionActive.get()) { + LOG.debug("Subscription to shard {} for consumer {} is not yet active. Skipping.", shardId, consumerArn); + return null; + } + + return eventQueue.poll(); + } + + /** + * Implementation of {@link Subscriber} to retrieve events from Kinesis stream using Reactive Streams. + */ + private class FanOutShardSubscriber implements Subscriber { + private final CountDownLatch subscriptionLatch; + + private Subscription subscription; + + private FanOutShardSubscriber(CountDownLatch subscriptionLatch) { + this.subscriptionLatch = subscriptionLatch; + } + + public void requestRecords() { + subscription.request(1); + } + + public void cancel() { + if (!subscriptionActive.get()) { + LOG.warn("Trying to cancel inactive subscription. Ignoring."); + return; + } + subscriptionActive.set(false); + subscription.cancel(); + } + + @Override + public void onSubscribe(Subscription subscription) { + LOG.info("Successfully subscribed to shard {} at {} using consumer {}.", shardId, startingPosition, consumerArn); + this.subscription = subscription; + subscriptionLatch.countDown(); + } + + @Override + public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { + subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + try { + LOG.debug("Received event: {}, {}", event.getClass().getSimpleName(), event); + eventQueue.put(event); + + // Update the starting position in case we have to recreate the subscription + startingPosition = StartingPosition.continueFromSequenceNumber(event.continuationSequenceNumber()); + + // Replace the record just consumed in the Queue + requestRecords(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new KinesisStreamsSourceException("Interrupted while adding Kinesis record to internal buffer.", e); + } + } + }); + } + + @Override + public void onError(Throwable throwable) { + // TODO: Add recoverable error + if (!subscriptionException.compareAndSet(null, throwable)) { + LOG.warn("Another subscription exception has been queued, ignoring subsequent exceptions", throwable); + } + } + + @Override + public void onComplete() { + LOG.info("Subscription complete - {} ({})", shardId, consumerArn); + subscriptionActive.set(false); + activateSubscription(); + } + } + +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutSubscriptionEvent.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutSubscriptionEvent.java new file mode 100644 index 00000000..425da656 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutSubscriptionEvent.java @@ -0,0 +1,4 @@ +package org.apache.flink.connector.kinesis.source.reader.fanout; + +public class FanOutSubscriptionEvent { +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPosition.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPosition.java index 0806b727..4e4cbd92 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPosition.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPosition.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.source.split; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @@ -31,7 +32,9 @@ import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.TRIM_HORIZON; -/** Data class indicating the starting position for reading a given shard. */ +/** + * Data class indicating the starting position for reading a given shard. + */ @Internal public final class StartingPosition { @@ -52,6 +55,26 @@ public Object getStartingMarker() { return startingMarker; } + public software.amazon.awssdk.services.kinesis.model.StartingPosition getSdkStartingPosition() { + software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder = software.amazon.awssdk.services.kinesis.model.StartingPosition.builder() + .type(shardIteratorType); + + switch (shardIteratorType) { + case LATEST: + case TRIM_HORIZON: + return builder.build(); + case AT_TIMESTAMP: + Preconditions.checkArgument(startingMarker instanceof Instant, "Invalid StartingPosition. When ShardIteratorType is AT_TIMESTAMP, startingMarker must be an Instant."); + return builder.timestamp((Instant) startingMarker).build(); + case AT_SEQUENCE_NUMBER: + case AFTER_SEQUENCE_NUMBER: + Preconditions.checkArgument(startingMarker instanceof String, "Invalid StartingPosition. When ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, startingMarker must be a String."); + return builder.sequenceNumber((String) startingMarker).build(); + } + throw new IllegalArgumentException( + "Unsupported shardIteratorType " + shardIteratorType); + } + public static StartingPosition fromTimestamp(final Instant timestamp) { return new StartingPosition(AT_TIMESTAMP, timestamp); }