Skip to content

Commit

Permalink
Moved to annotation-based plug-in declaration
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Spector committed Jun 20, 2017
1 parent 97aeb26 commit 3c53b02
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 64 deletions.
2 changes: 0 additions & 2 deletions assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
<outputDirectory>.</outputDirectory>
<includes>
<include>*.jar</include>
<include>logo.png</include>
<include>plugin.xml</include>
<include>version.xml</include>
</includes>
</fileSet>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.trans.kafka.consumer;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -31,10 +31,10 @@
*
* @author Michael Spector
*/
public class KafkaConsumerStep extends BaseStep implements StepInterface {
public class KafkaConsumer extends BaseStep implements StepInterface {
public static final String CONSUMER_TIMEOUT_KEY = "consumer.timeout.ms";

public KafkaConsumerStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
public KafkaConsumer(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
Expand All @@ -58,12 +58,12 @@ public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
}
} else {
if (substProperties.containsKey(CONSUMER_TIMEOUT_KEY)) {
logError(Messages.getString("KafkaConsumerStep.WarnConsumerTimeout"));
logError(Messages.getString("KafkaConsumer.WarnConsumerTimeout"));
}
}
ConsumerConfig consumerConfig = new ConsumerConfig(substProperties);

logBasic(Messages.getString("KafkaConsumerStep.CreateKafkaConsumer.Message", consumerConfig.zkConnect()));
logBasic(Messages.getString("KafkaConsumer.CreateKafkaConsumer.Message", consumerConfig.zkConnect()));
data.consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
String topic = environmentSubstitute(meta.getTopic());
Expand Down Expand Up @@ -137,7 +137,7 @@ protected void messageReceived(byte[] key, byte[] message) throws KettleExceptio
putRow(data.outputRowMeta, newRow);

if (isRowLevel()) {
logRowlevel(Messages.getString("KafkaConsumerStep.Log.OutputRow",
logRowlevel(Messages.getString("KafkaConsumer.Log.OutputRow",
Long.toString(getLinesWritten()), data.outputRowMeta.getString(newRow)));
}
}
Expand All @@ -162,7 +162,7 @@ protected void messageReceived(byte[] key, byte[] message) throws KettleExceptio
}
} catch (KettleException e) {
if (!getStepMeta().isDoingErrorHandling()) {
logError(Messages.getString("KafkaConsumerStep.ErrorInStepRunning", e.getMessage()));
logError(Messages.getString("KafkaConsumer.ErrorInStepRunning", e.getMessage()));
setErrors(1);
stopAll();
setOutputDone();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.trans.kafka.consumer;

import java.util.concurrent.Callable;

Expand All @@ -16,9 +16,9 @@ public abstract class KafkaConsumerCallable implements Callable<Object> {

private KafkaConsumerData data;
private KafkaConsumerMeta meta;
private KafkaConsumerStep step;
private KafkaConsumer step;

public KafkaConsumerCallable(KafkaConsumerMeta meta, KafkaConsumerData data, KafkaConsumerStep step) {
public KafkaConsumerCallable(KafkaConsumerMeta meta, KafkaConsumerData data, KafkaConsumer step) {
this.meta = meta;
this.data = data;
this.step = step;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.trans.kafka.consumer;

import kafka.consumer.ConsumerIterator;
import kafka.javaapi.consumer.ConsumerConnector;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.trans.kafka.consumer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Properties;

import org.pentaho.di.core.annotations.Step;
import org.pentaho.di.core.CheckResult;
import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.Const;
Expand Down Expand Up @@ -39,6 +40,13 @@
*
* @author Michael Spector
*/
@Step(
id = "KafkaConsumer",
image = "org/pentaho/di/trans/kafka/consumer/resources/kafka_consumer.png",
i18nPackageName="org.pentaho.di.trans.kafka.consumer",
name="KafkaConsumerDialog.Shell.Title",
description = "KafkaConsumerDialog.Shell.Tooltip",
categoryDescription="i18n:org.pentaho.di.trans.step:BaseStep.Category.Input")
public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface {

public static final String[] KAFKA_PROPERTIES_NAMES = new String[] { "zookeeper.connect", "group.id", "consumer.id",
Expand All @@ -61,7 +69,7 @@ public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface
private String timeout;
private boolean stopOnEmptyTopic;

Properties getKafkaProperties() {
public Properties getKafkaProperties() {
return kafkaProperties;
}

Expand Down Expand Up @@ -181,7 +189,7 @@ public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepM

public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta,
Trans trans) {
return new KafkaConsumerStep(stepMeta, stepDataInterface, cnr, transMeta, trans);
return new KafkaConsumer(stepMeta, stepDataInterface, cnr, transMeta, trans);
}

public StepDataInterface getStepData() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.trans.kafka.consumer;

import org.pentaho.di.i18n.BaseMessages;

Expand Down Expand Up @@ -33,4 +33,4 @@ public static String getString(String key, String param1, String param2, String
String param5, String param6) {
return BaseMessages.getString(clazz, key, param1, param2, param3, param4, param5, param6);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ruckuswireless.pentaho.kafka.consumer;
package org.pentaho.di.ui.trans.kafka.consumer;

import java.util.Arrays;
import java.util.Properties;
Expand Down Expand Up @@ -33,6 +33,9 @@
import org.pentaho.di.ui.core.widget.TextVar;
import org.pentaho.di.ui.trans.step.BaseStepDialog;

import org.pentaho.di.trans.kafka.consumer.KafkaConsumerMeta;
import org.pentaho.di.trans.kafka.consumer.Messages;

/**
* UI for the Kafka Consumer step
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
KafkaConsumerStep.CreateKafkaConsumer.Message=Creating Kafka consumer listening on zookeeper\: {0}
KafkaConsumerStep.Log.OutputRow=Outputting row {0} : {1}
KafkaConsumerStep.ErrorInStepRunning=Error running step \: {0}
KafkaConsumerStep.WarnConsumerTimeout=WARNING\! You have set a consumer timeout, but have not requested termination on an empty topic. This could lead to a transformation failure if the queue becomes empty!
KafkaConsumer.CreateKafkaConsumer.Message=Creating Kafka consumer listening on zookeeper\: {0}
KafkaConsumer.Log.OutputRow=Outputting row {0} : {1}
KafkaConsumer.ErrorInStepRunning=Error running step \: {0}
KafkaConsumer.WarnConsumerTimeout=WARNING\! You have set a consumer timeout, but have not requested termination on an empty topic. This could lead to a transformation failure if the queue becomes empty!
KafkaConsumerMeta.Exception.loadXml=Unable to read step information from XML
KafkaConsumerMeta.Exception.loadRep=Unexpected error reading step information from the repository
KafkaConsumerMeta.Exception.saveRep=Unexpected error writing step information to the repository
KafkaConsumerMeta.Check.InvalidTopic=Topic name must be set\!
KafkaConsumerMeta.Check.InvalidField=Field name must be set\!
KafkaConsumerMeta.Check.InvalidKeyField=Key field name must be set\!
KafkaConsumerDialog.Shell.Title=Apache Kafka Consumer
KafkaConsumerDialog.Shell.Tooltip=Read messages throug a specific topic from a Kafka stream
KafkaConsumerDialog.StepName.Label=Step name
KafkaConsumerDialog.TopicName.Label=Topic name
KafkaConsumerDialog.FieldName.Label=Target message field name
Expand All @@ -18,4 +19,4 @@ KafkaConsumerDialog.Timeout.Label=Maximum duration of consumption (ms)
KafkaConsumerDialog.StopOnEmpty.Label=Stop on empty topic
KafkaConsumerDialog.TableView.Label=Kafka Properties
KafkaConsumerDialog.TableView.NameCol.Label=Name
KafkaConsumerDialog.TableView.ValueCol.Label=Value
KafkaConsumerDialog.TableView.ValueCol.Label=Value
File renamed without changes
40 changes: 0 additions & 40 deletions src/main/resources/plugin.xml

This file was deleted.

0 comments on commit 3c53b02

Please sign in to comment.