diff --git a/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java b/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java index 589f67c..06c0817 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java +++ b/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java @@ -56,7 +56,7 @@ JMXConnector open(Map connectionMap) throws IOException { JMXConnector jmxConnector; final Map env = new HashMap<>(); - if(Boolean.valueOf((String)connectionMap.get(Constants.USE_SSL))) { + if(Boolean.valueOf(connectionMap.get(Constants.USE_SSL).toString())) { //TODO this is not needed as even if you comment it the SSL connections still work. SslRMIClientSocketFactory sslRMIClientSocketFactory = new SslRMIClientSocketFactory(); env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, sslRMIClientSocketFactory); diff --git a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java index 407603d..0342380 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java +++ b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java @@ -13,11 +13,21 @@ import com.appdynamics.extensions.kafka.utils.Constants; import com.appdynamics.extensions.kafka.utils.SslUtils; import com.appdynamics.extensions.util.AssertUtils; +import com.google.common.collect.Maps; +import com.singularity.ee.agent.systemagent.api.exception.TaskExecutionException; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.PatternLayout; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static com.appdynamics.extensions.kafka.utils.Constants.DEFAULT_METRIC_PREFIX; @@ -60,4 +70,31 @@ protected int getTaskCount () { return servers.size(); } + public static void main(String[] args) throws TaskExecutionException, IOException { + + ConsoleAppender ca = new ConsoleAppender(); + ca.setWriter(new OutputStreamWriter(System.out)); + ca.setLayout(new PatternLayout("%-5p [%t]: %m%n")); + ca.setThreshold(Level.DEBUG); + org.apache.log4j.Logger.getRootLogger().addAppender(ca); + + + KafkaMonitor monitor = new KafkaMonitor(); + final Map taskArgs = Maps.newHashMap(); + taskArgs.put("config-file", "/Users/vishaka.sekar/AppDynamics/kafka-monitoring-extension/src/main/resources/conf/config.yml"); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(new Runnable() { + public void run() { + try { + monitor.execute(taskArgs, null); + } catch (Exception e) { + logger.error("Error while running the task", e); + } + } + }, 2, 60, TimeUnit.SECONDS); + + } + + + } \ No newline at end of file