π¨ August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink.
π¨ This example refers to an old Apache Flink version (1.11) and managed service runtime. For newer examples, refer to then new Blueprints repository and general Amazon Managed Service for Apache Flink examples
Amazon Kinesis Data Analytics Flink Starter Kit helps you with the development of Flink Application with Kinesis Stream as a source and Amazon S3 as a sink. This demonstrates the use of Session Window with AggregateFunction.
Contents:
- Architecture
- Application Overview
- Build Instructions
- Deployment Instructions
- Testing Instructions
- Future Releases
The Architecture of this starter kit is shown in the below diagram
- JDK 11
- IDE for e.g. Eclipse or Spring Tools or Intellij IDEA
- Apache Maven
- AWS CLI
- This starter kit tested with the Apache Flink Version 1.11.1
The following AWS services are required to deploy this starter kit:
- 1 Amazon S3 Bucket
- 1 Amazon Kinesis Data Stream
- 1 Amazon Kinesis Data Analytics Flink Application
- 1 IAM role with 4 policies
- Clone this starter kit to your Laptop / MacBook
- It has Maven nature, so you can import it to your IDE.
- Build the Jar file using one of the steps below:
- Using standalone Maven, go to project home directory and run command
mvn -X clean install
- From Eclipse or STS, run command
-X clean install
. Navigation: Project right click --> Run As --> Maven Build (Option 4)
- Using standalone Maven, go to project home directory and run command
- Build process will generate a jar file
amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar
. - Note: The size of the jar file is around 46 MB
You can deploy the Starter Kit using either AWS CLI or AWS Console.
-
The Starter Kit requires the following properties
Key Value Description region us-east-1 AWS region input_stream_name kda_flink_starter_kit_kinesis_stream Input Kinesis Data Stream Name session_time_out_in_minutes 10 Session timeout in minutes stream_initial_position TRIM_HORIZON Refer documentation here for more details s3_output_path s3a://<bucket_name>/kda_flink_starter_kit_output s3 path for Flink Application output bucket_check_interval_in_seconds 2 interval for checking time based rolling policies rolling_interval_in_seconds 2 the max time a part file can stay open before having to roll inactivity_interval_in_seconds 2 Sets the interval of allowed inactivity after which a part file will have to roll
-
Log onto AWS console and go to S3, select the bucket you will use. If not create a new bucket and go to the bucket
-
Create a folder with name
kda_flink_starter_kit_jar
-
Create a folder with name
kda_flink_starter_kit_output
-
Open command prompt on your Laptop / MacBook
-
Upload Flink Application Jar file to S3 bucket
aws s3 cp amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar s3://bucket_name/kda_flink_starter_kit_jar/
-
Create Kinesis Stream
aws kinesis create-stream --stream-name kda_flink_starter_kit_kinesis_stream --shard-count 4
-
Create IAM policies. On your terminal, navigate to folder /amazon-kinesis-data-analytics-flink-starter-kit/src/main/resources
-
Policy for CloudWatch Logs
aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch_logs \ --policy-document file://flink_starter_kit_iam_policy_cloudwatch_logs.json
-
Policy for CloudWatch
aws iam create-policy --policy-name flink_starter_kit_iam_policy_cloudwatch \ --policy-document file://flink_starter_kit_iam_policy_cloudwatch.json
-
Policy for Kinesis Data Stream
aws iam create-policy --policy-name flink_starter_kit_iam_policy_kinesis \ --policy-document file://flink_starter_kit_iam_policy_kinesis.json
-
Policy for S3
aws iam create-policy --policy-name flink_starter_kit_iam_policy_s3 \ --policy-document file://flink_starter_kit_iam_policy_s3.json
-
-
Create an IAM role
aws iam create-role --role-name flink_starter_kit_role --assume-role-policy-document file://flink_starter_kit_assume-role-policy-document.json
-
Attach policies to IAM role
flink_starter_kit_role
. Replace <1234567890> with your AWS Account Id before running the commands.-
Policy for CloudWatch Logs
aws iam attach-role-policy --role-name flink_starter_kit_role \ --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch_logs
-
Policy for CloudWatch
aws iam attach-role-policy --role-name flink_starter_kit_role \ --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_cloudwatch
-
Policy for Kinesis
aws iam attach-role-policy --role-name flink_starter_kit_role \ --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_kinesis
-
Policy for S3
aws iam attach-role-policy --role-name flink_starter_kit_role \ --policy-arn arn:aws:iam::<1234567890>:policy/flink_starter_kit_iam_policy_s3
-
-
Open flink_starter_kit_def_stream_position_trim_horizon.json and update the following values:
- AWS account number in attributes ServiceExecutionRole and LogStreamARN
- S3 bucket name for attribute BucketARN
- S3 bucket name for parameter s3_output_path under PropertyMaps
-
Create Log group in CloudWatch Logs
aws logs create-log-group --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit
-
Create Log stream in under the Log group
aws logs create-log-stream --log-group-name /aws/kinesis-analytics/kda_flink_starter_kit \ --log-stream-name kda_flink_starter_kit
-
Run this command to create Kinesis Data Analytics Flink application
aws kinesisanalyticsv2 create-application \ --cli-input-json file://flink_starter_kit_def_stream_position_trim_horizon.json
-
Run this command to start the application
aws kinesisanalyticsv2 start-application \ --cli-input-json file://flink_starter_kit_start_configuration.json
- Login to AWS Console
- Choose or create an S3 bucket to be used to runs this Quick Start
- Go to the S3 bucket, create a folder called
kda_flink_starter_kit
- Go to the folder and upload the Jar generated in the previous section
- Create following IAM policies
- IAM policy with name
flink_starter_kit_iam_policy_s3
using Policy summary sample - IAM policy with name
flink_starter_kit_iam_policy_kinesis
using Policy summary sample - IAM policy with name
flink_starter_kit_iam_policy_cloudwatch
using Policy summary sample - IAM policy with name
flink_starter_kit_iam_policy_cloudwatch_logs
using Policy summary sample
- IAM policy with name
- Create an IAM role with name
kda_flink_starter_kit
and attach above policies - Create a Kinesis Data Stream
- Name =
kda_flink_starter_kit_kinesis_stream
- Number of shards =
6
- Name =
- Create Kinesis Data Analysis Application as follows:
- Application name =
amazon_kda_flink_starter_kit
- Runtime = Apache Flink. Select version 1.8
- Application name =
- Click on Configure
- Amazon S3 bucket = Choose the bucket you selected in Step # 2
- Path to Amazon S3 object = must be the prefix for
amazon-kinesis-data-analytics-flink-starter-kit-1.0.jar
- Under section Access to application resources select Choose from IAM roles that Kinesis Data Analytics can assume
- IAM role = Choose the IAM role created above
- Using the Jar file generated in the above step
- Select the Runtime as Flink 1.8
- IAM role =
the IAM role created above
- Snapshot = Enable
- Monitoring -> Monitoring metrics level = Parallelism
- Monitoring -> Monitoring with CloudWatch Logs -> Enable, Monitoring log level = Info
- Scaling -> Parallelism = 10, Parallelism per KPU = 1
- Under Properties, click on Add group and provide the Group ID as
FlinkAppProperties
. Create properties defined in the section Flink Application Properties
You can use Amazon Kinesis Data Analytics Flink β Benchmarking Utility to generate sample data, test Apache Flink Session Window, and to prove the architecture of this starter kit.
The future releases of this starter kit will include the following features
- Add example(s) for Event Time / Processing Time based streaming application. Refer Apache Flink documentation for more details.
Contributions are welcome, refer CONTRIBUTING.md for more details.
This sample code is made available under the MIT-0 license. See the LICENSE file.