Skip to content

Commit

Permalink
kinesis data streams
Browse files Browse the repository at this point in the history
  • Loading branch information
omenking committed Mar 4, 2024
1 parent ea4ba35 commit af2ea15
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 0 deletions.
104 changes: 104 additions & 0 deletions kinesis/datastreams/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
## KPL

Generate a new java project.
This will create a new directory call kpl-example with a bare bones java project.

```sh
mvn archetype:generate \
-DgroupId=co.exampro \
-DartifactId=kpl-example \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false
```

In the pom.xml we will add into the project tag profiles tag
and change our build directory for the outputed binary.

You will have to change the directory based on your developer enviroment.
In our configuration we are using Gitpod so we are leveraging an env var.
You could hardcode this value to go where you need it to go.

```xml
<profiles>
<profile>
<id>dev</id>
<build>
<directory>${env.THEIA_WORKSPACE_ROOT}/kinesis/datastreams/kpl-example/output</directory>
</build>
</profile>
</profiles>
```

We need to add the KPL to our pom.xml depenednecies

https://central.sonatype.com/artifact/com.amazonaws/amazon-kinesis-producer

```xml
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.15.9</version>
</dependency>
```

To package the dependencies will need to use Maven Shade Plugin
- provides the capability to package the artifact in an uber-jar, including its dependencies and to shade

```xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>co.exampro.App</mainClass>
</transformer>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedArtifactId>application</shadedArtifactId>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
```

Build your binary

```sh
mvn clean package shade:shade -Pdev
```

Run your binary

cd $THEIA_WORKSPACE_ROOT/output
java -jar application-1.0-SNAPSHOT-shaded.jar

# Set your env vars

export DATA_STREAM_NAME="datastream-MyStream-mVSDDqynVfJj"
export DATA_STREAM_PARTITION_KEY="456"
export DATA_STREAM_SHARD="shardId-000000000000"

# Test your Datastream using the AWS CLI

Write data
```sh
echo 'Send reinforcements' | base64
aws kinesis put-record --stream-name $DATA_STREAM_NAME --partition-key $DATA_STREAM_PARTITION_KEY --data U2VuZCByZWluZm9yY2VtZW50cwo=
```

Get data

```sh
export SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id $DATA_STREAM_SHARD --shard-iterator-type TRIM_HORIZON --stream-name $DATA_STREAM_NAME --query 'ShardIterator')
aws kinesis get-records --shard-iterator $SHARD_ITERATOR
```
13 changes: 13 additions & 0 deletions kinesis/datastreams/deploy
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

echo "== deploy Data Stream"

STACK_NAME="datastream"

# https://awscli.amazonaws.com/v2/documentation/api/latest/reference/cloudformation/deploy/index.html
aws cloudformation deploy \
--template-file template.yaml \
--capabilities CAPABILITY_NAMED_IAM \
--no-execute-changeset \
--region ca-central-1 \
--stack-name $STACK_NAME
114 changes: 114 additions & 0 deletions kinesis/datastreams/kpl-example/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<profiles>
<profile>
<id>dev</id>
<build>
<directory>${env.THEIA_WORKSPACE_ROOT}/kinesis/datastreams/kpl-example/output</directory>
</build>
</profile>
</profiles>

<groupId>co.exampro</groupId>
<artifactId>kpl-example</artifactId>
<version>1.0-SNAPSHOT</version>

<name>kpl-example</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.15.9</version>
</dependency>
</dependencies>

<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>co.exampro.App</mainClass>
</transformer>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedArtifactId>application</shadedArtifactId>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</pluginManagement>
</build>
</project>
68 changes: 68 additions & 0 deletions kinesis/datastreams/kpl-example/src/main/java/co/exampro/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package co.exampro;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import java.nio.ByteBuffer;

import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.amazonaws.services.kinesis.producer.Attempt;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.LinkedList;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException; // For handling the exceptions from Future.get()
import java.nio.charset.StandardCharsets; // For using StandardCharsets.UTF_8


public class App
{
public static void main( String[] args )
{
// Need to update these!
String streamName = System.getenv("DATA_STREAM_NAME");
String partitionKey = System.getenv("DATA_STREAM_PARTITION_KEY");

System.out.println("Hello World!");

System.out.println("DATA_STREAM_NAME: " + streamName);
System.out.println("DATA_PARTITION_KEY: " + partitionKey);

KinesisProducer kinesis = new KinesisProducer();
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>();

for (int i = 0; i < 5; ++i) {
ByteBuffer data = null;
System.out.println("LOOP");
try {
data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
e.printStackTrace(); // handle the exception here
}
// doesn't block

putFutures.add(
kinesis.addUserRecord(streamName, partitionKey, data)
);
}


for (Future<UserRecordResult> f : putFutures) {
try {
UserRecordResult result = f.get(); // This does block
if (result.isSuccessful()) {
System.out.println("Put record into shard " + result.getShardId());
} else {
for (Attempt attempt : result.getAttempts()) {
// Analyze and respond to the failure
System.out.println("Record Put Failed");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // handle the InterruptedException
System.out.println("Interrupted Exception");
} catch (ExecutionException e) {
System.out.println("Execution Exception");
e.printStackTrace(); // handle the ExecutionException
}
}
System.out.println( "Goodbye World!" );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package co.exampro;

import static org.junit.Assert.assertTrue;

import org.junit.Test;

/**
* Unit test for simple App.
*/
public class AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
}
Binary file not shown.
Binary file not shown.
11 changes: 11 additions & 0 deletions kinesis/datastreams/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
AWSTemplateFormatVersion: "2010-09-09"
Description: A Simple Kinesis Data Streams Provisioned
Resources:
MyStream:
Type: AWS::Kinesis::Stream
Properties:
# Name: MyStream
# RetentionPeriodHours: 24
ShardCount: 1
StreamModeDetails:
StreamMode: PROVISIONED
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit af2ea15

Please sign in to comment.