Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement JMS Source/Sink Plugin
Browse files Browse the repository at this point in the history
dardanxh committed Apr 9, 2021
1 parent 6cf8ea3 commit 7fe5119
Showing 37 changed files with 4,341 additions and 0 deletions.
36 changes: 36 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
*.class
.*.swp
.beamer
# Package Files #
*.jar
*.war
*.ear

# Intellij Files & Dir #
*.iml
*.ipr
*.iws
atlassian-ide-plugin.xml
out/
.DS_Store
./lib/
.idea

# Gradle Files & Dir #
build/
.gradle/
.stickyStorage
.build/
target/

# Node log
npm-*.log
logs/

# Singlenode and test data files.
/templates/
/data/
/data-fabric-tests/data/

# generated by docs build
*.pyc
406 changes: 406 additions & 0 deletions checkstyle.xml

Large diffs are not rendered by default.

58 changes: 58 additions & 0 deletions docs/JMS-batchsink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# JMS batch sink


Description
-----------
Produces JMS messages of different types as Message, Text, Bytes and Map to a specified Queue or Topic.

Use Case
--------
Use this JMS Sink plugin when you want to produce messages to a JMS Queue/Topic.


Properties
----------
**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by
default.

**JMS Username**: Username to connect to JMS. This property is mandatory.

**JMS Password**: Password to connect to JMS. This property is mandatory.

**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider
you can set: `tcp://hostname:61616`.

**Type**: Queue or Topic. Queue is considered by default.

**Destination**: Queue/Topic name.

**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ*
provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`.

**JNDI Username**: Username for JNDI. This property is optional.

**JNDI Password**: Password for JNDI. This property is optional.

**Message Type**: The type of messages you intend to produce. A JMS message could be of the following types: *Message*,
*Text*, *Bytes* and *Map*. By default, *Text* message type is considered.

Example
-------
This example reads a record from a JSON file and produces a *MapMessage* with the file's content as a payload.
For every record of the JSON file it produces one *MapMessage* to the given topic.
An example of a JSON record is shown below.

```json
{"name":"foo", "surname":"bar", "age":23}
```
We could see the produced message in the running JMS implementation.

| field name | value |
| ----------------- | ------------------------------------------ |
| messageId | ID:Producer-36705-1609122138230-1:1:1:1:1 |
| messageTimestamp | 1609122138554 |
| destination | topic://MyTopic |
| deliveryNode | 2 |
| expiration | 0 |
| priority | 4 |
| payload | {"name":"foo","surname":"bar","age":23} |
102 changes: 102 additions & 0 deletions docs/JMS-streamingsource.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# JMS Streaming Source


Description
-----------
Consumes JMS messages of different types as Message, Text, Bytes and Map from a specified Queue or Topic.

Use Case
--------
Use this JMS Source plugin when you want to consume messages from a JMS Queue/Topic and write them to a Table.


Properties
----------
**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by
default.

**JMS Username**: Username to connect to JMS. This property is mandatory.

**JMS Password**: Password to connect to JMS. This property is mandatory.

**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider
you can set: `tcp://hostname:61616`.

**Type**: Queue or Topic. Queue is considered by default.

**Destination**: Queue/Topic name.

**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ*
provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`.

**JNDI Username**: Username for JNDI. This property is optional.

**JNDI Password**: Password for JNDI. This property is optional.

**Message Type**: The type of messages you intend to consume. A JMS message could be of the following types: *Message*,
*Text*, *Bytes* and *Map*. By default, *Text* message type is considered. The *payload* field of the output schema gets
switched to the appropriate data type upon the selection of a message type and *validate* button click.


Example
-------
This example reads JMS messages of *TextMessage* type from the *status* topic existing in provider *tcp://hostname
:616161* with JNDI context factory name *org.apache.activemq.jndi.ActiveMQInitialContextFactory*. An example of a
TextMessage object is shown below. Since we used ActiveMQ as a provider, the message is automatically considered as an
*ActiveMQTextMessage* (an implementation of JMS TextMessage interface). The object below shows an
*ActiveMQTextMessage* consumed:

```text
ActiveMQTextMessage {
commandId=5,
responseRequired=true,
messageId=ID: Producer-50444-1608735228752-1: 1: 1: 1: 1,
originalDestination=null,
originalTransactionId=null,
producerId=ID: Producer-50444-1608735228752-1: 1: 1: 1,
destination=topic: topic://status,
transactionId=null,
expiration=0,
timestamp=1608735228894,
arrival=0,
brokerInTime=1608735228895,
brokerOutTime=1608735228896,
correlationId=null,
replyTo=null,
persistent=true,
type=null,
priority=4,
groupID=null,
groupSequence=0,
targetConsumerId=null,
compressed=false,
userID=null,
content=org.apache.activemq.util.ByteSequence@4b9e255,
marshalledProperties=null,
dataStructure=null,
redeliveryCounter=0,
size=0,
properties=null,
readOnlyProperties=true,
readOnlyBody=true,
droppable=false,
jmsXGroupFirstForConsumer=false,
text=DONE
}
```
Since the JMS Source plugin's implementation is purely based in JMS (ie. not coupled in any JMS implementation as for
example ActiveMQ), we consider only the header data supported by JMS. The consumed will output the below
record:

| field name | value |
| ----------------- | ------------------------------------------ |
| messageId | ID:Producer-54511-1608749039578-1:1:1:1:1 |
| messageTimestamp | 1609122138554 |
| deliveryNode | 2 |
| payload | DONE |
| replyTo | topic://status |
| correlationId | null |
| expiration | 0 |
| type | null |
| redelivered | false |
107 changes: 107 additions & 0 deletions examples/JMS-batchsink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.3.0-SNAPSHOT",
"scope": "SYSTEM"
},
"description": "Read records from a json file and generate a MapMessage for each one of them.",
"name": "JMS-batchsink",
"config": {
"resources": {
"memoryMB": 2048,
"virtualCores": 1
},
"driverResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"connections": [
{
"from": "File",
"to": "JMS"
}
],
"comments": [],
"postActions": [],
"properties": {},
"processTimingEnabled": true,
"stageLoggingEnabled": false,
"stages": [
{
"name": "File",
"plugin": {
"name": "File",
"type": "batchsource",
"label": "File",
"artifact": {
"name": "core-plugins",
"version": "2.6.0-SNAPSHOT",
"scope": "USER"
},
"properties": {
"referenceName": "JMS",
"path": "${PATH}",
"format": "json",
"delimiter": ",",
"skipHeader": "true",
"filenameOnly": "false",
"recursive": "false",
"ignoreNonExistingFolders": "false",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
}
],
"id": "File"
},
{
"name": "JMS",
"plugin": {
"name": "JMS",
"type": "batchsink",
"label": "JMS",
"artifact": {
"name": "jms-plugins",
"version": "1.0.0-SNAPSHOT",
"scope": "USER"
},
"properties": {
"referenceName": "JMS",
"type": "Topic",
"messageType": "Map",
"jmsUsername": "${JMS_USERNAME}",
"jmsPassword": "${JMS_PASSWORD}",
"providerUrl": "${PROVIDER_URL}",
"destination": "${DESTINATION}",
"jndiContextFactory": "${JNDI_CONTEXT_FACTORY}",
"jndiUsername": "${JNDI_USERNAME}",
"jndiPassword": "${JNDI_PASSWORD}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": ""
}
],
"inputSchema": [
{
"name": "File",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
}
],
"id": "JMS"
}
],
"schedule": "0 * * * *",
"engine": "spark",
"numOfRecordsPreview": 100,
"description": "Data Pipeline Application",
"maxConcurrentRuns": 1
}
}

99 changes: 99 additions & 0 deletions examples/JMS-streamingsource.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"artifact": {
"name": "cdap-data-streams",
"version": "6.3.0-SNAPSHOT",
"scope": "SYSTEM"
},
"description": "Consume messages from a given topic and write them to a file.",
"name": "JMS-streamingsource",
"config": {
"resources": {
"memoryMB": 2048,
"virtualCores": 1
},
"driverResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"connections": [
{
"from": "JMS",
"to": "File"
}
],
"comments": [],
"postActions": [],
"properties": {
"system.spark.spark.streaming.backpressure.enabled": "true",
"system.spark.spark.executor.instances": "1"
},
"processTimingEnabled": true,
"stageLoggingEnabled": false,
"stages": [
{
"name": "JMS",
"plugin": {
"name": "JMS",
"type": "streamingsource",
"label": "JMS",
"artifact": {
"name": "jms-plugins",
"version": "1.0.0-SNAPSHOT",
"scope": "USER"
},
"properties": {
"referenceName": "JMS",
"schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}",
"type": "Topic",
"messageType": "Text",
"jmsUsername": "${JMS_USERNAME}",
"jmsPassword": "${JMS_PASSWORD}",
"providerUrl": "${PROVIDER_URL}",
"destination": "${DESTINATION}",
"jndiContextFactory": "${JNDI_CONTEXT_FACTORY}"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}",
"id": "JMS"
},
{
"name": "File",
"plugin": {
"name": "File",
"type": "batchsink",
"label": "File",
"artifact": {
"name": "core-plugins",
"version": "2.5.0-SNAPSHOT",
"scope": "SYSTEM"
},
"properties": {
"suffix": "yyyy-MM-dd-HH-mm",
"format": "delimited",
"referenceName": "file",
"path": "${PATH}",
"delimiter": ";",
"schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}"
}
},
"outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
"inputSchema": [
{
"name": "JMS",
"schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}"
}
],
"id": "File"
}
],
"batchInterval": "10s",
"clientResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"disableCheckpoints": true,
"stopGracefully": true,
"description": "Data Streams Application"
}
}

230 changes: 230 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2021 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<name>JMS Plugins</name>
<groupId>io.cdap.plugin</groupId>
<artifactId>jms-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<widgets.dir>widgets</widgets.dir>
<docs.dir>docs</docs.dir>
<main.basedir>${project.basedir}</main.basedir>
<cdap.version>6.2.3</cdap.version>
<cdap.plugin.version>2.6.0-SNAPSHOT</cdap.plugin.version>
<spark.version>2.3.1</spark.version>
<hadoop.version>2.9.1</hadoop.version>
<netty.version>4.1.16.Final</netty.version>
<netty-http.version>1.3.0</netty-http.version>
<avro.version>1.8.2</avro.version>
<junit.version>4.12</junit.version>
<guava.version>27.0.1-jre</guava.version>
<activemq.version>5.11.1</activemq.version>
<mockito.version>2.24.0</mockito.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>hydrator-common</artifactId>
<version>${cdap.plugin.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-api</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-common</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-api-common</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api-spark</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-formats</artifactId>
<version>${cdap.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>hydrator-test</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>antlr</groupId>
<artifactId>antlr</artifactId>
<version>2.7.7</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<suppressionsLocation>suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14.1</version>
<configuration>
<argLine>-Xmx2048m -Djava.awt.headless=true -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC
-XX:OnOutOfMemoryError="kill -9 %p" -XX:+HeapDumpOnOutOfMemoryError
</argLine>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.*;
org.apache.activemq.*;
org.apache.spark.streaming.*;
com.google.common.base.*;
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Embed-Directory>lib</Embed-Directory>
</instructions>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>bundle</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.cdap</groupId>
<artifactId>cdap-maven-plugin</artifactId>
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.2.3, 7.0.0)</parent>
<parent>system:cdap-data-streams[6.2.3, 7.0.0)</parent>
</cdapArtifacts>
</configuration>
<executions>
<execution>
<id>create-artifact-config</id>
<phase>prepare-package</phase>
<goals>
<goal>create-plugin-json</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
172 changes: 172 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ReferencePluginConfig;

import javax.annotation.Nullable;

/**
* Base config for JMS plugins.
*/
public class JMSConfig extends ReferencePluginConfig {

public static final String NAME_CONNECTION_FACTORY = "connectionFactory";
public static final String NAME_JMS_USERNAME = "jmsUsername";
public static final String NAME_JMS_PASSWORD = "jmsPassword";
public static final String NAME_PROVIDER_URL = "providerUrl";
public static final String NAME_TYPE = "type";
public static final String NAME_JNDI_CONTEXT_FACTORY = "jndiContextFactory";
public static final String NAME_JNDI_USERNAME = "jndiUsername";
public static final String NAME_JNDI_PASSWORD = "jndiPassword";

@Name(NAME_CONNECTION_FACTORY)
@Description("Name of the connection factory.")
@Nullable
@Macro
private String connectionFactory; // default: ConnectionFactory

@Name(NAME_JMS_USERNAME)
@Description("Username to connect to JMS.")
@Macro
private String jmsUsername;

@Name(NAME_JMS_PASSWORD)
@Description("Password to connect to JMS.")
@Macro
private String jmsPassword;

@Name(NAME_PROVIDER_URL)
@Description("The URL of the JMS provider. For example, in case of an ActiveMQ Provider, the URL has the format " +
"tcp://hostname:61616.")
@Macro
private String providerUrl;

@Name(NAME_TYPE)
@Description("Queue or Topic.")
@Nullable
@Macro
private String type; // default: queue

@Name(NAME_JNDI_CONTEXT_FACTORY)
@Description("Name of the JNDI context factory. For example, in case of an ActiveMQ Provider, the JNDI Context " +
"Factory is: org.apache.activemq.jndi.ActiveMQInitialContextFactory.")
@Macro
private String jndiContextFactory; // default: org.apache.activemq.jndi.ActiveMQInitialContextFactory

@Name(NAME_JNDI_USERNAME)
@Description("User name for the JNDI.")
@Nullable
@Macro
private String jndiUsername;

@Name(NAME_JNDI_PASSWORD)
@Description("Password for the JNDI.")
@Nullable
@Macro
private String jndiPassword;

public JMSConfig(String referenceName) {
super(referenceName);
this.connectionFactory = "ConnectionFactory";
this.type = JMSDataStructures.QUEUE;
this.jndiContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";

}

@VisibleForTesting
public JMSConfig(String referenceName, String connectionFactory, String jmsUsername, String jmsPassword,
String providerUrl, String type, String jndiContextFactory, String jndiUsername,
String jndiPassword) {
super(referenceName);
this.connectionFactory = connectionFactory;
this.jmsUsername = jmsUsername;
this.jmsPassword = jmsPassword;
this.providerUrl = providerUrl;
this.type = type;
this.jndiContextFactory = jndiContextFactory;
this.jndiUsername = jndiUsername;
this.jndiPassword = jndiPassword;

}

public String getConnectionFactory() {
return connectionFactory;
}

public String getJmsUsername() {
return jmsUsername;
}

public String getJmsPassword() {
return jmsPassword;
}

public String getProviderUrl() {
return providerUrl;
}

public String getType() {
return type;
}

public String getJndiContextFactory() {
return jndiContextFactory;
}

public String getJndiUsername() {
return jndiUsername;
}

public String getJndiPassword() {
return jndiPassword;
}

public void validateParams(FailureCollector failureCollector) {

if (Strings.isNullOrEmpty(jmsUsername) && !containsMacro(NAME_JMS_USERNAME)) {
failureCollector
.addFailure("JMS username must be provided.", "Please provide your JMS username.")
.withConfigProperty(NAME_JMS_USERNAME);
}

if (Strings.isNullOrEmpty(jmsPassword) && !containsMacro(NAME_JMS_PASSWORD)) {
failureCollector
.addFailure("JMS password must be provided.", "Please provide your JMS password.")
.withConfigProperty(NAME_JMS_PASSWORD);
}

if (Strings.isNullOrEmpty(jndiContextFactory) && !containsMacro(NAME_JNDI_CONTEXT_FACTORY)) {
failureCollector
.addFailure("JNDI context factory must be provided.", "Please provide your JNDI" +
" context factory.")
.withConfigProperty(NAME_JNDI_CONTEXT_FACTORY);
}

if (Strings.isNullOrEmpty(providerUrl) && !containsMacro(NAME_PROVIDER_URL)) {
failureCollector
.addFailure("Provider URL must be provided.", "Please provide your provider URL.")
.withConfigProperty(NAME_PROVIDER_URL);
}
}
}
343 changes: 343 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

import com.google.common.base.Strings;
import io.cdap.plugin.jms.sink.JMSBatchSinkConfig;
import io.cdap.plugin.jms.source.JMSStreamingSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
* A facade class that encapsulates the necessary functionality to: get the initial context, resolve the connection
* factory, establish connection to JMS, create a session, resolve the destination by a queue or topic name, create
* producer, create consumer, set message listener to a consumer, and start connection. This class handles exceptions
* for all the functionalities provided.
*/
public class JMSConnection {

private static final Logger LOG = LoggerFactory.getLogger(JMSConnection.class);
private final JMSConfig config;

public JMSConnection(JMSConfig config) {
this.config = config;
}

/**
* Gets the initial context by offering 'jndiContextFactory', 'providerUrl', 'topic'/'queue' name, `jndiUsername`,
* and `jndiPassword` config properties.
*
* @return the {@link InitialContext} from the given properties
*/
public Context getContext() {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, config.getJndiContextFactory());
properties.put(Context.PROVIDER_URL, config.getProviderUrl());

if (config instanceof JMSBatchSinkConfig) {
String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
if (config.getType().equals(JMSDataStructures.TOPIC)) {
properties.put(String.format("topic.%s", destinationName), destinationName);
} else {
properties.put(String.format("queue.%s", destinationName), destinationName);
}
} else {
String sourceName = ((JMSStreamingSourceConfig) config).getSourceName();
if (config.getType().equals(JMSDataStructures.TOPIC)) {
properties.put(String.format("topic.%s", sourceName), sourceName);
} else {
properties.put(String.format("queue.%s", sourceName), sourceName);
}
}

if (!(Strings.isNullOrEmpty(config.getJndiUsername()) && Strings.isNullOrEmpty(config.getJndiPassword()))) {
properties.put(Context.SECURITY_PRINCIPAL, config.getJndiUsername());
properties.put(Context.SECURITY_CREDENTIALS, config.getJndiPassword());
}

try {
return new InitialContext(properties);
} catch (NamingException e) {
throw new RuntimeException("Failed to create initial context for provider URL " + config.getProviderUrl() +
" with principal " + config.getJndiUsername(), e);
}
}

/**
* Gets a {@link ConnectionFactory} by offering the `connectionFactory` config property.
*
* @param context an initial context
* @return a connection factory
*/
public ConnectionFactory getConnectionFactory(Context context) {
try {
return (ConnectionFactory) context.lookup(config.getConnectionFactory());
} catch (NamingException e) {
throw new RuntimeException(String.format("Failed to resolve the connection factory for %s.",
config.getConnectionFactory()), e);
}
}

/**
* Creates a {@link Connection} by offering `jmsUsername` and `jmsPassword`. If a source {@link Topic} is to be
* consumed set `clientId` which is needed by the JMS broker to identify the durable subscriber.
*
* @param connectionFactory a given connection factory
* @return a connection to the JMS broker
*/
public Connection createConnection(ConnectionFactory connectionFactory) {
Connection connection = null;
try {
connection = connectionFactory.createConnection(config.getJmsUsername(), config.getJmsPassword());
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
// If subscribing to a source topic, create a durable subscriber
if (config.getType().equals(JMSDataStructures.TOPIC)) {
try {
if (config instanceof JMSStreamingSourceConfig) {
String clientId = "client-id-" + ((JMSStreamingSourceConfig) config).getSourceName();
connection.setClientID(clientId);
}
} catch (JMSException e) {
throw new RuntimeException("Cannot set Client Id", e);
}
}
return connection;
}

/**
* Starts connection of this client to the JMS broker.
*
* @param connection a given connection
*/
public void startConnection(Connection connection) {
try {
connection.start();
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Stops connection of this client from the JMS broker.
*
* @param connection a given connection
*/
public void stopConnection(Connection connection) {
try {
connection.stop();
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Closes connection of this client from the JMS broker.
*
* @param connection a given connection
*/
public void closeConnection(Connection connection) {
try {
connection.close();
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Creates a {@link Session} between this client and the JMS broker.
*
* @param connection a given session
* @return a session to the JMS broker
*/
public Session createSession(Connection connection) {
try {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Closes the {@link Session} between this client and the JMS broker.
*
* @param session a given session
*/
public void closeSession(Session session) {
try {
session.close();
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Gets the source {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the
* parent class of {@link Topic} and {@link Queue}.
*
* @param context a given context
* @return a source topic/queue that this client is about to consume messages from
*/
public Destination getSource(Context context) {
String sourceName = ((JMSStreamingSourceConfig) config).getSourceName();
if (config.getType().equals(JMSDataStructures.TOPIC)) {
try {
return (Topic) context.lookup(sourceName);
} catch (NamingException e) {
throw new RuntimeException("Failed to resolve the topic " + sourceName, e);
}
} else {
try {
return (Queue) context.lookup(sourceName);
} catch (NamingException e) {
throw new RuntimeException("Failed to resolve the queue " + sourceName, e);
}
}
}

/**
* Gets a sink {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the
* parent class of {@link Topic} and {@link Queue}. If no sink topic/queue name is provided, a sink topic/queue is
* automatically created.
*
* @param context a given context
* @param session a given session needed to create the topic/queue in case it does not exist
* @return a sink topic/queue this client is about to produce messages to
*/
public Destination getSink(Context context, Session session) {
String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();

if (config.getType().equals(JMSDataStructures.TOPIC)) {
try {
return (Topic) context.lookup(destinationName);
} catch (NamingException e) {
LOG.warn("Failed to resolve queue " + destinationName, e);
return createSinkTopic(session);
}
} else {
try {
return (Queue) context.lookup(destinationName);
} catch (NamingException e) {
LOG.warn("Failed to resolve queue " + destinationName, e);
return createSinkQueue(session);
}
}
}

/**
* Creates a sink {@link Topic}
*
* @param session a given session
* @return a created topic
*/
private Destination createSinkTopic(Session session) {
String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
LOG.info("Creating topic " + destinationName);
try {
return session.createTopic(destinationName);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Creates a sink {@link Queue}
*
* @param session a given session
* @return a created queue
*/
private Destination createSinkQueue(Session session) {
String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
LOG.info("Creating queue " + destinationName);
try {
return session.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Creates a {@link MessageConsumer} that consumes messages from a defined source {@link Topic}/{@link Queue}. In case
* of a topic-consumer, a durable subscriber is created. A durable subscriber makes the JMS broker keep the state of
* the offset consumed. Hence this client can restart consuming messages from the last offset not read.
*
* @param session a given session
* @param destination a source topic/queue this client is about to consume messages from
* @return a created message consumer
*/
public MessageConsumer createConsumer(Session session, Destination destination) {
MessageConsumer messageConsumer = null;
try {
if (destination instanceof Topic) {
String clientId = "subscriber-id-" + ((JMSStreamingSourceConfig) config).getSourceName();
messageConsumer = session.createDurableSubscriber((Topic) destination, clientId);
} else {
messageConsumer = session.createConsumer(destination);
}
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return messageConsumer;
}

/**
* Creates a {@link MessageProducer} that produces messages to a defined sink {@link Topic}/{@link Queue}.
*
* @param session a given session
* @param destination a sink topic/queue this client is about to produce messages to
* @return a created message producer
*/
public MessageProducer createProducer(Session session, Destination destination) {
try {
return session.createProducer(destination);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

/**
* Sets a {@link MessageListener} to the {@link MessageConsumer}. This message listener has a `onMessage()` method
* that gets automatically triggered in case a new message is produced to the queue/topic while the pipeline is in
* the RUNNING state.
*
* @param messageListener a given message listener
* @param messageConsumer a given message consumer
*/
public void setMessageListener(MessageListener messageListener, MessageConsumer messageConsumer) {
try {
messageConsumer.setMessageListener(messageListener);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}
}
25 changes: 25 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

/**
* A class that specifies the JMS data structures types.
*/
public class JMSDataStructures {
public static final String QUEUE = "Queue";
public static final String TOPIC = "Topic";
}
58 changes: 58 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

import io.cdap.cdap.api.data.schema.Schema;

import java.util.Arrays;
import java.util.List;

/**
* A class that specifies JMS message metadata fields.
*/
public class JMSMessageMetadata {
public static final String MESSAGE_ID = "messageId";
public static final String MESSAGE_TIMESTAMP = "messageTimestamp";
public static final String CORRELATION_ID = "correlationId";
public static final String REPLY_TO = "replyTo";
public static final String DESTINATION = "destination";
public static final String DELIVERY_MODE = "deliveryNode";
public static final String REDELIVERED = "redelivered";
public static final String TYPE = "type";
public static final String EXPIRATION = "expiration";
public static final String PRIORITY = "priority";

public static Schema.Field getMessageMetadata() {
return Schema.Field.of(JMSMessageParts.METADATA, Schema.recordOf(JMSMessageParts.METADATA, Arrays.asList(
Schema.Field.of(MESSAGE_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of(MESSAGE_TIMESTAMP, Schema.nullableOf(Schema.of(Schema.Type.LONG))),
Schema.Field.of(CORRELATION_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of(REPLY_TO, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of(DESTINATION, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of(DELIVERY_MODE, Schema.nullableOf(Schema.of(Schema.Type.INT))),
Schema.Field.of(REDELIVERED, Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))),
Schema.Field.of(TYPE, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of(EXPIRATION, Schema.nullableOf(Schema.of(Schema.Type.LONG))),
Schema.Field.of(PRIORITY, Schema.nullableOf(Schema.of(Schema.Type.INT))))));
}

public static List<String> getJMSMessageHeaderNames() {
return Arrays.asList(MESSAGE_ID, MESSAGE_TIMESTAMP, CORRELATION_ID, REPLY_TO, DESTINATION, DELIVERY_MODE,
REDELIVERED, TYPE, EXPIRATION, PRIORITY);
}
}

33 changes: 33 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

import java.util.Arrays;
import java.util.List;

/**
* A class that specifies JMS message parts.
*/
public class JMSMessageParts {
public static final String METADATA = "metadata";
public static final String PAYLOAD = "payload";
public static final String PROPERTIES = "properties";

public static List<String> getJMSMessageParts() {
return Arrays.asList(METADATA, PAYLOAD, PROPERTIES);
}
}
125 changes: 125 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

import com.google.gson.Gson;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;

/**
* A class that specifies JMS message properties fields.
*/
public class JMSMessageProperties {

public static void populateProperties(Schema schema, StructuredRecord.Builder builder, Message message,
String messageType) throws JMSException {

boolean isString = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.STRING);
boolean isRecord = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.RECORD);

if (isString) {
populatePropertiesOnStringSchema(builder, message);
} else if (isRecord) {
populatePropertiesOnRecordSchema(schema, builder, message, messageType);
} else {
throw new RuntimeException(
String.format("Failed to populate properties! Field %s can only support String or Record data types!",
JMSMessageParts.PROPERTIES)
);
}
}


/**
* @param builder
* @param message
* @throws JMSException
*/
public static void populatePropertiesOnStringSchema(StructuredRecord.Builder builder, Message message)
throws JMSException {
HashMap<String, Object> properties = new HashMap<>();
List<String> listOfPropertyNames = Collections.list(message.getPropertyNames());

for (String propertyName : listOfPropertyNames) {
properties.put(propertyName, message.getObjectProperty(propertyName));
}

builder.set(JMSMessageParts.PROPERTIES, new Gson().toJson(properties));
}


/**
* @param schema the entire schema of the record
* @param recordBuilder the record builder that we set the properties record into
* @param message the incoming JMS message
* @param messageType the incoming JMS message type
* @throws JMSException
*/
public static void populatePropertiesOnRecordSchema(Schema schema, StructuredRecord.Builder recordBuilder,
Message message, String messageType) throws JMSException {
Schema propertiesSchema = schema.getField(JMSMessageParts.PROPERTIES).getSchema();

StructuredRecord.Builder propertiesRecordBuilder = StructuredRecord.builder(propertiesSchema);

for (Schema.Field field : propertiesSchema.getFields()) {
String name = field.getName();
Schema.Type type = field.getSchema().getType();

if (!message.propertyExists(field.getName())) {
throw new RuntimeException(
String.format("Property \"%1$s\" does not exist in the incoming \"%2$s\" message! " +
"Make sure that you have specified a correct field name in the output schema that " +
"matches a property name in the incoming \"%2$s\" message.", field.getName(), messageType));
}

switch (type) {
case BOOLEAN:
propertiesRecordBuilder.set(name, message.getBooleanProperty(name));
continue;
case BYTES:
propertiesRecordBuilder.set(name, message.getByteProperty(name));
continue;
case INT:
propertiesRecordBuilder.set(name, message.getIntProperty(name));
// short getShortProperty(String var1) throws JMSException;
continue;
case LONG:
propertiesRecordBuilder.set(name, message.getLongProperty(name));
continue;
case FLOAT:
propertiesRecordBuilder.set(name, message.getFloatProperty(name));
continue;
case DOUBLE:
propertiesRecordBuilder.set(name, message.getDoubleProperty(name));
continue;
case STRING:
propertiesRecordBuilder.set(name, message.getStringProperty(name));
continue;
default:
propertiesRecordBuilder.set(name, message.getObjectProperty(name));
continue;
}
}
recordBuilder.set(JMSMessageParts.PROPERTIES, propertiesRecordBuilder.build());
}
}
28 changes: 28 additions & 0 deletions src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.common;

/**
* A class that specifies JMS message types.
*/
public class JMSMessageType {
public static final String MESSAGE = "Message";
public static final String TEXT = "Text";
public static final String BYTES = "Bytes";
public static final String MAP = "Map";
public static final String OBJECT = "Object";
}
82 changes: 82 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceBatchSink;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;
import java.util.stream.Collectors;

/**
* A class that produces {@link StructuredRecord} to a JMS Queue or Topic.
*/
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name("JMS")
@Description("JMS sink to write events to JMS")
public class JMSBatchSink extends ReferenceBatchSink<StructuredRecord, NullWritable, StructuredRecord> {

private final JMSBatchSinkConfig config;

public JMSBatchSink(JMSBatchSinkConfig config) {
super(config);
this.config = config;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validateParams(pipelineConfigurer.getStageConfigurer().getFailureCollector());
pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException();
}

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = context.getInputSchema();

if (schema != null) {
lineageRecorder.createExternalDataset(schema);
if (schema.getFields() != null && !schema.getFields().isEmpty()) {
lineageRecorder.recordWrite("Write", "Wrote to JMS topic.",
schema.getFields().stream()
.map(Schema.Field::getName)
.collect(Collectors.toList()));
}
}

context.addOutput(Output.of(config.referenceName, new JMSOutputFormatProvider(config)));
}

@Override
public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter)
throws IOException {
emitter.emit(new KeyValue<>(null, input));
}
}
114 changes: 114 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.jms.common.JMSConfig;
import io.cdap.plugin.jms.common.JMSMessageType;

import java.io.IOException;
import java.io.Serializable;
import javax.annotation.Nullable;

/**
* Holds configuration required for configuring {@link io.cdap.plugin.jms.source.JMSSourceUtils;} and
* {@link JMSBatchSink}.
*/
public class JMSBatchSinkConfig extends JMSConfig implements Serializable {

// Params
public static final String NAME_DESTINATION = "destinationName";
public static final String NAME_MESSAGE_TYPE = "messageType";
public static final String NAME_OUTPUT_SCHEMA = "schema";

public static final String DESC_DESTINATION = "Name of the destination Queue/Topic. If the given Queue/Topic name" +
"is not resolved, a new Queue/Topic with the given name will get created.";
public static final String DESC_MESSAGE_TYPE = "Supports the following message types: Message, Text, Bytes, Map.";
public static final String DESC_OUTPUT_SCHEMA = "Output schema.";

@Name(NAME_DESTINATION)
@Description(DESC_DESTINATION)
@Macro
private String destinationName;

@Name(NAME_MESSAGE_TYPE)
@Description(DESC_MESSAGE_TYPE)
@Nullable
@Macro
private String messageType; // default: Text

@Name(NAME_OUTPUT_SCHEMA)
@Description(DESC_OUTPUT_SCHEMA)
@Nullable
@Macro
private String schema;

public JMSBatchSinkConfig() {
super("");
this.messageType = Strings.isNullOrEmpty(messageType) ? JMSMessageType.TEXT : messageType;
}

public JMSBatchSinkConfig(String referenceName, String connectionFactory, String jmsUsername,
String jmsPassword, String providerUrl, String type, String jndiContextFactory,
String jndiUsername, String jndiPassword, String messageType, String destinationName) {
super(referenceName, connectionFactory, jmsUsername, jmsPassword, providerUrl, type, jndiContextFactory,
jndiUsername, jndiPassword);
this.destinationName = destinationName;
this.messageType = messageType;
}

public String getDestinationName() {
return destinationName;
}

public void validateParams(FailureCollector failureCollector) {
this.validateParams(failureCollector);

if (Strings.isNullOrEmpty(destinationName) && !containsMacro(NAME_DESTINATION)) {
failureCollector
.addFailure("The destination topic/queue name must be provided!", "Provide your topic/queue name.")
.withConfigProperty(NAME_DESTINATION);
}
}

public String getMessageType() {
if (!Strings.isNullOrEmpty(NAME_MESSAGE_TYPE) && !containsMacro(NAME_MESSAGE_TYPE)) {
return messageType;
}
return JMSMessageType.TEXT;
}

/**
* @return {@link io.cdap.cdap.api.data.schema.Schema} of the dataset if one was given
* @throws IllegalArgumentException if the schema is not a valid JSON
*/
public Schema getSchema() {
if (!Strings.isNullOrEmpty(schema)) {
try {
return Schema.parseJson(schema);
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Invalid schema : %s", e.getMessage()), e);
}
}
return null;
}
}
237 changes: 237 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.format.StructuredRecordStringConverter;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* Utils for {@link JMSBatchSink} plugin
*/
public class JMSBatchSinkUtils {

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link TextMessage}
*
* @param session the open session to the JMS broker
* @param record the incoming record
* @return a JMS text message
*/
public static TextMessage convertStructuredRecordToTextMessage(Session session, StructuredRecord record) {
TextMessage textMessage = null;
String payload = null;

try {
payload = StructuredRecordStringConverter.toJsonString(record);
} catch (IOException e) {
throw new RuntimeException("Failed to convert record to json!", e);
}

try {
textMessage = session.createTextMessage();
textMessage.setText(payload);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return textMessage;
}

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link MapMessage}
*
* @param outputSchema he output schema necessary to properly build the map message.
* @param session the open session to the JMS broker
* @param record the incoming record
* @return a JMS map message
*/
public static MapMessage convertStructuredRecordToMapMessage(Schema outputSchema, Session session,
StructuredRecord record) {
MapMessage mapMessage = null;

if (outputSchema == null) {
throw new RuntimeException("Output schema is empty! Please provide the output schema.");
}

try {
mapMessage = session.createMapMessage();
for (Schema.Field field : outputSchema.getFields()) {
String fieldName = field.getName();
Object value = record.get(fieldName);

switch (field.getSchema().getType()) {
case INT:
mapMessage.setInt(fieldName, cast(value, Integer.class));
break;
case LONG:
mapMessage.setLong(fieldName, cast(value, Long.class));
break;
case DOUBLE:
mapMessage.setDouble(fieldName, cast(value, Double.class));
break;
case FLOAT:
mapMessage.setFloat(fieldName, cast(value, Float.class));
break;
case BOOLEAN:
mapMessage.setBoolean(fieldName, cast(value, Boolean.class));
break;
case BYTES:
mapMessage.setBytes(fieldName, cast(value, byte[].class));
break;
case ARRAY:
mapMessage.setObject(fieldName, value);
break;
case RECORD:
mapMessage.setObject(fieldName, value);
break;
case MAP:
mapMessage.setObject(fieldName, value);
break;
default:
mapMessage.setString(fieldName, cast(value, String.class));
}
}
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return mapMessage;
}

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link BytesMessage}
* @param session the open session to the JMS broker
* @param record the incoming record
* @return a JMS bytes message
*/
public static BytesMessage convertStructuredRecordToBytesMessage(Session session, StructuredRecord record) {
BytesMessage bytesMessage = null;
byte[] payload = null;

try {
payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to convert record to json!", e);
}

try {
bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload);

} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return bytesMessage;
}

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link ObjectMessage}
*
* @param session the open session to the JMS broker
* @param record the incoming record
* @return a JMS object message
*/
public static ObjectMessage convertStructuredRecordToObjectMessage(Session session, StructuredRecord record) {
ObjectMessage objectMessage = null;
byte[] payload = null;

try {
payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to convert record to json!", e);
}

try {
objectMessage = session.createObjectMessage();
objectMessage.setObject(payload);

} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return objectMessage;
}

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link Message}
*
* @param session the open session to the JMS broker
* @param record the incoming record
* @return a JMS message
*/
public static Message convertStructuredRecordToMessage(Schema outputSchema, Session session,
StructuredRecord record) {
Message message = null;

if (outputSchema == null) {
throw new RuntimeException("Output schema is empty! Please provide the output schema.");
}

try {
message = session.createMessage();
for (Schema.Field field : outputSchema.getFields()) {
String fieldName = field.getName();
Object value = record.get(fieldName);

switch (field.getSchema().getType()) {
case INT:
message.setIntProperty(fieldName, cast(value, Integer.class));
break;
case LONG:
message.setLongProperty(fieldName, cast(value, Long.class));
break;
case DOUBLE:
message.setDoubleProperty(fieldName, cast(value, Double.class));
break;
case FLOAT:
message.setFloatProperty(fieldName, cast(value, Float.class));
break;
case BOOLEAN:
message.setBooleanProperty(fieldName, cast(value, Boolean.class));
break;
case ARRAY:
message.setObjectProperty(fieldName, value);
break;
case RECORD:
message.setObjectProperty(fieldName, value);
break;
case MAP:
message.setObjectProperty(fieldName, value);
break;
default:
message.setStringProperty(fieldName, cast(value, String.class));
}
}

} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return message;
}

public static <T> T cast(Object o, Class<T> clazz) {
return o != null ? clazz.cast(o) : null;
}
}
71 changes: 71 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
* Output format to write to JMS.
*/
public class JMSOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {

@Override
public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext context) {
return new JMSRecordWriter(context);
}

@Override
public void checkOutputSpecs(JobContext jobContext) {
// no-op
}

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
return new OutputCommitter() {
@Override
public void setupJob(JobContext jobContext) {
// no-op
}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) {
// no-op
}

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
return false;
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) {
// no-op
}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) {
// no-op
}
};
}
}
50 changes: 50 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;

import java.util.Map;

/**
* JMS Output format provider.
*/
public class JMSOutputFormatProvider implements OutputFormatProvider {

public static final String PROPERTY_CONFIG_JSON = "cdap.jms.sink.config";
private static final Gson GSON = new GsonBuilder().create();
private final Map<String, String> conf;

public JMSOutputFormatProvider(JMSBatchSinkConfig config) {
this.conf = new ImmutableMap.Builder<String, String>()
.put(PROPERTY_CONFIG_JSON, GSON.toJson(config))
.build();
}

@Override
public String getOutputFormatClassName() {
return JMSOutputFormat.class.getName();
}

@Override
public Map<String, String> getOutputFormatConfiguration() {
return conf;
}
}
124 changes: 124 additions & 0 deletions src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.sink;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.jms.common.JMSConnection;
import io.cdap.plugin.jms.common.JMSMessageType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;

/**
* Record writer to produce messages to a JMS Topic/Queue.
*/
public class JMSRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(JMSRecordWriter.class);
private static final Gson GSON = new GsonBuilder().create();

private final JMSBatchSinkConfig config;
private Connection connection;
private Session session;
private MessageProducer messageProducer;
private JMSConnection jmsConnection;

public JMSRecordWriter(TaskAttemptContext context) {
Configuration config = context.getConfiguration();
String configJson = config.get(JMSOutputFormatProvider.PROPERTY_CONFIG_JSON);
this.config = GSON.fromJson(configJson, JMSBatchSinkConfig.class);
this.jmsConnection = new JMSConnection(this.config);
establishConnection();
}

@Override
public void write(NullWritable key, StructuredRecord record) {
String messageType = config.getMessageType();
Schema outputSchema = config.getSchema();

// when the output schema is not specified, the incoming record gets wrapped in a text payload
if (outputSchema == null) {
TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record);
produceMessage(textMessage);
} else {
switch (messageType) {
case JMSMessageType.TEXT:
TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record);
produceMessage(textMessage);
break;
case JMSMessageType.MAP:
MapMessage mapMessage = JMSBatchSinkUtils.convertStructuredRecordToMapMessage(outputSchema, session, record);
produceMessage(mapMessage);
break;
case JMSMessageType.BYTES:
BytesMessage bytesMessage = JMSBatchSinkUtils.convertStructuredRecordToBytesMessage(session, record);
produceMessage(bytesMessage);
break;
case JMSMessageType.MESSAGE:
Message message = JMSBatchSinkUtils.convertStructuredRecordToMessage(outputSchema, session, record);
produceMessage(message);
break;
case JMSMessageType.OBJECT:
ObjectMessage objectMessage = JMSBatchSinkUtils.convertStructuredRecordToObjectMessage(session, record);
produceMessage(objectMessage);
break;
}
}
}

@Override
public void close(TaskAttemptContext taskAttemptContext) {
this.jmsConnection.stopConnection(this.connection);
this.jmsConnection.closeSession(this.session);
this.jmsConnection.closeConnection(this.connection);
}

private void produceMessage(Message message) {
try {
messageProducer.send(message);
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}

private void establishConnection() {
Context context = jmsConnection.getContext();
ConnectionFactory factory = jmsConnection.getConnectionFactory(context);
connection = jmsConnection.createConnection(factory);
session = jmsConnection.createSession(connection);
Destination destination = jmsConnection.getSink(context, session);
messageProducer = jmsConnection.createProducer(session, destination);
}
}
104 changes: 104 additions & 0 deletions src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.source;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.jms.common.JMSConnection;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;

/**
* This class creates a customized message Receiver and implements the <code>MessageListener</code> interface.
*/
public class JMSReceiver extends Receiver<StructuredRecord> implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(JMSReceiver.class);
private JMSStreamingSourceConfig config;
private Connection connection;
private StorageLevel storageLevel;
private Session session;
private JMSConnection jmsConnection;

public JMSReceiver(StorageLevel storageLevel, JMSStreamingSourceConfig config) {
super(storageLevel);
this.storageLevel = storageLevel;
this.config = config;
}

@Override
public void onStart() {
this.jmsConnection = new JMSConnection(config);
Context context = jmsConnection.getContext();
ConnectionFactory factory = jmsConnection.getConnectionFactory(context);
connection = jmsConnection.createConnection(factory);

session = jmsConnection.createSession(connection);
Destination destination = jmsConnection.getSource(context);
MessageConsumer messageConsumer = jmsConnection.createConsumer(session, destination);
jmsConnection.setMessageListener(this, messageConsumer);
jmsConnection.startConnection(connection);

// fetch the entire queue events
if (destination instanceof Queue) {
fetchEntireQueue(messageConsumer);
}
}

@Override
public void onStop() {
this.jmsConnection.stopConnection(this.connection);
this.jmsConnection.closeSession(this.session);
this.jmsConnection.closeConnection(this.connection);
}

@Override
public void onMessage(Message message) {
try {
store(JMSSourceUtils.convertMessage(message, this.config));
} catch (Exception e) {
LOG.error("Message couldn't get stored in the Spark memory.", e);
throw new RuntimeException(e);
}
}

private void fetchEntireQueue(MessageConsumer messageConsumer) {
while (true) {
try {
Message message = messageConsumer.receive(5000);
if (message != null) {
store(JMSSourceUtils.convertMessage(message, this.config));
} else {
break;
}
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
}
}
}
332 changes: 332 additions & 0 deletions src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.source;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.plugin.jms.common.JMSMessageMetadata;
import io.cdap.plugin.jms.common.JMSMessageParts;
import io.cdap.plugin.jms.common.JMSMessageProperties;
import io.cdap.plugin.jms.common.JMSMessageType;
import org.apache.commons.lang.SerializationUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.receiver.Receiver;

import java.io.ByteArrayOutputStream;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;

/**
* An utils class that encapsulates the necessary functionality to convert different types of JMS messages to
* StructuredRecords, and creates an input stream with our customized implementation of receiver {@link JMSReceiver}.
*/
public class JMSSourceUtils {

static JavaDStream<StructuredRecord> getJavaDStream(StreamingContext context,
JMSStreamingSourceConfig config) {
Receiver<StructuredRecord> jmsReceiver = new JMSReceiver(StorageLevel.MEMORY_AND_DISK_SER_2(), config);
return context.getSparkStreamingContext().receiverStream(jmsReceiver);
}

/**
* Creates a {@link StructuredRecord} from a JMS message.
*
* @param message the incoming JMS message
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS message fields
* @throws JMSException in case the method fails to read fields from the JMS message
* @throws IllegalArgumentException in case the user provides a non-supported message type
*/
public static StructuredRecord convertMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException, IllegalArgumentException {
String messageType = null;
if (!Strings.isNullOrEmpty(config.getMessageType())) {
messageType = config.getMessageType();
} else {
throw new RuntimeException("Message type should not be null.");
}

if (message instanceof BytesMessage && messageType.equals(JMSMessageType.BYTES)) {
return convertJMSByteMessage(message, config);
}
if (message instanceof MapMessage && messageType.equals(JMSMessageType.MAP)) {
return convertJMSMapMessage(message, config);
}
if (message instanceof ObjectMessage && messageType.equals(JMSMessageType.OBJECT)) {
return convertJMSObjectMessage(message, config);
}
if (message instanceof Message && messageType.equals(JMSMessageType.MESSAGE)) {
return convertJMSMessage(message, config);
}
if (message instanceof TextMessage && messageType.equals(JMSMessageType.TEXT)) {
return convertJMSTextMessage(message, config);
} else {
throw new IllegalArgumentException("Message type should be one of Message, Text, Bytes, Map, or Object");
}
}

/**
* Creates a {@link StructuredRecord} from a JMS {@link TextMessage}.
*
* @param message the incoming JMS {@link TextMessage}
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS {@link TextMessage} fields
* @throws JMSException in case the method fails to read fields from the JMS message
*/
public static StructuredRecord convertJMSTextMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException {
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(config.getSchema());

if (config.getMessageMetadata()) {
populateMetadata(config.getSchema(), recordBuilder, message);
}
if (config.getMessageProperties()) {
JMSMessageProperties.populateProperties(config.getSchema(), recordBuilder, message, config.getMessageType());
}

recordBuilder.set(JMSMessageParts.PAYLOAD, ((TextMessage) message).getText());
return recordBuilder.build();
}

/**
* Creates a {@link StructuredRecord} from a JMS {@link MapMessage}.
*
* @param message the incoming JMS {@link MapMessage}
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS {@link MapMessage} fields
* @throws JMSException in case the method fails to read fields from the JMS message
*/
public static StructuredRecord convertJMSMapMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException {
Schema schema = config.getSchema();
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);

Schema payloadSchema = schema.getField(JMSMessageParts.PAYLOAD).getSchema();
StructuredRecord.Builder payloadRecordBuilder = StructuredRecord.builder(payloadSchema);

if (config.getMessageMetadata()) {
populateMetadata(schema, recordBuilder, message);
}
if (config.getMessageProperties()) {
JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
}

if (payloadSchema.getType().equals(Schema.Type.STRING)) {
Map<String, Object> payload = new HashMap<>();
Enumeration<String> names = ((MapMessage) message).getMapNames();
while (names.hasMoreElements()) {
String key = names.nextElement();
payload.put(key, ((MapMessage) message).getObject(key));
}
payloadRecordBuilder.set(JMSMessageParts.PAYLOAD, new Gson().toJson(payload));
} else {
for (Schema.Field field : config.getDataFields(config.getSchema(), null)) {
Schema.Type type = field.getSchema().getType();
String name = field.getName();

if (type.equals(Schema.Type.UNION)) {
type = field.getSchema().getUnionSchema(0).getType();
}

switch (type) {
case BOOLEAN:
payloadRecordBuilder.set(name, ((MapMessage) message).getBoolean(name));
break;
case INT:
payloadRecordBuilder.set(name, ((MapMessage) message).getInt(name));
break;
case LONG:
payloadRecordBuilder.set(name, ((MapMessage) message).getLong(name));
break;
case FLOAT:
payloadRecordBuilder.set(name, ((MapMessage) message).getFloat(name));
break;
case DOUBLE:
payloadRecordBuilder.set(name, ((MapMessage) message).getDouble(name));
break;
case BYTES:
payloadRecordBuilder.set(name, ((MapMessage) message).getByte(name));
break;
case STRING:
payloadRecordBuilder.set(name, ((MapMessage) message).getString(name));
break;
case ARRAY: // byte array only
Schema.Type itemType = field.getSchema().getComponentSchema().getType();
if (itemType.equals(Schema.Type.UNION)) {
itemType = field.getSchema().getComponentSchema().getUnionSchema(0).getType();
}
if (itemType.equals(Schema.Type.BYTES)) {
payloadRecordBuilder.set(name, ((MapMessage) message).getBytes(name));
}
break;
}
}
}
recordBuilder.set(JMSMessageParts.PAYLOAD, payloadRecordBuilder.build());
return recordBuilder.build();
}

/**
* Creates a {@link StructuredRecord} from a JMS {@link BytesMessage}.
*
* @param message the incoming JMS {@link BytesMessage}
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS {@link BytesMessage} fields
* @throws JMSException in case the method fails to read fields from the JMS message
*/
public static StructuredRecord convertJMSByteMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException {
Schema schema = config.getSchema();
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);

if (config.getMessageMetadata()) {
populateMetadata(config.getSchema(), recordBuilder, message);
}
if (config.getMessageProperties()) {
JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
}

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[8096];
int currentByte;
while ((currentByte = ((BytesMessage) message).readBytes(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, currentByte);
}
recordBuilder.set(JMSMessageParts.PAYLOAD, byteArrayOutputStream.toByteArray());
return recordBuilder.build();
}

/**
* Creates a {@link StructuredRecord} from a JMS {@link ObjectMessage}.
*
* @param message the incoming JMS {@link ObjectMessage}
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS {@link ObjectMessage} fields
* @throws JMSException in case the method fails to read fields from the JMS message
*/
public static StructuredRecord convertJMSObjectMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException {
Schema schema = config.getSchema();
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);

if (config.getMessageMetadata()) {
populateMetadata(config.getSchema(), recordBuilder, message);
}
if (config.getMessageProperties()) {
JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
}

byte[] payload = SerializationUtils.serialize(((ObjectMessage) message).getObject());
recordBuilder.set(JMSMessageParts.PAYLOAD, payload);
return recordBuilder.build();
}

/**
* Creates a {@link StructuredRecord} from a JMS {@link Message}.
*
* @param message the incoming JMS {@link Message}
* @param config the {@link JMSStreamingSourceConfig} with all user provided property values
* @return the {@link StructuredRecord} built out of the JMS {@link Message} fields
* @throws JMSException in case the method fails to read fields from the JMS message
*/
public static StructuredRecord convertJMSMessage(Message message, JMSStreamingSourceConfig config)
throws JMSException {
Schema schema = config.getSchema();
StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);

if (config.getMessageMetadata()) {
populateMetadata(config.getSchema(), recordBuilder, message);
}
if (config.getMessageProperties()) {
JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
}
return recordBuilder.build();
}

/**
* Gets header data fields from the JMS message and adds them to the passed record builder.
*
* @param schema the entire schema of the record
* @param recordBuilder the record builder that we set the metadata record into
* @param message the incoming JMS message
* @throws JMSException in case the method fails to read fields from the JMS message
*/
private static void populateMetadata(Schema schema, StructuredRecord.Builder recordBuilder, Message message)
throws JMSException {
Schema metadataSchema = schema.getField(JMSMessageParts.METADATA).getSchema();
StructuredRecord.Builder metadataRecordBuilder = StructuredRecord.builder(metadataSchema);

for (Schema.Field field : metadataSchema.getFields()) {

switch (field.getName()) {
case JMSMessageMetadata.MESSAGE_ID:
metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_ID, message.getJMSMessageID());
break;

case JMSMessageMetadata.CORRELATION_ID:
metadataRecordBuilder.set(JMSMessageMetadata.CORRELATION_ID, message.getJMSCorrelationID());
break;

case JMSMessageMetadata.REPLY_TO:
//todo: handle NPE
metadataRecordBuilder.set(JMSMessageMetadata.REPLY_TO, message.getJMSReplyTo().toString());
break;

case JMSMessageMetadata.DESTINATION:
//todo: handle NPE
metadataRecordBuilder.set(JMSMessageMetadata.DESTINATION, message.getJMSDestination().toString());
break;

case JMSMessageMetadata.TYPE:
metadataRecordBuilder.set(JMSMessageMetadata.TYPE, message.getJMSType());
break;

case JMSMessageMetadata.MESSAGE_TIMESTAMP:
metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_TIMESTAMP, message.getJMSTimestamp());
break;

case JMSMessageMetadata.DELIVERY_MODE:
metadataRecordBuilder.set(JMSMessageMetadata.DELIVERY_MODE, message.getJMSDeliveryMode());
break;

case JMSMessageMetadata.REDELIVERED:
metadataRecordBuilder.set(JMSMessageMetadata.REDELIVERED, message.getJMSRedelivered());
break;

case JMSMessageMetadata.EXPIRATION:
metadataRecordBuilder.set(JMSMessageMetadata.EXPIRATION, message.getJMSExpiration());
break;

case JMSMessageMetadata.PRIORITY:
metadataRecordBuilder.set(JMSMessageMetadata.PRIORITY, message.getJMSPriority());
break;
}
recordBuilder.set(JMSMessageParts.METADATA, metadataRecordBuilder.build());
}
}
}
78 changes: 78 additions & 0 deletions src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.jms.source;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import org.apache.spark.streaming.api.java.JavaDStream;

import java.util.stream.Collectors;

/**
* This class <code>JMSStreamingSource</code> is a plugin that allows consuming messages from a specified JMS
* Queue/Topic and generate
* StructuredRecords out of them.
*/
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("JMS")
@Description("JMS (Java Messaging Service) Source")
public class JMSStreamingSource extends ReferenceStreamingSource<StructuredRecord> {

private JMSStreamingSourceConfig config;

public JMSStreamingSource(JMSStreamingSourceConfig config) {
super(config);
this.config = config;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
}

@Override
public void prepareRun(StreamingSourceContext context) throws Exception {
Schema schema = config.getSchema();
// record dataset lineage
context.registerLineage(config.referenceName, schema);

if (schema.getFields() != null) {
LineageRecorder recorder = new LineageRecorder(context, config.referenceName);
recorder.recordRead("Read", "Read from jms",
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
}
}

@Override
public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws Exception {
FailureCollector collector = context.getFailureCollector();
config.validate(collector);
collector.getOrThrowException();
return JMSSourceUtils.getJavaDStream(context, config);
}
}
Loading

0 comments on commit 7fe5119

Please sign in to comment.