diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b0df7b7
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,36 @@
+*.class
+.*.swp
+.beamer
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# Intellij Files & Dir #
+*.iml
+*.ipr
+*.iws
+atlassian-ide-plugin.xml
+out/
+.DS_Store
+./lib/
+.idea
+
+# Gradle Files & Dir #
+build/
+.gradle/
+.stickyStorage
+.build/
+target/
+
+# Node log
+npm-*.log
+logs/
+
+# Singlenode and test data files.
+/templates/
+/data/
+/data-fabric-tests/data/
+
+# generated by docs build
+*.pyc
diff --git a/checkstyle.xml b/checkstyle.xml
new file mode 100644
index 0000000..2b6e777
--- /dev/null
+++ b/checkstyle.xml
@@ -0,0 +1,406 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/JMS-batchsink.md b/docs/JMS-batchsink.md
new file mode 100644
index 0000000..74ccae6
--- /dev/null
+++ b/docs/JMS-batchsink.md
@@ -0,0 +1,58 @@
+# JMS batch sink
+
+
+Description
+-----------
+Produces JMS messages of different types as Message, Text, Bytes and Map to a specified Queue or Topic.
+
+Use Case
+--------
+Use this JMS Sink plugin when you want to produce messages to a JMS Queue/Topic.
+
+
+Properties
+----------
+**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by
+ default.
+
+**JMS Username**: Username to connect to JMS. This property is mandatory.
+
+**JMS Password**: Password to connect to JMS. This property is mandatory.
+
+**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider
+ you can set: `tcp://hostname:61616`.
+
+**Type**: Queue or Topic. Queue is considered by default.
+
+**Destination**: Queue/Topic name.
+
+**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ*
+provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`.
+
+**JNDI Username**: Username for JNDI. This property is optional.
+
+**JNDI Password**: Password for JNDI. This property is optional.
+
+**Message Type**: The type of messages you intend to produce. A JMS message could be of the following types: *Message*,
+ *Text*, *Bytes* and *Map*. By default, *Text* message type is considered.
+
+Example
+-------
+This example reads a record from a JSON file and produces a *MapMessage* with the file's content as a payload.
+For every record of the JSON file it produces one *MapMessage* to the given topic.
+An example of a JSON record is shown below.
+
+```json
+{"name":"foo", "surname":"bar", "age":23}
+```
+We could see the produced message in the running JMS implementation.
+
+| field name | value |
+| ----------------- | ------------------------------------------ |
+| messageId | ID:Producer-36705-1609122138230-1:1:1:1:1 |
+| messageTimestamp | 1609122138554 |
+| destination | topic://MyTopic |
+| deliveryNode | 2 |
+| expiration | 0 |
+| priority | 4 |
+| payload | {"name":"foo","surname":"bar","age":23} |
diff --git a/docs/JMS-streamingsource.md b/docs/JMS-streamingsource.md
new file mode 100644
index 0000000..38a4101
--- /dev/null
+++ b/docs/JMS-streamingsource.md
@@ -0,0 +1,102 @@
+# JMS Streaming Source
+
+
+Description
+-----------
+Consumes JMS messages of different types as Message, Text, Bytes and Map from a specified Queue or Topic.
+
+Use Case
+--------
+Use this JMS Source plugin when you want to consume messages from a JMS Queue/Topic and write them to a Table.
+
+
+Properties
+----------
+**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by
+ default.
+
+**JMS Username**: Username to connect to JMS. This property is mandatory.
+
+**JMS Password**: Password to connect to JMS. This property is mandatory.
+
+**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider
+ you can set: `tcp://hostname:61616`.
+
+**Type**: Queue or Topic. Queue is considered by default.
+
+**Destination**: Queue/Topic name.
+
+**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ*
+provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`.
+
+**JNDI Username**: Username for JNDI. This property is optional.
+
+**JNDI Password**: Password for JNDI. This property is optional.
+
+**Message Type**: The type of messages you intend to consume. A JMS message could be of the following types: *Message*,
+ *Text*, *Bytes* and *Map*. By default, *Text* message type is considered. The *payload* field of the output schema gets
+ switched to the appropriate data type upon the selection of a message type and *validate* button click.
+
+
+Example
+-------
+This example reads JMS messages of *TextMessage* type from the *status* topic existing in provider *tcp://hostname
+:616161* with JNDI context factory name *org.apache.activemq.jndi.ActiveMQInitialContextFactory*. An example of a
+TextMessage object is shown below. Since we used ActiveMQ as a provider, the message is automatically considered as an
+*ActiveMQTextMessage* (an implementation of JMS TextMessage interface). The object below shows an
+*ActiveMQTextMessage* consumed:
+
+```text
+ActiveMQTextMessage {
+ commandId=5,
+ responseRequired=true,
+ messageId=ID: Producer-50444-1608735228752-1: 1: 1: 1: 1,
+ originalDestination=null,
+ originalTransactionId=null,
+ producerId=ID: Producer-50444-1608735228752-1: 1: 1: 1,
+ destination=topic: topic://status,
+ transactionId=null,
+ expiration=0,
+ timestamp=1608735228894,
+ arrival=0,
+ brokerInTime=1608735228895,
+ brokerOutTime=1608735228896,
+ correlationId=null,
+ replyTo=null,
+ persistent=true,
+ type=null,
+ priority=4,
+ groupID=null,
+ groupSequence=0,
+ targetConsumerId=null,
+ compressed=false,
+ userID=null,
+ content=org.apache.activemq.util.ByteSequence@4b9e255,
+ marshalledProperties=null,
+ dataStructure=null,
+ redeliveryCounter=0,
+ size=0,
+ properties=null,
+ readOnlyProperties=true,
+ readOnlyBody=true,
+ droppable=false,
+ jmsXGroupFirstForConsumer=false,
+ text=DONE
+}
+
+```
+Since the JMS Source plugin's implementation is purely based in JMS (ie. not coupled in any JMS implementation as for
+example ActiveMQ), we consider only the header data supported by JMS. The consumed will output the below
+record:
+
+| field name | value |
+| ----------------- | ------------------------------------------ |
+| messageId | ID:Producer-54511-1608749039578-1:1:1:1:1 |
+| messageTimestamp | 1609122138554 |
+| deliveryNode | 2 |
+| payload | DONE |
+| replyTo | topic://status |
+| correlationId | null |
+| expiration | 0 |
+| type | null |
+| redelivered | false |
diff --git a/examples/JMS-batchsink.json b/examples/JMS-batchsink.json
new file mode 100644
index 0000000..bbe2c24
--- /dev/null
+++ b/examples/JMS-batchsink.json
@@ -0,0 +1,107 @@
+{
+ "artifact": {
+ "name": "cdap-data-pipeline",
+ "version": "6.3.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "description": "Read records from a json file and generate a MapMessage for each one of them.",
+ "name": "JMS-batchsink",
+ "config": {
+ "resources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "driverResources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "connections": [
+ {
+ "from": "File",
+ "to": "JMS"
+ }
+ ],
+ "comments": [],
+ "postActions": [],
+ "properties": {},
+ "processTimingEnabled": true,
+ "stageLoggingEnabled": false,
+ "stages": [
+ {
+ "name": "File",
+ "plugin": {
+ "name": "File",
+ "type": "batchsource",
+ "label": "File",
+ "artifact": {
+ "name": "core-plugins",
+ "version": "2.6.0-SNAPSHOT",
+ "scope": "USER"
+ },
+ "properties": {
+ "referenceName": "JMS",
+ "path": "${PATH}",
+ "format": "json",
+ "delimiter": ",",
+ "skipHeader": "true",
+ "filenameOnly": "false",
+ "recursive": "false",
+ "ignoreNonExistingFolders": "false",
+ "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
+ }
+ },
+ "outputSchema": [
+ {
+ "name": "etlSchemaBody",
+ "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
+ }
+ ],
+ "id": "File"
+ },
+ {
+ "name": "JMS",
+ "plugin": {
+ "name": "JMS",
+ "type": "batchsink",
+ "label": "JMS",
+ "artifact": {
+ "name": "jms-plugins",
+ "version": "1.0.0-SNAPSHOT",
+ "scope": "USER"
+ },
+ "properties": {
+ "referenceName": "JMS",
+ "type": "Topic",
+ "messageType": "Map",
+ "jmsUsername": "${JMS_USERNAME}",
+ "jmsPassword": "${JMS_PASSWORD}",
+ "providerUrl": "${PROVIDER_URL}",
+ "destination": "${DESTINATION}",
+ "jndiContextFactory": "${JNDI_CONTEXT_FACTORY}",
+ "jndiUsername": "${JNDI_USERNAME}",
+ "jndiPassword": "${JNDI_PASSWORD}"
+ }
+ },
+ "outputSchema": [
+ {
+ "name": "etlSchemaBody",
+ "schema": ""
+ }
+ ],
+ "inputSchema": [
+ {
+ "name": "File",
+ "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
+ }
+ ],
+ "id": "JMS"
+ }
+ ],
+ "schedule": "0 * * * *",
+ "engine": "spark",
+ "numOfRecordsPreview": 100,
+ "description": "Data Pipeline Application",
+ "maxConcurrentRuns": 1
+ }
+}
+
diff --git a/examples/JMS-streamingsource.json b/examples/JMS-streamingsource.json
new file mode 100644
index 0000000..05fde74
--- /dev/null
+++ b/examples/JMS-streamingsource.json
@@ -0,0 +1,99 @@
+{
+ "artifact": {
+ "name": "cdap-data-streams",
+ "version": "6.3.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "description": "Consume messages from a given topic and write them to a file.",
+ "name": "JMS-streamingsource",
+ "config": {
+ "resources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "driverResources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "connections": [
+ {
+ "from": "JMS",
+ "to": "File"
+ }
+ ],
+ "comments": [],
+ "postActions": [],
+ "properties": {
+ "system.spark.spark.streaming.backpressure.enabled": "true",
+ "system.spark.spark.executor.instances": "1"
+ },
+ "processTimingEnabled": true,
+ "stageLoggingEnabled": false,
+ "stages": [
+ {
+ "name": "JMS",
+ "plugin": {
+ "name": "JMS",
+ "type": "streamingsource",
+ "label": "JMS",
+ "artifact": {
+ "name": "jms-plugins",
+ "version": "1.0.0-SNAPSHOT",
+ "scope": "USER"
+ },
+ "properties": {
+ "referenceName": "JMS",
+ "schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}",
+ "type": "Topic",
+ "messageType": "Text",
+ "jmsUsername": "${JMS_USERNAME}",
+ "jmsPassword": "${JMS_PASSWORD}",
+ "providerUrl": "${PROVIDER_URL}",
+ "destination": "${DESTINATION}",
+ "jndiContextFactory": "${JNDI_CONTEXT_FACTORY}"
+ }
+ },
+ "outputSchema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}",
+ "id": "JMS"
+ },
+ {
+ "name": "File",
+ "plugin": {
+ "name": "File",
+ "type": "batchsink",
+ "label": "File",
+ "artifact": {
+ "name": "core-plugins",
+ "version": "2.5.0-SNAPSHOT",
+ "scope": "SYSTEM"
+ },
+ "properties": {
+ "suffix": "yyyy-MM-dd-HH-mm",
+ "format": "delimited",
+ "referenceName": "file",
+ "path": "${PATH}",
+ "delimiter": ";",
+ "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}"
+ }
+ },
+ "outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
+ "inputSchema": [
+ {
+ "name": "JMS",
+ "schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}"
+ }
+ ],
+ "id": "File"
+ }
+ ],
+ "batchInterval": "10s",
+ "clientResources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "disableCheckpoints": true,
+ "stopGracefully": true,
+ "description": "Data Streams Application"
+ }
+}
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6082a1e
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,230 @@
+
+
+
+
+
+ 4.0.0
+ JMS Plugins
+ io.cdap.plugin
+ jms-plugins
+ 1.0.0-SNAPSHOT
+
+
+ UTF-8
+ widgets
+ docs
+ ${project.basedir}
+ 6.2.3
+ 2.6.0-SNAPSHOT
+ 2.3.1
+ 2.9.1
+ 4.1.16.Final
+ 1.3.0
+ 1.8.2
+ 4.12
+ 27.0.1-jre
+ 5.11.1
+ 2.24.0
+
+
+
+
+ org.apache.activemq
+ activemq-all
+ ${activemq.version}
+
+
+ io.cdap.plugin
+ hydrator-common
+ ${cdap.plugin.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-common
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api-common
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-etl-api-spark
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-formats
+ ${cdap.version}
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ io.cdap.cdap
+ hydrator-test
+ ${cdap.version}
+ test
+
+
+ org.apache.spark
+ spark-streaming_2.11
+ ${spark.version}
+ provided
+
+
+ antlr
+ antlr
+ 2.7.7
+ compile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.17
+
+
+ validate
+ validate
+
+ checkstyle.xml
+ suppressions.xml
+ UTF-8
+ true
+ true
+ true
+
+
+ check
+
+
+
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 6.19
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.7.0
+
+ 1.8
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.14.1
+
+ -Xmx2048m -Djava.awt.headless=true -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC
+ -XX:OnOutOfMemoryError="kill -9 %p" -XX:+HeapDumpOnOutOfMemoryError
+
+ false
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 3.3.0
+
+
+ <_exportcontents>
+ io.cdap.plugin.*;
+ org.apache.activemq.*;
+ org.apache.spark.streaming.*;
+ com.google.common.base.*;
+
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ package
+
+ bundle
+
+
+
+
+
+ io.cdap
+ cdap-maven-plugin
+ 1.1.0
+
+
+ system:cdap-data-pipeline[6.2.3, 7.0.0)
+ system:cdap-data-streams[6.2.3, 7.0.0)
+
+
+
+
+ create-artifact-config
+ prepare-package
+
+ create-plugin-json
+
+
+
+
+
+
+
+
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java b/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java
new file mode 100644
index 0000000..b9872a8
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.ReferencePluginConfig;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base config for JMS plugins.
+ */
+public class JMSConfig extends ReferencePluginConfig {
+
+ public static final String NAME_CONNECTION_FACTORY = "connectionFactory";
+ public static final String NAME_JMS_USERNAME = "jmsUsername";
+ public static final String NAME_JMS_PASSWORD = "jmsPassword";
+ public static final String NAME_PROVIDER_URL = "providerUrl";
+ public static final String NAME_TYPE = "type";
+ public static final String NAME_JNDI_CONTEXT_FACTORY = "jndiContextFactory";
+ public static final String NAME_JNDI_USERNAME = "jndiUsername";
+ public static final String NAME_JNDI_PASSWORD = "jndiPassword";
+
+ @Name(NAME_CONNECTION_FACTORY)
+ @Description("Name of the connection factory.")
+ @Nullable
+ @Macro
+ private String connectionFactory; // default: ConnectionFactory
+
+ @Name(NAME_JMS_USERNAME)
+ @Description("Username to connect to JMS.")
+ @Macro
+ private String jmsUsername;
+
+ @Name(NAME_JMS_PASSWORD)
+ @Description("Password to connect to JMS.")
+ @Macro
+ private String jmsPassword;
+
+ @Name(NAME_PROVIDER_URL)
+ @Description("The URL of the JMS provider. For example, in case of an ActiveMQ Provider, the URL has the format " +
+ "tcp://hostname:61616.")
+ @Macro
+ private String providerUrl;
+
+ @Name(NAME_TYPE)
+ @Description("Queue or Topic.")
+ @Nullable
+ @Macro
+ private String type; // default: queue
+
+ @Name(NAME_JNDI_CONTEXT_FACTORY)
+ @Description("Name of the JNDI context factory. For example, in case of an ActiveMQ Provider, the JNDI Context " +
+ "Factory is: org.apache.activemq.jndi.ActiveMQInitialContextFactory.")
+ @Macro
+ private String jndiContextFactory; // default: org.apache.activemq.jndi.ActiveMQInitialContextFactory
+
+ @Name(NAME_JNDI_USERNAME)
+ @Description("User name for the JNDI.")
+ @Nullable
+ @Macro
+ private String jndiUsername;
+
+ @Name(NAME_JNDI_PASSWORD)
+ @Description("Password for the JNDI.")
+ @Nullable
+ @Macro
+ private String jndiPassword;
+
+ public JMSConfig(String referenceName) {
+ super(referenceName);
+ this.connectionFactory = "ConnectionFactory";
+ this.type = JMSDataStructures.QUEUE;
+ this.jndiContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
+
+ }
+
+ @VisibleForTesting
+ public JMSConfig(String referenceName, String connectionFactory, String jmsUsername, String jmsPassword,
+ String providerUrl, String type, String jndiContextFactory, String jndiUsername,
+ String jndiPassword) {
+ super(referenceName);
+ this.connectionFactory = connectionFactory;
+ this.jmsUsername = jmsUsername;
+ this.jmsPassword = jmsPassword;
+ this.providerUrl = providerUrl;
+ this.type = type;
+ this.jndiContextFactory = jndiContextFactory;
+ this.jndiUsername = jndiUsername;
+ this.jndiPassword = jndiPassword;
+
+ }
+
+ public String getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public String getJmsUsername() {
+ return jmsUsername;
+ }
+
+ public String getJmsPassword() {
+ return jmsPassword;
+ }
+
+ public String getProviderUrl() {
+ return providerUrl;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getJndiContextFactory() {
+ return jndiContextFactory;
+ }
+
+ public String getJndiUsername() {
+ return jndiUsername;
+ }
+
+ public String getJndiPassword() {
+ return jndiPassword;
+ }
+
+ public void validateParams(FailureCollector failureCollector) {
+
+ if (Strings.isNullOrEmpty(jmsUsername) && !containsMacro(NAME_JMS_USERNAME)) {
+ failureCollector
+ .addFailure("JMS username must be provided.", "Please provide your JMS username.")
+ .withConfigProperty(NAME_JMS_USERNAME);
+ }
+
+ if (Strings.isNullOrEmpty(jmsPassword) && !containsMacro(NAME_JMS_PASSWORD)) {
+ failureCollector
+ .addFailure("JMS password must be provided.", "Please provide your JMS password.")
+ .withConfigProperty(NAME_JMS_PASSWORD);
+ }
+
+ if (Strings.isNullOrEmpty(jndiContextFactory) && !containsMacro(NAME_JNDI_CONTEXT_FACTORY)) {
+ failureCollector
+ .addFailure("JNDI context factory must be provided.", "Please provide your JNDI" +
+ " context factory.")
+ .withConfigProperty(NAME_JNDI_CONTEXT_FACTORY);
+ }
+
+ if (Strings.isNullOrEmpty(providerUrl) && !containsMacro(NAME_PROVIDER_URL)) {
+ failureCollector
+ .addFailure("Provider URL must be provided.", "Please provide your provider URL.")
+ .withConfigProperty(NAME_PROVIDER_URL);
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java b/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java
new file mode 100644
index 0000000..ca935b7
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java
@@ -0,0 +1,343 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+import com.google.common.base.Strings;
+import io.cdap.plugin.jms.sink.JMSBatchSinkConfig;
+import io.cdap.plugin.jms.source.JMSStreamingSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+/**
+ * A facade class that encapsulates the necessary functionality to: get the initial context, resolve the connection
+ * factory, establish connection to JMS, create a session, resolve the destination by a queue or topic name, create
+ * producer, create consumer, set message listener to a consumer, and start connection. This class handles exceptions
+ * for all the functionalities provided.
+ */
+public class JMSConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSConnection.class);
+ private final JMSConfig config;
+
+ public JMSConnection(JMSConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Gets the initial context by offering 'jndiContextFactory', 'providerUrl', 'topic'/'queue' name, `jndiUsername`,
+ * and `jndiPassword` config properties.
+ *
+ * @return the {@link InitialContext} from the given properties
+ */
+ public Context getContext() {
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, config.getJndiContextFactory());
+ properties.put(Context.PROVIDER_URL, config.getProviderUrl());
+
+ if (config instanceof JMSBatchSinkConfig) {
+ String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
+ if (config.getType().equals(JMSDataStructures.TOPIC)) {
+ properties.put(String.format("topic.%s", destinationName), destinationName);
+ } else {
+ properties.put(String.format("queue.%s", destinationName), destinationName);
+ }
+ } else {
+ String sourceName = ((JMSStreamingSourceConfig) config).getSourceName();
+ if (config.getType().equals(JMSDataStructures.TOPIC)) {
+ properties.put(String.format("topic.%s", sourceName), sourceName);
+ } else {
+ properties.put(String.format("queue.%s", sourceName), sourceName);
+ }
+ }
+
+ if (!(Strings.isNullOrEmpty(config.getJndiUsername()) && Strings.isNullOrEmpty(config.getJndiPassword()))) {
+ properties.put(Context.SECURITY_PRINCIPAL, config.getJndiUsername());
+ properties.put(Context.SECURITY_CREDENTIALS, config.getJndiPassword());
+ }
+
+ try {
+ return new InitialContext(properties);
+ } catch (NamingException e) {
+ throw new RuntimeException("Failed to create initial context for provider URL " + config.getProviderUrl() +
+ " with principal " + config.getJndiUsername(), e);
+ }
+ }
+
+ /**
+ * Gets a {@link ConnectionFactory} by offering the `connectionFactory` config property.
+ *
+ * @param context an initial context
+ * @return a connection factory
+ */
+ public ConnectionFactory getConnectionFactory(Context context) {
+ try {
+ return (ConnectionFactory) context.lookup(config.getConnectionFactory());
+ } catch (NamingException e) {
+ throw new RuntimeException(String.format("Failed to resolve the connection factory for %s.",
+ config.getConnectionFactory()), e);
+ }
+ }
+
+ /**
+ * Creates a {@link Connection} by offering `jmsUsername` and `jmsPassword`. If a source {@link Topic} is to be
+ * consumed set `clientId` which is needed by the JMS broker to identify the durable subscriber.
+ *
+ * @param connectionFactory a given connection factory
+ * @return a connection to the JMS broker
+ */
+ public Connection createConnection(ConnectionFactory connectionFactory) {
+ Connection connection = null;
+ try {
+ connection = connectionFactory.createConnection(config.getJmsUsername(), config.getJmsPassword());
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ // If subscribing to a source topic, create a durable subscriber
+ if (config.getType().equals(JMSDataStructures.TOPIC)) {
+ try {
+ if (config instanceof JMSStreamingSourceConfig) {
+ String clientId = "client-id-" + ((JMSStreamingSourceConfig) config).getSourceName();
+ connection.setClientID(clientId);
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException("Cannot set Client Id", e);
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * Starts connection of this client to the JMS broker.
+ *
+ * @param connection a given connection
+ */
+ public void startConnection(Connection connection) {
+ try {
+ connection.start();
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Stops connection of this client from the JMS broker.
+ *
+ * @param connection a given connection
+ */
+ public void stopConnection(Connection connection) {
+ try {
+ connection.stop();
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Closes connection of this client from the JMS broker.
+ *
+ * @param connection a given connection
+ */
+ public void closeConnection(Connection connection) {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Creates a {@link Session} between this client and the JMS broker.
+ *
+ * @param connection a given session
+ * @return a session to the JMS broker
+ */
+ public Session createSession(Connection connection) {
+ try {
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Closes the {@link Session} between this client and the JMS broker.
+ *
+ * @param session a given session
+ */
+ public void closeSession(Session session) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Gets the source {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the
+ * parent class of {@link Topic} and {@link Queue}.
+ *
+ * @param context a given context
+ * @return a source topic/queue that this client is about to consume messages from
+ */
+ public Destination getSource(Context context) {
+ String sourceName = ((JMSStreamingSourceConfig) config).getSourceName();
+ if (config.getType().equals(JMSDataStructures.TOPIC)) {
+ try {
+ return (Topic) context.lookup(sourceName);
+ } catch (NamingException e) {
+ throw new RuntimeException("Failed to resolve the topic " + sourceName, e);
+ }
+ } else {
+ try {
+ return (Queue) context.lookup(sourceName);
+ } catch (NamingException e) {
+ throw new RuntimeException("Failed to resolve the queue " + sourceName, e);
+ }
+ }
+ }
+
+ /**
+ * Gets a sink {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the
+ * parent class of {@link Topic} and {@link Queue}. If no sink topic/queue name is provided, a sink topic/queue is
+ * automatically created.
+ *
+ * @param context a given context
+ * @param session a given session needed to create the topic/queue in case it does not exist
+ * @return a sink topic/queue this client is about to produce messages to
+ */
+ public Destination getSink(Context context, Session session) {
+ String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
+
+ if (config.getType().equals(JMSDataStructures.TOPIC)) {
+ try {
+ return (Topic) context.lookup(destinationName);
+ } catch (NamingException e) {
+ LOG.warn("Failed to resolve queue " + destinationName, e);
+ return createSinkTopic(session);
+ }
+ } else {
+ try {
+ return (Queue) context.lookup(destinationName);
+ } catch (NamingException e) {
+ LOG.warn("Failed to resolve queue " + destinationName, e);
+ return createSinkQueue(session);
+ }
+ }
+ }
+
+ /**
+ * Creates a sink {@link Topic}
+ *
+ * @param session a given session
+ * @return a created topic
+ */
+ private Destination createSinkTopic(Session session) {
+ String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
+ LOG.info("Creating topic " + destinationName);
+ try {
+ return session.createTopic(destinationName);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Creates a sink {@link Queue}
+ *
+ * @param session a given session
+ * @return a created queue
+ */
+ private Destination createSinkQueue(Session session) {
+ String destinationName = ((JMSBatchSinkConfig) config).getDestinationName();
+ LOG.info("Creating queue " + destinationName);
+ try {
+ return session.createQueue(destinationName);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Creates a {@link MessageConsumer} that consumes messages from a defined source {@link Topic}/{@link Queue}. In case
+ * of a topic-consumer, a durable subscriber is created. A durable subscriber makes the JMS broker keep the state of
+ * the offset consumed. Hence this client can restart consuming messages from the last offset not read.
+ *
+ * @param session a given session
+ * @param destination a source topic/queue this client is about to consume messages from
+ * @return a created message consumer
+ */
+ public MessageConsumer createConsumer(Session session, Destination destination) {
+ MessageConsumer messageConsumer = null;
+ try {
+ if (destination instanceof Topic) {
+ String clientId = "subscriber-id-" + ((JMSStreamingSourceConfig) config).getSourceName();
+ messageConsumer = session.createDurableSubscriber((Topic) destination, clientId);
+ } else {
+ messageConsumer = session.createConsumer(destination);
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return messageConsumer;
+ }
+
+ /**
+ * Creates a {@link MessageProducer} that produces messages to a defined sink {@link Topic}/{@link Queue}.
+ *
+ * @param session a given session
+ * @param destination a sink topic/queue this client is about to produce messages to
+ * @return a created message producer
+ */
+ public MessageProducer createProducer(Session session, Destination destination) {
+ try {
+ return session.createProducer(destination);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ /**
+ * Sets a {@link MessageListener} to the {@link MessageConsumer}. This message listener has a `onMessage()` method
+ * that gets automatically triggered in case a new message is produced to the queue/topic while the pipeline is in
+ * the RUNNING state.
+ *
+ * @param messageListener a given message listener
+ * @param messageConsumer a given message consumer
+ */
+ public void setMessageListener(MessageListener messageListener, MessageConsumer messageConsumer) {
+ try {
+ messageConsumer.setMessageListener(messageListener);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java b/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java
new file mode 100644
index 0000000..4990af2
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+/**
+ * A class that specifies the JMS data structures types.
+ */
+public class JMSDataStructures {
+ public static final String QUEUE = "Queue";
+ public static final String TOPIC = "Topic";
+}
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java
new file mode 100644
index 0000000..d389ada
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+import io.cdap.cdap.api.data.schema.Schema;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A class that specifies JMS message metadata fields.
+ */
+public class JMSMessageMetadata {
+ public static final String MESSAGE_ID = "messageId";
+ public static final String MESSAGE_TIMESTAMP = "messageTimestamp";
+ public static final String CORRELATION_ID = "correlationId";
+ public static final String REPLY_TO = "replyTo";
+ public static final String DESTINATION = "destination";
+ public static final String DELIVERY_MODE = "deliveryNode";
+ public static final String REDELIVERED = "redelivered";
+ public static final String TYPE = "type";
+ public static final String EXPIRATION = "expiration";
+ public static final String PRIORITY = "priority";
+
+ public static Schema.Field getMessageMetadata() {
+ return Schema.Field.of(JMSMessageParts.METADATA, Schema.recordOf(JMSMessageParts.METADATA, Arrays.asList(
+ Schema.Field.of(MESSAGE_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(MESSAGE_TIMESTAMP, Schema.nullableOf(Schema.of(Schema.Type.LONG))),
+ Schema.Field.of(CORRELATION_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(REPLY_TO, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(DESTINATION, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(DELIVERY_MODE, Schema.nullableOf(Schema.of(Schema.Type.INT))),
+ Schema.Field.of(REDELIVERED, Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))),
+ Schema.Field.of(TYPE, Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of(EXPIRATION, Schema.nullableOf(Schema.of(Schema.Type.LONG))),
+ Schema.Field.of(PRIORITY, Schema.nullableOf(Schema.of(Schema.Type.INT))))));
+ }
+
+ public static List getJMSMessageHeaderNames() {
+ return Arrays.asList(MESSAGE_ID, MESSAGE_TIMESTAMP, CORRELATION_ID, REPLY_TO, DESTINATION, DELIVERY_MODE,
+ REDELIVERED, TYPE, EXPIRATION, PRIORITY);
+ }
+}
+
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java
new file mode 100644
index 0000000..97906a3
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A class that specifies JMS message parts.
+ */
+public class JMSMessageParts {
+ public static final String METADATA = "metadata";
+ public static final String PAYLOAD = "payload";
+ public static final String PROPERTIES = "properties";
+
+ public static List getJMSMessageParts() {
+ return Arrays.asList(METADATA, PAYLOAD, PROPERTIES);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java
new file mode 100644
index 0000000..ce04678
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+import com.google.gson.Gson;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A class that specifies JMS message properties fields.
+ */
+public class JMSMessageProperties {
+
+ public static void populateProperties(Schema schema, StructuredRecord.Builder builder, Message message,
+ String messageType) throws JMSException {
+
+ boolean isString = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.STRING);
+ boolean isRecord = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.RECORD);
+
+ if (isString) {
+ populatePropertiesOnStringSchema(builder, message);
+ } else if (isRecord) {
+ populatePropertiesOnRecordSchema(schema, builder, message, messageType);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to populate properties! Field %s can only support String or Record data types!",
+ JMSMessageParts.PROPERTIES)
+ );
+ }
+ }
+
+
+ /**
+ * @param builder
+ * @param message
+ * @throws JMSException
+ */
+ public static void populatePropertiesOnStringSchema(StructuredRecord.Builder builder, Message message)
+ throws JMSException {
+ HashMap properties = new HashMap<>();
+ List listOfPropertyNames = Collections.list(message.getPropertyNames());
+
+ for (String propertyName : listOfPropertyNames) {
+ properties.put(propertyName, message.getObjectProperty(propertyName));
+ }
+
+ builder.set(JMSMessageParts.PROPERTIES, new Gson().toJson(properties));
+ }
+
+
+ /**
+ * @param schema the entire schema of the record
+ * @param recordBuilder the record builder that we set the properties record into
+ * @param message the incoming JMS message
+ * @param messageType the incoming JMS message type
+ * @throws JMSException
+ */
+ public static void populatePropertiesOnRecordSchema(Schema schema, StructuredRecord.Builder recordBuilder,
+ Message message, String messageType) throws JMSException {
+ Schema propertiesSchema = schema.getField(JMSMessageParts.PROPERTIES).getSchema();
+
+ StructuredRecord.Builder propertiesRecordBuilder = StructuredRecord.builder(propertiesSchema);
+
+ for (Schema.Field field : propertiesSchema.getFields()) {
+ String name = field.getName();
+ Schema.Type type = field.getSchema().getType();
+
+ if (!message.propertyExists(field.getName())) {
+ throw new RuntimeException(
+ String.format("Property \"%1$s\" does not exist in the incoming \"%2$s\" message! " +
+ "Make sure that you have specified a correct field name in the output schema that " +
+ "matches a property name in the incoming \"%2$s\" message.", field.getName(), messageType));
+ }
+
+ switch (type) {
+ case BOOLEAN:
+ propertiesRecordBuilder.set(name, message.getBooleanProperty(name));
+ continue;
+ case BYTES:
+ propertiesRecordBuilder.set(name, message.getByteProperty(name));
+ continue;
+ case INT:
+ propertiesRecordBuilder.set(name, message.getIntProperty(name));
+// short getShortProperty(String var1) throws JMSException;
+ continue;
+ case LONG:
+ propertiesRecordBuilder.set(name, message.getLongProperty(name));
+ continue;
+ case FLOAT:
+ propertiesRecordBuilder.set(name, message.getFloatProperty(name));
+ continue;
+ case DOUBLE:
+ propertiesRecordBuilder.set(name, message.getDoubleProperty(name));
+ continue;
+ case STRING:
+ propertiesRecordBuilder.set(name, message.getStringProperty(name));
+ continue;
+ default:
+ propertiesRecordBuilder.set(name, message.getObjectProperty(name));
+ continue;
+ }
+ }
+ recordBuilder.set(JMSMessageParts.PROPERTIES, propertiesRecordBuilder.build());
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java
new file mode 100644
index 0000000..e1d2524
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.common;
+
+/**
+ * A class that specifies JMS message types.
+ */
+public class JMSMessageType {
+ public static final String MESSAGE = "Message";
+ public static final String TEXT = "Text";
+ public static final String BYTES = "Bytes";
+ public static final String MAP = "Map";
+ public static final String OBJECT = "Object";
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java
new file mode 100644
index 0000000..88196fc
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.batch.Output;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.common.ReferenceBatchSink;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+
+/**
+ * A class that produces {@link StructuredRecord} to a JMS Queue or Topic.
+ */
+@Plugin(type = BatchSink.PLUGIN_TYPE)
+@Name("JMS")
+@Description("JMS sink to write events to JMS")
+public class JMSBatchSink extends ReferenceBatchSink {
+
+ private final JMSBatchSinkConfig config;
+
+ public JMSBatchSink(JMSBatchSinkConfig config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ config.validateParams(pipelineConfigurer.getStageConfigurer().getFailureCollector());
+ pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException();
+ }
+
+ @Override
+ public void prepareRun(BatchSinkContext context) throws Exception {
+ LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
+ Schema schema = context.getInputSchema();
+
+ if (schema != null) {
+ lineageRecorder.createExternalDataset(schema);
+ if (schema.getFields() != null && !schema.getFields().isEmpty()) {
+ lineageRecorder.recordWrite("Write", "Wrote to JMS topic.",
+ schema.getFields().stream()
+ .map(Schema.Field::getName)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ context.addOutput(Output.of(config.referenceName, new JMSOutputFormatProvider(config)));
+ }
+
+ @Override
+ public void transform(StructuredRecord input, Emitter> emitter)
+ throws IOException {
+ emitter.emit(new KeyValue<>(null, input));
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java
new file mode 100644
index 0000000..f933f69
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.jms.common.JMSConfig;
+import io.cdap.plugin.jms.common.JMSMessageType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+/**
+ * Holds configuration required for configuring {@link io.cdap.plugin.jms.source.JMSSourceUtils;} and
+ * {@link JMSBatchSink}.
+ */
+public class JMSBatchSinkConfig extends JMSConfig implements Serializable {
+
+ // Params
+ public static final String NAME_DESTINATION = "destinationName";
+ public static final String NAME_MESSAGE_TYPE = "messageType";
+ public static final String NAME_OUTPUT_SCHEMA = "schema";
+
+ public static final String DESC_DESTINATION = "Name of the destination Queue/Topic. If the given Queue/Topic name" +
+ "is not resolved, a new Queue/Topic with the given name will get created.";
+ public static final String DESC_MESSAGE_TYPE = "Supports the following message types: Message, Text, Bytes, Map.";
+ public static final String DESC_OUTPUT_SCHEMA = "Output schema.";
+
+ @Name(NAME_DESTINATION)
+ @Description(DESC_DESTINATION)
+ @Macro
+ private String destinationName;
+
+ @Name(NAME_MESSAGE_TYPE)
+ @Description(DESC_MESSAGE_TYPE)
+ @Nullable
+ @Macro
+ private String messageType; // default: Text
+
+ @Name(NAME_OUTPUT_SCHEMA)
+ @Description(DESC_OUTPUT_SCHEMA)
+ @Nullable
+ @Macro
+ private String schema;
+
+ public JMSBatchSinkConfig() {
+ super("");
+ this.messageType = Strings.isNullOrEmpty(messageType) ? JMSMessageType.TEXT : messageType;
+ }
+
+ public JMSBatchSinkConfig(String referenceName, String connectionFactory, String jmsUsername,
+ String jmsPassword, String providerUrl, String type, String jndiContextFactory,
+ String jndiUsername, String jndiPassword, String messageType, String destinationName) {
+ super(referenceName, connectionFactory, jmsUsername, jmsPassword, providerUrl, type, jndiContextFactory,
+ jndiUsername, jndiPassword);
+ this.destinationName = destinationName;
+ this.messageType = messageType;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void validateParams(FailureCollector failureCollector) {
+ this.validateParams(failureCollector);
+
+ if (Strings.isNullOrEmpty(destinationName) && !containsMacro(NAME_DESTINATION)) {
+ failureCollector
+ .addFailure("The destination topic/queue name must be provided!", "Provide your topic/queue name.")
+ .withConfigProperty(NAME_DESTINATION);
+ }
+ }
+
+ public String getMessageType() {
+ if (!Strings.isNullOrEmpty(NAME_MESSAGE_TYPE) && !containsMacro(NAME_MESSAGE_TYPE)) {
+ return messageType;
+ }
+ return JMSMessageType.TEXT;
+ }
+
+ /**
+ * @return {@link io.cdap.cdap.api.data.schema.Schema} of the dataset if one was given
+ * @throws IllegalArgumentException if the schema is not a valid JSON
+ */
+ public Schema getSchema() {
+ if (!Strings.isNullOrEmpty(schema)) {
+ try {
+ return Schema.parseJson(schema);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Invalid schema : %s", e.getMessage()), e);
+ }
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java
new file mode 100644
index 0000000..508ade1
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.format.StructuredRecordStringConverter;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Utils for {@link JMSBatchSink} plugin
+ */
+public class JMSBatchSinkUtils {
+
+ /**
+ * Converts an incoming {@link StructuredRecord} to a JMS {@link TextMessage}
+ *
+ * @param session the open session to the JMS broker
+ * @param record the incoming record
+ * @return a JMS text message
+ */
+ public static TextMessage convertStructuredRecordToTextMessage(Session session, StructuredRecord record) {
+ TextMessage textMessage = null;
+ String payload = null;
+
+ try {
+ payload = StructuredRecordStringConverter.toJsonString(record);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert record to json!", e);
+ }
+
+ try {
+ textMessage = session.createTextMessage();
+ textMessage.setText(payload);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return textMessage;
+ }
+
+ /**
+ * Converts an incoming {@link StructuredRecord} to a JMS {@link MapMessage}
+ *
+ * @param outputSchema he output schema necessary to properly build the map message.
+ * @param session the open session to the JMS broker
+ * @param record the incoming record
+ * @return a JMS map message
+ */
+ public static MapMessage convertStructuredRecordToMapMessage(Schema outputSchema, Session session,
+ StructuredRecord record) {
+ MapMessage mapMessage = null;
+
+ if (outputSchema == null) {
+ throw new RuntimeException("Output schema is empty! Please provide the output schema.");
+ }
+
+ try {
+ mapMessage = session.createMapMessage();
+ for (Schema.Field field : outputSchema.getFields()) {
+ String fieldName = field.getName();
+ Object value = record.get(fieldName);
+
+ switch (field.getSchema().getType()) {
+ case INT:
+ mapMessage.setInt(fieldName, cast(value, Integer.class));
+ break;
+ case LONG:
+ mapMessage.setLong(fieldName, cast(value, Long.class));
+ break;
+ case DOUBLE:
+ mapMessage.setDouble(fieldName, cast(value, Double.class));
+ break;
+ case FLOAT:
+ mapMessage.setFloat(fieldName, cast(value, Float.class));
+ break;
+ case BOOLEAN:
+ mapMessage.setBoolean(fieldName, cast(value, Boolean.class));
+ break;
+ case BYTES:
+ mapMessage.setBytes(fieldName, cast(value, byte[].class));
+ break;
+ case ARRAY:
+ mapMessage.setObject(fieldName, value);
+ break;
+ case RECORD:
+ mapMessage.setObject(fieldName, value);
+ break;
+ case MAP:
+ mapMessage.setObject(fieldName, value);
+ break;
+ default:
+ mapMessage.setString(fieldName, cast(value, String.class));
+ }
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return mapMessage;
+ }
+
+ /**
+ * Converts an incoming {@link StructuredRecord} to a JMS {@link BytesMessage}
+ * @param session the open session to the JMS broker
+ * @param record the incoming record
+ * @return a JMS bytes message
+ */
+ public static BytesMessage convertStructuredRecordToBytesMessage(Session session, StructuredRecord record) {
+ BytesMessage bytesMessage = null;
+ byte[] payload = null;
+
+ try {
+ payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert record to json!", e);
+ }
+
+ try {
+ bytesMessage = session.createBytesMessage();
+ bytesMessage.writeBytes(payload);
+
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return bytesMessage;
+ }
+
+ /**
+ * Converts an incoming {@link StructuredRecord} to a JMS {@link ObjectMessage}
+ *
+ * @param session the open session to the JMS broker
+ * @param record the incoming record
+ * @return a JMS object message
+ */
+ public static ObjectMessage convertStructuredRecordToObjectMessage(Session session, StructuredRecord record) {
+ ObjectMessage objectMessage = null;
+ byte[] payload = null;
+
+ try {
+ payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert record to json!", e);
+ }
+
+ try {
+ objectMessage = session.createObjectMessage();
+ objectMessage.setObject(payload);
+
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return objectMessage;
+ }
+
+ /**
+ * Converts an incoming {@link StructuredRecord} to a JMS {@link Message}
+ *
+ * @param session the open session to the JMS broker
+ * @param record the incoming record
+ * @return a JMS message
+ */
+ public static Message convertStructuredRecordToMessage(Schema outputSchema, Session session,
+ StructuredRecord record) {
+ Message message = null;
+
+ if (outputSchema == null) {
+ throw new RuntimeException("Output schema is empty! Please provide the output schema.");
+ }
+
+ try {
+ message = session.createMessage();
+ for (Schema.Field field : outputSchema.getFields()) {
+ String fieldName = field.getName();
+ Object value = record.get(fieldName);
+
+ switch (field.getSchema().getType()) {
+ case INT:
+ message.setIntProperty(fieldName, cast(value, Integer.class));
+ break;
+ case LONG:
+ message.setLongProperty(fieldName, cast(value, Long.class));
+ break;
+ case DOUBLE:
+ message.setDoubleProperty(fieldName, cast(value, Double.class));
+ break;
+ case FLOAT:
+ message.setFloatProperty(fieldName, cast(value, Float.class));
+ break;
+ case BOOLEAN:
+ message.setBooleanProperty(fieldName, cast(value, Boolean.class));
+ break;
+ case ARRAY:
+ message.setObjectProperty(fieldName, value);
+ break;
+ case RECORD:
+ message.setObjectProperty(fieldName, value);
+ break;
+ case MAP:
+ message.setObjectProperty(fieldName, value);
+ break;
+ default:
+ message.setStringProperty(fieldName, cast(value, String.class));
+ }
+ }
+
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ return message;
+ }
+
+ public static T cast(Object o, Class clazz) {
+ return o != null ? clazz.cast(o) : null;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java
new file mode 100644
index 0000000..a3b69b1
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Output format to write to JMS.
+ */
+public class JMSOutputFormat extends OutputFormat {
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context) {
+ return new JMSRecordWriter(context);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) {
+ // no-op
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
+ return new OutputCommitter() {
+ @Override
+ public void setupJob(JobContext jobContext) {
+ // no-op
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) {
+ // no-op
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) {
+ // no-op
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) {
+ // no-op
+ }
+ };
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java
new file mode 100644
index 0000000..d433167
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.api.data.batch.OutputFormatProvider;
+
+import java.util.Map;
+
+/**
+ * JMS Output format provider.
+ */
+public class JMSOutputFormatProvider implements OutputFormatProvider {
+
+ public static final String PROPERTY_CONFIG_JSON = "cdap.jms.sink.config";
+ private static final Gson GSON = new GsonBuilder().create();
+ private final Map conf;
+
+ public JMSOutputFormatProvider(JMSBatchSinkConfig config) {
+ this.conf = new ImmutableMap.Builder()
+ .put(PROPERTY_CONFIG_JSON, GSON.toJson(config))
+ .build();
+ }
+
+ @Override
+ public String getOutputFormatClassName() {
+ return JMSOutputFormat.class.getName();
+ }
+
+ @Override
+ public Map getOutputFormatConfiguration() {
+ return conf;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java b/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java
new file mode 100644
index 0000000..5342edf
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.sink;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.jms.common.JMSConnection;
+import io.cdap.plugin.jms.common.JMSMessageType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+
+/**
+ * Record writer to produce messages to a JMS Topic/Queue.
+ */
+public class JMSRecordWriter extends RecordWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(JMSRecordWriter.class);
+ private static final Gson GSON = new GsonBuilder().create();
+
+ private final JMSBatchSinkConfig config;
+ private Connection connection;
+ private Session session;
+ private MessageProducer messageProducer;
+ private JMSConnection jmsConnection;
+
+ public JMSRecordWriter(TaskAttemptContext context) {
+ Configuration config = context.getConfiguration();
+ String configJson = config.get(JMSOutputFormatProvider.PROPERTY_CONFIG_JSON);
+ this.config = GSON.fromJson(configJson, JMSBatchSinkConfig.class);
+ this.jmsConnection = new JMSConnection(this.config);
+ establishConnection();
+ }
+
+ @Override
+ public void write(NullWritable key, StructuredRecord record) {
+ String messageType = config.getMessageType();
+ Schema outputSchema = config.getSchema();
+
+ // when the output schema is not specified, the incoming record gets wrapped in a text payload
+ if (outputSchema == null) {
+ TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record);
+ produceMessage(textMessage);
+ } else {
+ switch (messageType) {
+ case JMSMessageType.TEXT:
+ TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record);
+ produceMessage(textMessage);
+ break;
+ case JMSMessageType.MAP:
+ MapMessage mapMessage = JMSBatchSinkUtils.convertStructuredRecordToMapMessage(outputSchema, session, record);
+ produceMessage(mapMessage);
+ break;
+ case JMSMessageType.BYTES:
+ BytesMessage bytesMessage = JMSBatchSinkUtils.convertStructuredRecordToBytesMessage(session, record);
+ produceMessage(bytesMessage);
+ break;
+ case JMSMessageType.MESSAGE:
+ Message message = JMSBatchSinkUtils.convertStructuredRecordToMessage(outputSchema, session, record);
+ produceMessage(message);
+ break;
+ case JMSMessageType.OBJECT:
+ ObjectMessage objectMessage = JMSBatchSinkUtils.convertStructuredRecordToObjectMessage(session, record);
+ produceMessage(objectMessage);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) {
+ this.jmsConnection.stopConnection(this.connection);
+ this.jmsConnection.closeSession(this.session);
+ this.jmsConnection.closeConnection(this.connection);
+ }
+
+ private void produceMessage(Message message) {
+ try {
+ messageProducer.send(message);
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+
+ private void establishConnection() {
+ Context context = jmsConnection.getContext();
+ ConnectionFactory factory = jmsConnection.getConnectionFactory(context);
+ connection = jmsConnection.createConnection(factory);
+ session = jmsConnection.createSession(connection);
+ Destination destination = jmsConnection.getSink(context, session);
+ messageProducer = jmsConnection.createProducer(session, destination);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java b/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java
new file mode 100644
index 0000000..74274fa
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.plugin.jms.common.JMSConnection;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+
+/**
+ * This class creates a customized message Receiver and implements the MessageListener
interface.
+ */
+public class JMSReceiver extends Receiver implements MessageListener {
+ private static final Logger LOG = LoggerFactory.getLogger(JMSReceiver.class);
+ private JMSStreamingSourceConfig config;
+ private Connection connection;
+ private StorageLevel storageLevel;
+ private Session session;
+ private JMSConnection jmsConnection;
+
+ public JMSReceiver(StorageLevel storageLevel, JMSStreamingSourceConfig config) {
+ super(storageLevel);
+ this.storageLevel = storageLevel;
+ this.config = config;
+ }
+
+ @Override
+ public void onStart() {
+ this.jmsConnection = new JMSConnection(config);
+ Context context = jmsConnection.getContext();
+ ConnectionFactory factory = jmsConnection.getConnectionFactory(context);
+ connection = jmsConnection.createConnection(factory);
+
+ session = jmsConnection.createSession(connection);
+ Destination destination = jmsConnection.getSource(context);
+ MessageConsumer messageConsumer = jmsConnection.createConsumer(session, destination);
+ jmsConnection.setMessageListener(this, messageConsumer);
+ jmsConnection.startConnection(connection);
+
+ // fetch the entire queue events
+ if (destination instanceof Queue) {
+ fetchEntireQueue(messageConsumer);
+ }
+ }
+
+ @Override
+ public void onStop() {
+ this.jmsConnection.stopConnection(this.connection);
+ this.jmsConnection.closeSession(this.session);
+ this.jmsConnection.closeConnection(this.connection);
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ store(JMSSourceUtils.convertMessage(message, this.config));
+ } catch (Exception e) {
+ LOG.error("Message couldn't get stored in the Spark memory.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void fetchEntireQueue(MessageConsumer messageConsumer) {
+ while (true) {
+ try {
+ Message message = messageConsumer.receive(5000);
+ if (message != null) {
+ store(JMSSourceUtils.convertMessage(message, this.config));
+ } else {
+ break;
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java b/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java
new file mode 100644
index 0000000..b58b236
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.streaming.StreamingContext;
+import io.cdap.plugin.jms.common.JMSMessageMetadata;
+import io.cdap.plugin.jms.common.JMSMessageParts;
+import io.cdap.plugin.jms.common.JMSMessageProperties;
+import io.cdap.plugin.jms.common.JMSMessageType;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.receiver.Receiver;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+/**
+ * An utils class that encapsulates the necessary functionality to convert different types of JMS messages to
+ * StructuredRecords, and creates an input stream with our customized implementation of receiver {@link JMSReceiver}.
+ */
+public class JMSSourceUtils {
+
+ static JavaDStream getJavaDStream(StreamingContext context,
+ JMSStreamingSourceConfig config) {
+ Receiver jmsReceiver = new JMSReceiver(StorageLevel.MEMORY_AND_DISK_SER_2(), config);
+ return context.getSparkStreamingContext().receiverStream(jmsReceiver);
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS message.
+ *
+ * @param message the incoming JMS message
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS message fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ * @throws IllegalArgumentException in case the user provides a non-supported message type
+ */
+ public static StructuredRecord convertMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException, IllegalArgumentException {
+ String messageType = null;
+ if (!Strings.isNullOrEmpty(config.getMessageType())) {
+ messageType = config.getMessageType();
+ } else {
+ throw new RuntimeException("Message type should not be null.");
+ }
+
+ if (message instanceof BytesMessage && messageType.equals(JMSMessageType.BYTES)) {
+ return convertJMSByteMessage(message, config);
+ }
+ if (message instanceof MapMessage && messageType.equals(JMSMessageType.MAP)) {
+ return convertJMSMapMessage(message, config);
+ }
+ if (message instanceof ObjectMessage && messageType.equals(JMSMessageType.OBJECT)) {
+ return convertJMSObjectMessage(message, config);
+ }
+ if (message instanceof Message && messageType.equals(JMSMessageType.MESSAGE)) {
+ return convertJMSMessage(message, config);
+ }
+ if (message instanceof TextMessage && messageType.equals(JMSMessageType.TEXT)) {
+ return convertJMSTextMessage(message, config);
+ } else {
+ throw new IllegalArgumentException("Message type should be one of Message, Text, Bytes, Map, or Object");
+ }
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS {@link TextMessage}.
+ *
+ * @param message the incoming JMS {@link TextMessage}
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS {@link TextMessage} fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ public static StructuredRecord convertJMSTextMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException {
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(config.getSchema());
+
+ if (config.getMessageMetadata()) {
+ populateMetadata(config.getSchema(), recordBuilder, message);
+ }
+ if (config.getMessageProperties()) {
+ JMSMessageProperties.populateProperties(config.getSchema(), recordBuilder, message, config.getMessageType());
+ }
+
+ recordBuilder.set(JMSMessageParts.PAYLOAD, ((TextMessage) message).getText());
+ return recordBuilder.build();
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS {@link MapMessage}.
+ *
+ * @param message the incoming JMS {@link MapMessage}
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS {@link MapMessage} fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ public static StructuredRecord convertJMSMapMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException {
+ Schema schema = config.getSchema();
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
+
+ Schema payloadSchema = schema.getField(JMSMessageParts.PAYLOAD).getSchema();
+ StructuredRecord.Builder payloadRecordBuilder = StructuredRecord.builder(payloadSchema);
+
+ if (config.getMessageMetadata()) {
+ populateMetadata(schema, recordBuilder, message);
+ }
+ if (config.getMessageProperties()) {
+ JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
+ }
+
+ if (payloadSchema.getType().equals(Schema.Type.STRING)) {
+ Map payload = new HashMap<>();
+ Enumeration names = ((MapMessage) message).getMapNames();
+ while (names.hasMoreElements()) {
+ String key = names.nextElement();
+ payload.put(key, ((MapMessage) message).getObject(key));
+ }
+ payloadRecordBuilder.set(JMSMessageParts.PAYLOAD, new Gson().toJson(payload));
+ } else {
+ for (Schema.Field field : config.getDataFields(config.getSchema(), null)) {
+ Schema.Type type = field.getSchema().getType();
+ String name = field.getName();
+
+ if (type.equals(Schema.Type.UNION)) {
+ type = field.getSchema().getUnionSchema(0).getType();
+ }
+
+ switch (type) {
+ case BOOLEAN:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getBoolean(name));
+ break;
+ case INT:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getInt(name));
+ break;
+ case LONG:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getLong(name));
+ break;
+ case FLOAT:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getFloat(name));
+ break;
+ case DOUBLE:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getDouble(name));
+ break;
+ case BYTES:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getByte(name));
+ break;
+ case STRING:
+ payloadRecordBuilder.set(name, ((MapMessage) message).getString(name));
+ break;
+ case ARRAY: // byte array only
+ Schema.Type itemType = field.getSchema().getComponentSchema().getType();
+ if (itemType.equals(Schema.Type.UNION)) {
+ itemType = field.getSchema().getComponentSchema().getUnionSchema(0).getType();
+ }
+ if (itemType.equals(Schema.Type.BYTES)) {
+ payloadRecordBuilder.set(name, ((MapMessage) message).getBytes(name));
+ }
+ break;
+ }
+ }
+ }
+ recordBuilder.set(JMSMessageParts.PAYLOAD, payloadRecordBuilder.build());
+ return recordBuilder.build();
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS {@link BytesMessage}.
+ *
+ * @param message the incoming JMS {@link BytesMessage}
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS {@link BytesMessage} fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ public static StructuredRecord convertJMSByteMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException {
+ Schema schema = config.getSchema();
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
+
+ if (config.getMessageMetadata()) {
+ populateMetadata(config.getSchema(), recordBuilder, message);
+ }
+ if (config.getMessageProperties()) {
+ JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
+ }
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[8096];
+ int currentByte;
+ while ((currentByte = ((BytesMessage) message).readBytes(buffer)) != -1) {
+ byteArrayOutputStream.write(buffer, 0, currentByte);
+ }
+ recordBuilder.set(JMSMessageParts.PAYLOAD, byteArrayOutputStream.toByteArray());
+ return recordBuilder.build();
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS {@link ObjectMessage}.
+ *
+ * @param message the incoming JMS {@link ObjectMessage}
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS {@link ObjectMessage} fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ public static StructuredRecord convertJMSObjectMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException {
+ Schema schema = config.getSchema();
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
+
+ if (config.getMessageMetadata()) {
+ populateMetadata(config.getSchema(), recordBuilder, message);
+ }
+ if (config.getMessageProperties()) {
+ JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
+ }
+
+ byte[] payload = SerializationUtils.serialize(((ObjectMessage) message).getObject());
+ recordBuilder.set(JMSMessageParts.PAYLOAD, payload);
+ return recordBuilder.build();
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a JMS {@link Message}.
+ *
+ * @param message the incoming JMS {@link Message}
+ * @param config the {@link JMSStreamingSourceConfig} with all user provided property values
+ * @return the {@link StructuredRecord} built out of the JMS {@link Message} fields
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ public static StructuredRecord convertJMSMessage(Message message, JMSStreamingSourceConfig config)
+ throws JMSException {
+ Schema schema = config.getSchema();
+ StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema);
+
+ if (config.getMessageMetadata()) {
+ populateMetadata(config.getSchema(), recordBuilder, message);
+ }
+ if (config.getMessageProperties()) {
+ JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE);
+ }
+ return recordBuilder.build();
+ }
+
+ /**
+ * Gets header data fields from the JMS message and adds them to the passed record builder.
+ *
+ * @param schema the entire schema of the record
+ * @param recordBuilder the record builder that we set the metadata record into
+ * @param message the incoming JMS message
+ * @throws JMSException in case the method fails to read fields from the JMS message
+ */
+ private static void populateMetadata(Schema schema, StructuredRecord.Builder recordBuilder, Message message)
+ throws JMSException {
+ Schema metadataSchema = schema.getField(JMSMessageParts.METADATA).getSchema();
+ StructuredRecord.Builder metadataRecordBuilder = StructuredRecord.builder(metadataSchema);
+
+ for (Schema.Field field : metadataSchema.getFields()) {
+
+ switch (field.getName()) {
+ case JMSMessageMetadata.MESSAGE_ID:
+ metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_ID, message.getJMSMessageID());
+ break;
+
+ case JMSMessageMetadata.CORRELATION_ID:
+ metadataRecordBuilder.set(JMSMessageMetadata.CORRELATION_ID, message.getJMSCorrelationID());
+ break;
+
+ case JMSMessageMetadata.REPLY_TO:
+ //todo: handle NPE
+ metadataRecordBuilder.set(JMSMessageMetadata.REPLY_TO, message.getJMSReplyTo().toString());
+ break;
+
+ case JMSMessageMetadata.DESTINATION:
+ //todo: handle NPE
+ metadataRecordBuilder.set(JMSMessageMetadata.DESTINATION, message.getJMSDestination().toString());
+ break;
+
+ case JMSMessageMetadata.TYPE:
+ metadataRecordBuilder.set(JMSMessageMetadata.TYPE, message.getJMSType());
+ break;
+
+ case JMSMessageMetadata.MESSAGE_TIMESTAMP:
+ metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_TIMESTAMP, message.getJMSTimestamp());
+ break;
+
+ case JMSMessageMetadata.DELIVERY_MODE:
+ metadataRecordBuilder.set(JMSMessageMetadata.DELIVERY_MODE, message.getJMSDeliveryMode());
+ break;
+
+ case JMSMessageMetadata.REDELIVERED:
+ metadataRecordBuilder.set(JMSMessageMetadata.REDELIVERED, message.getJMSRedelivered());
+ break;
+
+ case JMSMessageMetadata.EXPIRATION:
+ metadataRecordBuilder.set(JMSMessageMetadata.EXPIRATION, message.getJMSExpiration());
+ break;
+
+ case JMSMessageMetadata.PRIORITY:
+ metadataRecordBuilder.set(JMSMessageMetadata.PRIORITY, message.getJMSPriority());
+ break;
+ }
+ recordBuilder.set(JMSMessageParts.METADATA, metadataRecordBuilder.build());
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java
new file mode 100644
index 0000000..16c4d21
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.streaming.StreamingContext;
+import io.cdap.cdap.etl.api.streaming.StreamingSource;
+import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
+import io.cdap.plugin.common.LineageRecorder;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import java.util.stream.Collectors;
+
+/**
+ * This class JMSStreamingSource
is a plugin that allows consuming messages from a specified JMS
+ * Queue/Topic and generate
+ * StructuredRecords out of them.
+ */
+@Plugin(type = StreamingSource.PLUGIN_TYPE)
+@Name("JMS")
+@Description("JMS (Java Messaging Service) Source")
+public class JMSStreamingSource extends ReferenceStreamingSource {
+
+ private JMSStreamingSourceConfig config;
+
+ public JMSStreamingSource(JMSStreamingSourceConfig config) {
+ super(config);
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
+ }
+
+ @Override
+ public void prepareRun(StreamingSourceContext context) throws Exception {
+ Schema schema = config.getSchema();
+ // record dataset lineage
+ context.registerLineage(config.referenceName, schema);
+
+ if (schema.getFields() != null) {
+ LineageRecorder recorder = new LineageRecorder(context, config.referenceName);
+ recorder.recordRead("Read", "Read from jms",
+ schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
+ }
+ }
+
+ @Override
+ public JavaDStream getStream(StreamingContext context) throws Exception {
+ FailureCollector collector = context.getFailureCollector();
+ config.validate(collector);
+ collector.getOrThrowException();
+ return JMSSourceUtils.getJavaDStream(context, config);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java
new file mode 100644
index 0000000..7258103
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.jms.common.JMSConfig;
+import io.cdap.plugin.jms.common.JMSMessageMetadata;
+import io.cdap.plugin.jms.common.JMSMessageParts;
+import io.cdap.plugin.jms.common.JMSMessageType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+/**
+ * Configs for {@link JMSStreamingSource}
+ */
+public class JMSStreamingSourceConfig extends JMSConfig implements Serializable {
+ // Params
+ public static final String NAME_SOURCE = "sourceName";
+ public static final String NAME_SCHEMA = "schema";
+ public static final String NAME_MESSAGE_METADATA = "messageMetadata";
+ public static final String NAME_MESSAGE_PROPERTIES = "messageProperties";
+ public static final String NAME_MESSAGE_TYPE = "messageType";
+
+
+ @Name(NAME_SOURCE)
+ @Description("Name of the source Queue/Topic. The Queue/Topic with the given name, should exist in order to read " +
+ "messages from.")
+ @Macro
+ private String sourceName;
+
+ @Name(NAME_MESSAGE_METADATA)
+ @Description("If true, message metadata is also consumed. Otherwise, it is not.")
+ @Nullable
+ @Macro
+ private String messageMetadata;
+
+ @Name(NAME_MESSAGE_PROPERTIES)
+ @Description("If true, message properties are also consumed. Otherwise, they are not.")
+ @Nullable
+ @Macro
+ private String messageProperties;
+
+ @Name(NAME_MESSAGE_TYPE)
+ @Description("Supports the following message types: Message, Text, Bytes, Map, Object.")
+ @Nullable
+ @Macro
+ private String messageType; // default: Text
+
+ @Name(NAME_SCHEMA)
+ @Description("Specifies the schema of the records outputted from this plugin.")
+ @Macro
+ private String schema;
+
+ public JMSStreamingSourceConfig() {
+ super("");
+ this.messageMetadata = Strings.isNullOrEmpty(messageMetadata) ? "true" : messageMetadata;
+ this.messageType = Strings.isNullOrEmpty(messageType) ? JMSMessageType.TEXT : messageType;
+ }
+
+ @VisibleForTesting
+ public JMSStreamingSourceConfig(String referenceName, String connectionFactory, String jmsUsername,
+ String jmsPassword, String providerUrl, String type, String jndiContextFactory,
+ String jndiUsername, String jndiPassword, String messageMetadata,
+ String messageProperties, String messageType, String sourceName, String schema) {
+ super(referenceName, connectionFactory, jmsUsername, jmsPassword, providerUrl, type, jndiContextFactory,
+ jndiUsername, jndiPassword);
+ this.sourceName = sourceName;
+ this.messageMetadata = messageMetadata;
+ this.messageProperties = messageProperties;
+ this.messageType = messageType;
+ this.schema = schema;
+ }
+
+ public void validate(FailureCollector failureCollector) {
+ this.validateParams(failureCollector);
+
+ if (Strings.isNullOrEmpty(messageType) && !containsMacro(NAME_MESSAGE_TYPE)) {
+ failureCollector
+ .addFailure("The source topic/queue name must be provided!", "Provide your topic/queue name.")
+ .withConfigProperty(NAME_MESSAGE_TYPE);
+ }
+
+ if (Strings.isNullOrEmpty(sourceName) && !containsMacro(NAME_SOURCE)) {
+ failureCollector
+ .addFailure("The source topic/queue name must be provided!", "Provide your topic/queue name.")
+ .withConfigProperty(NAME_SOURCE);
+ }
+
+ if (!containsMacro(NAME_SCHEMA)) {
+
+ Schema schema = getSchema();
+
+ boolean otherFieldsExist = schema
+ .getFields()
+ .stream()
+ .anyMatch(field -> !JMSMessageParts.getJMSMessageParts().contains(field.getName()));
+
+ if (otherFieldsExist) {
+ failureCollector
+ .addFailure("New fields detected in the output schema!",
+ String.format("Only \"%s\", \"%s\" and \"%s\" fields are allowed.",
+ JMSMessageParts.METADATA, JMSMessageParts.PAYLOAD, JMSMessageParts.PROPERTIES))
+ .withConfigProperty(NAME_SCHEMA);
+ }
+
+ switch (messageType) {
+ case JMSMessageType.TEXT:
+ if (!schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType().equals(Schema.Type.STRING)) {
+ failureCollector
+ .addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.PAYLOAD),
+ String.format("For JMS %s type of message, \"%s\" must be of String data type.",
+ messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA);
+ }
+ break;
+
+ case JMSMessageType.OBJECT:
+ if (!schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType().equals(Schema.Type.ARRAY)) {
+ failureCollector
+ .addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.PAYLOAD),
+ String.format("For JMS %s type of message, \"%s\" must be of Array[Bytes] data type.",
+ messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA);
+ }
+ break;
+
+ case JMSMessageType.BYTES:
+ case JMSMessageType.MAP:
+ case JMSMessageType.MESSAGE:
+ if (!Arrays.asList(Schema.Type.STRING, Schema.Type.RECORD)
+ .contains(schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType())) {
+ failureCollector
+ .addFailure(String.format("Wrong data type for field \"%s\"", JMSMessageParts.PAYLOAD),
+ String.format("For JMS %s type of message, \"%s\" must be of String or Record data type.",
+ messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA);
+ }
+ break;
+ }
+ }
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ /**
+ * @return {@link io.cdap.cdap.api.data.schema.Schema} of the dataset if one was given
+ * @throws IllegalArgumentException if the schema is not a valid JSON
+ */
+ public Schema getSchema() {
+
+ if (!Strings.isNullOrEmpty(schema) && !containsMacro(schema)) {
+ try {
+ return Schema.parseJson(schema);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Invalid schema : %s.", e.getMessage()), e);
+ }
+ }
+
+ List fields = new ArrayList<>();
+
+ if (getMessageMetadata()) {
+ fields.add(JMSMessageMetadata.getMessageMetadata());
+ }
+
+ if (getMessageProperties()) {
+ fields.add(Schema.Field.of(JMSMessageParts.PROPERTIES, Schema.of(Schema.Type.STRING)));
+ }
+
+ switch (messageType) {
+ case JMSMessageType.OBJECT:
+ fields.add(Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.arrayOf(Schema.of(Schema.Type.BYTES))));
+ return Schema.recordOf("record", fields);
+ case JMSMessageType.MAP:
+ case JMSMessageType.MESSAGE:
+ case JMSMessageType.TEXT:
+ case JMSMessageType.BYTES:
+ fields.add(Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING)));
+ return Schema.recordOf("record", fields);
+ default:
+ return Schema.recordOf("record", fields);
+ }
+ }
+
+ @Nullable
+ public String getMessageType() {
+ return messageType;
+ }
+
+ public boolean getMessageMetadata() {
+ return this.messageMetadata.equalsIgnoreCase("true");
+ }
+
+ public boolean getMessageProperties() {
+ return this.messageProperties.equalsIgnoreCase("true");
+ }
+
+ public List getDataFields(Schema schema, String skipFieldName) {
+ return schema
+ .getFields()
+ .stream()
+ .filter(field -> !JMSMessageMetadata.getJMSMessageHeaderNames().contains(field.getName()))
+ .filter(field -> !field.getName().equals(skipFieldName))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java b/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java
new file mode 100644
index 0000000..ddc7bf0
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source;
+
+import io.cdap.cdap.api.dataset.DatasetProperties;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.streaming.StreamingSource;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.common.IdUtils;
+import io.cdap.plugin.common.ReferencePluginConfig;
+
+/**
+ * Base streaming source that adds an External Dataset for a reference name, and performs a single getDataset()
+ * call to make sure CDAP records that it was accessed.
+ *
+ * @param type of object read by the source.
+ */
+public abstract class ReferenceStreamingSource extends StreamingSource {
+ private final ReferencePluginConfig conf;
+
+ public ReferenceStreamingSource(ReferencePluginConfig conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
+ super.configurePipeline(pipelineConfigurer);
+ FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
+ IdUtils.validateReferenceName(conf.referenceName, collector);
+ collector.getOrThrowException();
+ pipelineConfigurer.createDataset(conf.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY);
+ }
+}
diff --git a/src/test/java/JMSPluginTest.java b/src/test/java/JMSPluginTest.java
new file mode 100644
index 0000000..5a0caef
--- /dev/null
+++ b/src/test/java/JMSPluginTest.java
@@ -0,0 +1,75 @@
+///*
+// * Copyright © 2021 Cask Data, Inc.
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// * use this file except in compliance with the License. You may obtain a copy of
+// * the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// * License for the specific language governing permissions and limitations under
+// * the License.
+// */
+//
+//import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
+//import io.cdap.plugin.jms.common.JMSConfig;
+//import io.cdap.plugin.jms.source.JMSStreamingSourceConfig;
+//import org.junit.Assert;
+//import org.junit.Test;
+//import org.mockito.internal.util.reflection.FieldSetter;
+//
+///**
+// * Unit tests for the plugin
+// */
+//public class JMSPluginTest {
+// private JMSConfig getValidConfig(String fileSystemProperties) throws NoSuchFieldException {
+// JMSConfig jmsConfig = new JMSStreamingSourceConfig();
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("connectionFactory"), "ConFactory");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsUsername"), "jms_username");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsPassword"), "jms_password");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("providerUrl"), "tcp://0.0.0.0:61616");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("type"), "Queue");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiContextFactory"), "ContextFactory");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiUsername"), "jndi-username");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiPassword"), "jndi-password");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("messageType"), "Text");
+// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("sourceName"), "test-topic");
+// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("keepMessageHeaders"), "false");
+// return jmsConfig;
+// }
+//
+// private JMSConfig getInvalidConfig(String fileSystemProperties) throws NoSuchFieldException {
+// JMSConfig jmsConfig = new JMSStreamingSourceConfig();
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("connectionFactory"), "ConFactory");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsUsername"), "jms_username");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsPassword"), "jms_password");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("providerUrl"), "tcp://0.0.0.0:61616");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("type"), "Queue");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiContextFactory"), "ContextFactory");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiUsername"), "jndi-username");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiPassword"), "jndi-password");
+// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("messageType"), "Text");
+// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("sourceName"), null);
+// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("keepMessageHeaders"), "false");
+// return jmsConfig;
+// }
+//
+// @Test
+// public void testValidFSProperties() throws NoSuchFieldException {
+// JMSConfig jmsConfig = getValidConfig("{\"key\":\"val\"}");
+// MockFailureCollector collector = new MockFailureCollector("jms_failure_collector");
+// ((JMSStreamingSourceConfig) jmsConfig).validateParams(collector);
+// Assert.assertEquals(0, collector.getValidationFailures().size());
+// }
+//
+// @Test
+// public void testInvalidFSProperties() throws NoSuchFieldException {
+// JMSConfig jmsConfig = getInvalidConfig("{\"key\":\"val\"}");
+// MockFailureCollector collector = new MockFailureCollector("jms_failure_collector");
+// ((JMSStreamingSourceConfig) jmsConfig).validateParams(collector);
+// Assert.assertEquals(1, collector.getValidationFailures().size());
+// }
+//}
diff --git a/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java b/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java
new file mode 100644
index 0000000..2a7440c
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java
@@ -0,0 +1,256 @@
+///*
+// * Copyright © 2021 Cask Data, Inc.
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// * use this file except in compliance with the License. You may obtain a copy of
+// * the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// * License for the specific language governing permissions and limitations under
+// * the License.
+// */
+//
+//package io.cdap.plugin.jms.source;
+//
+//
+//import com.google.gson.Gson;
+//import io.cdap.cdap.api.data.format.StructuredRecord;
+//import io.cdap.cdap.api.data.schema.Schema;
+//import io.cdap.plugin.jms.common.JMSMessageMetadata;
+//import io.cdap.plugin.jms.common.JMSMessageType;
+//import io.cdap.plugin.jms.source.common.DummyObject;
+//import org.apache.commons.lang.SerializationUtils;
+//import org.junit.Assert;
+//import org.junit.Test;
+//
+//import java.nio.charset.StandardCharsets;
+//import java.util.Arrays;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Properties;
+//import javax.jms.Destination;
+//import javax.jms.JMSException;
+//import javax.jms.MapMessage;
+//import javax.jms.ObjectMessage;
+//import javax.jms.TextMessage;
+//
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//public class JMSSourceUtilsTest {
+//
+//
+// @Test
+// public void testSchema() throws JMSException {
+// List headers = Arrays.asList(
+// Schema.Field.of("header_1", Schema.of(Schema.Type.INT)),
+// Schema.Field.of("header_2", Schema.of(Schema.Type.STRING)),
+// Schema.Field.of("header_3", Schema.of(Schema.Type.DOUBLE))
+// );
+// List payload = Arrays.asList(
+// Schema.Field.of("payload_1", Schema.of(Schema.Type.INT)),
+// Schema.Field.of("payload_2", Schema.of(Schema.Type.STRING)),
+// Schema.Field.of("payload_3", Schema.of(Schema.Type.DOUBLE))
+// );
+// List properties = Arrays.asList(
+// Schema.Field.of("property_1", Schema.of(Schema.Type.INT)),
+// Schema.Field.of("property_2", Schema.of(Schema.Type.STRING)),
+// Schema.Field.of("property_3", Schema.of(Schema.Type.DOUBLE))
+// );
+//
+// Schema s = Schema.recordOf("record",
+// Schema.Field.of("headers", Schema.recordOf("headers", headers)),
+// Schema.Field.of("payload", Schema.recordOf("payload", headers)),
+// Schema.Field.of("properties", Schema.recordOf("properties", headers))
+// );
+//
+// s.getField("payload").getSchema().getType();
+// }
+//
+//
+//
+// @Test
+// public void testConvertMapMessageWithPayload() throws JMSException {
+// /*
+// * This tests if a JMS MapMessage is properly converted to a StructuredRecord given that the automatic schema is
+// * kept. This schema does have a single field called "payload" that will include the entire JMS MapMessage
+// * properties as a Json String.
+// */
+//
+// MapMessage mapMessage = mock(MapMessage.class);
+// mockMapMessage(mapMessage);
+// boolean keepMessageHeaders = false;
+// String schema = null;
+// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.MAP, schema);
+// StructuredRecord actualRecord = JMSSourceUtils.convertJMSMapMessage(mapMessage, config);
+//
+// Schema expectedSchema = Schema.recordOf("record", Schema.Field.of("payload", Schema.of(Schema.Type.STRING)));
+//
+// Map expectedValues = new HashMap<>();
+// expectedValues.put("byteVal", (byte) 5);
+// expectedValues.put("doubleVal", 1.0d);
+// expectedValues.put("longVal", 1L);
+// expectedValues.put("intVal", 1);
+// expectedValues.put("stringVal", "Hello World!");
+// expectedValues.put("charVal", 'a');
+// expectedValues.put("floatVal", 1.0f);
+// expectedValues.put("shortVal", 5);
+// expectedValues.put("booleanVal", true);
+// // expectedValues.put("bytesVal",);
+// // expectedValues.put("objectVal", );
+//
+// Gson gson = new Gson();
+// String json = gson.toJson(expectedValues);
+//
+// StructuredRecord expectedRecord = StructuredRecord.builder(expectedSchema)
+// .set("payload", json)
+// .build();
+//
+// assertEqualsStructuredRecords(expectedRecord, actualRecord);
+// }
+//
+// @Test
+// public void testConvertMapMessageWithoutPayload() throws JMSException {
+// /*
+// * This tests if a JMS MapMessage is properly converted to a StructuredRecord given that the schema is
+// * predefined. This means that there is no "payload" field in the schema, and for every property of JMS MapMessage
+// * that we want to get included into the StructuredRecord, we define a field in the output schema.
+// */
+//
+// MapMessage mapMessage = mock(MapMessage.class);
+// mockMapMessage(mapMessage);
+// boolean keepMessageHeaders = false;
+//
+// // Schema is set manually. No "payload" field in the schema.
+// List fields = Arrays.asList(
+// Schema.Field.of("booleanVal", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))),
+// Schema.Field.of("byteVal", Schema.nullableOf(Schema.of(Schema.Type.BYTES))),
+// Schema.Field.of("intVal", Schema.nullableOf(Schema.of(Schema.Type.INT))),
+// Schema.Field.of("longVal", Schema.nullableOf(Schema.of(Schema.Type.LONG))),
+// Schema.Field.of("floatVal", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))),
+// Schema.Field.of("doubleVal", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
+// Schema.Field.of("stringVal", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
+//// Schema.Field.of("bytesVal", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.Type.BYTES)))
+// );
+// Schema schema = Schema.recordOf("record", fields);
+//
+// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.MAP,
+// schema.toString());
+// StructuredRecord actual = JMSSourceUtils.convertJMSMapMessage(mapMessage, config);
+//
+// StructuredRecord expected = StructuredRecord.builder(schema)
+// .set("booleanVal", true)
+// .set("byteVal", (byte) 5)
+// .set("intVal", 1)
+// .set("longVal", 1L)
+// .set("floatVal", 1.0f)
+// .set("doubleVal", 1.0d)
+// .set("stringVal", "Hello World!")
+//// .set("bytesVal", "Hello World!".getBytes(StandardCharsets.UTF_8))
+// .build();
+//
+// assertEqualsStructuredRecords(expected, actual);
+// }
+//
+// @Test
+// public void testConvertBytesMessage() throws JMSException {
+// /*
+// * This tests if the JMS TextMessage is properly converted to a StructuredRecord given that no headers are
+// * included.
+// */
+//
+//// BytesMessage bytesMessage = mock(BytesMessage.class);
+////
+//// when(bytesMessage.readBytes(new byte[8096])).thenReturn(1);
+////
+//// boolean keepMessageHeaders = false;
+//// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(keepMessageHeaders, null);
+//// StructuredRecord actual = JMSSourceUtils.convertJMSByteMessage(bytesMessage, config);
+////
+//// StructuredRecord expected = StructuredRecord
+//// .builder(Schema.recordOf("record", Schema.Field.of("payload", Schema.of(Schema.Type.STRING))))
+//// .set("payload", "Hello World!").build();
+////
+//// Assert.assertEquals(expected, actual);
+// }
+//
+// @Test
+// public void testConvertObjectMessage() throws JMSException {
+// /*
+// * This tests if the JMS BytesMessage is properly converted to a StructuredRecord given that no headers are
+// * included.
+// */
+//
+// ObjectMessage objectMessage = mock(ObjectMessage.class);
+// DummyObject dummyObj = new DummyObject("Hello", 123);
+//
+// when(objectMessage.getObject()).thenReturn(dummyObj);
+// boolean keepMessageHeaders = false;
+//
+// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.OBJECT, null);
+// StructuredRecord actual = JMSSourceUtils.convertJMSObjectMessage(objectMessage, config);
+//
+// StructuredRecord expected = StructuredRecord
+// .builder(Schema.recordOf("record", Schema.Field.of("payload", Schema.arrayOf(Schema.of(Schema.Type.BYTES)))))
+// .set("payload", SerializationUtils.serialize(dummyObj)).build();
+//
+// assertEqualsStructuredRecords(expected, actual);
+// }
+//
+// @Test
+// public void testConvertMessage() {
+//
+// }
+//
+//
+//
+//
+//
+//
+//
+// private void mockMapMessage(MapMessage mapMessage) throws JMSException {
+// Properties prop = new Properties();
+// prop.put("booleanVal", true);
+// prop.put("byteVal", (byte) 5);
+// prop.put("shortVal", (short) 5);
+// prop.put("charVal", 'a');
+// prop.put("intVal", 1);
+// prop.put("longVal", 1L);
+// prop.put("floatVal", 1.0f);
+// prop.put("doubleVal", 1.0d);
+// prop.put("stringVal", "Hello World!");
+//// prop.put("bytesVal", "Hello World!".getBytes(StandardCharsets.UTF_8));
+//// prop.put("objectVal", new Object());
+//
+// when(mapMessage.getObject("booleanVal")).thenReturn(true);
+// when(mapMessage.getObject("byteVal")).thenReturn((byte) 5);
+// when(mapMessage.getObject("shortVal")).thenReturn((short) 5);
+// when(mapMessage.getObject("charVal")).thenReturn('a');
+// when(mapMessage.getObject("intVal")).thenReturn(1);
+// when(mapMessage.getObject("longVal")).thenReturn(1L);
+// when(mapMessage.getObject("floatVal")).thenReturn(1.0f);
+// when(mapMessage.getObject("doubleVal")).thenReturn(1.0d);
+// when(mapMessage.getObject("stringVal")).thenReturn("Hello World!");
+//// when(mapMessage.getObject("bytesVal")).thenReturn("Hello World!".getBytes(StandardCharsets.UTF_8));
+//// when(mapMessage.getObject("objectVal")).thenReturn(new Object());
+// when(mapMessage.getMapNames()).thenReturn(prop.propertyNames());
+//
+//
+// when(mapMessage.getBoolean("booleanVal")).thenReturn(true);
+// when(mapMessage.getByte("byteVal")).thenReturn((byte) 5);
+// when(mapMessage.getShort("shortVal")).thenReturn((short) 5);
+// when(mapMessage.getChar("charVal")).thenReturn('a');
+// when(mapMessage.getInt("intVal")).thenReturn(1);
+// when(mapMessage.getLong("longVal")).thenReturn(1L);
+// when(mapMessage.getFloat("floatVal")).thenReturn(1.0f);
+// when(mapMessage.getDouble("doubleVal")).thenReturn(1.0d);
+// when(mapMessage.getString("stringVal")).thenReturn("Hello World!");
+//// when(mapMessage.getBytes("bytesVal")).thenReturn("Hello World!".getBytes(StandardCharsets.UTF_8));
+//// when(mapMessage.getObject("objectVal")).thenReturn(new Object());
+// }
+//}
diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java
new file mode 100644
index 0000000..ef68fbd
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java
@@ -0,0 +1,4 @@
+package io.cdap.plugin.jms.source;
+
+public class SourceByteMessageTest {
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java
new file mode 100644
index 0000000..569a1e6
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java
@@ -0,0 +1,4 @@
+package io.cdap.plugin.jms.source;
+
+public class SourceMapMessageTest {
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java
new file mode 100644
index 0000000..1ef6dd3
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java
@@ -0,0 +1,4 @@
+package io.cdap.plugin.jms.source;
+
+public class SourceMessageTest {
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java
new file mode 100644
index 0000000..f3640db
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java
@@ -0,0 +1,4 @@
+package io.cdap.plugin.jms.source;
+
+public class SourceObjectMessageTest {
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java
new file mode 100644
index 0000000..c9245d6
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java
@@ -0,0 +1,124 @@
+package io.cdap.plugin.jms.source;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.jms.common.JMSMessageMetadata;
+import io.cdap.plugin.jms.common.JMSMessageParts;
+import io.cdap.plugin.jms.common.JMSMessageType;
+import io.cdap.plugin.jms.source.common.TestUtils;
+import org.junit.Test;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SourceTextMessageTest {
+
+ @Test
+ public void convertJMSTextMessage_withNoMetadataNoPropertiesNoSchema_successfully() throws JMSException {
+ String payloadContent = "Hello World!";
+
+ TextMessage textMessage = mock(TextMessage.class);
+ when(textMessage.getText()).thenReturn(payloadContent);
+
+ boolean keepMetadata = false;
+ boolean keepProperties = false;
+ String schema = null;
+ JMSStreamingSourceConfig config = TestUtils
+ .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema);
+
+ // expected
+ Schema expectedSchema = Schema
+ .recordOf("record", Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING)));
+ StructuredRecord expectedRecord = StructuredRecord
+ .builder(expectedSchema)
+ .set(JMSMessageParts.PAYLOAD, "Hello World!").build();
+
+ // actual
+ StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config);
+
+ // asserts
+ TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord);
+ }
+
+ @Test
+ public void convertJMSTextMessage_withMetadataAndPropertiesButNoSchema_successfully() throws JMSException {
+ String payloadContent = "Hello World!";
+
+ TextMessage textMessage = mock(TextMessage.class);
+ when(textMessage.getText()).thenReturn(payloadContent);
+ TestUtils.mockMetadata(textMessage);
+ TestUtils.mockProperties(textMessage);
+
+ boolean keepMetadata = true;
+ boolean keepProperties = true;
+ String schema = null;
+ JMSStreamingSourceConfig config = TestUtils
+ .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema);
+
+ // expected
+ Schema expectedSchema = Schema.recordOf("record", JMSMessageMetadata.getMessageMetadata(),
+ Schema.Field.of(JMSMessageParts.PROPERTIES, Schema.of(Schema.Type.STRING)),
+ Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING)));
+ StructuredRecord.Builder expectedRecordBuilder = StructuredRecord.builder(expectedSchema);
+ TestUtils.populateDummyMetadata(expectedRecordBuilder, expectedSchema);
+ TestUtils.populateDummyProperties(expectedRecordBuilder, expectedSchema);
+ expectedRecordBuilder.set(JMSMessageParts.PAYLOAD, payloadContent);
+ StructuredRecord expectedRecord = expectedRecordBuilder.build();
+
+ // actual
+ StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config);
+
+ // asserts
+ TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord);
+ }
+
+ @Test
+ public void convertJMSTextMessage_withMetadataAndPropertiesAndSchema_successfully() throws JMSException {
+ String payloadContent = "Hello World!";
+
+ TextMessage textMessage = mock(TextMessage.class);
+ when(textMessage.getText()).thenReturn(payloadContent);
+ TestUtils.mockMetadata(textMessage);
+ TestUtils.mockProperties(textMessage);
+
+ Schema.Field propertiesField = Schema.Field.of(
+ JMSMessageParts.PROPERTIES,
+ Schema.recordOf(
+ JMSMessageParts.PROPERTIES,
+ Schema.Field.of("boolean-prop", Schema.of(Schema.Type.BOOLEAN)),
+ Schema.Field.of("string-prop", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("int-prop", Schema.of(Schema.Type.BOOLEAN))
+ )
+ );
+
+ Schema outputSchema = Schema.recordOf(
+ "record",
+ JMSMessageMetadata.getMessageMetadata(),
+ propertiesField,
+ Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING))
+ );
+
+ boolean keepMetadata = true;
+ boolean keepProperties = true;
+ String schema = outputSchema.toString();
+
+ JMSStreamingSourceConfig config = TestUtils
+ .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema);
+
+ // expected
+ StructuredRecord.Builder expectedRecordBuilder = StructuredRecord.builder(outputSchema);
+ TestUtils.populateDummyMetadata(expectedRecordBuilder, outputSchema);
+ TestUtils.populateDummyProperties(expectedRecordBuilder, outputSchema);
+ expectedRecordBuilder.set(JMSMessageParts.PAYLOAD, payloadContent);
+ StructuredRecord expectedRecord = expectedRecordBuilder.build();
+
+ // actual
+ StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config);
+
+ // asserts
+ TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord);
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java b/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java
new file mode 100644
index 0000000..9139a00
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java
@@ -0,0 +1,47 @@
+package io.cdap.plugin.jms.source.common;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class DummyObject implements Serializable {
+ String dummyStr;
+ int dummyInt;
+
+ public DummyObject(String dummyStr, int dummyInt) {
+ this.dummyStr = dummyStr;
+ this.dummyInt = dummyInt;
+ }
+
+ public String getDummyStr() {
+ return dummyStr;
+ }
+
+ public void setDummyStr(String dummyStr) {
+ this.dummyStr = dummyStr;
+ }
+
+ public int getDummyInt() {
+ return dummyInt;
+ }
+
+ public void setDummyInt(int dummyInt) {
+ this.dummyInt = dummyInt;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DummyObject that = (DummyObject) o;
+ return dummyInt == that.dummyInt && Objects.equals(dummyStr, that.dummyStr);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dummyStr, dummyInt);
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java b/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java
new file mode 100644
index 0000000..72f0e58
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.jms.source.common;
+
+import com.google.gson.Gson;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.jms.common.JMSMessageMetadata;
+import io.cdap.plugin.jms.common.JMSMessageParts;
+import io.cdap.plugin.jms.source.JMSStreamingSourceConfig;
+import org.junit.Assert;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import static org.mockito.Mockito.when;
+
+public class TestUtils {
+
+ public static JMSStreamingSourceConfig getSourceConfig(boolean messageMetadata, boolean messageProperties,
+ String messageType, String schema) {
+ return new JMSStreamingSourceConfig("referenceName", "Connection Factory", "jms-username", "jms-password",
+ "tcp://0.0.0.0:61616", "Queue", "jndi-context-factory", "jndi-username",
+ "jndi-password", String.valueOf(messageMetadata),
+ String.valueOf(messageProperties), messageType, "MyQueue", schema);
+ }
+
+ public static void populateDummyMetadata(StructuredRecord.Builder builder, Schema schema) {
+ StructuredRecord.Builder metadataBuilder = StructuredRecord
+ .builder(schema.getField(JMSMessageParts.METADATA).getSchema());
+
+ metadataBuilder.set(JMSMessageMetadata.MESSAGE_ID, "jms-message-id");
+ metadataBuilder.set(JMSMessageMetadata.MESSAGE_TIMESTAMP, 112233445566L);
+ metadataBuilder.set(JMSMessageMetadata.CORRELATION_ID, "jms-correlation-id");
+ metadataBuilder.set(JMSMessageMetadata.REPLY_TO, getDummyDestination().toString());
+ metadataBuilder.set(JMSMessageMetadata.DESTINATION, getDummyDestination().toString());
+ metadataBuilder.set(JMSMessageMetadata.DELIVERY_MODE, 0);
+ metadataBuilder.set(JMSMessageMetadata.REDELIVERED, false);
+ metadataBuilder.set(JMSMessageMetadata.TYPE, "jms-type");
+ metadataBuilder.set(JMSMessageMetadata.EXPIRATION, 112233445566L);
+ metadataBuilder.set(JMSMessageMetadata.PRIORITY, 0);
+
+ builder.set(JMSMessageParts.METADATA, metadataBuilder.build());
+ }
+
+
+ public static void populateDummyProperties(StructuredRecord.Builder builder, Schema schema) {
+ boolean isPropertiesString = schema.getField(JMSMessageParts.PROPERTIES)
+ .getSchema()
+ .getType()
+ .equals(Schema.Type.STRING);
+
+ boolean isPropertiesRecord = schema.getField(JMSMessageParts.PROPERTIES)
+ .getSchema()
+ .getType()
+ .equals(Schema.Type.RECORD);
+
+ if (isPropertiesString) {
+ HashMap properties = new HashMap<>();
+ properties.put("boolean-prop", true);
+ properties.put("byte-prop", (byte) 100);
+ properties.put("short-prop", (short) 200);
+ properties.put("int-prop", 300);
+ properties.put("long-prop", 400L);
+ properties.put("float-prop", 500.0f);
+ properties.put("double-prop", 600.0d);
+ properties.put("string-prop", "string-prop");
+ builder.set(JMSMessageParts.PROPERTIES, new Gson().toJson(properties));
+ } else if (isPropertiesRecord) {
+ StructuredRecord propertiesRecord = StructuredRecord
+ .builder(schema.getField(JMSMessageParts.PROPERTIES).getSchema())
+ .set("boolean-prop", true)
+ .set("string-prop", "Hello World!")
+ .set("int-prop", 300)
+ .build();
+
+ builder.set(JMSMessageParts.PROPERTIES, propertiesRecord);
+ }
+ }
+
+ public static void mockMetadata(Message message) throws JMSException {
+ when(message.getJMSMessageID()).thenReturn("jms-message-id");
+ when(message.getJMSCorrelationID()).thenReturn("jms-correlation-id");
+ when(message.getJMSReplyTo()).thenReturn(getDummyDestination());
+ when(message.getJMSDestination()).thenReturn(getDummyDestination());
+ when(message.getJMSType()).thenReturn("jms-type");
+ when(message.getJMSTimestamp()).thenReturn(112233445566L);
+ when(message.getJMSDeliveryMode()).thenReturn(0);
+ when(message.getJMSRedelivered()).thenReturn(false);
+ when(message.getJMSExpiration()).thenReturn(112233445566L);
+ when(message.getJMSPriority()).thenReturn(0);
+ }
+
+ public static void mockProperties(Message message) throws JMSException {
+ List propertyNames = Arrays.asList("boolean-prop", "byte-prop", "short-prop", "int-prop", "long-prop",
+ "float-prop", "double-prop", "string-prop");
+ when(message.getPropertyNames()).thenReturn(Collections.enumeration(propertyNames));
+ when(message.getBooleanProperty("boolean-prop")).thenReturn(true);
+ when(message.getByteProperty("byte-prop")).thenReturn((byte) 100);
+ when(message.getShortProperty("short-prop")).thenReturn((short) 100);
+ when(message.getIntProperty("int-prop")).thenReturn(300);
+ when(message.getLongProperty("long-prop")).thenReturn(400L);
+ when(message.getFloatProperty("float-prop")).thenReturn(500.0f);
+ when(message.getDoubleProperty("double-prop")).thenReturn(600.0d);
+ when(message.getStringProperty("string-prop")).thenReturn("string-prop");
+
+ when(message.getObjectProperty("boolean-prop")).thenReturn(true);
+ when(message.getObjectProperty("byte-prop")).thenReturn((byte) 100);
+ when(message.getObjectProperty("short-prop")).thenReturn((short) 200);
+ when(message.getObjectProperty("int-prop")).thenReturn(300);
+ when(message.getObjectProperty("long-prop")).thenReturn(400L);
+ when(message.getObjectProperty("float-prop")).thenReturn(500.0f);
+ when(message.getObjectProperty("double-prop")).thenReturn(600.0d);
+ when(message.getObjectProperty("string-prop")).thenReturn("string-prop");
+
+ when(message.propertyExists("boolean-prop")).thenReturn(true);
+ when(message.propertyExists("byte-prop")).thenReturn(true);
+ when(message.propertyExists("short-prop")).thenReturn(true);
+ when(message.propertyExists("int-prop")).thenReturn(true);
+ when(message.propertyExists("long-prop")).thenReturn(true);
+ when(message.propertyExists("float-prop")).thenReturn(true);
+ when(message.propertyExists("double-prop")).thenReturn(true);
+ when(message.propertyExists("string-prop")).thenReturn(true);
+ }
+
+ private static Destination getDummyDestination() {
+ return new Destination() {
+ @Override
+ public String toString() {
+ return "Destination";
+ }
+ };
+ }
+
+ public static void assertEqualsStructuredRecords(StructuredRecord expected, StructuredRecord actual) {
+ Assert.assertEquals(expected.getSchema(), actual.getSchema());
+ for (Schema.Field field : expected.getSchema().getFields()) {
+
+ switch (field.getSchema().getType()) {
+ case BOOLEAN:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case INT:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case LONG:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case FLOAT:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case DOUBLE:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case BYTES:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case STRING:
+ Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName()));
+ break;
+ case ARRAY:
+ Assert.assertEquals(new String(expected.get(field.getName()), StandardCharsets.UTF_8),
+ new String(actual.get(field.getName()), StandardCharsets.UTF_8));
+ break;
+ }
+ }
+ }
+
+}
diff --git a/suppressions.xml b/suppressions.xml
new file mode 100644
index 0000000..f1ba54a
--- /dev/null
+++ b/suppressions.xml
@@ -0,0 +1,35 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/widgets/JMS-batchsink.json b/widgets/JMS-batchsink.json
new file mode 100644
index 0000000..d39d46a
--- /dev/null
+++ b/widgets/JMS-batchsink.json
@@ -0,0 +1,100 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "JMS",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Connection Factory",
+ "name": "connectionFactory",
+ "widget-attributes": {
+ "default": "ConnectionFactory"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JMS Username",
+ "name": "jmsUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "JMS Password",
+ "name": "jmsPassword"
+ },
+ {
+ "widget-type": "text",
+ "label": "Provider URL",
+ "name": "providerUrl",
+ "widget-attributes": {
+ "placeholder": "tcp://hostname:61616"
+ }
+ },
+ {
+ "widget-type": "select",
+ "label": "Type",
+ "name": "type",
+ "widget-attributes": {
+ "values": [
+ "Queue",
+ "Topic"
+ ],
+ "default": "Queue"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Destination Queue or Topic Name",
+ "name": "destinationName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JNDI Context Factory",
+ "name": "jndiContextFactory",
+ "widget-attributes": {
+ "default": "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JNDI Username",
+ "name": "jndiUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "JNDI Password",
+ "name": "jndiPassword"
+ },
+ {
+ "widget-type": "select",
+ "label": "Message Type",
+ "name": "messageType",
+ "widget-attributes": {
+ "values": [
+ "Message",
+ "Text",
+ "Bytes",
+ "Map"
+ ],
+ "default": "Text"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {}
+ }
+ ]
+}
diff --git a/widgets/JMS-streamingsource.json b/widgets/JMS-streamingsource.json
new file mode 100644
index 0000000..61abfa6
--- /dev/null
+++ b/widgets/JMS-streamingsource.json
@@ -0,0 +1,207 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "JMS",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Connection Factory",
+ "name": "connectionFactory",
+ "widget-attributes": {
+ "default": "ConnectionFactory"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JMS Username",
+ "name": "jmsUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "JMS Password",
+ "name": "jmsPassword"
+ },
+ {
+ "widget-type": "text",
+ "label": "Provider URL",
+ "name": "providerUrl",
+ "widget-attributes": {
+ "placeholder": "tcp://hostname:61616"
+ }
+ },
+ {
+ "widget-type": "select",
+ "label": "Type",
+ "name": "type",
+ "widget-attributes": {
+ "values": [
+ "Queue",
+ "Topic"
+ ],
+ "default": "Queue"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Source Queue or Topic Name",
+ "name": "sourceName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JNDI Context Factory",
+ "name": "jndiContextFactory",
+ "widget-attributes": {
+ "default": "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JNDI Username",
+ "name": "jndiUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "JNDI Password",
+ "name": "jndiPassword"
+ },
+ {
+ "widget-type": "toggle",
+ "name": "messageMetadata",
+ "label": "Keep Message Metadata",
+ "widget-attributes": {
+ "default": "true",
+ "on": {
+ "value": "true",
+ "label": "True"
+ },
+ "off": {
+ "value": "false",
+ "label": "False"
+ }
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "name": "messageProperties",
+ "label": "Keep Message Properties",
+ "widget-attributes": {
+ "default": "true",
+ "on": {
+ "value": "true",
+ "label": "True"
+ },
+ "off": {
+ "value": "false",
+ "label": "False"
+ }
+ }
+ },
+ {
+ "widget-type": "select",
+ "label": "Message Type",
+ "name": "messageType",
+ "widget-attributes": {
+ "values": [
+ "Message",
+ "Text",
+ "Bytes",
+ "Map",
+ "Object"
+ ],
+ "default": "Text"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "default-schema": {
+ "name": "etlSchemaBody",
+ "type": "record",
+ "fields": [
+ {
+ "name": "metadata",
+ "type": {
+ "type": "record",
+ "name": "metadata",
+ "fields": [
+ {
+ "name": "messageId",
+ "type": ["string", "null"],
+ "default": null
+ },
+ {
+ "name": "messageTimestamp",
+ "type": ["long", "null"],
+ "default": null
+ },
+ {
+ "name": "correlationId",
+ "type": ["string", "null"],
+ "default": null
+ },
+ {
+ "name": "replyTo",
+ "type": ["string", "null"],
+ "default": null
+ },
+ {
+ "name": "destination",
+ "type": ["string", "null"],
+ "default": null
+ },
+ {
+ "name": "deliveryNode",
+ "type": ["int", "null"],
+ "default": null
+ },
+ {
+ "name": "redelivered",
+ "type": ["boolean", "null"],
+ "default": null
+ },
+ {
+ "name": "type",
+ "type": ["boolean", "null"],
+ "default": null
+ },
+ {
+ "name": "expiration",
+ "type": ["long", "null"],
+ "default": null
+ },
+ {
+ "name": "priority",
+ "type": ["int", "null"],
+ "default": null
+ }
+ ]
+ }
+ },
+ {
+ "name": "payload",
+ "type": "string"
+ },
+ {
+ "name": "properties",
+ "type": "string"
+ }
+ ]
+ }
+ }
+ }
+ ]
+}