Skip to content

Commit

Permalink
Refactoring and fixing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dardanxh committed Apr 22, 2021
1 parent 57be4c9 commit a929316
Show file tree
Hide file tree
Showing 27 changed files with 225 additions and 150 deletions.
83 changes: 58 additions & 25 deletions src/main/java/io/cdap/plugin/jms/common/SchemaValidationUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cdap.plugin.jms.source.JMSStreamingSourceConfig;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -60,20 +61,25 @@ public class SchemaValidationUtils {
WRONG_BODY_DATA_TYPE_FOR_HEADER_ACTION = "The \"header\" field must be of Record data type.",

NOT_SUPPORTED_FIELDS_IN_HEADER_RECORD_ERROR = "Not supported fields set in the header record!",
NOT_SUPPORTED_FIELDS_IN_HEADER_RECORD_ACTION = "The field \"header\" support only the fields " +
NOT_SUPPORTED_FIELDS_IN_HEADER_RECORD_ACTION = "The field \"header\" support only fields: " +
JMSMessageHeader.describe(),

WRONG_PROPERTIES_DATA_TYPE_ERROR = "The field \"properties\" has a not supported data type set!",
WRONG_PROPERTIES_DATA_TYPE_ACTION = "The field \"properties\" is mandatory to be of String or Record data type.",

NOT_SUPPORTED_ROOT_FIELDS_IN_MESSAGE_ERROR = "Not supported root fields in the schema!",
NOT_SUPPORTED_ROOT_FIELDS_IN_MESSAGE_ACTION = "JMS \"Message\" message type supports only \"header\" and " +
"\"properties\" as root fields.";
"\"properties\" as root fields.",

HEADER_AND_PROPERTIES_MISSING_IN_MESSAGE_ERROR = "Fields \"Header\" and \"Properties\" are missing!",
HEADER_AND_PROPERTIES_MISSING_IN_MESSAGE_ACTION = "When JMS \"Message\" message type is selected, it is " +
"mandatory that either \"Header\" or \"Properties\" root fields to be present in schema. " +
"Set at least one of \"Keep Message Header\" or \"Keep Message Properties\" to true.";

/**
* Throws an error if schema is null.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateIfSchemaIsNull(Schema schema, FailureCollector collector) {
Expand All @@ -85,7 +91,7 @@ public static void validateIfSchemaIsNull(Schema schema, FailureCollector collec
/**
* Throws an error if the input schema contains any other root fields except of "header", "properties", and "body".
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateIfAnyNotSupportedRootFieldExists(Schema schema, FailureCollector collector) {
Expand All @@ -104,7 +110,7 @@ public static void validateIfAnyNotSupportedRootFieldExists(Schema schema, Failu
* Throws an error if the input schema does not contain the root field "body". JMS "Message" type is the only message
* type allowed to have the schema without the root field "body".
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateIfBodyNotInSchema(Schema schema, FailureCollector collector) {
Expand All @@ -123,7 +129,7 @@ public static void validateIfBodyNotInSchema(Schema schema, FailureCollector col
/**
* Throws an error if the root field "body" is not of type "string" when JMS "TextMessage" is selected.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateTextMessageSchema(Schema schema, FailureCollector collector) {
Expand All @@ -138,15 +144,15 @@ public static void validateTextMessageSchema(Schema schema, FailureCollector col
/**
* Throws an error if the root field "body" is not of type "string" or "record" when JMS "MapMessage" is selected.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateMapMessageSchema(Schema schema, FailureCollector collector) {
Schema.Type type = schema.getField(JMSMessageParts.BODY).getSchema().getType();
boolean isTypeString = type.equals(Schema.Type.STRING);
boolean isTypeRecord = type.equals(Schema.Type.RECORD);

if (!isTypeString || !isTypeRecord) {
if (!isTypeString && !isTypeRecord) {
tell(collector, WRONG_BODY_DATA_TYPE_IN_MAP_MESSAGE_ERROR, WRONG_BODY_DATA_TYPE_IN_MAP_MESSAGE_ACTION);
}
}
Expand All @@ -155,25 +161,35 @@ public static void validateMapMessageSchema(Schema schema, FailureCollector coll
* Throws an error if the input schema contains any other root fields except of "header", and "properties" when JMS
* "Message" type is selected.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateMessageSchema(Schema schema, FailureCollector collector) {
boolean areNonSupportedRootFieldsPresent = schema
.getFields()
.stream()
.map(field -> field.getName())
.anyMatch(f -> !Arrays.asList(JMSMessageParts.HEADER, JMSMessageParts.PROPERTIES).contains(f));
List<String> fieldNames = schema.getFields().stream().map(field -> field.getName()).collect(Collectors.toList());

boolean areNonSupportedRootFieldsPresent = false;
for (String fieldName: fieldNames) {
if (Arrays.asList(JMSMessageParts.PROPERTIES, JMSMessageParts.HEADER).contains(fieldName)) {
areNonSupportedRootFieldsPresent = true;
break;
}
}

if (areNonSupportedRootFieldsPresent) {
tell(collector, NOT_SUPPORTED_ROOT_FIELDS_IN_MESSAGE_ERROR, NOT_SUPPORTED_ROOT_FIELDS_IN_MESSAGE_ACTION);
}

boolean areHeaderAndPropertiesMissing = !fieldNames.contains(JMSMessageParts.HEADER) &&
!fieldNames.contains(JMSMessageParts.PROPERTIES);
if (areHeaderAndPropertiesMissing) {
tell(collector, HEADER_AND_PROPERTIES_MISSING_IN_MESSAGE_ERROR, HEADER_AND_PROPERTIES_MISSING_IN_MESSAGE_ACTION);
}
}

/**
* Throws an error if the root field "body" is not of type "array of bytes" when JMS "ObjectMessage" is selected.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateObjectMessageSchema(Schema schema, FailureCollector collector) {
Expand Down Expand Up @@ -205,15 +221,15 @@ public static void validateObjectMessageSchema(Schema schema, FailureCollector c
/**
* Throws an error if the root field "body" is not of type "string" or "record" when JMS "BytesMessage" is selected.
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateBytesMessageSchema(Schema schema, FailureCollector collector) {
Schema.Type type = schema.getField(JMSMessageParts.BODY).getSchema().getType();
boolean isTypeString = type.equals(Schema.Type.STRING);
boolean isTypeRecord = type.equals(Schema.Type.RECORD);

if (!isTypeString || !isTypeRecord) {
if (!isTypeString && !isTypeRecord) {
tell(collector, WRONG_BODY_DATA_TYPE_IN_BYTES_MESSAGE_ERROR, WRONG_BODY_DATA_TYPE_IN_BYTES_MESSAGE_ACTION);
}
}
Expand All @@ -223,10 +239,15 @@ public static void validateBytesMessageSchema(Schema schema, FailureCollector co
* contains other fields except of "messageId", "messageTimestamp", "correlationId", "replyTo", "destination",
* "deliveryNode", "redelivered", "type", "expiration", and "priority".
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validateHeaderSchema(Schema schema, FailureCollector collector) {

if (!isFieldPresent(schema, JMSMessageParts.HEADER)) {
return;
}

boolean isTypeRecord = schema
.getField(JMSMessageParts.HEADER)
.getSchema()
Expand All @@ -238,6 +259,8 @@ public static void validateHeaderSchema(Schema schema, FailureCollector collecto
}

boolean areNonSupportedHeaderFieldsPresent = schema
.getField(JMSMessageParts.HEADER)
.getSchema()
.getFields()
.stream()
.map(field -> field.getName())
Expand All @@ -251,40 +274,50 @@ public static void validateHeaderSchema(Schema schema, FailureCollector collecto
/**
* Throws an error if the root field "properties" is not of type "string" or "record".
*
* @param schema the user defined schema
* @param schema the user defined schema
* @param collector the failure collector
*/
public static void validatePropertiesSchema(Schema schema, FailureCollector collector) {

if (!isFieldPresent(schema, JMSMessageParts.PROPERTIES)) {
return;
}

Schema.Type type = schema.getField(JMSMessageParts.PROPERTIES).getSchema().getType();
boolean isTypeString = type.equals(Schema.Type.STRING);
boolean isTypeRecord = type.equals(Schema.Type.RECORD);

if (!isTypeString || !isTypeRecord) {
if (!isTypeString && !isTypeRecord) {
tell(collector, WRONG_PROPERTIES_DATA_TYPE_ERROR, WRONG_PROPERTIES_DATA_TYPE_ACTION);
}
}

/**
* Throws an error and also add the error in the failure collector if one is provided.
*
* @param collector the failure collector
* @param errorMessage the error message
* @param collector the failure collector
* @param errorMessage the error message
* @param correctiveAction the action that the user should perform to resolve the error
*/
private static void tell(FailureCollector collector, String errorMessage, String correctiveAction) {
public static void tell(FailureCollector collector, String errorMessage, String correctiveAction) {
String errorNature = "Error during schema validation";

if (collector != null) {
collector.addFailure(errorNature + ": " + errorMessage, correctiveAction)
.withConfigProperty(JMSStreamingSourceConfig.NAME_SCHEMA);
} else {
throw new RuntimeException(concatenate(errorNature + ": " + errorMessage, correctiveAction));
}
}

throw new RuntimeException(concatenate(errorNature + ": " + errorMessage, correctiveAction));
private static boolean isFieldPresent(Schema schema, String fieldName) {
return schema.getField(fieldName) != null;
}

/**
* Concatenates two strings with a space in between
* @param left the left string
*
* @param left the left string
* @param right the right string
* @return the concatenated string
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package io.cdap.plugin.jms.sink.converters;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.cdap.api.data.schema.Schema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.jms.BytesMessage;
import javax.jms.JMSException;

Expand All @@ -31,24 +29,47 @@ public class RecordToBytesMessageConverter {

/**
* Converts an incoming {@link StructuredRecord} to a JMS {@link BytesMessage}
*
* @param bytesMessage the jms message to be populated with data
* @param record the incoming record
* @param record the incoming record
* @return a JMS bytes message
*/
public static BytesMessage convertStructuredRecordToBytesMessage(BytesMessage bytesMessage, StructuredRecord record) {
byte[] body = null;

public static BytesMessage toBytesMessage(BytesMessage bytesMessage, StructuredRecord record) {
try {
body = StructuredRecordStringConverter.toJsonString(record).getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to convert record to json!", e);
}
for (Schema.Field field : record.getSchema().getFields()) {
String fieldName = field.getName();
Object value = record.get(fieldName);

try {
bytesMessage.writeBytes(body);
switch (field.getSchema().getType()) {
case INT:
bytesMessage.writeInt(cast(value, Integer.class));
break;
case LONG:
bytesMessage.writeLong(cast(value, Long.class));
break;
case DOUBLE:
bytesMessage.writeDouble(cast(value, Double.class));
break;
case FLOAT:
bytesMessage.writeFloat(cast(value, Float.class));
break;
case BOOLEAN:
bytesMessage.writeBoolean(cast(value, Boolean.class));
break;
case BYTES:
bytesMessage.writeBytes(cast(value, byte[].class));
break;
default:
bytesMessage.writeUTF(cast(value, String.class));
}
}
} catch (JMSException e) {
throw new RuntimeException(String.format("%s: %s", e.getErrorCode(), e.getMessage()));
}
return bytesMessage;
}

public static <T> T cast(Object o, Class<T> clazz) {
return o != null ? clazz.cast(o) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RecordToMapMessageConverter {
* @param record the incoming record
* @return a JMS map message
*/
public static MapMessage convertStructuredRecordToMapMessage(MapMessage mapMessage, StructuredRecord record) {
public static MapMessage toMapMessage(MapMessage mapMessage, StructuredRecord record) {
try {
for (Schema.Field field : record.getSchema().getFields()) {
String fieldName = field.getName();
Expand All @@ -59,15 +59,6 @@ public static MapMessage convertStructuredRecordToMapMessage(MapMessage mapMessa
case BYTES:
mapMessage.setBytes(fieldName, cast(value, byte[].class));
break;
case ARRAY:
mapMessage.setObject(fieldName, value);
break;
case RECORD:
mapMessage.setObject(fieldName, value);
break;
case MAP:
mapMessage.setObject(fieldName, value);
break;
default:
mapMessage.setString(fieldName, cast(value, String.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RecordToMessageConverter {
* @param record the incoming record
* @return a JMS message
*/
public static Message convertRecordToMessage(Message message, StructuredRecord record) {
public static Message toMessage(Message message, StructuredRecord record) {
try {
for (Schema.Field field : record.getSchema().getFields()) {
String fieldName = field.getName();
Expand All @@ -56,14 +56,8 @@ public static Message convertRecordToMessage(Message message, StructuredRecord r
case BOOLEAN:
message.setBooleanProperty(fieldName, cast(value, Boolean.class));
break;
case ARRAY:
message.setObjectProperty(fieldName, value);
break;
case RECORD:
message.setObjectProperty(fieldName, value);
break;
case MAP:
message.setObjectProperty(fieldName, value);
case BYTES:
message.setByteProperty(fieldName, cast(value, Byte.class));
break;
default:
message.setStringProperty(fieldName, cast(value, String.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RecordToObjectMessageConverter {
* @param record the incoming record
* @return a JMS object message
*/
public static ObjectMessage convertRecordToObjectMessage(ObjectMessage objectMessage, StructuredRecord record) {
public static ObjectMessage toObjectMessage(ObjectMessage objectMessage, StructuredRecord record) {
byte[] body = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public class RecordToTextMessageConverter {
* @param record the incoming record
* @return a JMS text message
*/
public static TextMessage convertRecordToTextMessage(TextMessage textMessage, StructuredRecord record) {
public static TextMessage toTextMessage(TextMessage textMessage, StructuredRecord record) {
int numFields = record.getSchema().getFields().size();

if (numFields == 1) {
return convertRecordWithSingleField(textMessage, record);
return withSingleField(textMessage, record);
}
return convertRecordWithMultipleFields(textMessage, record);
return withMultipleFields(textMessage, record);
}

/**
Expand All @@ -53,7 +53,7 @@ public static TextMessage convertRecordToTextMessage(TextMessage textMessage, St
* @param record the incoming record
* @return a JMS text message
*/
private static TextMessage convertRecordWithSingleField(TextMessage textMessage, StructuredRecord record) {
private static TextMessage withSingleField(TextMessage textMessage, StructuredRecord record) {
Schema.Field singleField = record.getSchema().getFields().get(0);
String body = record.get(singleField.getName()).toString();
try {
Expand All @@ -72,7 +72,7 @@ private static TextMessage convertRecordWithSingleField(TextMessage textMessage,
* @param record the incoming record
* @return a JMS text message
*/
private static TextMessage convertRecordWithMultipleFields(TextMessage textMessage, StructuredRecord record) {
private static TextMessage withMultipleFields(TextMessage textMessage, StructuredRecord record) {
String body;

try {
Expand Down
Loading

0 comments on commit a929316

Please sign in to comment.