From 7fe5119726a88962ed99a68555eacbb03e785ec1 Mon Sep 17 00:00:00 2001 From: dardanxh Date: Fri, 18 Dec 2020 01:30:19 +0100 Subject: [PATCH] Implement JMS Source/Sink Plugin --- .gitignore | 36 ++ checkstyle.xml | 406 ++++++++++++++++++ docs/JMS-batchsink.md | 58 +++ docs/JMS-streamingsource.md | 102 +++++ examples/JMS-batchsink.json | 107 +++++ examples/JMS-streamingsource.json | 99 +++++ pom.xml | 230 ++++++++++ .../io/cdap/plugin/jms/common/JMSConfig.java | 172 ++++++++ .../cdap/plugin/jms/common/JMSConnection.java | 343 +++++++++++++++ .../plugin/jms/common/JMSDataStructures.java | 25 ++ .../plugin/jms/common/JMSMessageMetadata.java | 58 +++ .../plugin/jms/common/JMSMessageParts.java | 33 ++ .../jms/common/JMSMessageProperties.java | 125 ++++++ .../plugin/jms/common/JMSMessageType.java | 28 ++ .../io/cdap/plugin/jms/sink/JMSBatchSink.java | 82 ++++ .../plugin/jms/sink/JMSBatchSinkConfig.java | 114 +++++ .../plugin/jms/sink/JMSBatchSinkUtils.java | 237 ++++++++++ .../cdap/plugin/jms/sink/JMSOutputFormat.java | 71 +++ .../jms/sink/JMSOutputFormatProvider.java | 50 +++ .../cdap/plugin/jms/sink/JMSRecordWriter.java | 124 ++++++ .../cdap/plugin/jms/source/JMSReceiver.java | 104 +++++ .../plugin/jms/source/JMSSourceUtils.java | 332 ++++++++++++++ .../plugin/jms/source/JMSStreamingSource.java | 78 ++++ .../jms/source/JMSStreamingSourceConfig.java | 230 ++++++++++ .../jms/source/ReferenceStreamingSource.java | 48 +++ src/test/java/JMSPluginTest.java | 75 ++++ .../plugin/jms/source/JMSSourceUtilsTest.java | 256 +++++++++++ .../jms/source/SourceByteMessageTest.java | 4 + .../jms/source/SourceMapMessageTest.java | 4 + .../plugin/jms/source/SourceMessageTest.java | 4 + .../jms/source/SourceObjectMessageTest.java | 4 + .../jms/source/SourceTextMessageTest.java | 124 ++++++ .../plugin/jms/source/common/DummyObject.java | 47 ++ .../plugin/jms/source/common/TestUtils.java | 189 ++++++++ suppressions.xml | 35 ++ widgets/JMS-batchsink.json | 100 +++++ widgets/JMS-streamingsource.json | 207 +++++++++ 37 files changed, 4341 insertions(+) create mode 100644 .gitignore create mode 100644 checkstyle.xml create mode 100644 docs/JMS-batchsink.md create mode 100644 docs/JMS-streamingsource.md create mode 100644 examples/JMS-batchsink.json create mode 100644 examples/JMS-streamingsource.json create mode 100644 pom.xml create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSConfig.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSConnection.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java create mode 100644 src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java create mode 100644 src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java create mode 100644 src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java create mode 100644 src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java create mode 100644 src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java create mode 100644 src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java create mode 100644 src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java create mode 100644 src/test/java/JMSPluginTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java create mode 100644 src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java create mode 100644 suppressions.xml create mode 100644 widgets/JMS-batchsink.json create mode 100644 widgets/JMS-streamingsource.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b0df7b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +*.class +.*.swp +.beamer +# Package Files # +*.jar +*.war +*.ear + +# Intellij Files & Dir # +*.iml +*.ipr +*.iws +atlassian-ide-plugin.xml +out/ +.DS_Store +./lib/ +.idea + +# Gradle Files & Dir # +build/ +.gradle/ +.stickyStorage +.build/ +target/ + +# Node log +npm-*.log +logs/ + +# Singlenode and test data files. +/templates/ +/data/ +/data-fabric-tests/data/ + +# generated by docs build +*.pyc diff --git a/checkstyle.xml b/checkstyle.xml new file mode 100644 index 0000000..2b6e777 --- /dev/null +++ b/checkstyle.xml @@ -0,0 +1,406 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/JMS-batchsink.md b/docs/JMS-batchsink.md new file mode 100644 index 0000000..74ccae6 --- /dev/null +++ b/docs/JMS-batchsink.md @@ -0,0 +1,58 @@ +# JMS batch sink + + +Description +----------- +Produces JMS messages of different types as Message, Text, Bytes and Map to a specified Queue or Topic. + +Use Case +-------- +Use this JMS Sink plugin when you want to produce messages to a JMS Queue/Topic. + + +Properties +---------- +**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by + default. + +**JMS Username**: Username to connect to JMS. This property is mandatory. + +**JMS Password**: Password to connect to JMS. This property is mandatory. + +**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider + you can set: `tcp://hostname:61616`. + +**Type**: Queue or Topic. Queue is considered by default. + +**Destination**: Queue/Topic name. + +**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ* +provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`. + +**JNDI Username**: Username for JNDI. This property is optional. + +**JNDI Password**: Password for JNDI. This property is optional. + +**Message Type**: The type of messages you intend to produce. A JMS message could be of the following types: *Message*, + *Text*, *Bytes* and *Map*. By default, *Text* message type is considered. + +Example +------- +This example reads a record from a JSON file and produces a *MapMessage* with the file's content as a payload. +For every record of the JSON file it produces one *MapMessage* to the given topic. +An example of a JSON record is shown below. + +```json +{"name":"foo", "surname":"bar", "age":23} +``` +We could see the produced message in the running JMS implementation. + +| field name | value | +| ----------------- | ------------------------------------------ | +| messageId | ID:Producer-36705-1609122138230-1:1:1:1:1 | +| messageTimestamp | 1609122138554 | +| destination | topic://MyTopic | +| deliveryNode | 2 | +| expiration | 0 | +| priority | 4 | +| payload | {"name":"foo","surname":"bar","age":23} | diff --git a/docs/JMS-streamingsource.md b/docs/JMS-streamingsource.md new file mode 100644 index 0000000..38a4101 --- /dev/null +++ b/docs/JMS-streamingsource.md @@ -0,0 +1,102 @@ +# JMS Streaming Source + + +Description +----------- +Consumes JMS messages of different types as Message, Text, Bytes and Map from a specified Queue or Topic. + +Use Case +-------- +Use this JMS Source plugin when you want to consume messages from a JMS Queue/Topic and write them to a Table. + + +Properties +---------- +**Connection Factory**: Name of the connection factory. If not specified, the value *ConnectionFactory* is considered by + default. + +**JMS Username**: Username to connect to JMS. This property is mandatory. + +**JMS Password**: Password to connect to JMS. This property is mandatory. + +**Provider URL**: Provider URL of the JMS Provider. This property is mandatory. For example for the *ActiveMQ* provider + you can set: `tcp://hostname:61616`. + +**Type**: Queue or Topic. Queue is considered by default. + +**Destination**: Queue/Topic name. + +**JNDI Context Factory**: Name of the context factory. This property is optional. For example for the *ActiveMQ* +provider you can set: `org.apache.activemq.jndi.ActiveMQInitialContextFactory`. + +**JNDI Username**: Username for JNDI. This property is optional. + +**JNDI Password**: Password for JNDI. This property is optional. + +**Message Type**: The type of messages you intend to consume. A JMS message could be of the following types: *Message*, + *Text*, *Bytes* and *Map*. By default, *Text* message type is considered. The *payload* field of the output schema gets + switched to the appropriate data type upon the selection of a message type and *validate* button click. + + +Example +------- +This example reads JMS messages of *TextMessage* type from the *status* topic existing in provider *tcp://hostname +:616161* with JNDI context factory name *org.apache.activemq.jndi.ActiveMQInitialContextFactory*. An example of a +TextMessage object is shown below. Since we used ActiveMQ as a provider, the message is automatically considered as an +*ActiveMQTextMessage* (an implementation of JMS TextMessage interface). The object below shows an +*ActiveMQTextMessage* consumed: + +```text +ActiveMQTextMessage { + commandId=5, + responseRequired=true, + messageId=ID: Producer-50444-1608735228752-1: 1: 1: 1: 1, + originalDestination=null, + originalTransactionId=null, + producerId=ID: Producer-50444-1608735228752-1: 1: 1: 1, + destination=topic: topic://status, + transactionId=null, + expiration=0, + timestamp=1608735228894, + arrival=0, + brokerInTime=1608735228895, + brokerOutTime=1608735228896, + correlationId=null, + replyTo=null, + persistent=true, + type=null, + priority=4, + groupID=null, + groupSequence=0, + targetConsumerId=null, + compressed=false, + userID=null, + content=org.apache.activemq.util.ByteSequence@4b9e255, + marshalledProperties=null, + dataStructure=null, + redeliveryCounter=0, + size=0, + properties=null, + readOnlyProperties=true, + readOnlyBody=true, + droppable=false, + jmsXGroupFirstForConsumer=false, + text=DONE +} + +``` +Since the JMS Source plugin's implementation is purely based in JMS (ie. not coupled in any JMS implementation as for +example ActiveMQ), we consider only the header data supported by JMS. The consumed will output the below +record: + +| field name | value | +| ----------------- | ------------------------------------------ | +| messageId | ID:Producer-54511-1608749039578-1:1:1:1:1 | +| messageTimestamp | 1609122138554 | +| deliveryNode | 2 | +| payload | DONE | +| replyTo | topic://status | +| correlationId | null | +| expiration | 0 | +| type | null | +| redelivered | false | diff --git a/examples/JMS-batchsink.json b/examples/JMS-batchsink.json new file mode 100644 index 0000000..bbe2c24 --- /dev/null +++ b/examples/JMS-batchsink.json @@ -0,0 +1,107 @@ +{ + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.3.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "description": "Read records from a json file and generate a MapMessage for each one of them.", + "name": "JMS-batchsink", + "config": { + "resources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "connections": [ + { + "from": "File", + "to": "JMS" + } + ], + "comments": [], + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "File", + "plugin": { + "name": "File", + "type": "batchsource", + "label": "File", + "artifact": { + "name": "core-plugins", + "version": "2.6.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "referenceName": "JMS", + "path": "${PATH}", + "format": "json", + "delimiter": ",", + "skipHeader": "true", + "filenameOnly": "false", + "recursive": "false", + "ignoreNonExistingFolders": "false", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}" + } + ], + "id": "File" + }, + { + "name": "JMS", + "plugin": { + "name": "JMS", + "type": "batchsink", + "label": "JMS", + "artifact": { + "name": "jms-plugins", + "version": "1.0.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "referenceName": "JMS", + "type": "Topic", + "messageType": "Map", + "jmsUsername": "${JMS_USERNAME}", + "jmsPassword": "${JMS_PASSWORD}", + "providerUrl": "${PROVIDER_URL}", + "destination": "${DESTINATION}", + "jndiContextFactory": "${JNDI_CONTEXT_FACTORY}", + "jndiUsername": "${JNDI_USERNAME}", + "jndiPassword": "${JNDI_PASSWORD}" + } + }, + "outputSchema": [ + { + "name": "etlSchemaBody", + "schema": "" + } + ], + "inputSchema": [ + { + "name": "File", + "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"surname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}" + } + ], + "id": "JMS" + } + ], + "schedule": "0 * * * *", + "engine": "spark", + "numOfRecordsPreview": 100, + "description": "Data Pipeline Application", + "maxConcurrentRuns": 1 + } +} + diff --git a/examples/JMS-streamingsource.json b/examples/JMS-streamingsource.json new file mode 100644 index 0000000..05fde74 --- /dev/null +++ b/examples/JMS-streamingsource.json @@ -0,0 +1,99 @@ +{ + "artifact": { + "name": "cdap-data-streams", + "version": "6.3.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "description": "Consume messages from a given topic and write them to a file.", + "name": "JMS-streamingsource", + "config": { + "resources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "connections": [ + { + "from": "JMS", + "to": "File" + } + ], + "comments": [], + "postActions": [], + "properties": { + "system.spark.spark.streaming.backpressure.enabled": "true", + "system.spark.spark.executor.instances": "1" + }, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "JMS", + "plugin": { + "name": "JMS", + "type": "streamingsource", + "label": "JMS", + "artifact": { + "name": "jms-plugins", + "version": "1.0.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "referenceName": "JMS", + "schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}", + "type": "Topic", + "messageType": "Text", + "jmsUsername": "${JMS_USERNAME}", + "jmsPassword": "${JMS_PASSWORD}", + "providerUrl": "${PROVIDER_URL}", + "destination": "${DESTINATION}", + "jndiContextFactory": "${JNDI_CONTEXT_FACTORY}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}", + "id": "JMS" + }, + { + "name": "File", + "plugin": { + "name": "File", + "type": "batchsink", + "label": "File", + "artifact": { + "name": "core-plugins", + "version": "2.5.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "suffix": "yyyy-MM-dd-HH-mm", + "format": "delimited", + "referenceName": "file", + "path": "${PATH}", + "delimiter": ";", + "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}" + } + }, + "outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}", + "inputSchema": [ + { + "name": "JMS", + "schema": "{\"type\":\"record\",\"name\":\"message\",\"fields\":[{\"name\":\"messageId\",\"type\":\"string\"},{\"name\":\"messageTimestamp\",\"type\":\"long\"},{\"name\":\"correlationId\",\"type\":\"string\"},{\"name\":\"replyTo\",\"type\":\"string\"},{\"name\":\"destination\",\"type\":\"string\"},{\"name\":\"deliveryNode\",\"type\":\"int\"},{\"name\":\"redelivered\",\"type\":\"boolean\"},{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"expiration\",\"type\":\"long\"},{\"name\":\"priority\",\"type\":\"int\"},{\"name\":\"payload\",\"type\":\"string\"}]}" + } + ], + "id": "File" + } + ], + "batchInterval": "10s", + "clientResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "disableCheckpoints": true, + "stopGracefully": true, + "description": "Data Streams Application" + } +} + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..6082a1e --- /dev/null +++ b/pom.xml @@ -0,0 +1,230 @@ + + + + + + 4.0.0 + JMS Plugins + io.cdap.plugin + jms-plugins + 1.0.0-SNAPSHOT + + + UTF-8 + widgets + docs + ${project.basedir} + 6.2.3 + 2.6.0-SNAPSHOT + 2.3.1 + 2.9.1 + 4.1.16.Final + 1.3.0 + 1.8.2 + 4.12 + 27.0.1-jre + 5.11.1 + 2.24.0 + + + + + org.apache.activemq + activemq-all + ${activemq.version} + + + io.cdap.plugin + hydrator-common + ${cdap.plugin.version} + + + com.google.guava + guava + ${guava.version} + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-common + ${cdap.version} + provided + + + io.cdap.cdap + cdap-api-common + ${cdap.version} + provided + + + io.cdap.cdap + cdap-etl-api-spark + ${cdap.version} + provided + + + io.cdap.cdap + cdap-etl-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-formats + ${cdap.version} + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + io.cdap.cdap + hydrator-test + ${cdap.version} + test + + + org.apache.spark + spark-streaming_2.11 + ${spark.version} + provided + + + antlr + antlr + 2.7.7 + compile + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + validate + validate + + checkstyle.xml + suppressions.xml + UTF-8 + true + true + true + + + check + + + + + + com.puppycrawl.tools + checkstyle + 6.19 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.7.0 + + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.14.1 + + -Xmx2048m -Djava.awt.headless=true -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC + -XX:OnOutOfMemoryError="kill -9 %p" -XX:+HeapDumpOnOutOfMemoryError + + false + + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + + + <_exportcontents> + io.cdap.plugin.*; + org.apache.activemq.*; + org.apache.spark.streaming.*; + com.google.common.base.*; + + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + io.cdap + cdap-maven-plugin + 1.1.0 + + + system:cdap-data-pipeline[6.2.3, 7.0.0) + system:cdap-data-streams[6.2.3, 7.0.0) + + + + + create-artifact-config + prepare-package + + create-plugin-json + + + + + + + + diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java b/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java new file mode 100644 index 0000000..b9872a8 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSConfig.java @@ -0,0 +1,172 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.common.ReferencePluginConfig; + +import javax.annotation.Nullable; + +/** + * Base config for JMS plugins. + */ +public class JMSConfig extends ReferencePluginConfig { + + public static final String NAME_CONNECTION_FACTORY = "connectionFactory"; + public static final String NAME_JMS_USERNAME = "jmsUsername"; + public static final String NAME_JMS_PASSWORD = "jmsPassword"; + public static final String NAME_PROVIDER_URL = "providerUrl"; + public static final String NAME_TYPE = "type"; + public static final String NAME_JNDI_CONTEXT_FACTORY = "jndiContextFactory"; + public static final String NAME_JNDI_USERNAME = "jndiUsername"; + public static final String NAME_JNDI_PASSWORD = "jndiPassword"; + + @Name(NAME_CONNECTION_FACTORY) + @Description("Name of the connection factory.") + @Nullable + @Macro + private String connectionFactory; // default: ConnectionFactory + + @Name(NAME_JMS_USERNAME) + @Description("Username to connect to JMS.") + @Macro + private String jmsUsername; + + @Name(NAME_JMS_PASSWORD) + @Description("Password to connect to JMS.") + @Macro + private String jmsPassword; + + @Name(NAME_PROVIDER_URL) + @Description("The URL of the JMS provider. For example, in case of an ActiveMQ Provider, the URL has the format " + + "tcp://hostname:61616.") + @Macro + private String providerUrl; + + @Name(NAME_TYPE) + @Description("Queue or Topic.") + @Nullable + @Macro + private String type; // default: queue + + @Name(NAME_JNDI_CONTEXT_FACTORY) + @Description("Name of the JNDI context factory. For example, in case of an ActiveMQ Provider, the JNDI Context " + + "Factory is: org.apache.activemq.jndi.ActiveMQInitialContextFactory.") + @Macro + private String jndiContextFactory; // default: org.apache.activemq.jndi.ActiveMQInitialContextFactory + + @Name(NAME_JNDI_USERNAME) + @Description("User name for the JNDI.") + @Nullable + @Macro + private String jndiUsername; + + @Name(NAME_JNDI_PASSWORD) + @Description("Password for the JNDI.") + @Nullable + @Macro + private String jndiPassword; + + public JMSConfig(String referenceName) { + super(referenceName); + this.connectionFactory = "ConnectionFactory"; + this.type = JMSDataStructures.QUEUE; + this.jndiContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; + + } + + @VisibleForTesting + public JMSConfig(String referenceName, String connectionFactory, String jmsUsername, String jmsPassword, + String providerUrl, String type, String jndiContextFactory, String jndiUsername, + String jndiPassword) { + super(referenceName); + this.connectionFactory = connectionFactory; + this.jmsUsername = jmsUsername; + this.jmsPassword = jmsPassword; + this.providerUrl = providerUrl; + this.type = type; + this.jndiContextFactory = jndiContextFactory; + this.jndiUsername = jndiUsername; + this.jndiPassword = jndiPassword; + + } + + public String getConnectionFactory() { + return connectionFactory; + } + + public String getJmsUsername() { + return jmsUsername; + } + + public String getJmsPassword() { + return jmsPassword; + } + + public String getProviderUrl() { + return providerUrl; + } + + public String getType() { + return type; + } + + public String getJndiContextFactory() { + return jndiContextFactory; + } + + public String getJndiUsername() { + return jndiUsername; + } + + public String getJndiPassword() { + return jndiPassword; + } + + public void validateParams(FailureCollector failureCollector) { + + if (Strings.isNullOrEmpty(jmsUsername) && !containsMacro(NAME_JMS_USERNAME)) { + failureCollector + .addFailure("JMS username must be provided.", "Please provide your JMS username.") + .withConfigProperty(NAME_JMS_USERNAME); + } + + if (Strings.isNullOrEmpty(jmsPassword) && !containsMacro(NAME_JMS_PASSWORD)) { + failureCollector + .addFailure("JMS password must be provided.", "Please provide your JMS password.") + .withConfigProperty(NAME_JMS_PASSWORD); + } + + if (Strings.isNullOrEmpty(jndiContextFactory) && !containsMacro(NAME_JNDI_CONTEXT_FACTORY)) { + failureCollector + .addFailure("JNDI context factory must be provided.", "Please provide your JNDI" + + " context factory.") + .withConfigProperty(NAME_JNDI_CONTEXT_FACTORY); + } + + if (Strings.isNullOrEmpty(providerUrl) && !containsMacro(NAME_PROVIDER_URL)) { + failureCollector + .addFailure("Provider URL must be provided.", "Please provide your provider URL.") + .withConfigProperty(NAME_PROVIDER_URL); + } + } +} diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java b/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java new file mode 100644 index 0000000..ca935b7 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSConnection.java @@ -0,0 +1,343 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +import com.google.common.base.Strings; +import io.cdap.plugin.jms.sink.JMSBatchSinkConfig; +import io.cdap.plugin.jms.source.JMSStreamingSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +/** + * A facade class that encapsulates the necessary functionality to: get the initial context, resolve the connection + * factory, establish connection to JMS, create a session, resolve the destination by a queue or topic name, create + * producer, create consumer, set message listener to a consumer, and start connection. This class handles exceptions + * for all the functionalities provided. + */ +public class JMSConnection { + + private static final Logger LOG = LoggerFactory.getLogger(JMSConnection.class); + private final JMSConfig config; + + public JMSConnection(JMSConfig config) { + this.config = config; + } + + /** + * Gets the initial context by offering 'jndiContextFactory', 'providerUrl', 'topic'/'queue' name, `jndiUsername`, + * and `jndiPassword` config properties. + * + * @return the {@link InitialContext} from the given properties + */ + public Context getContext() { + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, config.getJndiContextFactory()); + properties.put(Context.PROVIDER_URL, config.getProviderUrl()); + + if (config instanceof JMSBatchSinkConfig) { + String destinationName = ((JMSBatchSinkConfig) config).getDestinationName(); + if (config.getType().equals(JMSDataStructures.TOPIC)) { + properties.put(String.format("topic.%s", destinationName), destinationName); + } else { + properties.put(String.format("queue.%s", destinationName), destinationName); + } + } else { + String sourceName = ((JMSStreamingSourceConfig) config).getSourceName(); + if (config.getType().equals(JMSDataStructures.TOPIC)) { + properties.put(String.format("topic.%s", sourceName), sourceName); + } else { + properties.put(String.format("queue.%s", sourceName), sourceName); + } + } + + if (!(Strings.isNullOrEmpty(config.getJndiUsername()) && Strings.isNullOrEmpty(config.getJndiPassword()))) { + properties.put(Context.SECURITY_PRINCIPAL, config.getJndiUsername()); + properties.put(Context.SECURITY_CREDENTIALS, config.getJndiPassword()); + } + + try { + return new InitialContext(properties); + } catch (NamingException e) { + throw new RuntimeException("Failed to create initial context for provider URL " + config.getProviderUrl() + + " with principal " + config.getJndiUsername(), e); + } + } + + /** + * Gets a {@link ConnectionFactory} by offering the `connectionFactory` config property. + * + * @param context an initial context + * @return a connection factory + */ + public ConnectionFactory getConnectionFactory(Context context) { + try { + return (ConnectionFactory) context.lookup(config.getConnectionFactory()); + } catch (NamingException e) { + throw new RuntimeException(String.format("Failed to resolve the connection factory for %s.", + config.getConnectionFactory()), e); + } + } + + /** + * Creates a {@link Connection} by offering `jmsUsername` and `jmsPassword`. If a source {@link Topic} is to be + * consumed set `clientId` which is needed by the JMS broker to identify the durable subscriber. + * + * @param connectionFactory a given connection factory + * @return a connection to the JMS broker + */ + public Connection createConnection(ConnectionFactory connectionFactory) { + Connection connection = null; + try { + connection = connectionFactory.createConnection(config.getJmsUsername(), config.getJmsPassword()); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + // If subscribing to a source topic, create a durable subscriber + if (config.getType().equals(JMSDataStructures.TOPIC)) { + try { + if (config instanceof JMSStreamingSourceConfig) { + String clientId = "client-id-" + ((JMSStreamingSourceConfig) config).getSourceName(); + connection.setClientID(clientId); + } + } catch (JMSException e) { + throw new RuntimeException("Cannot set Client Id", e); + } + } + return connection; + } + + /** + * Starts connection of this client to the JMS broker. + * + * @param connection a given connection + */ + public void startConnection(Connection connection) { + try { + connection.start(); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Stops connection of this client from the JMS broker. + * + * @param connection a given connection + */ + public void stopConnection(Connection connection) { + try { + connection.stop(); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Closes connection of this client from the JMS broker. + * + * @param connection a given connection + */ + public void closeConnection(Connection connection) { + try { + connection.close(); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Creates a {@link Session} between this client and the JMS broker. + * + * @param connection a given session + * @return a session to the JMS broker + */ + public Session createSession(Connection connection) { + try { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Closes the {@link Session} between this client and the JMS broker. + * + * @param session a given session + */ + public void closeSession(Session session) { + try { + session.close(); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Gets the source {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the + * parent class of {@link Topic} and {@link Queue}. + * + * @param context a given context + * @return a source topic/queue that this client is about to consume messages from + */ + public Destination getSource(Context context) { + String sourceName = ((JMSStreamingSourceConfig) config).getSourceName(); + if (config.getType().equals(JMSDataStructures.TOPIC)) { + try { + return (Topic) context.lookup(sourceName); + } catch (NamingException e) { + throw new RuntimeException("Failed to resolve the topic " + sourceName, e); + } + } else { + try { + return (Queue) context.lookup(sourceName); + } catch (NamingException e) { + throw new RuntimeException("Failed to resolve the queue " + sourceName, e); + } + } + } + + /** + * Gets a sink {@link Topic}/{@link Queue} depending on the `type` config parameter. {@link Destination} is the + * parent class of {@link Topic} and {@link Queue}. If no sink topic/queue name is provided, a sink topic/queue is + * automatically created. + * + * @param context a given context + * @param session a given session needed to create the topic/queue in case it does not exist + * @return a sink topic/queue this client is about to produce messages to + */ + public Destination getSink(Context context, Session session) { + String destinationName = ((JMSBatchSinkConfig) config).getDestinationName(); + + if (config.getType().equals(JMSDataStructures.TOPIC)) { + try { + return (Topic) context.lookup(destinationName); + } catch (NamingException e) { + LOG.warn("Failed to resolve queue " + destinationName, e); + return createSinkTopic(session); + } + } else { + try { + return (Queue) context.lookup(destinationName); + } catch (NamingException e) { + LOG.warn("Failed to resolve queue " + destinationName, e); + return createSinkQueue(session); + } + } + } + + /** + * Creates a sink {@link Topic} + * + * @param session a given session + * @return a created topic + */ + private Destination createSinkTopic(Session session) { + String destinationName = ((JMSBatchSinkConfig) config).getDestinationName(); + LOG.info("Creating topic " + destinationName); + try { + return session.createTopic(destinationName); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Creates a sink {@link Queue} + * + * @param session a given session + * @return a created queue + */ + private Destination createSinkQueue(Session session) { + String destinationName = ((JMSBatchSinkConfig) config).getDestinationName(); + LOG.info("Creating queue " + destinationName); + try { + return session.createQueue(destinationName); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Creates a {@link MessageConsumer} that consumes messages from a defined source {@link Topic}/{@link Queue}. In case + * of a topic-consumer, a durable subscriber is created. A durable subscriber makes the JMS broker keep the state of + * the offset consumed. Hence this client can restart consuming messages from the last offset not read. + * + * @param session a given session + * @param destination a source topic/queue this client is about to consume messages from + * @return a created message consumer + */ + public MessageConsumer createConsumer(Session session, Destination destination) { + MessageConsumer messageConsumer = null; + try { + if (destination instanceof Topic) { + String clientId = "subscriber-id-" + ((JMSStreamingSourceConfig) config).getSourceName(); + messageConsumer = session.createDurableSubscriber((Topic) destination, clientId); + } else { + messageConsumer = session.createConsumer(destination); + } + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return messageConsumer; + } + + /** + * Creates a {@link MessageProducer} that produces messages to a defined sink {@link Topic}/{@link Queue}. + * + * @param session a given session + * @param destination a sink topic/queue this client is about to produce messages to + * @return a created message producer + */ + public MessageProducer createProducer(Session session, Destination destination) { + try { + return session.createProducer(destination); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + /** + * Sets a {@link MessageListener} to the {@link MessageConsumer}. This message listener has a `onMessage()` method + * that gets automatically triggered in case a new message is produced to the queue/topic while the pipeline is in + * the RUNNING state. + * + * @param messageListener a given message listener + * @param messageConsumer a given message consumer + */ + public void setMessageListener(MessageListener messageListener, MessageConsumer messageConsumer) { + try { + messageConsumer.setMessageListener(messageListener); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } +} diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java b/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java new file mode 100644 index 0000000..4990af2 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSDataStructures.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +/** + * A class that specifies the JMS data structures types. + */ +public class JMSDataStructures { + public static final String QUEUE = "Queue"; + public static final String TOPIC = "Topic"; +} diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java new file mode 100644 index 0000000..d389ada --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageMetadata.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +import io.cdap.cdap.api.data.schema.Schema; + +import java.util.Arrays; +import java.util.List; + +/** + * A class that specifies JMS message metadata fields. + */ +public class JMSMessageMetadata { + public static final String MESSAGE_ID = "messageId"; + public static final String MESSAGE_TIMESTAMP = "messageTimestamp"; + public static final String CORRELATION_ID = "correlationId"; + public static final String REPLY_TO = "replyTo"; + public static final String DESTINATION = "destination"; + public static final String DELIVERY_MODE = "deliveryNode"; + public static final String REDELIVERED = "redelivered"; + public static final String TYPE = "type"; + public static final String EXPIRATION = "expiration"; + public static final String PRIORITY = "priority"; + + public static Schema.Field getMessageMetadata() { + return Schema.Field.of(JMSMessageParts.METADATA, Schema.recordOf(JMSMessageParts.METADATA, Arrays.asList( + Schema.Field.of(MESSAGE_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(MESSAGE_TIMESTAMP, Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of(CORRELATION_ID, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(REPLY_TO, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(DESTINATION, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(DELIVERY_MODE, Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of(REDELIVERED, Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))), + Schema.Field.of(TYPE, Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of(EXPIRATION, Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of(PRIORITY, Schema.nullableOf(Schema.of(Schema.Type.INT)))))); + } + + public static List getJMSMessageHeaderNames() { + return Arrays.asList(MESSAGE_ID, MESSAGE_TIMESTAMP, CORRELATION_ID, REPLY_TO, DESTINATION, DELIVERY_MODE, + REDELIVERED, TYPE, EXPIRATION, PRIORITY); + } +} + diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java new file mode 100644 index 0000000..97906a3 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageParts.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +import java.util.Arrays; +import java.util.List; + +/** + * A class that specifies JMS message parts. + */ +public class JMSMessageParts { + public static final String METADATA = "metadata"; + public static final String PAYLOAD = "payload"; + public static final String PROPERTIES = "properties"; + + public static List getJMSMessageParts() { + return Arrays.asList(METADATA, PAYLOAD, PROPERTIES); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java new file mode 100644 index 0000000..ce04678 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageProperties.java @@ -0,0 +1,125 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +import com.google.gson.Gson; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * A class that specifies JMS message properties fields. + */ +public class JMSMessageProperties { + + public static void populateProperties(Schema schema, StructuredRecord.Builder builder, Message message, + String messageType) throws JMSException { + + boolean isString = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.STRING); + boolean isRecord = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType().equals(Schema.Type.RECORD); + + if (isString) { + populatePropertiesOnStringSchema(builder, message); + } else if (isRecord) { + populatePropertiesOnRecordSchema(schema, builder, message, messageType); + } else { + throw new RuntimeException( + String.format("Failed to populate properties! Field %s can only support String or Record data types!", + JMSMessageParts.PROPERTIES) + ); + } + } + + + /** + * @param builder + * @param message + * @throws JMSException + */ + public static void populatePropertiesOnStringSchema(StructuredRecord.Builder builder, Message message) + throws JMSException { + HashMap properties = new HashMap<>(); + List listOfPropertyNames = Collections.list(message.getPropertyNames()); + + for (String propertyName : listOfPropertyNames) { + properties.put(propertyName, message.getObjectProperty(propertyName)); + } + + builder.set(JMSMessageParts.PROPERTIES, new Gson().toJson(properties)); + } + + + /** + * @param schema the entire schema of the record + * @param recordBuilder the record builder that we set the properties record into + * @param message the incoming JMS message + * @param messageType the incoming JMS message type + * @throws JMSException + */ + public static void populatePropertiesOnRecordSchema(Schema schema, StructuredRecord.Builder recordBuilder, + Message message, String messageType) throws JMSException { + Schema propertiesSchema = schema.getField(JMSMessageParts.PROPERTIES).getSchema(); + + StructuredRecord.Builder propertiesRecordBuilder = StructuredRecord.builder(propertiesSchema); + + for (Schema.Field field : propertiesSchema.getFields()) { + String name = field.getName(); + Schema.Type type = field.getSchema().getType(); + + if (!message.propertyExists(field.getName())) { + throw new RuntimeException( + String.format("Property \"%1$s\" does not exist in the incoming \"%2$s\" message! " + + "Make sure that you have specified a correct field name in the output schema that " + + "matches a property name in the incoming \"%2$s\" message.", field.getName(), messageType)); + } + + switch (type) { + case BOOLEAN: + propertiesRecordBuilder.set(name, message.getBooleanProperty(name)); + continue; + case BYTES: + propertiesRecordBuilder.set(name, message.getByteProperty(name)); + continue; + case INT: + propertiesRecordBuilder.set(name, message.getIntProperty(name)); +// short getShortProperty(String var1) throws JMSException; + continue; + case LONG: + propertiesRecordBuilder.set(name, message.getLongProperty(name)); + continue; + case FLOAT: + propertiesRecordBuilder.set(name, message.getFloatProperty(name)); + continue; + case DOUBLE: + propertiesRecordBuilder.set(name, message.getDoubleProperty(name)); + continue; + case STRING: + propertiesRecordBuilder.set(name, message.getStringProperty(name)); + continue; + default: + propertiesRecordBuilder.set(name, message.getObjectProperty(name)); + continue; + } + } + recordBuilder.set(JMSMessageParts.PROPERTIES, propertiesRecordBuilder.build()); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java b/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java new file mode 100644 index 0000000..e1d2524 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/common/JMSMessageType.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.common; + +/** + * A class that specifies JMS message types. + */ +public class JMSMessageType { + public static final String MESSAGE = "Message"; + public static final String TEXT = "Text"; + public static final String BYTES = "Bytes"; + public static final String MAP = "Map"; + public static final String OBJECT = "Object"; +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java new file mode 100644 index 0000000..88196fc --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java @@ -0,0 +1,82 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.ReferenceBatchSink; +import org.apache.hadoop.io.NullWritable; + +import java.io.IOException; +import java.util.stream.Collectors; + +/** + * A class that produces {@link StructuredRecord} to a JMS Queue or Topic. + */ +@Plugin(type = BatchSink.PLUGIN_TYPE) +@Name("JMS") +@Description("JMS sink to write events to JMS") +public class JMSBatchSink extends ReferenceBatchSink { + + private final JMSBatchSinkConfig config; + + public JMSBatchSink(JMSBatchSinkConfig config) { + super(config); + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + config.validateParams(pipelineConfigurer.getStageConfigurer().getFailureCollector()); + pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException(); + } + + @Override + public void prepareRun(BatchSinkContext context) throws Exception { + LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName); + Schema schema = context.getInputSchema(); + + if (schema != null) { + lineageRecorder.createExternalDataset(schema); + if (schema.getFields() != null && !schema.getFields().isEmpty()) { + lineageRecorder.recordWrite("Write", "Wrote to JMS topic.", + schema.getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.toList())); + } + } + + context.addOutput(Output.of(config.referenceName, new JMSOutputFormatProvider(config))); + } + + @Override + public void transform(StructuredRecord input, Emitter> emitter) + throws IOException { + emitter.emit(new KeyValue<>(null, input)); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java new file mode 100644 index 0000000..f933f69 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java @@ -0,0 +1,114 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.jms.common.JMSConfig; +import io.cdap.plugin.jms.common.JMSMessageType; + +import java.io.IOException; +import java.io.Serializable; +import javax.annotation.Nullable; + +/** + * Holds configuration required for configuring {@link io.cdap.plugin.jms.source.JMSSourceUtils;} and + * {@link JMSBatchSink}. + */ +public class JMSBatchSinkConfig extends JMSConfig implements Serializable { + + // Params + public static final String NAME_DESTINATION = "destinationName"; + public static final String NAME_MESSAGE_TYPE = "messageType"; + public static final String NAME_OUTPUT_SCHEMA = "schema"; + + public static final String DESC_DESTINATION = "Name of the destination Queue/Topic. If the given Queue/Topic name" + + "is not resolved, a new Queue/Topic with the given name will get created."; + public static final String DESC_MESSAGE_TYPE = "Supports the following message types: Message, Text, Bytes, Map."; + public static final String DESC_OUTPUT_SCHEMA = "Output schema."; + + @Name(NAME_DESTINATION) + @Description(DESC_DESTINATION) + @Macro + private String destinationName; + + @Name(NAME_MESSAGE_TYPE) + @Description(DESC_MESSAGE_TYPE) + @Nullable + @Macro + private String messageType; // default: Text + + @Name(NAME_OUTPUT_SCHEMA) + @Description(DESC_OUTPUT_SCHEMA) + @Nullable + @Macro + private String schema; + + public JMSBatchSinkConfig() { + super(""); + this.messageType = Strings.isNullOrEmpty(messageType) ? JMSMessageType.TEXT : messageType; + } + + public JMSBatchSinkConfig(String referenceName, String connectionFactory, String jmsUsername, + String jmsPassword, String providerUrl, String type, String jndiContextFactory, + String jndiUsername, String jndiPassword, String messageType, String destinationName) { + super(referenceName, connectionFactory, jmsUsername, jmsPassword, providerUrl, type, jndiContextFactory, + jndiUsername, jndiPassword); + this.destinationName = destinationName; + this.messageType = messageType; + } + + public String getDestinationName() { + return destinationName; + } + + public void validateParams(FailureCollector failureCollector) { + this.validateParams(failureCollector); + + if (Strings.isNullOrEmpty(destinationName) && !containsMacro(NAME_DESTINATION)) { + failureCollector + .addFailure("The destination topic/queue name must be provided!", "Provide your topic/queue name.") + .withConfigProperty(NAME_DESTINATION); + } + } + + public String getMessageType() { + if (!Strings.isNullOrEmpty(NAME_MESSAGE_TYPE) && !containsMacro(NAME_MESSAGE_TYPE)) { + return messageType; + } + return JMSMessageType.TEXT; + } + + /** + * @return {@link io.cdap.cdap.api.data.schema.Schema} of the dataset if one was given + * @throws IllegalArgumentException if the schema is not a valid JSON + */ + public Schema getSchema() { + if (!Strings.isNullOrEmpty(schema)) { + try { + return Schema.parseJson(schema); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Invalid schema : %s", e.getMessage()), e); + } + } + return null; + } +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java new file mode 100644 index 0000000..508ade1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkUtils.java @@ -0,0 +1,237 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.format.StructuredRecordStringConverter; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Utils for {@link JMSBatchSink} plugin + */ +public class JMSBatchSinkUtils { + + /** + * Converts an incoming {@link StructuredRecord} to a JMS {@link TextMessage} + * + * @param session the open session to the JMS broker + * @param record the incoming record + * @return a JMS text message + */ + public static TextMessage convertStructuredRecordToTextMessage(Session session, StructuredRecord record) { + TextMessage textMessage = null; + String payload = null; + + try { + payload = StructuredRecordStringConverter.toJsonString(record); + } catch (IOException e) { + throw new RuntimeException("Failed to convert record to json!", e); + } + + try { + textMessage = session.createTextMessage(); + textMessage.setText(payload); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return textMessage; + } + + /** + * Converts an incoming {@link StructuredRecord} to a JMS {@link MapMessage} + * + * @param outputSchema he output schema necessary to properly build the map message. + * @param session the open session to the JMS broker + * @param record the incoming record + * @return a JMS map message + */ + public static MapMessage convertStructuredRecordToMapMessage(Schema outputSchema, Session session, + StructuredRecord record) { + MapMessage mapMessage = null; + + if (outputSchema == null) { + throw new RuntimeException("Output schema is empty! Please provide the output schema."); + } + + try { + mapMessage = session.createMapMessage(); + for (Schema.Field field : outputSchema.getFields()) { + String fieldName = field.getName(); + Object value = record.get(fieldName); + + switch (field.getSchema().getType()) { + case INT: + mapMessage.setInt(fieldName, cast(value, Integer.class)); + break; + case LONG: + mapMessage.setLong(fieldName, cast(value, Long.class)); + break; + case DOUBLE: + mapMessage.setDouble(fieldName, cast(value, Double.class)); + break; + case FLOAT: + mapMessage.setFloat(fieldName, cast(value, Float.class)); + break; + case BOOLEAN: + mapMessage.setBoolean(fieldName, cast(value, Boolean.class)); + break; + case BYTES: + mapMessage.setBytes(fieldName, cast(value, byte[].class)); + break; + case ARRAY: + mapMessage.setObject(fieldName, value); + break; + case RECORD: + mapMessage.setObject(fieldName, value); + break; + case MAP: + mapMessage.setObject(fieldName, value); + break; + default: + mapMessage.setString(fieldName, cast(value, String.class)); + } + } + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return mapMessage; + } + + /** + * Converts an incoming {@link StructuredRecord} to a JMS {@link BytesMessage} + * @param session the open session to the JMS broker + * @param record the incoming record + * @return a JMS bytes message + */ + public static BytesMessage convertStructuredRecordToBytesMessage(Session session, StructuredRecord record) { + BytesMessage bytesMessage = null; + byte[] payload = null; + + try { + payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to convert record to json!", e); + } + + try { + bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(payload); + + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return bytesMessage; + } + + /** + * Converts an incoming {@link StructuredRecord} to a JMS {@link ObjectMessage} + * + * @param session the open session to the JMS broker + * @param record the incoming record + * @return a JMS object message + */ + public static ObjectMessage convertStructuredRecordToObjectMessage(Session session, StructuredRecord record) { + ObjectMessage objectMessage = null; + byte[] payload = null; + + try { + payload = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to convert record to json!", e); + } + + try { + objectMessage = session.createObjectMessage(); + objectMessage.setObject(payload); + + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return objectMessage; + } + + /** + * Converts an incoming {@link StructuredRecord} to a JMS {@link Message} + * + * @param session the open session to the JMS broker + * @param record the incoming record + * @return a JMS message + */ + public static Message convertStructuredRecordToMessage(Schema outputSchema, Session session, + StructuredRecord record) { + Message message = null; + + if (outputSchema == null) { + throw new RuntimeException("Output schema is empty! Please provide the output schema."); + } + + try { + message = session.createMessage(); + for (Schema.Field field : outputSchema.getFields()) { + String fieldName = field.getName(); + Object value = record.get(fieldName); + + switch (field.getSchema().getType()) { + case INT: + message.setIntProperty(fieldName, cast(value, Integer.class)); + break; + case LONG: + message.setLongProperty(fieldName, cast(value, Long.class)); + break; + case DOUBLE: + message.setDoubleProperty(fieldName, cast(value, Double.class)); + break; + case FLOAT: + message.setFloatProperty(fieldName, cast(value, Float.class)); + break; + case BOOLEAN: + message.setBooleanProperty(fieldName, cast(value, Boolean.class)); + break; + case ARRAY: + message.setObjectProperty(fieldName, value); + break; + case RECORD: + message.setObjectProperty(fieldName, value); + break; + case MAP: + message.setObjectProperty(fieldName, value); + break; + default: + message.setStringProperty(fieldName, cast(value, String.class)); + } + } + + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + return message; + } + + public static T cast(Object o, Class clazz) { + return o != null ? clazz.cast(o) : null; + } +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java new file mode 100644 index 0000000..a3b69b1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormat.java @@ -0,0 +1,71 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Output format to write to JMS. + */ +public class JMSOutputFormat extends OutputFormat { + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) { + return new JMSRecordWriter(context); + } + + @Override + public void checkOutputSpecs(JobContext jobContext) { + // no-op + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) { + return new OutputCommitter() { + @Override + public void setupJob(JobContext jobContext) { + // no-op + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + // no-op + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) { + // no-op + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) { + // no-op + } + }; + } +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java new file mode 100644 index 0000000..d433167 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSOutputFormatProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.data.batch.OutputFormatProvider; + +import java.util.Map; + +/** + * JMS Output format provider. + */ +public class JMSOutputFormatProvider implements OutputFormatProvider { + + public static final String PROPERTY_CONFIG_JSON = "cdap.jms.sink.config"; + private static final Gson GSON = new GsonBuilder().create(); + private final Map conf; + + public JMSOutputFormatProvider(JMSBatchSinkConfig config) { + this.conf = new ImmutableMap.Builder() + .put(PROPERTY_CONFIG_JSON, GSON.toJson(config)) + .build(); + } + + @Override + public String getOutputFormatClassName() { + return JMSOutputFormat.class.getName(); + } + + @Override + public Map getOutputFormatConfiguration() { + return conf; + } +} diff --git a/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java b/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java new file mode 100644 index 0000000..5342edf --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/sink/JMSRecordWriter.java @@ -0,0 +1,124 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.sink; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.jms.common.JMSConnection; +import io.cdap.plugin.jms.common.JMSMessageType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; + +/** + * Record writer to produce messages to a JMS Topic/Queue. + */ +public class JMSRecordWriter extends RecordWriter { + private static final Logger LOG = LoggerFactory.getLogger(JMSRecordWriter.class); + private static final Gson GSON = new GsonBuilder().create(); + + private final JMSBatchSinkConfig config; + private Connection connection; + private Session session; + private MessageProducer messageProducer; + private JMSConnection jmsConnection; + + public JMSRecordWriter(TaskAttemptContext context) { + Configuration config = context.getConfiguration(); + String configJson = config.get(JMSOutputFormatProvider.PROPERTY_CONFIG_JSON); + this.config = GSON.fromJson(configJson, JMSBatchSinkConfig.class); + this.jmsConnection = new JMSConnection(this.config); + establishConnection(); + } + + @Override + public void write(NullWritable key, StructuredRecord record) { + String messageType = config.getMessageType(); + Schema outputSchema = config.getSchema(); + + // when the output schema is not specified, the incoming record gets wrapped in a text payload + if (outputSchema == null) { + TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record); + produceMessage(textMessage); + } else { + switch (messageType) { + case JMSMessageType.TEXT: + TextMessage textMessage = JMSBatchSinkUtils.convertStructuredRecordToTextMessage(session, record); + produceMessage(textMessage); + break; + case JMSMessageType.MAP: + MapMessage mapMessage = JMSBatchSinkUtils.convertStructuredRecordToMapMessage(outputSchema, session, record); + produceMessage(mapMessage); + break; + case JMSMessageType.BYTES: + BytesMessage bytesMessage = JMSBatchSinkUtils.convertStructuredRecordToBytesMessage(session, record); + produceMessage(bytesMessage); + break; + case JMSMessageType.MESSAGE: + Message message = JMSBatchSinkUtils.convertStructuredRecordToMessage(outputSchema, session, record); + produceMessage(message); + break; + case JMSMessageType.OBJECT: + ObjectMessage objectMessage = JMSBatchSinkUtils.convertStructuredRecordToObjectMessage(session, record); + produceMessage(objectMessage); + break; + } + } + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) { + this.jmsConnection.stopConnection(this.connection); + this.jmsConnection.closeSession(this.session); + this.jmsConnection.closeConnection(this.connection); + } + + private void produceMessage(Message message) { + try { + messageProducer.send(message); + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + + private void establishConnection() { + Context context = jmsConnection.getContext(); + ConnectionFactory factory = jmsConnection.getConnectionFactory(context); + connection = jmsConnection.createConnection(factory); + session = jmsConnection.createSession(connection); + Destination destination = jmsConnection.getSink(context, session); + messageProducer = jmsConnection.createProducer(session, destination); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java b/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java new file mode 100644 index 0000000..74274fa --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/source/JMSReceiver.java @@ -0,0 +1,104 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.jms.common.JMSConnection; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.Context; + +/** + * This class creates a customized message Receiver and implements the MessageListener interface. + */ +public class JMSReceiver extends Receiver implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(JMSReceiver.class); + private JMSStreamingSourceConfig config; + private Connection connection; + private StorageLevel storageLevel; + private Session session; + private JMSConnection jmsConnection; + + public JMSReceiver(StorageLevel storageLevel, JMSStreamingSourceConfig config) { + super(storageLevel); + this.storageLevel = storageLevel; + this.config = config; + } + + @Override + public void onStart() { + this.jmsConnection = new JMSConnection(config); + Context context = jmsConnection.getContext(); + ConnectionFactory factory = jmsConnection.getConnectionFactory(context); + connection = jmsConnection.createConnection(factory); + + session = jmsConnection.createSession(connection); + Destination destination = jmsConnection.getSource(context); + MessageConsumer messageConsumer = jmsConnection.createConsumer(session, destination); + jmsConnection.setMessageListener(this, messageConsumer); + jmsConnection.startConnection(connection); + + // fetch the entire queue events + if (destination instanceof Queue) { + fetchEntireQueue(messageConsumer); + } + } + + @Override + public void onStop() { + this.jmsConnection.stopConnection(this.connection); + this.jmsConnection.closeSession(this.session); + this.jmsConnection.closeConnection(this.connection); + } + + @Override + public void onMessage(Message message) { + try { + store(JMSSourceUtils.convertMessage(message, this.config)); + } catch (Exception e) { + LOG.error("Message couldn't get stored in the Spark memory.", e); + throw new RuntimeException(e); + } + } + + private void fetchEntireQueue(MessageConsumer messageConsumer) { + while (true) { + try { + Message message = messageConsumer.receive(5000); + if (message != null) { + store(JMSSourceUtils.convertMessage(message, this.config)); + } else { + break; + } + } catch (JMSException e) { + throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage())); + } + } + } +} diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java b/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java new file mode 100644 index 0000000..b58b236 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java @@ -0,0 +1,332 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.streaming.StreamingContext; +import io.cdap.plugin.jms.common.JMSMessageMetadata; +import io.cdap.plugin.jms.common.JMSMessageParts; +import io.cdap.plugin.jms.common.JMSMessageProperties; +import io.cdap.plugin.jms.common.JMSMessageType; +import org.apache.commons.lang.SerializationUtils; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.receiver.Receiver; + +import java.io.ByteArrayOutputStream; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; + +/** + * An utils class that encapsulates the necessary functionality to convert different types of JMS messages to + * StructuredRecords, and creates an input stream with our customized implementation of receiver {@link JMSReceiver}. + */ +public class JMSSourceUtils { + + static JavaDStream getJavaDStream(StreamingContext context, + JMSStreamingSourceConfig config) { + Receiver jmsReceiver = new JMSReceiver(StorageLevel.MEMORY_AND_DISK_SER_2(), config); + return context.getSparkStreamingContext().receiverStream(jmsReceiver); + } + + /** + * Creates a {@link StructuredRecord} from a JMS message. + * + * @param message the incoming JMS message + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS message fields + * @throws JMSException in case the method fails to read fields from the JMS message + * @throws IllegalArgumentException in case the user provides a non-supported message type + */ + public static StructuredRecord convertMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException, IllegalArgumentException { + String messageType = null; + if (!Strings.isNullOrEmpty(config.getMessageType())) { + messageType = config.getMessageType(); + } else { + throw new RuntimeException("Message type should not be null."); + } + + if (message instanceof BytesMessage && messageType.equals(JMSMessageType.BYTES)) { + return convertJMSByteMessage(message, config); + } + if (message instanceof MapMessage && messageType.equals(JMSMessageType.MAP)) { + return convertJMSMapMessage(message, config); + } + if (message instanceof ObjectMessage && messageType.equals(JMSMessageType.OBJECT)) { + return convertJMSObjectMessage(message, config); + } + if (message instanceof Message && messageType.equals(JMSMessageType.MESSAGE)) { + return convertJMSMessage(message, config); + } + if (message instanceof TextMessage && messageType.equals(JMSMessageType.TEXT)) { + return convertJMSTextMessage(message, config); + } else { + throw new IllegalArgumentException("Message type should be one of Message, Text, Bytes, Map, or Object"); + } + } + + /** + * Creates a {@link StructuredRecord} from a JMS {@link TextMessage}. + * + * @param message the incoming JMS {@link TextMessage} + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS {@link TextMessage} fields + * @throws JMSException in case the method fails to read fields from the JMS message + */ + public static StructuredRecord convertJMSTextMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException { + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(config.getSchema()); + + if (config.getMessageMetadata()) { + populateMetadata(config.getSchema(), recordBuilder, message); + } + if (config.getMessageProperties()) { + JMSMessageProperties.populateProperties(config.getSchema(), recordBuilder, message, config.getMessageType()); + } + + recordBuilder.set(JMSMessageParts.PAYLOAD, ((TextMessage) message).getText()); + return recordBuilder.build(); + } + + /** + * Creates a {@link StructuredRecord} from a JMS {@link MapMessage}. + * + * @param message the incoming JMS {@link MapMessage} + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS {@link MapMessage} fields + * @throws JMSException in case the method fails to read fields from the JMS message + */ + public static StructuredRecord convertJMSMapMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException { + Schema schema = config.getSchema(); + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); + + Schema payloadSchema = schema.getField(JMSMessageParts.PAYLOAD).getSchema(); + StructuredRecord.Builder payloadRecordBuilder = StructuredRecord.builder(payloadSchema); + + if (config.getMessageMetadata()) { + populateMetadata(schema, recordBuilder, message); + } + if (config.getMessageProperties()) { + JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE); + } + + if (payloadSchema.getType().equals(Schema.Type.STRING)) { + Map payload = new HashMap<>(); + Enumeration names = ((MapMessage) message).getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + payload.put(key, ((MapMessage) message).getObject(key)); + } + payloadRecordBuilder.set(JMSMessageParts.PAYLOAD, new Gson().toJson(payload)); + } else { + for (Schema.Field field : config.getDataFields(config.getSchema(), null)) { + Schema.Type type = field.getSchema().getType(); + String name = field.getName(); + + if (type.equals(Schema.Type.UNION)) { + type = field.getSchema().getUnionSchema(0).getType(); + } + + switch (type) { + case BOOLEAN: + payloadRecordBuilder.set(name, ((MapMessage) message).getBoolean(name)); + break; + case INT: + payloadRecordBuilder.set(name, ((MapMessage) message).getInt(name)); + break; + case LONG: + payloadRecordBuilder.set(name, ((MapMessage) message).getLong(name)); + break; + case FLOAT: + payloadRecordBuilder.set(name, ((MapMessage) message).getFloat(name)); + break; + case DOUBLE: + payloadRecordBuilder.set(name, ((MapMessage) message).getDouble(name)); + break; + case BYTES: + payloadRecordBuilder.set(name, ((MapMessage) message).getByte(name)); + break; + case STRING: + payloadRecordBuilder.set(name, ((MapMessage) message).getString(name)); + break; + case ARRAY: // byte array only + Schema.Type itemType = field.getSchema().getComponentSchema().getType(); + if (itemType.equals(Schema.Type.UNION)) { + itemType = field.getSchema().getComponentSchema().getUnionSchema(0).getType(); + } + if (itemType.equals(Schema.Type.BYTES)) { + payloadRecordBuilder.set(name, ((MapMessage) message).getBytes(name)); + } + break; + } + } + } + recordBuilder.set(JMSMessageParts.PAYLOAD, payloadRecordBuilder.build()); + return recordBuilder.build(); + } + + /** + * Creates a {@link StructuredRecord} from a JMS {@link BytesMessage}. + * + * @param message the incoming JMS {@link BytesMessage} + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS {@link BytesMessage} fields + * @throws JMSException in case the method fails to read fields from the JMS message + */ + public static StructuredRecord convertJMSByteMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException { + Schema schema = config.getSchema(); + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); + + if (config.getMessageMetadata()) { + populateMetadata(config.getSchema(), recordBuilder, message); + } + if (config.getMessageProperties()) { + JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE); + } + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[8096]; + int currentByte; + while ((currentByte = ((BytesMessage) message).readBytes(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, currentByte); + } + recordBuilder.set(JMSMessageParts.PAYLOAD, byteArrayOutputStream.toByteArray()); + return recordBuilder.build(); + } + + /** + * Creates a {@link StructuredRecord} from a JMS {@link ObjectMessage}. + * + * @param message the incoming JMS {@link ObjectMessage} + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS {@link ObjectMessage} fields + * @throws JMSException in case the method fails to read fields from the JMS message + */ + public static StructuredRecord convertJMSObjectMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException { + Schema schema = config.getSchema(); + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); + + if (config.getMessageMetadata()) { + populateMetadata(config.getSchema(), recordBuilder, message); + } + if (config.getMessageProperties()) { + JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE); + } + + byte[] payload = SerializationUtils.serialize(((ObjectMessage) message).getObject()); + recordBuilder.set(JMSMessageParts.PAYLOAD, payload); + return recordBuilder.build(); + } + + /** + * Creates a {@link StructuredRecord} from a JMS {@link Message}. + * + * @param message the incoming JMS {@link Message} + * @param config the {@link JMSStreamingSourceConfig} with all user provided property values + * @return the {@link StructuredRecord} built out of the JMS {@link Message} fields + * @throws JMSException in case the method fails to read fields from the JMS message + */ + public static StructuredRecord convertJMSMessage(Message message, JMSStreamingSourceConfig config) + throws JMSException { + Schema schema = config.getSchema(); + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); + + if (config.getMessageMetadata()) { + populateMetadata(config.getSchema(), recordBuilder, message); + } + if (config.getMessageProperties()) { + JMSMessageProperties.populateProperties(schema, recordBuilder, message, JMSMessageType.MESSAGE); + } + return recordBuilder.build(); + } + + /** + * Gets header data fields from the JMS message and adds them to the passed record builder. + * + * @param schema the entire schema of the record + * @param recordBuilder the record builder that we set the metadata record into + * @param message the incoming JMS message + * @throws JMSException in case the method fails to read fields from the JMS message + */ + private static void populateMetadata(Schema schema, StructuredRecord.Builder recordBuilder, Message message) + throws JMSException { + Schema metadataSchema = schema.getField(JMSMessageParts.METADATA).getSchema(); + StructuredRecord.Builder metadataRecordBuilder = StructuredRecord.builder(metadataSchema); + + for (Schema.Field field : metadataSchema.getFields()) { + + switch (field.getName()) { + case JMSMessageMetadata.MESSAGE_ID: + metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_ID, message.getJMSMessageID()); + break; + + case JMSMessageMetadata.CORRELATION_ID: + metadataRecordBuilder.set(JMSMessageMetadata.CORRELATION_ID, message.getJMSCorrelationID()); + break; + + case JMSMessageMetadata.REPLY_TO: + //todo: handle NPE + metadataRecordBuilder.set(JMSMessageMetadata.REPLY_TO, message.getJMSReplyTo().toString()); + break; + + case JMSMessageMetadata.DESTINATION: + //todo: handle NPE + metadataRecordBuilder.set(JMSMessageMetadata.DESTINATION, message.getJMSDestination().toString()); + break; + + case JMSMessageMetadata.TYPE: + metadataRecordBuilder.set(JMSMessageMetadata.TYPE, message.getJMSType()); + break; + + case JMSMessageMetadata.MESSAGE_TIMESTAMP: + metadataRecordBuilder.set(JMSMessageMetadata.MESSAGE_TIMESTAMP, message.getJMSTimestamp()); + break; + + case JMSMessageMetadata.DELIVERY_MODE: + metadataRecordBuilder.set(JMSMessageMetadata.DELIVERY_MODE, message.getJMSDeliveryMode()); + break; + + case JMSMessageMetadata.REDELIVERED: + metadataRecordBuilder.set(JMSMessageMetadata.REDELIVERED, message.getJMSRedelivered()); + break; + + case JMSMessageMetadata.EXPIRATION: + metadataRecordBuilder.set(JMSMessageMetadata.EXPIRATION, message.getJMSExpiration()); + break; + + case JMSMessageMetadata.PRIORITY: + metadataRecordBuilder.set(JMSMessageMetadata.PRIORITY, message.getJMSPriority()); + break; + } + recordBuilder.set(JMSMessageParts.METADATA, metadataRecordBuilder.build()); + } + } +} diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java new file mode 100644 index 0000000..16c4d21 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.streaming.StreamingContext; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.api.streaming.StreamingSourceContext; +import io.cdap.plugin.common.LineageRecorder; +import org.apache.spark.streaming.api.java.JavaDStream; + +import java.util.stream.Collectors; + +/** + * This class JMSStreamingSource is a plugin that allows consuming messages from a specified JMS + * Queue/Topic and generate + * StructuredRecords out of them. + */ +@Plugin(type = StreamingSource.PLUGIN_TYPE) +@Name("JMS") +@Description("JMS (Java Messaging Service) Source") +public class JMSStreamingSource extends ReferenceStreamingSource { + + private JMSStreamingSourceConfig config; + + public JMSStreamingSource(JMSStreamingSourceConfig config) { + super(config); + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector()); + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + } + + @Override + public void prepareRun(StreamingSourceContext context) throws Exception { + Schema schema = config.getSchema(); + // record dataset lineage + context.registerLineage(config.referenceName, schema); + + if (schema.getFields() != null) { + LineageRecorder recorder = new LineageRecorder(context, config.referenceName); + recorder.recordRead("Read", "Read from jms", + schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList())); + } + } + + @Override + public JavaDStream getStream(StreamingContext context) throws Exception { + FailureCollector collector = context.getFailureCollector(); + config.validate(collector); + collector.getOrThrowException(); + return JMSSourceUtils.getJavaDStream(context, config); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java new file mode 100644 index 0000000..7258103 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java @@ -0,0 +1,230 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.jms.common.JMSConfig; +import io.cdap.plugin.jms.common.JMSMessageMetadata; +import io.cdap.plugin.jms.common.JMSMessageParts; +import io.cdap.plugin.jms.common.JMSMessageType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +/** + * Configs for {@link JMSStreamingSource} + */ +public class JMSStreamingSourceConfig extends JMSConfig implements Serializable { + // Params + public static final String NAME_SOURCE = "sourceName"; + public static final String NAME_SCHEMA = "schema"; + public static final String NAME_MESSAGE_METADATA = "messageMetadata"; + public static final String NAME_MESSAGE_PROPERTIES = "messageProperties"; + public static final String NAME_MESSAGE_TYPE = "messageType"; + + + @Name(NAME_SOURCE) + @Description("Name of the source Queue/Topic. The Queue/Topic with the given name, should exist in order to read " + + "messages from.") + @Macro + private String sourceName; + + @Name(NAME_MESSAGE_METADATA) + @Description("If true, message metadata is also consumed. Otherwise, it is not.") + @Nullable + @Macro + private String messageMetadata; + + @Name(NAME_MESSAGE_PROPERTIES) + @Description("If true, message properties are also consumed. Otherwise, they are not.") + @Nullable + @Macro + private String messageProperties; + + @Name(NAME_MESSAGE_TYPE) + @Description("Supports the following message types: Message, Text, Bytes, Map, Object.") + @Nullable + @Macro + private String messageType; // default: Text + + @Name(NAME_SCHEMA) + @Description("Specifies the schema of the records outputted from this plugin.") + @Macro + private String schema; + + public JMSStreamingSourceConfig() { + super(""); + this.messageMetadata = Strings.isNullOrEmpty(messageMetadata) ? "true" : messageMetadata; + this.messageType = Strings.isNullOrEmpty(messageType) ? JMSMessageType.TEXT : messageType; + } + + @VisibleForTesting + public JMSStreamingSourceConfig(String referenceName, String connectionFactory, String jmsUsername, + String jmsPassword, String providerUrl, String type, String jndiContextFactory, + String jndiUsername, String jndiPassword, String messageMetadata, + String messageProperties, String messageType, String sourceName, String schema) { + super(referenceName, connectionFactory, jmsUsername, jmsPassword, providerUrl, type, jndiContextFactory, + jndiUsername, jndiPassword); + this.sourceName = sourceName; + this.messageMetadata = messageMetadata; + this.messageProperties = messageProperties; + this.messageType = messageType; + this.schema = schema; + } + + public void validate(FailureCollector failureCollector) { + this.validateParams(failureCollector); + + if (Strings.isNullOrEmpty(messageType) && !containsMacro(NAME_MESSAGE_TYPE)) { + failureCollector + .addFailure("The source topic/queue name must be provided!", "Provide your topic/queue name.") + .withConfigProperty(NAME_MESSAGE_TYPE); + } + + if (Strings.isNullOrEmpty(sourceName) && !containsMacro(NAME_SOURCE)) { + failureCollector + .addFailure("The source topic/queue name must be provided!", "Provide your topic/queue name.") + .withConfigProperty(NAME_SOURCE); + } + + if (!containsMacro(NAME_SCHEMA)) { + + Schema schema = getSchema(); + + boolean otherFieldsExist = schema + .getFields() + .stream() + .anyMatch(field -> !JMSMessageParts.getJMSMessageParts().contains(field.getName())); + + if (otherFieldsExist) { + failureCollector + .addFailure("New fields detected in the output schema!", + String.format("Only \"%s\", \"%s\" and \"%s\" fields are allowed.", + JMSMessageParts.METADATA, JMSMessageParts.PAYLOAD, JMSMessageParts.PROPERTIES)) + .withConfigProperty(NAME_SCHEMA); + } + + switch (messageType) { + case JMSMessageType.TEXT: + if (!schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType().equals(Schema.Type.STRING)) { + failureCollector + .addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.PAYLOAD), + String.format("For JMS %s type of message, \"%s\" must be of String data type.", + messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA); + } + break; + + case JMSMessageType.OBJECT: + if (!schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType().equals(Schema.Type.ARRAY)) { + failureCollector + .addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.PAYLOAD), + String.format("For JMS %s type of message, \"%s\" must be of Array[Bytes] data type.", + messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA); + } + break; + + case JMSMessageType.BYTES: + case JMSMessageType.MAP: + case JMSMessageType.MESSAGE: + if (!Arrays.asList(Schema.Type.STRING, Schema.Type.RECORD) + .contains(schema.getField(JMSMessageParts.PAYLOAD).getSchema().getType())) { + failureCollector + .addFailure(String.format("Wrong data type for field \"%s\"", JMSMessageParts.PAYLOAD), + String.format("For JMS %s type of message, \"%s\" must be of String or Record data type.", + messageType, JMSMessageParts.PAYLOAD)).withConfigProperty(NAME_SCHEMA); + } + break; + } + } + } + + public String getSourceName() { + return sourceName; + } + + /** + * @return {@link io.cdap.cdap.api.data.schema.Schema} of the dataset if one was given + * @throws IllegalArgumentException if the schema is not a valid JSON + */ + public Schema getSchema() { + + if (!Strings.isNullOrEmpty(schema) && !containsMacro(schema)) { + try { + return Schema.parseJson(schema); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Invalid schema : %s.", e.getMessage()), e); + } + } + + List fields = new ArrayList<>(); + + if (getMessageMetadata()) { + fields.add(JMSMessageMetadata.getMessageMetadata()); + } + + if (getMessageProperties()) { + fields.add(Schema.Field.of(JMSMessageParts.PROPERTIES, Schema.of(Schema.Type.STRING))); + } + + switch (messageType) { + case JMSMessageType.OBJECT: + fields.add(Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.arrayOf(Schema.of(Schema.Type.BYTES)))); + return Schema.recordOf("record", fields); + case JMSMessageType.MAP: + case JMSMessageType.MESSAGE: + case JMSMessageType.TEXT: + case JMSMessageType.BYTES: + fields.add(Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING))); + return Schema.recordOf("record", fields); + default: + return Schema.recordOf("record", fields); + } + } + + @Nullable + public String getMessageType() { + return messageType; + } + + public boolean getMessageMetadata() { + return this.messageMetadata.equalsIgnoreCase("true"); + } + + public boolean getMessageProperties() { + return this.messageProperties.equalsIgnoreCase("true"); + } + + public List getDataFields(Schema schema, String skipFieldName) { + return schema + .getFields() + .stream() + .filter(field -> !JMSMessageMetadata.getJMSMessageHeaderNames().contains(field.getName())) + .filter(field -> !field.getName().equals(skipFieldName)) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java b/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java new file mode 100644 index 0000000..ddc7bf0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/jms/source/ReferenceStreamingSource.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source; + +import io.cdap.cdap.api.dataset.DatasetProperties; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.common.ReferencePluginConfig; + +/** + * Base streaming source that adds an External Dataset for a reference name, and performs a single getDataset() + * call to make sure CDAP records that it was accessed. + * + * @param type of object read by the source. + */ +public abstract class ReferenceStreamingSource extends StreamingSource { + private final ReferencePluginConfig conf; + + public ReferenceStreamingSource(ReferencePluginConfig conf) { + this.conf = conf; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { + super.configurePipeline(pipelineConfigurer); + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + IdUtils.validateReferenceName(conf.referenceName, collector); + collector.getOrThrowException(); + pipelineConfigurer.createDataset(conf.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY); + } +} diff --git a/src/test/java/JMSPluginTest.java b/src/test/java/JMSPluginTest.java new file mode 100644 index 0000000..5a0caef --- /dev/null +++ b/src/test/java/JMSPluginTest.java @@ -0,0 +1,75 @@ +///* +// * Copyright © 2021 Cask Data, Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); you may not +// * use this file except in compliance with the License. You may obtain a copy of +// * the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// * License for the specific language governing permissions and limitations under +// * the License. +// */ +// +//import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +//import io.cdap.plugin.jms.common.JMSConfig; +//import io.cdap.plugin.jms.source.JMSStreamingSourceConfig; +//import org.junit.Assert; +//import org.junit.Test; +//import org.mockito.internal.util.reflection.FieldSetter; +// +///** +// * Unit tests for the plugin +// */ +//public class JMSPluginTest { +// private JMSConfig getValidConfig(String fileSystemProperties) throws NoSuchFieldException { +// JMSConfig jmsConfig = new JMSStreamingSourceConfig(); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("connectionFactory"), "ConFactory"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsUsername"), "jms_username"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsPassword"), "jms_password"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("providerUrl"), "tcp://0.0.0.0:61616"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("type"), "Queue"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiContextFactory"), "ContextFactory"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiUsername"), "jndi-username"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiPassword"), "jndi-password"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("messageType"), "Text"); +// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("sourceName"), "test-topic"); +// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("keepMessageHeaders"), "false"); +// return jmsConfig; +// } +// +// private JMSConfig getInvalidConfig(String fileSystemProperties) throws NoSuchFieldException { +// JMSConfig jmsConfig = new JMSStreamingSourceConfig(); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("connectionFactory"), "ConFactory"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsUsername"), "jms_username"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jmsPassword"), "jms_password"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("providerUrl"), "tcp://0.0.0.0:61616"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("type"), "Queue"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiContextFactory"), "ContextFactory"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiUsername"), "jndi-username"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("jndiPassword"), "jndi-password"); +// FieldSetter.setField(jmsConfig, JMSConfig.class.getDeclaredField("messageType"), "Text"); +// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("sourceName"), null); +// FieldSetter.setField(jmsConfig, JMSStreamingSourceConfig.class.getDeclaredField("keepMessageHeaders"), "false"); +// return jmsConfig; +// } +// +// @Test +// public void testValidFSProperties() throws NoSuchFieldException { +// JMSConfig jmsConfig = getValidConfig("{\"key\":\"val\"}"); +// MockFailureCollector collector = new MockFailureCollector("jms_failure_collector"); +// ((JMSStreamingSourceConfig) jmsConfig).validateParams(collector); +// Assert.assertEquals(0, collector.getValidationFailures().size()); +// } +// +// @Test +// public void testInvalidFSProperties() throws NoSuchFieldException { +// JMSConfig jmsConfig = getInvalidConfig("{\"key\":\"val\"}"); +// MockFailureCollector collector = new MockFailureCollector("jms_failure_collector"); +// ((JMSStreamingSourceConfig) jmsConfig).validateParams(collector); +// Assert.assertEquals(1, collector.getValidationFailures().size()); +// } +//} diff --git a/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java b/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java new file mode 100644 index 0000000..2a7440c --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/JMSSourceUtilsTest.java @@ -0,0 +1,256 @@ +///* +// * Copyright © 2021 Cask Data, Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); you may not +// * use this file except in compliance with the License. You may obtain a copy of +// * the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// * License for the specific language governing permissions and limitations under +// * the License. +// */ +// +//package io.cdap.plugin.jms.source; +// +// +//import com.google.gson.Gson; +//import io.cdap.cdap.api.data.format.StructuredRecord; +//import io.cdap.cdap.api.data.schema.Schema; +//import io.cdap.plugin.jms.common.JMSMessageMetadata; +//import io.cdap.plugin.jms.common.JMSMessageType; +//import io.cdap.plugin.jms.source.common.DummyObject; +//import org.apache.commons.lang.SerializationUtils; +//import org.junit.Assert; +//import org.junit.Test; +// +//import java.nio.charset.StandardCharsets; +//import java.util.Arrays; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +//import java.util.Properties; +//import javax.jms.Destination; +//import javax.jms.JMSException; +//import javax.jms.MapMessage; +//import javax.jms.ObjectMessage; +//import javax.jms.TextMessage; +// +//import static org.mockito.Mockito.mock; +//import static org.mockito.Mockito.when; +// +//public class JMSSourceUtilsTest { +// +// +// @Test +// public void testSchema() throws JMSException { +// List headers = Arrays.asList( +// Schema.Field.of("header_1", Schema.of(Schema.Type.INT)), +// Schema.Field.of("header_2", Schema.of(Schema.Type.STRING)), +// Schema.Field.of("header_3", Schema.of(Schema.Type.DOUBLE)) +// ); +// List payload = Arrays.asList( +// Schema.Field.of("payload_1", Schema.of(Schema.Type.INT)), +// Schema.Field.of("payload_2", Schema.of(Schema.Type.STRING)), +// Schema.Field.of("payload_3", Schema.of(Schema.Type.DOUBLE)) +// ); +// List properties = Arrays.asList( +// Schema.Field.of("property_1", Schema.of(Schema.Type.INT)), +// Schema.Field.of("property_2", Schema.of(Schema.Type.STRING)), +// Schema.Field.of("property_3", Schema.of(Schema.Type.DOUBLE)) +// ); +// +// Schema s = Schema.recordOf("record", +// Schema.Field.of("headers", Schema.recordOf("headers", headers)), +// Schema.Field.of("payload", Schema.recordOf("payload", headers)), +// Schema.Field.of("properties", Schema.recordOf("properties", headers)) +// ); +// +// s.getField("payload").getSchema().getType(); +// } +// +// +// +// @Test +// public void testConvertMapMessageWithPayload() throws JMSException { +// /* +// * This tests if a JMS MapMessage is properly converted to a StructuredRecord given that the automatic schema is +// * kept. This schema does have a single field called "payload" that will include the entire JMS MapMessage +// * properties as a Json String. +// */ +// +// MapMessage mapMessage = mock(MapMessage.class); +// mockMapMessage(mapMessage); +// boolean keepMessageHeaders = false; +// String schema = null; +// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.MAP, schema); +// StructuredRecord actualRecord = JMSSourceUtils.convertJMSMapMessage(mapMessage, config); +// +// Schema expectedSchema = Schema.recordOf("record", Schema.Field.of("payload", Schema.of(Schema.Type.STRING))); +// +// Map expectedValues = new HashMap<>(); +// expectedValues.put("byteVal", (byte) 5); +// expectedValues.put("doubleVal", 1.0d); +// expectedValues.put("longVal", 1L); +// expectedValues.put("intVal", 1); +// expectedValues.put("stringVal", "Hello World!"); +// expectedValues.put("charVal", 'a'); +// expectedValues.put("floatVal", 1.0f); +// expectedValues.put("shortVal", 5); +// expectedValues.put("booleanVal", true); +// // expectedValues.put("bytesVal",); +// // expectedValues.put("objectVal", ); +// +// Gson gson = new Gson(); +// String json = gson.toJson(expectedValues); +// +// StructuredRecord expectedRecord = StructuredRecord.builder(expectedSchema) +// .set("payload", json) +// .build(); +// +// assertEqualsStructuredRecords(expectedRecord, actualRecord); +// } +// +// @Test +// public void testConvertMapMessageWithoutPayload() throws JMSException { +// /* +// * This tests if a JMS MapMessage is properly converted to a StructuredRecord given that the schema is +// * predefined. This means that there is no "payload" field in the schema, and for every property of JMS MapMessage +// * that we want to get included into the StructuredRecord, we define a field in the output schema. +// */ +// +// MapMessage mapMessage = mock(MapMessage.class); +// mockMapMessage(mapMessage); +// boolean keepMessageHeaders = false; +// +// // Schema is set manually. No "payload" field in the schema. +// List fields = Arrays.asList( +// Schema.Field.of("booleanVal", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN))), +// Schema.Field.of("byteVal", Schema.nullableOf(Schema.of(Schema.Type.BYTES))), +// Schema.Field.of("intVal", Schema.nullableOf(Schema.of(Schema.Type.INT))), +// Schema.Field.of("longVal", Schema.nullableOf(Schema.of(Schema.Type.LONG))), +// Schema.Field.of("floatVal", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))), +// Schema.Field.of("doubleVal", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))), +// Schema.Field.of("stringVal", Schema.nullableOf(Schema.of(Schema.Type.STRING))) +//// Schema.Field.of("bytesVal", Schema.arrayOf(Schema.nullableOf(Schema.of(Schema.Type.BYTES))) +// ); +// Schema schema = Schema.recordOf("record", fields); +// +// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.MAP, +// schema.toString()); +// StructuredRecord actual = JMSSourceUtils.convertJMSMapMessage(mapMessage, config); +// +// StructuredRecord expected = StructuredRecord.builder(schema) +// .set("booleanVal", true) +// .set("byteVal", (byte) 5) +// .set("intVal", 1) +// .set("longVal", 1L) +// .set("floatVal", 1.0f) +// .set("doubleVal", 1.0d) +// .set("stringVal", "Hello World!") +//// .set("bytesVal", "Hello World!".getBytes(StandardCharsets.UTF_8)) +// .build(); +// +// assertEqualsStructuredRecords(expected, actual); +// } +// +// @Test +// public void testConvertBytesMessage() throws JMSException { +// /* +// * This tests if the JMS TextMessage is properly converted to a StructuredRecord given that no headers are +// * included. +// */ +// +//// BytesMessage bytesMessage = mock(BytesMessage.class); +//// +//// when(bytesMessage.readBytes(new byte[8096])).thenReturn(1); +//// +//// boolean keepMessageHeaders = false; +//// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(keepMessageHeaders, null); +//// StructuredRecord actual = JMSSourceUtils.convertJMSByteMessage(bytesMessage, config); +//// +//// StructuredRecord expected = StructuredRecord +//// .builder(Schema.recordOf("record", Schema.Field.of("payload", Schema.of(Schema.Type.STRING)))) +//// .set("payload", "Hello World!").build(); +//// +//// Assert.assertEquals(expected, actual); +// } +// +// @Test +// public void testConvertObjectMessage() throws JMSException { +// /* +// * This tests if the JMS BytesMessage is properly converted to a StructuredRecord given that no headers are +// * included. +// */ +// +// ObjectMessage objectMessage = mock(ObjectMessage.class); +// DummyObject dummyObj = new DummyObject("Hello", 123); +// +// when(objectMessage.getObject()).thenReturn(dummyObj); +// boolean keepMessageHeaders = false; +// +// JMSStreamingSourceConfig config = getJMSStreamingSourceConfig(false, false, JMSMessageType.OBJECT, null); +// StructuredRecord actual = JMSSourceUtils.convertJMSObjectMessage(objectMessage, config); +// +// StructuredRecord expected = StructuredRecord +// .builder(Schema.recordOf("record", Schema.Field.of("payload", Schema.arrayOf(Schema.of(Schema.Type.BYTES))))) +// .set("payload", SerializationUtils.serialize(dummyObj)).build(); +// +// assertEqualsStructuredRecords(expected, actual); +// } +// +// @Test +// public void testConvertMessage() { +// +// } +// +// +// +// +// +// +// +// private void mockMapMessage(MapMessage mapMessage) throws JMSException { +// Properties prop = new Properties(); +// prop.put("booleanVal", true); +// prop.put("byteVal", (byte) 5); +// prop.put("shortVal", (short) 5); +// prop.put("charVal", 'a'); +// prop.put("intVal", 1); +// prop.put("longVal", 1L); +// prop.put("floatVal", 1.0f); +// prop.put("doubleVal", 1.0d); +// prop.put("stringVal", "Hello World!"); +//// prop.put("bytesVal", "Hello World!".getBytes(StandardCharsets.UTF_8)); +//// prop.put("objectVal", new Object()); +// +// when(mapMessage.getObject("booleanVal")).thenReturn(true); +// when(mapMessage.getObject("byteVal")).thenReturn((byte) 5); +// when(mapMessage.getObject("shortVal")).thenReturn((short) 5); +// when(mapMessage.getObject("charVal")).thenReturn('a'); +// when(mapMessage.getObject("intVal")).thenReturn(1); +// when(mapMessage.getObject("longVal")).thenReturn(1L); +// when(mapMessage.getObject("floatVal")).thenReturn(1.0f); +// when(mapMessage.getObject("doubleVal")).thenReturn(1.0d); +// when(mapMessage.getObject("stringVal")).thenReturn("Hello World!"); +//// when(mapMessage.getObject("bytesVal")).thenReturn("Hello World!".getBytes(StandardCharsets.UTF_8)); +//// when(mapMessage.getObject("objectVal")).thenReturn(new Object()); +// when(mapMessage.getMapNames()).thenReturn(prop.propertyNames()); +// +// +// when(mapMessage.getBoolean("booleanVal")).thenReturn(true); +// when(mapMessage.getByte("byteVal")).thenReturn((byte) 5); +// when(mapMessage.getShort("shortVal")).thenReturn((short) 5); +// when(mapMessage.getChar("charVal")).thenReturn('a'); +// when(mapMessage.getInt("intVal")).thenReturn(1); +// when(mapMessage.getLong("longVal")).thenReturn(1L); +// when(mapMessage.getFloat("floatVal")).thenReturn(1.0f); +// when(mapMessage.getDouble("doubleVal")).thenReturn(1.0d); +// when(mapMessage.getString("stringVal")).thenReturn("Hello World!"); +//// when(mapMessage.getBytes("bytesVal")).thenReturn("Hello World!".getBytes(StandardCharsets.UTF_8)); +//// when(mapMessage.getObject("objectVal")).thenReturn(new Object()); +// } +//} diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java new file mode 100644 index 0000000..ef68fbd --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/SourceByteMessageTest.java @@ -0,0 +1,4 @@ +package io.cdap.plugin.jms.source; + +public class SourceByteMessageTest { +} diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java new file mode 100644 index 0000000..569a1e6 --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/SourceMapMessageTest.java @@ -0,0 +1,4 @@ +package io.cdap.plugin.jms.source; + +public class SourceMapMessageTest { +} diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java new file mode 100644 index 0000000..1ef6dd3 --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/SourceMessageTest.java @@ -0,0 +1,4 @@ +package io.cdap.plugin.jms.source; + +public class SourceMessageTest { +} diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java new file mode 100644 index 0000000..f3640db --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/SourceObjectMessageTest.java @@ -0,0 +1,4 @@ +package io.cdap.plugin.jms.source; + +public class SourceObjectMessageTest { +} diff --git a/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java b/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java new file mode 100644 index 0000000..c9245d6 --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/SourceTextMessageTest.java @@ -0,0 +1,124 @@ +package io.cdap.plugin.jms.source; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.jms.common.JMSMessageMetadata; +import io.cdap.plugin.jms.common.JMSMessageParts; +import io.cdap.plugin.jms.common.JMSMessageType; +import io.cdap.plugin.jms.source.common.TestUtils; +import org.junit.Test; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SourceTextMessageTest { + + @Test + public void convertJMSTextMessage_withNoMetadataNoPropertiesNoSchema_successfully() throws JMSException { + String payloadContent = "Hello World!"; + + TextMessage textMessage = mock(TextMessage.class); + when(textMessage.getText()).thenReturn(payloadContent); + + boolean keepMetadata = false; + boolean keepProperties = false; + String schema = null; + JMSStreamingSourceConfig config = TestUtils + .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema); + + // expected + Schema expectedSchema = Schema + .recordOf("record", Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING))); + StructuredRecord expectedRecord = StructuredRecord + .builder(expectedSchema) + .set(JMSMessageParts.PAYLOAD, "Hello World!").build(); + + // actual + StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config); + + // asserts + TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord); + } + + @Test + public void convertJMSTextMessage_withMetadataAndPropertiesButNoSchema_successfully() throws JMSException { + String payloadContent = "Hello World!"; + + TextMessage textMessage = mock(TextMessage.class); + when(textMessage.getText()).thenReturn(payloadContent); + TestUtils.mockMetadata(textMessage); + TestUtils.mockProperties(textMessage); + + boolean keepMetadata = true; + boolean keepProperties = true; + String schema = null; + JMSStreamingSourceConfig config = TestUtils + .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema); + + // expected + Schema expectedSchema = Schema.recordOf("record", JMSMessageMetadata.getMessageMetadata(), + Schema.Field.of(JMSMessageParts.PROPERTIES, Schema.of(Schema.Type.STRING)), + Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING))); + StructuredRecord.Builder expectedRecordBuilder = StructuredRecord.builder(expectedSchema); + TestUtils.populateDummyMetadata(expectedRecordBuilder, expectedSchema); + TestUtils.populateDummyProperties(expectedRecordBuilder, expectedSchema); + expectedRecordBuilder.set(JMSMessageParts.PAYLOAD, payloadContent); + StructuredRecord expectedRecord = expectedRecordBuilder.build(); + + // actual + StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config); + + // asserts + TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord); + } + + @Test + public void convertJMSTextMessage_withMetadataAndPropertiesAndSchema_successfully() throws JMSException { + String payloadContent = "Hello World!"; + + TextMessage textMessage = mock(TextMessage.class); + when(textMessage.getText()).thenReturn(payloadContent); + TestUtils.mockMetadata(textMessage); + TestUtils.mockProperties(textMessage); + + Schema.Field propertiesField = Schema.Field.of( + JMSMessageParts.PROPERTIES, + Schema.recordOf( + JMSMessageParts.PROPERTIES, + Schema.Field.of("boolean-prop", Schema.of(Schema.Type.BOOLEAN)), + Schema.Field.of("string-prop", Schema.of(Schema.Type.STRING)), + Schema.Field.of("int-prop", Schema.of(Schema.Type.BOOLEAN)) + ) + ); + + Schema outputSchema = Schema.recordOf( + "record", + JMSMessageMetadata.getMessageMetadata(), + propertiesField, + Schema.Field.of(JMSMessageParts.PAYLOAD, Schema.of(Schema.Type.STRING)) + ); + + boolean keepMetadata = true; + boolean keepProperties = true; + String schema = outputSchema.toString(); + + JMSStreamingSourceConfig config = TestUtils + .getSourceConfig(keepMetadata, keepProperties, JMSMessageType.TEXT, schema); + + // expected + StructuredRecord.Builder expectedRecordBuilder = StructuredRecord.builder(outputSchema); + TestUtils.populateDummyMetadata(expectedRecordBuilder, outputSchema); + TestUtils.populateDummyProperties(expectedRecordBuilder, outputSchema); + expectedRecordBuilder.set(JMSMessageParts.PAYLOAD, payloadContent); + StructuredRecord expectedRecord = expectedRecordBuilder.build(); + + // actual + StructuredRecord actualRecord = JMSSourceUtils.convertJMSTextMessage(textMessage, config); + + // asserts + TestUtils.assertEqualsStructuredRecords(expectedRecord, actualRecord); + } +} diff --git a/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java b/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java new file mode 100644 index 0000000..9139a00 --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/common/DummyObject.java @@ -0,0 +1,47 @@ +package io.cdap.plugin.jms.source.common; + +import java.io.Serializable; +import java.util.Objects; + +public class DummyObject implements Serializable { + String dummyStr; + int dummyInt; + + public DummyObject(String dummyStr, int dummyInt) { + this.dummyStr = dummyStr; + this.dummyInt = dummyInt; + } + + public String getDummyStr() { + return dummyStr; + } + + public void setDummyStr(String dummyStr) { + this.dummyStr = dummyStr; + } + + public int getDummyInt() { + return dummyInt; + } + + public void setDummyInt(int dummyInt) { + this.dummyInt = dummyInt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DummyObject that = (DummyObject) o; + return dummyInt == that.dummyInt && Objects.equals(dummyStr, that.dummyStr); + } + + @Override + public int hashCode() { + return Objects.hash(dummyStr, dummyInt); + } +} diff --git a/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java b/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java new file mode 100644 index 0000000..72f0e58 --- /dev/null +++ b/src/test/java/io/cdap/plugin/jms/source/common/TestUtils.java @@ -0,0 +1,189 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jms.source.common; + +import com.google.gson.Gson; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.jms.common.JMSMessageMetadata; +import io.cdap.plugin.jms.common.JMSMessageParts; +import io.cdap.plugin.jms.source.JMSStreamingSourceConfig; +import org.junit.Assert; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; + +import static org.mockito.Mockito.when; + +public class TestUtils { + + public static JMSStreamingSourceConfig getSourceConfig(boolean messageMetadata, boolean messageProperties, + String messageType, String schema) { + return new JMSStreamingSourceConfig("referenceName", "Connection Factory", "jms-username", "jms-password", + "tcp://0.0.0.0:61616", "Queue", "jndi-context-factory", "jndi-username", + "jndi-password", String.valueOf(messageMetadata), + String.valueOf(messageProperties), messageType, "MyQueue", schema); + } + + public static void populateDummyMetadata(StructuredRecord.Builder builder, Schema schema) { + StructuredRecord.Builder metadataBuilder = StructuredRecord + .builder(schema.getField(JMSMessageParts.METADATA).getSchema()); + + metadataBuilder.set(JMSMessageMetadata.MESSAGE_ID, "jms-message-id"); + metadataBuilder.set(JMSMessageMetadata.MESSAGE_TIMESTAMP, 112233445566L); + metadataBuilder.set(JMSMessageMetadata.CORRELATION_ID, "jms-correlation-id"); + metadataBuilder.set(JMSMessageMetadata.REPLY_TO, getDummyDestination().toString()); + metadataBuilder.set(JMSMessageMetadata.DESTINATION, getDummyDestination().toString()); + metadataBuilder.set(JMSMessageMetadata.DELIVERY_MODE, 0); + metadataBuilder.set(JMSMessageMetadata.REDELIVERED, false); + metadataBuilder.set(JMSMessageMetadata.TYPE, "jms-type"); + metadataBuilder.set(JMSMessageMetadata.EXPIRATION, 112233445566L); + metadataBuilder.set(JMSMessageMetadata.PRIORITY, 0); + + builder.set(JMSMessageParts.METADATA, metadataBuilder.build()); + } + + + public static void populateDummyProperties(StructuredRecord.Builder builder, Schema schema) { + boolean isPropertiesString = schema.getField(JMSMessageParts.PROPERTIES) + .getSchema() + .getType() + .equals(Schema.Type.STRING); + + boolean isPropertiesRecord = schema.getField(JMSMessageParts.PROPERTIES) + .getSchema() + .getType() + .equals(Schema.Type.RECORD); + + if (isPropertiesString) { + HashMap properties = new HashMap<>(); + properties.put("boolean-prop", true); + properties.put("byte-prop", (byte) 100); + properties.put("short-prop", (short) 200); + properties.put("int-prop", 300); + properties.put("long-prop", 400L); + properties.put("float-prop", 500.0f); + properties.put("double-prop", 600.0d); + properties.put("string-prop", "string-prop"); + builder.set(JMSMessageParts.PROPERTIES, new Gson().toJson(properties)); + } else if (isPropertiesRecord) { + StructuredRecord propertiesRecord = StructuredRecord + .builder(schema.getField(JMSMessageParts.PROPERTIES).getSchema()) + .set("boolean-prop", true) + .set("string-prop", "Hello World!") + .set("int-prop", 300) + .build(); + + builder.set(JMSMessageParts.PROPERTIES, propertiesRecord); + } + } + + public static void mockMetadata(Message message) throws JMSException { + when(message.getJMSMessageID()).thenReturn("jms-message-id"); + when(message.getJMSCorrelationID()).thenReturn("jms-correlation-id"); + when(message.getJMSReplyTo()).thenReturn(getDummyDestination()); + when(message.getJMSDestination()).thenReturn(getDummyDestination()); + when(message.getJMSType()).thenReturn("jms-type"); + when(message.getJMSTimestamp()).thenReturn(112233445566L); + when(message.getJMSDeliveryMode()).thenReturn(0); + when(message.getJMSRedelivered()).thenReturn(false); + when(message.getJMSExpiration()).thenReturn(112233445566L); + when(message.getJMSPriority()).thenReturn(0); + } + + public static void mockProperties(Message message) throws JMSException { + List propertyNames = Arrays.asList("boolean-prop", "byte-prop", "short-prop", "int-prop", "long-prop", + "float-prop", "double-prop", "string-prop"); + when(message.getPropertyNames()).thenReturn(Collections.enumeration(propertyNames)); + when(message.getBooleanProperty("boolean-prop")).thenReturn(true); + when(message.getByteProperty("byte-prop")).thenReturn((byte) 100); + when(message.getShortProperty("short-prop")).thenReturn((short) 100); + when(message.getIntProperty("int-prop")).thenReturn(300); + when(message.getLongProperty("long-prop")).thenReturn(400L); + when(message.getFloatProperty("float-prop")).thenReturn(500.0f); + when(message.getDoubleProperty("double-prop")).thenReturn(600.0d); + when(message.getStringProperty("string-prop")).thenReturn("string-prop"); + + when(message.getObjectProperty("boolean-prop")).thenReturn(true); + when(message.getObjectProperty("byte-prop")).thenReturn((byte) 100); + when(message.getObjectProperty("short-prop")).thenReturn((short) 200); + when(message.getObjectProperty("int-prop")).thenReturn(300); + when(message.getObjectProperty("long-prop")).thenReturn(400L); + when(message.getObjectProperty("float-prop")).thenReturn(500.0f); + when(message.getObjectProperty("double-prop")).thenReturn(600.0d); + when(message.getObjectProperty("string-prop")).thenReturn("string-prop"); + + when(message.propertyExists("boolean-prop")).thenReturn(true); + when(message.propertyExists("byte-prop")).thenReturn(true); + when(message.propertyExists("short-prop")).thenReturn(true); + when(message.propertyExists("int-prop")).thenReturn(true); + when(message.propertyExists("long-prop")).thenReturn(true); + when(message.propertyExists("float-prop")).thenReturn(true); + when(message.propertyExists("double-prop")).thenReturn(true); + when(message.propertyExists("string-prop")).thenReturn(true); + } + + private static Destination getDummyDestination() { + return new Destination() { + @Override + public String toString() { + return "Destination"; + } + }; + } + + public static void assertEqualsStructuredRecords(StructuredRecord expected, StructuredRecord actual) { + Assert.assertEquals(expected.getSchema(), actual.getSchema()); + for (Schema.Field field : expected.getSchema().getFields()) { + + switch (field.getSchema().getType()) { + case BOOLEAN: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case INT: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case LONG: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case FLOAT: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case DOUBLE: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case BYTES: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case STRING: + Assert.assertEquals(expected.get(field.getName()), actual.get(field.getName())); + break; + case ARRAY: + Assert.assertEquals(new String(expected.get(field.getName()), StandardCharsets.UTF_8), + new String(actual.get(field.getName()), StandardCharsets.UTF_8)); + break; + } + } + } + +} diff --git a/suppressions.xml b/suppressions.xml new file mode 100644 index 0000000..f1ba54a --- /dev/null +++ b/suppressions.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + diff --git a/widgets/JMS-batchsink.json b/widgets/JMS-batchsink.json new file mode 100644 index 0000000..d39d46a --- /dev/null +++ b/widgets/JMS-batchsink.json @@ -0,0 +1,100 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "JMS", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName" + }, + { + "widget-type": "textbox", + "label": "Connection Factory", + "name": "connectionFactory", + "widget-attributes": { + "default": "ConnectionFactory" + } + }, + { + "widget-type": "textbox", + "label": "JMS Username", + "name": "jmsUsername" + }, + { + "widget-type": "password", + "label": "JMS Password", + "name": "jmsPassword" + }, + { + "widget-type": "text", + "label": "Provider URL", + "name": "providerUrl", + "widget-attributes": { + "placeholder": "tcp://hostname:61616" + } + }, + { + "widget-type": "select", + "label": "Type", + "name": "type", + "widget-attributes": { + "values": [ + "Queue", + "Topic" + ], + "default": "Queue" + } + }, + { + "widget-type": "textbox", + "label": "Destination Queue or Topic Name", + "name": "destinationName" + }, + { + "widget-type": "textbox", + "label": "JNDI Context Factory", + "name": "jndiContextFactory", + "widget-attributes": { + "default": "org.apache.activemq.jndi.ActiveMQInitialContextFactory" + } + }, + { + "widget-type": "textbox", + "label": "JNDI Username", + "name": "jndiUsername" + }, + { + "widget-type": "password", + "label": "JNDI Password", + "name": "jndiPassword" + }, + { + "widget-type": "select", + "label": "Message Type", + "name": "messageType", + "widget-attributes": { + "values": [ + "Message", + "Text", + "Bytes", + "Map" + ], + "default": "Text" + } + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": {} + } + ] +} diff --git a/widgets/JMS-streamingsource.json b/widgets/JMS-streamingsource.json new file mode 100644 index 0000000..61abfa6 --- /dev/null +++ b/widgets/JMS-streamingsource.json @@ -0,0 +1,207 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "JMS", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName" + }, + { + "widget-type": "textbox", + "label": "Connection Factory", + "name": "connectionFactory", + "widget-attributes": { + "default": "ConnectionFactory" + } + }, + { + "widget-type": "textbox", + "label": "JMS Username", + "name": "jmsUsername" + }, + { + "widget-type": "password", + "label": "JMS Password", + "name": "jmsPassword" + }, + { + "widget-type": "text", + "label": "Provider URL", + "name": "providerUrl", + "widget-attributes": { + "placeholder": "tcp://hostname:61616" + } + }, + { + "widget-type": "select", + "label": "Type", + "name": "type", + "widget-attributes": { + "values": [ + "Queue", + "Topic" + ], + "default": "Queue" + } + }, + { + "widget-type": "textbox", + "label": "Source Queue or Topic Name", + "name": "sourceName" + }, + { + "widget-type": "textbox", + "label": "JNDI Context Factory", + "name": "jndiContextFactory", + "widget-attributes": { + "default": "org.apache.activemq.jndi.ActiveMQInitialContextFactory" + } + }, + { + "widget-type": "textbox", + "label": "JNDI Username", + "name": "jndiUsername" + }, + { + "widget-type": "password", + "label": "JNDI Password", + "name": "jndiPassword" + }, + { + "widget-type": "toggle", + "name": "messageMetadata", + "label": "Keep Message Metadata", + "widget-attributes": { + "default": "true", + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + } + } + }, + { + "widget-type": "toggle", + "name": "messageProperties", + "label": "Keep Message Properties", + "widget-attributes": { + "default": "true", + "on": { + "value": "true", + "label": "True" + }, + "off": { + "value": "false", + "label": "False" + } + } + }, + { + "widget-type": "select", + "label": "Message Type", + "name": "messageType", + "widget-attributes": { + "values": [ + "Message", + "Text", + "Bytes", + "Map", + "Object" + ], + "default": "Text" + } + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "default-schema": { + "name": "etlSchemaBody", + "type": "record", + "fields": [ + { + "name": "metadata", + "type": { + "type": "record", + "name": "metadata", + "fields": [ + { + "name": "messageId", + "type": ["string", "null"], + "default": null + }, + { + "name": "messageTimestamp", + "type": ["long", "null"], + "default": null + }, + { + "name": "correlationId", + "type": ["string", "null"], + "default": null + }, + { + "name": "replyTo", + "type": ["string", "null"], + "default": null + }, + { + "name": "destination", + "type": ["string", "null"], + "default": null + }, + { + "name": "deliveryNode", + "type": ["int", "null"], + "default": null + }, + { + "name": "redelivered", + "type": ["boolean", "null"], + "default": null + }, + { + "name": "type", + "type": ["boolean", "null"], + "default": null + }, + { + "name": "expiration", + "type": ["long", "null"], + "default": null + }, + { + "name": "priority", + "type": ["int", "null"], + "default": null + } + ] + } + }, + { + "name": "payload", + "type": "string" + }, + { + "name": "properties", + "type": "string" + } + ] + } + } + } + ] +}