From ca5d2b7d0f26c0c748236bd35bdb7b0d4b344611 Mon Sep 17 00:00:00 2001 From: Nikolay Dimitrov Date: Wed, 10 Jul 2019 13:54:41 +0300 Subject: [PATCH 1/5] Kafka timeouts now throw an exception --- .../symmetric/load/KafkaWriterFilter.java | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 7c11394328..308221ca43 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -10,8 +10,11 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -26,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; import org.jumpmind.db.model.Table; import org.jumpmind.symmetric.common.ParameterConstants; @@ -231,7 +235,8 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - sendKafkaMessageByObject(pojo, kafkaDataKey); + Future kafkaResult = sendKafkaMessageByObject(pojo, kafkaDataKey); + kafkaResult.get(); } else { throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } @@ -247,7 +252,13 @@ else if (Long.class.equals(propertyTypeClass)) { } catch (InstantiationException e) { log.info("Unable to instantiate a constructor on POJO based on table " + tableName, e); throw new RuntimeException(e); - } + } catch (InterruptedException e) { + log.info("Unable to write to Kafka, table " + tableName, e); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.info("Unable to write to Kafka, table " + tableName, e); + throw new RuntimeException(e); + } } else { GenericData.Record avroRecord = new GenericData.Record(schema); avroRecord.put("table", table.getName()); @@ -394,6 +405,8 @@ public void earlyCommit(DataContext context) { } public void batchComplete(DataContext context) { + LinkedList> pending = new LinkedList>(); + if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); @@ -404,22 +417,30 @@ public void batchComplete(DataContext context) { for (Map.Entry> entry : kafkaDataMap.entrySet()) { - for (String row : entry.getValue()) { + + for (String row : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - sendKafkaMessage(producer, row, entry.getKey()); + Future result = sendKafkaMessage(producer, row, entry.getKey()); + pending.add(result); } else { kafkaText.append(row); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + Future result = sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + pending.add(result); } } + + for (Future request: pending) { + request.get(); + } kafkaDataMap = new HashMap>(); } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); e.printStackTrace(); + throw new RuntimeException("Unable to write batch to Kafka " + batchFileName, e); } finally { producer.close(); context.put(KAFKA_TEXT_CACHE, new HashMap>()); @@ -435,15 +456,17 @@ public void batchCommitted(DataContext context) { public void batchRolledback(DataContext context) { } - public void sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { - producer.send(new ProducerRecord(topic, kafkaText)); - log.debug("Data to be sent to Kafka-" + kafkaText); + public Future sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { + log.debug("Data to be sent to Kafka-" + kafkaText); + return producer.send(new ProducerRecord(topic, kafkaText)); + } - public void sendKafkaMessageByObject(Object bean, String topic) { + public Future sendKafkaMessageByObject(Object bean, String topic) { KafkaProducer producer = new KafkaProducer(configs); - producer.send(new ProducerRecord(topic, bean)); + Future result = producer.send(new ProducerRecord(topic, bean)); producer.close(); + return result; } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException { From 75073bb92e3d1d76367b7840a428139073aaea03 Mon Sep 17 00:00:00 2001 From: Nikolay Dimitrov Date: Wed, 10 Jul 2019 14:01:26 +0300 Subject: [PATCH 2/5] Formatted according to JumpMind specs --- .../symmetric/load/KafkaWriterFilter.java | 68 ++++++++----------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 308221ca43..9471da4d2c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -74,19 +74,10 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { private String schemaPackage; - private String[] parseDatePatterns = new String[] { - "yyyy/MM/dd HH:mm:ss.SSSSSS", - "yyyy-MM-dd HH:mm:ss", - "ddMMMyyyy:HH:mm:ss.SSS Z", - "ddMMMyyyy:HH:mm:ss.SSS", - "yyyy-MM-dd HH:mm:ss.SSS", - "ddMMMyyyy:HH:mm:ss.SSSSSS", - "yyyy-MM-dd", - "yyyy-MM-dd'T'HH:mmZZZZ", - "yyyy-MM-dd'T'HH:mm:ssZZZZ", - "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ" - }; - + private String[] parseDatePatterns = new String[] { "yyyy/MM/dd HH:mm:ss.SSSSSS", "yyyy-MM-dd HH:mm:ss", "ddMMMyyyy:HH:mm:ss.SSS Z", + "ddMMMyyyy:HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss.SSS", "ddMMMyyyy:HH:mm:ss.SSSSSS", "yyyy-MM-dd", "yyyy-MM-dd'T'HH:mmZZZZ", + "yyyy-MM-dd'T'HH:mm:ssZZZZ", "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ" }; + private List schemaPackageClassNames = new ArrayList(); public final static String KAFKA_FORMAT_XML = "XML"; @@ -144,7 +135,7 @@ public KafkaWriterFilter(IParameterService parameterService) { } public boolean beforeWrite(DataContext context, Table table, CsvData data) { - + if (table.getNameLowerCase().startsWith("sym_")) { return true; } else { @@ -219,18 +210,15 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); if (CharSequence.class.equals(propertyTypeClass)) { PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); - } - else if (Long.class.equals(propertyTypeClass)) { + } else if (Long.class.equals(propertyTypeClass)) { Date date = null; try { date = DateUtils.parseDate(rowData[i], parseDatePatterns); - } - catch (Exception e) { + } catch (Exception e) { log.debug(rowData[i] + " was not a recognized date format so treating it as a long."); } BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]); - } - else { + } else { BeanUtils.setProperty(pojo, colName, rowData[i]); } } @@ -238,7 +226,8 @@ else if (Long.class.equals(propertyTypeClass)) { Future kafkaResult = sendKafkaMessageByObject(pojo, kafkaDataKey); kafkaResult.get(); } else { - throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); + throw new RuntimeException( + "Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } } catch (NoSuchMethodException e) { log.info("Unable to find setter on POJO based on table " + table.getName(), e); @@ -255,10 +244,10 @@ else if (Long.class.equals(propertyTypeClass)) { } catch (InterruptedException e) { log.info("Unable to write to Kafka, table " + tableName, e); throw new RuntimeException(e); - } catch (ExecutionException e) { + } catch (ExecutionException e) { log.info("Unable to write to Kafka, table " + tableName, e); throw new RuntimeException(e); - } + } } else { GenericData.Record avroRecord = new GenericData.Record(schema); avroRecord.put("table", table.getName()); @@ -405,38 +394,37 @@ public void earlyCommit(DataContext context) { } public void batchComplete(DataContext context) { - LinkedList> pending = new LinkedList>(); - + LinkedList> pending = new LinkedList>(); + if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); - + KafkaProducer producer = new KafkaProducer(configs); try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); - - + for (Map.Entry> entry : kafkaDataMap.entrySet()) { - - for (String row : entry.getValue()) { + + for (String row : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - Future result = sendKafkaMessage(producer, row, entry.getKey()); - pending.add(result); + Future result = sendKafkaMessage(producer, row, entry.getKey()); + pending.add(result); } else { kafkaText.append(row); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - Future result = sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); - pending.add(result); + Future result = sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + pending.add(result); } } - - for (Future request: pending) { - request.get(); + + for (Future request : pending) { + request.get(); } kafkaDataMap = new HashMap>(); - } + } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); e.printStackTrace(); @@ -457,9 +445,9 @@ public void batchRolledback(DataContext context) { } public Future sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { - log.debug("Data to be sent to Kafka-" + kafkaText); + log.debug("Data to be sent to Kafka-" + kafkaText); return producer.send(new ProducerRecord(topic, kafkaText)); - + } public Future sendKafkaMessageByObject(Object bean, String topic) { From a54dbaf4d4e947586b63ea3e4e68f4143e724992 Mon Sep 17 00:00:00 2001 From: Nikolay Dimitrov Date: Wed, 10 Jul 2019 14:02:48 +0300 Subject: [PATCH 3/5] Added comments --- .../java/org/jumpmind/symmetric/load/KafkaWriterFilter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 9471da4d2c..7f4d50c9e7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -224,7 +224,7 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { } } Future kafkaResult = sendKafkaMessageByObject(pojo, kafkaDataKey); - kafkaResult.get(); + kafkaResult.get(); //Wait for Kafka to send pending messages or throw an exception } else { throw new RuntimeException( "Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); @@ -421,6 +421,7 @@ public void batchComplete(DataContext context) { } for (Future request : pending) { + //Wait for Kafka to send pending messages or throw an exception request.get(); } kafkaDataMap = new HashMap>(); From 67e3c6f775f2f415b5bff0d33f5f96cffd03670d Mon Sep 17 00:00:00 2001 From: Nikolay Dimitrov Date: Fri, 19 Jul 2019 18:37:02 +0300 Subject: [PATCH 4/5] Skip CREATE events --- .../java/org/jumpmind/symmetric/load/KafkaWriterFilter.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 7f4d50c9e7..7093bfd596 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -145,6 +145,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { if (data.getDataEventType() == DataEventType.DELETE) { rowData = data.getParsedData(CsvData.OLD_DATA); } + else if (data.getDataEventType() == DataEventType.CREATE) { + return false; + } StringBuffer kafkaText = new StringBuffer(); From 87ab226bfe0f87459ff4b78031527ae23ff9266f Mon Sep 17 00:00:00 2001 From: Nikolay Dimitrov Date: Thu, 15 Aug 2019 15:30:58 +0300 Subject: [PATCH 5/5] JSON for Kafka is now created using GSON instead of manually in order to escape special symbols properly --- .../symmetric/load/KafkaWriterFilter.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 7093bfd596..15552a01c8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils; import org.springframework.util.SystemPropertyUtils; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; @@ -80,6 +83,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { private List schemaPackageClassNames = new ArrayList(); + private Gson gson = new Gson(); + public final static String KAFKA_FORMAT_XML = "XML"; public final static String KAFKA_FORMAT_JSON = "JSON"; public final static String KAFKA_FORMAT_AVRO = "AVRO"; @@ -163,23 +168,18 @@ else if (data.getDataEventType() == DataEventType.CREATE) { List kafkaDataList = kafkaDataMap.get(kafkaDataKey); if (outputFormat.equals(KAFKA_FORMAT_JSON)) { - kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",") - .append("\"data\": { "); + JsonObject jsonData = new JsonObject(); for (int i = 0; i < table.getColumnNames().length; i++) { - kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": "); - - if (rowData[i] != null) { - kafkaText.append("\""); - } - kafkaText.append(rowData[i]); - if (rowData[i] != null) { - kafkaText.append("\""); - } - if (i + 1 < table.getColumnNames().length) { - kafkaText.append(","); - } + jsonData.addProperty(table.getColumnNames()[i], rowData[i]); } - kafkaText.append(" } } }"); + JsonObject change = new JsonObject(); + change.addProperty("eventType", data.getDataEventType().toString()); + change.add("data", jsonData); + + JsonObject kafkaMessage = new JsonObject(); + kafkaMessage.add(table.getName(), change); + + kafkaText.append(gson.toJson(kafkaMessage)); } else if (outputFormat.equals(KAFKA_FORMAT_CSV)) { kafkaText.append("\nTABLE").append(",").append(table.getName()).append(",").append("EVENT").append(",") .append(data.getDataEventType()).append(",");