From 136d79dea017756eeb6cbb32e1474e71e11f5477 Mon Sep 17 00:00:00 2001 From: Kevin Conner Date: Wed, 26 Jul 2017 10:36:42 -0700 Subject: [PATCH] Cleanup of drain codebase in preparation for the initial release: CLOUD-1952 --- .gitignore | 3 + drain/pom.xml | 38 +-- .../ce/amq/drain/BrokerServiceDrainer.java | 142 +++++++++ .../java/org/jboss/ce/amq/drain/Main.java | 214 ------------- .../java/org/jboss/ce/amq/drain/Stats.java | 61 ---- .../java/org/jboss/ce/amq/drain/Utils.java | 10 +- .../org/jboss/ce/amq/drain/jms/Client.java | 170 ---------- .../org/jboss/ce/amq/drain/jms/Consumer.java | 135 -------- .../org/jboss/ce/amq/drain/jms/Producer.java | 59 ---- .../jboss/ce/amq/drain/jmx/AbstractJMX.java | 294 ------------------ .../org/jboss/ce/amq/drain/jmx/DTSTuple.java | 39 --- .../ce/amq/drain/jmx/DestinationHandle.java | 50 --- .../java/org/jboss/ce/amq/drain/jmx/JMX.java | 43 --- .../jboss/ce/amq/drain/jmx/JMXFactory.java | 33 -- .../org/jboss/ce/amq/drain/jmx/JarFinder.java | 48 --- .../org/jboss/ce/amq/drain/jmx/PokeJMX.java | 57 ---- .../org/jboss/ce/amq/drain/jmx/RemoteJMX.java | 119 ------- .../ce/amq/drain/log/RandomFileAppender.java | 29 -- drain/src/main/resources/log4j.properties | 5 +- .../org/jboss/test/ce/amq/ConsumerTest.java | 69 ---- .../org/jboss/test/ce/amq/ProducerTest.java | 61 ---- .../java/org/jboss/test/ce/amq/TestBase.java | 35 --- pom.xml | 21 +- 23 files changed, 167 insertions(+), 1568 deletions(-) create mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/BrokerServiceDrainer.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/Main.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/Stats.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jms/Client.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jms/Consumer.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jms/Producer.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/AbstractJMX.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/DTSTuple.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/DestinationHandle.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMX.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMXFactory.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/JarFinder.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/PokeJMX.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/jmx/RemoteJMX.java delete mode 100644 drain/src/main/java/org/jboss/ce/amq/drain/log/RandomFileAppender.java delete mode 100644 drain/src/test/java/org/jboss/test/ce/amq/ConsumerTest.java delete mode 100644 drain/src/test/java/org/jboss/test/ce/amq/ProducerTest.java delete mode 100644 drain/src/test/java/org/jboss/test/ce/amq/TestBase.java diff --git a/.gitignore b/.gitignore index 109bfc3..06eb854 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ *.ipr *.iws drain/target/ +.classpath +.project +.settings diff --git a/drain/pom.xml b/drain/pom.xml index 4b470f2..6b06571 100644 --- a/drain/pom.xml +++ b/drain/pom.xml @@ -5,7 +5,7 @@ org.jboss.ce ce-amq-parent - 1.0.0.Alpha1 + 1.0.0.Final ../pom.xml @@ -15,6 +15,14 @@ CE AMQ - Drain Pre-stop drain hook. + + + Apache License Version 2.0 + https://repository.jboss.org/licenses/apache-2.0.txt + repo + + + @@ -32,41 +40,19 @@ - org.apache.activemq - activemq-client + activemq-broker - org.slf4j - slf4j-log4j12 + org.apache.activemq + activemq-kahadb-store log4j log4j - - - junit - junit - - - - - - - default - - true - - - true - - - - - diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/BrokerServiceDrainer.java b/drain/src/main/java/org/jboss/ce/amq/drain/BrokerServiceDrainer.java new file mode 100644 index 0000000..e0e99cf --- /dev/null +++ b/drain/src/main/java/org/jboss/ce/amq/drain/BrokerServiceDrainer.java @@ -0,0 +1,142 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2016 Red Hat Inc. and/or its affiliates and other + * contributors as indicated by the @author tags. All rights reserved. + * See the copyright.txt in the distribution for a full listing of + * individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.jboss.ce.amq.drain; + +import java.io.File; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Ales Justin + */ +public class BrokerServiceDrainer { + private static final Logger log = LoggerFactory.getLogger(BrokerServiceDrainer.class); + + private static final String ACTIVEMQ_DATA = "activemq.data" ; + + private static final String MESH_URL_FORMAT = "kube://%s:61616/?queryInterval=%s"; + + public static void main(final String[] args) throws Exception { + final String dataDir; + switch(args.length) { + case 0: + dataDir = Utils.getSystemPropertyOrEnvVar(ACTIVEMQ_DATA); + break; + default: + dataDir = args[0]; + } + + if (dataDir == null) { + throw new IllegalArgumentException("Missing ActiveMQ data directory!"); + } else { + final File dataDirFile = new File(dataDir); + if (!dataDirFile.exists()) { + throw new IllegalArgumentException("Missing ActiveMQ data directory!"); + } else if (!dataDirFile.isDirectory()) { + throw new IllegalArgumentException("ActiveMQ data directory is not a directory!"); + } + } + + final BrokerService broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setBrokerName(getBrokerName()); + broker.setUseJmx(true); + + broker.setDataDirectory(dataDir); + log.info(String.format("Data directory %s.", dataDir)); + + final PersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); + adaptor.setDirectory(new File(dataDir + "/kahadb")); + broker.setPersistenceAdapter(adaptor); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + ConditionalNetworkBridgeFilterFactory filterFactory = new ConditionalNetworkBridgeFilterFactory(); + filterFactory.setReplayWhenNoConsumers(true); + defaultEntry.setNetworkBridgeFilterFactory(filterFactory); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + + String meshServiceName = Utils.getSystemPropertyOrEnvVar("amq.mesh.service.name"); + if (meshServiceName == null) { + meshServiceName = getApplicationName() + "-amq-tcp"; + } + String queryInterval = Utils.getSystemPropertyOrEnvVar("amq.mesh.query.interval", "3"); + String meshURL = String.format(MESH_URL_FORMAT, meshServiceName, queryInterval); + + // programmatically add the draining bridge, depends on the mesh url only (could be in the xml config either) + log.info("Creating network connector."); + NetworkConnector drainingNetworkConnector = broker.addNetworkConnector(meshURL); + drainingNetworkConnector.setUserName(getUsername()); + drainingNetworkConnector.setPassword(getPassword()); + drainingNetworkConnector.setMessageTTL(-1); + drainingNetworkConnector.setConsumerTTL(1); + drainingNetworkConnector.setStaticBridge(true); + drainingNetworkConnector.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQQueue("*")})); + + log.info("Starting broker."); + broker.start(); + broker.waitUntilStarted(); + log.info("Started broker."); + + long msgs; + while ((msgs = broker.getAdminView().getTotalMessageCount()) > 0) { + log.info(String.format("Still %s msgs left to migrate ...", msgs)); + TimeUnit.SECONDS.sleep(5); + } + + broker.stop(); + + log.info("-- [CE] A-MQ migration finished. --"); + } + + public static String getApplicationName() { + return Utils.getSystemPropertyOrEnvVar("application.name"); + } + + public static String getBrokerName() { + return Utils.getSystemPropertyOrEnvVar("broker.name", Utils.getSystemPropertyOrEnvVar("hostname", "localhost")); + } + + public static String getPassword() { + return Utils.getSystemPropertyOrEnvVar("amq.password"); + } + + public static String getUsername() { + return Utils.getSystemPropertyOrEnvVar("amq.user"); + } +} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/Main.java b/drain/src/main/java/org/jboss/ce/amq/drain/Main.java deleted file mode 100644 index 2848817..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/Main.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Message; - -import org.jboss.ce.amq.drain.jms.Consumer; -import org.jboss.ce.amq.drain.jms.Producer; -import org.jboss.ce.amq.drain.jmx.DTSTuple; -import org.jboss.ce.amq.drain.jmx.DestinationHandle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Ales Justin - */ -public class Main { - private static final Logger log = LoggerFactory.getLogger(Main.class); - - private String consumerURL = Utils.getSystemPropertyOrEnvVar("consumer.url", "tcp://" + Utils.getSystemPropertyOrEnvVar("hostname", "localhost") + ":61616"); - private String consumerUsername = Utils.getSystemPropertyOrEnvVar("consumer.username", Utils.getSystemPropertyOrEnvVar("amq.user")); - private String consumerPassword = Utils.getSystemPropertyOrEnvVar("consumer.password", Utils.getSystemPropertyOrEnvVar("amq.password")); - - private String producerURL = Utils.getSystemPropertyOrEnvVar("producer.url"); - private String producerUsername = Utils.getSystemPropertyOrEnvVar("producer.username", Utils.getSystemPropertyOrEnvVar("amq.user")); - private String producerPassword = Utils.getSystemPropertyOrEnvVar("producer.password", Utils.getSystemPropertyOrEnvVar("amq.password")); - - public static void main(String[] args) { - try { - Main main = new Main(); - main.validate(); - main.run(); - } catch (Exception e) { - log.error("Error draining broker: " + e.getMessage(), e); - System.exit(1); - } - } - - protected void validate() throws Exception { - if (producerURL == null && getProducerURLRaw() == null) { - throw new IllegalArgumentException("Missing producer url!"); - } - } - - protected String getProducerURL() { - if (producerURL == null) { - producerURL = "tcp://" + getProducerURLRaw() + ":61616"; - } - return producerURL; - } - - private String getProducerURLRaw() { - String appName = Utils.getSystemPropertyOrEnvVar("application.name"); - String url = Utils.getSystemPropertyOrEnvVar(appName + ".amq.tcp.service.host"); - if (url == null) { - url = Utils.getSystemPropertyOrEnvVar("amq.service.name"); - } - return url; - } - - protected void info() { - log.info("Running A-MQ migration ..."); - } - - protected boolean check() { - try (Producer producer = new Producer(getProducerURL(), producerUsername, producerPassword)) { - producer.start(); - producer.stop(); - log.info("A-MQ service accessible ..."); - return true; - } catch (Exception e) { - log.info(String.format("Cannot connect to A-MQ service [%s]: %s", getProducerURL(), e)); - return false; - } - } - - public void run() throws Exception { - info(); - - if (!check()) { - return; - } - - final AtomicBoolean terminating = new AtomicBoolean(); - final Semaphore statsSemaphore = new Semaphore(0); - - final Stats stats = new Stats(); - try { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - public void run() { - terminating.set(true); - try { - statsSemaphore.acquire(); - } catch (InterruptedException ignored) { - // ignore as we are terminating - } - stats.dumpStats(); - } - })); - - if (!terminating.get()) { - try (Producer queueProducer = new Producer(getProducerURL(), producerUsername, producerPassword)) { - queueProducer.start(); - - try (Consumer queueConsumer = new Consumer(consumerURL, consumerUsername, consumerPassword)) { - queueConsumer.start(); - - int msgsCounter; - - // drain queues - Collection queues = queueConsumer.getJMX().queues(); - log.info("Found queues: {}", queues); - for (DestinationHandle handle : queues) { - if (terminating.get()) { - break; - } - msgsCounter = 0; - String queue = queueConsumer.getJMX().queueName(handle); - log.info("Processing queue: '{}'", queue); - stats.setSize(queue, queueConsumer.currentQueueSize(handle)); - Producer.ProducerProcessor processor = queueProducer.processQueueMessages(queue); - Iterator iter = queueConsumer.consumeQueue(handle, queue); - while (iter.hasNext() && !terminating.get()) { - Message next = iter.next(); - processor.processMessage(next); - msgsCounter++; - stats.increment(queue); - } - log.info("Handled {} messages for queue '{}'.", msgsCounter, queue); - } - } - } - } - - if (!terminating.get()) { - try (Consumer dtsConsumer = new Consumer(consumerURL, consumerUsername, consumerPassword)) { - int msgsCounter; - // drain durable topic subscribers - Set ids = new HashSet<>(); - Collection subscribers = dtsConsumer.getJMX().durableTopicSubscribers(); - log.info("Found durable topic subscribers: {}", subscribers); - for (DestinationHandle handle : subscribers) { - if (terminating.get()) { - break; - } - msgsCounter = 0; - DTSTuple tuple = dtsConsumer.getJMX().dtsTuple(handle); - try (Producer dtsProducer = new Producer(getProducerURL(), producerUsername, producerPassword)) { - dtsProducer.start(tuple.clientId); - - dtsProducer.getTopicSubscriber(tuple.topic, tuple.subscriptionName).close(); // just create dts on producer-side - - Producer.ProducerProcessor processor = dtsProducer.processTopicMessages(tuple.topic); - dtsConsumer.getJMX().disconnect(tuple.clientId); - dtsConsumer.start(tuple.clientId); - try { - log.info("Processing topic subscriber : '{}' [{}]", tuple.topic, tuple.subscriptionName); - stats.setSize(tuple.topic + "/" + tuple.subscriptionName, dtsConsumer.currentTopicSubscriptionSize(handle)); - Iterator iter = dtsConsumer.consumeDurableTopicSubscriptions(handle, tuple.topic, tuple.subscriptionName); - while (iter.hasNext() && !terminating.get()) { - Message next = iter.next(); - if (ids.add(next.getJMSMessageID())) { - processor.processMessage(next); - msgsCounter++; - stats.increment(tuple.topic + "/" + tuple.subscriptionName); - } - } - log.info("Handled {} messages for topic subscriber '{}' [{}].", msgsCounter, tuple.topic, tuple.subscriptionName); - } finally { - //noinspection ThrowFromFinallyBlock - dtsConsumer.close(); - } - } - } - log.info("Consumed {} messages.", ids.size()); - } - } - - if (!terminating.get()) { - log.info("-- [CE] A-MQ migration finished. --"); - } - } finally { - statsSemaphore.release(); - } - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/Stats.java b/drain/src/main/java/org/jboss/ce/amq/drain/Stats.java deleted file mode 100644 index 1067ae2..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/Stats.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Ales Justin - */ -class Stats { - private static final Logger log = LoggerFactory.getLogger(Stats.class); - - private Map sizes = new ConcurrentHashMap<>(); - private Map counters = new ConcurrentHashMap<>(); - - void setSize(String destination, int size) { - sizes.put(destination, size); - } - - void increment(String destination) { - AtomicInteger x = counters.get(destination); - if (x == null) { - x = new AtomicInteger(0); - counters.put(destination, x); - } - x.incrementAndGet(); - } - - public void dumpStats() { - log.info("A-MQ migration statistics ... ('destination' -> [processed / all])"); - for (Map.Entry entry : counters.entrySet()) { - log.info(String.format("Processing stats: '%s' -> [%s / %s]", entry.getKey(), entry.getValue(), sizes.get(entry.getKey()))); - } - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/Utils.java b/drain/src/main/java/org/jboss/ce/amq/drain/Utils.java index dace6ee..bd1d6be 100644 --- a/drain/src/main/java/org/jboss/ce/amq/drain/Utils.java +++ b/drain/src/main/java/org/jboss/ce/amq/drain/Utils.java @@ -27,22 +27,18 @@ * @author Ales Justin */ public class Utils { - private static String checkForNone(String value) { - return "__none".equalsIgnoreCase(value) ? null : value; - } - private static String getSystemPropertyOrEnvVar(String systemPropertyName, String envVarName, String defaultValue) { String answer = System.getProperty(systemPropertyName); if (answer != null) { - return checkForNone(answer); + return answer; } answer = System.getenv(envVarName); if (answer != null) { - return checkForNone(answer); + return answer; } - return checkForNone(defaultValue); + return defaultValue; } private static String convertSystemPropertyNameToEnvVar(String systemPropertyName) { diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Client.java b/drain/src/main/java/org/jboss/ce/amq/drain/jms/Client.java deleted file mode 100644 index ef5d438..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Client.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jms; - -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.activemq.ActiveMQConnectionFactory; - -/** - * @author Ales Justin - */ -public abstract class Client implements Closeable { - private String url; - private String username; - private String password; - - private boolean transacted = false; - private int mode = Session.AUTO_ACKNOWLEDGE; - - private String clientId; - - private Connection connection; - private Session session; - - public Client(String url, String username, String password) { - this(url, username, password, null); - } - - public Client(String url, String username, String password, String clientId) { - this.url = url; - this.username = username; - this.password = password; - this.clientId = clientId; - } - - public void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - public void setMode(int mode) { - this.mode = mode; - } - - protected Session getSession() { - if (session == null) { - throw new IllegalStateException("No start invoked?"); - } - return session; - } - - protected ConnectionFactory getConnectionFactory() { - return new ActiveMQConnectionFactory(url); - } - - protected void init(Connection connection) throws JMSException { - if (clientId != null) { - connection.setClientID(clientId); - } - } - - public void close() throws IOException { - try { - close(connection); - } catch (JMSException ignored) { - } - } - - protected void close(Connection connection) throws JMSException { - if (connection != null) { - connection.close(); - } - } - - public void start(String clientId) throws JMSException { - this.clientId = clientId; - start(); - } - - public void start() throws JMSException { - ConnectionFactory cf = getConnectionFactory(); - connection = (username != null && password != null) ? cf.createConnection(username, password) : cf.createConnection(); - init(connection); - session = connection.createSession(transacted, mode); - connection.start(); - } - - public void stop() throws JMSException { - if (connection == null) { - throw new IllegalStateException("No start invoked?"); - } - connection.stop(); - } - - public TopicSubscriber getTopicSubscriber(String topicName, String subscriptionName) throws JMSException { - final Topic topic = getSession().createTopic(topicName); - return getSession().createDurableSubscriber(topic, subscriptionName); - } - - public TopicSubscriber topicSubscriber(String topicName, String subscriptionName) throws JMSException { - return getTopicSubscriber(topicName, subscriptionName); - } - - public Message createMessage() throws JMSException { - return getSession().createMessage(); - } - - public TextMessage createTextMessage() throws JMSException { - return getSession().createTextMessage(); - } - - public TextMessage createTextMessage(String text) throws JMSException { - return getSession().createTextMessage(text); - } - - public BytesMessage createBytesMessage() throws JMSException { - return getSession().createBytesMessage(); - } - - public ObjectMessage createObjectMessage() throws JMSException { - return getSession().createObjectMessage(); - } - - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return getSession().createObjectMessage(serializable); - } - - public StreamMessage createStreamMessage() throws JMSException { - return getSession().createStreamMessage(); - } - - public MapMessage createMapMessage() throws JMSException { - return getSession().createMapMessage(); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Consumer.java b/drain/src/main/java/org/jboss/ce/amq/drain/jms/Consumer.java deleted file mode 100644 index c39344d..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Consumer.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jms; - -import java.util.Iterator; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.TopicSubscriber; - -import org.jboss.ce.amq.drain.jmx.DestinationHandle; -import org.jboss.ce.amq.drain.jmx.JMX; -import org.jboss.ce.amq.drain.jmx.JMXFactory; - -/** - * @author Ales Justin - */ -public class Consumer extends Client { - - private JMX jmx; - - public Consumer(String url, String username, String password) { - this(url, username, password, null); - } - - public Consumer(String url, String username, String password, String clientId) { - super(url, username, password, clientId); - } - - public JMX getJMX() { - if (jmx == null) { - jmx = JMXFactory.createJMX(); - } - return jmx; - } - - private Queue createQueue(String queueName) throws JMSException { - return getSession().createQueue(queueName); - } - - public MessageConsumer queueConsumer(String queueName) throws JMSException { - return getSession().createConsumer(createQueue(queueName)); - } - - public int currentQueueSize(DestinationHandle handle) throws Exception { - return getJMX().getAttribute(Number.class, handle, "QueueSize").intValue(); - } - - public Iterator consumeQueue(DestinationHandle handle, String queueName) throws JMSException { - final Queue queue = createQueue(queueName); - return consumeMessages(queue, handle, "QueueSize"); - } - - public Iterator consumeDurableTopicSubscriptions(DestinationHandle handle, String topicName, String subscriptionName) throws Exception { - int pendingQueueSize = getJMX().getAttribute(Number.class, handle, "PendingQueueSize").intValue(); - TopicSubscriber subscriber = getTopicSubscriber(topicName, subscriptionName); - return consumeMessages(subscriber, new PendingQueueSizeChecker(pendingQueueSize)); - } - - public int currentTopicSubscriptionSize(DestinationHandle handle) throws Exception { - return getJMX().getAttribute(Number.class, handle, "PendingQueueSize").intValue(); - } - - private Iterator consumeMessages(Destination destination, final DestinationHandle handle, final String attributeName) throws JMSException { - final MessageConsumer consumer = getSession().createConsumer(destination); - return consumeMessages(consumer, new NextChecker() { - public boolean hasNext() throws Exception { - return getJMX().hasNextMessage(handle, attributeName); - } - }); - } - - private Iterator consumeMessages(final MessageConsumer consumer, final NextChecker checker) throws JMSException { - return new Iterator() { - public boolean hasNext() { - try { - return checker.hasNext(); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - public Message next() { - try { - return consumer.receive(); - } catch (JMSException e) { - throw new IllegalStateException(e); - } - } - - public void remove() { - } - }; - } - - private interface NextChecker { - boolean hasNext() throws Exception; - } - - private static class PendingQueueSizeChecker implements NextChecker { - private int size; - - public PendingQueueSizeChecker(int size) { - this.size = size; - } - - public boolean hasNext() throws Exception { - return (size-- > 0); - } - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Producer.java b/drain/src/main/java/org/jboss/ce/amq/drain/jms/Producer.java deleted file mode 100644 index 63bd6ff..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jms/Producer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jms; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; - -/** - * @author Ales Justin - */ -public class Producer extends Client { - public Producer(String url, String username, String password) { - super(url, username, password); - } - - public ProducerProcessor processQueueMessages(String queue) throws JMSException { - return processMessages(getSession().createQueue(queue)); - } - - public ProducerProcessor processTopicMessages(String topic) throws JMSException { - return processMessages(getSession().createTopic(topic)); - } - - private ProducerProcessor processMessages(Destination destination) throws JMSException { - final MessageProducer producer = getSession().createProducer(destination); - return new ProducerProcessor() { - public void processMessage(Message message) throws JMSException { - producer.send(message); - } - }; - } - - public interface ProducerProcessor { - void processMessage(Message message) throws JMSException; - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/AbstractJMX.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/AbstractJMX.java deleted file mode 100644 index a635e40..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/AbstractJMX.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.StringTokenizer; - -import javax.management.AttributeList; -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.QueryExp; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - -import org.jboss.ce.amq.drain.Utils; - -/** - * @author ActiveMQ codebase - * @author Ales Justin - */ -abstract class AbstractJMX { - private static String DEFAULT_JMX_URL; - private static String jmxUser; - private static String jmxPassword; - - private static final String PATTERN; - - private static final String CONNECTOR_ADDRESS = "com.sun.management.jmxremote.localConnectorAddress"; - - private JMXServiceURL jmxServiceUrl; - private JMXConnector jmxConnector; - private MBeanServerConnection jmxConnection; - - static { - DEFAULT_JMX_URL = Utils.getSystemPropertyOrEnvVar("activemq.jmx.url", "service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"); - jmxUser = Utils.getSystemPropertyOrEnvVar("activemq.jmx.user"); - jmxPassword = Utils.getSystemPropertyOrEnvVar("activemq.jmx.password"); - PATTERN = Utils.getSystemPropertyOrEnvVar("jmx.pattern", "activemq.jar start"); - } - - T getAttribute(Class type, ObjectName objectName, String attributeName) throws Exception { - return getAttribute(type, createJmxConnection(), objectName, attributeName); - } - - static T getAttribute(Class type, MBeanServerConnection connection, ObjectName objectName, String attributeName) throws Exception { - AttributeList attributes = connection.getAttributes(objectName, new String[]{attributeName}); - if (attributes.size() > 0) { - Object value = attributes.asList().get(0).getValue(); - return type.cast(value); - } - return null; - } - - // ActiveMQ impl details - - protected List queryMBeans(MBeanServerConnection jmxConnection, String queryString) throws Exception { - return new MBeansObjectNameQueryFilter(jmxConnection).query(queryString); - } - - protected abstract void print(String msg); - - private JMXServiceURL getJmxServiceUrl() { - return jmxServiceUrl; - } - - private void setJmxServiceUrl(JMXServiceURL jmxServiceUrl) { - this.jmxServiceUrl = jmxServiceUrl; - } - - private void setJmxServiceUrl(String jmxServiceUrl) throws MalformedURLException { - setJmxServiceUrl(new JMXServiceURL(jmxServiceUrl)); - } - - MBeanServerConnection createJmxConnection() throws IOException { - if (jmxConnection == null) { - jmxConnection = createJmxConnector().getMBeanServerConnection(); - } - return jmxConnection; - } - - private JMXConnector createJmxConnector() throws IOException { - // Reuse the previous connection - if (jmxConnector != null) { - jmxConnector.connect(); - return jmxConnector; - } - - // Create a new JMX connector - if (jmxUser != null && jmxPassword != null) { - Map props = new HashMap<>(); - props.put(JMXConnector.CREDENTIALS, new String[]{jmxUser, jmxPassword}); - jmxConnector = JMXConnectorFactory.connect(useJmxServiceUrl(), props); - } else { - jmxConnector = JMXConnectorFactory.connect(useJmxServiceUrl()); - } - return jmxConnector; - } - - private void dumpEnv() { - try { - print(String.format("user.name --> %s", System.getProperty("user.name"))); - print("---"); - String process; - // getRuntime: Returns the runtime object associated with the current Java application. - // exec: Executes the specified string command in a separate process. - Process p = Runtime.getRuntime().exec("ps auxwww"); - try (BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()))) { - while ((process = input.readLine()) != null) { - print(process); // <-- Print all Process here line by line - } - } - print("---"); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @SuppressWarnings("unchecked") - private JMXServiceURL useJmxServiceUrl() throws MalformedURLException { - if (getJmxServiceUrl() == null) { - String jmxUrl = DEFAULT_JMX_URL; - int connectingPid = -1; - try { - // Classes are all dynamically loaded, since they are specific to Sun VM - // if it fails for any reason default jmx url will be used - - // tools.jar are not always included used by default class loader, so we - // will try to use custom loader that will try to load tools.jar - - String javaHome = System.getProperty("java.home"); - print(String.format("java.home --> %s", javaHome)); - - File tools = JarFinder.findJar(javaHome, "tools.jar"); - URLClassLoader loader = new URLClassLoader(new URL[]{tools.toURI().toURL()}); - - Class virtualMachine = Class.forName("com.sun.tools.attach.VirtualMachine", true, loader); - Class virtualMachineDescriptor = Class.forName("com.sun.tools.attach.VirtualMachineDescriptor", true, loader); - - Method getVMList = virtualMachine.getMethod("list", (Class[]) null); - Method attachToVM = virtualMachine.getMethod("attach", String.class); - Method getAgentProperties = virtualMachine.getMethod("getAgentProperties", (Class[]) null); - Method loadAgent = virtualMachine.getMethod("loadAgent", String.class); - Method getVMDescriptor = virtualMachineDescriptor.getMethod("displayName", (Class[]) null); - Method getVMId = virtualMachineDescriptor.getMethod("id", (Class[]) null); - - List allVMs = (List) getVMList.invoke(null, (Object[]) null); - print(String.format("Found %s VMs ...", allVMs.size())); - - // dumpEnv(); - - for (Object vmInstance : allVMs) { - String displayName = (String) getVMDescriptor.invoke(vmInstance, (Object[]) null); - print(String.format("VM -- display-name: %s", displayName)); - if (displayName.contains(PATTERN)) { - String id = (String) getVMId.invoke(vmInstance, (Object[]) null); - - Object vm = attachToVM.invoke(null, id); - - Properties agentProperties = (Properties) getAgentProperties.invoke(vm, (Object[]) null); - String connectorAddress = agentProperties.getProperty(CONNECTOR_ADDRESS); - if (connectorAddress == null) { - String agent = JarFinder.findJar(javaHome, "management-agent.jar").getPath(); - loadAgent.invoke(vm, agent); - // re-check - agentProperties = (Properties) getAgentProperties.invoke(vm, (Object[]) null); - connectorAddress = agentProperties.getProperty(CONNECTOR_ADDRESS); - } - - if (connectorAddress != null) { - jmxUrl = connectorAddress; - connectingPid = Integer.parseInt(id); - print("useJmxServiceUrl Found JMS Url: " + jmxUrl); - } else { - print(String.format("No %s property!?", CONNECTOR_ADDRESS)); - } - break; - } - } - } catch (Exception e) { - print("Error: " + e.getMessage()); - e.printStackTrace(); - } - - if (connectingPid != -1) { - print("Connecting to pid: " + connectingPid); - } else { - print("Connecting to JMX URL: " + jmxUrl); - } - setJmxServiceUrl(jmxUrl); - } - - return getJmxServiceUrl(); - } - - private static class MBeansObjectNameQueryFilter { - static final String QUERY_DELIMETER = ","; - static final String DEFAULT_JMX_DOMAIN = Utils.getSystemPropertyOrEnvVar("jmx.domain", "org.apache.activemq"); - static final String QUERY_EXP_PREFIX = "MBeans.QueryExp."; - - private MBeanServerConnection jmxConnection; - - private MBeansObjectNameQueryFilter(MBeanServerConnection jmxConnection) { - this.jmxConnection = jmxConnection; - } - - private List query(String query) throws Exception { - // Converts string query to map query - StringTokenizer tokens = new StringTokenizer(query, QUERY_DELIMETER); - return query(Collections.list(tokens)); - } - - private List query(List queries) throws MalformedObjectNameException, IOException { - // Query all mbeans - if (queries == null || queries.isEmpty()) { - return queryMBeans(new ObjectName(DEFAULT_JMX_DOMAIN + ":*"), null); - } - - // Constructs object name query - String objNameQuery = ""; - String queryExp = ""; - String delimiter = ""; - for (Object query : queries) { - String key = (String) query; - String val = ""; - int pos = key.indexOf("="); - if (pos >= 0) { - val = key.substring(pos + 1); - key = key.substring(0, pos); - } else { - objNameQuery += delimiter + key; - } - - if (!val.startsWith(QUERY_EXP_PREFIX)) { - if (!key.equals("") && !val.equals("")) { - objNameQuery += delimiter + key + "=" + val; - delimiter = ","; - } - } - } - - return queryMBeans(new ObjectName(DEFAULT_JMX_DOMAIN + ":" + objNameQuery), queryExp); - } - - private List queryMBeans(ObjectName objName, String queryExpStr) throws IOException { - QueryExp queryExp = createQueryExp(queryExpStr); - // Convert mbeans set to list to make it standard throughout the query filter - return new ArrayList<>(jmxConnection.queryMBeans(objName, queryExp)); - } - - private QueryExp createQueryExp(@SuppressWarnings("UnusedParameters") String queryExpStr) { - // Currently unsupported - return null; - } - - } - -} \ No newline at end of file diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DTSTuple.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DTSTuple.java deleted file mode 100644 index 212c3e7..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DTSTuple.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -/** - * @author Ales Justin - */ -public class DTSTuple { - public final String clientId; - public final String topic; - public final String subscriptionName; - - public DTSTuple(String clientId, String topic, String subscriptionName) { - this.clientId = clientId; - this.topic = topic; - this.subscriptionName = subscriptionName; - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DestinationHandle.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DestinationHandle.java deleted file mode 100644 index 691d3be..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/DestinationHandle.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import javax.management.ObjectName; - -/** - * @author Ales Justin - */ -public class DestinationHandle implements Comparable { - private ObjectName objectName; - - public DestinationHandle(ObjectName objectName) { - this.objectName = objectName; - } - - public ObjectName getObjectName() { - return objectName; - } - - public int compareTo(DestinationHandle other) { - return getObjectName().compareTo(other.getObjectName()); - } - - @Override - public String toString() { - return getObjectName().getCanonicalName(); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMX.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMX.java deleted file mode 100644 index 98d11b0..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMX.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import java.util.Collection; - -/** - * @author Ales Justin - */ -public interface JMX { - Collection queues() throws Exception; - String queueName(DestinationHandle handle) throws Exception; - - Collection durableTopicSubscribers() throws Exception; - DTSTuple dtsTuple(DestinationHandle handle) throws Exception; - - void disconnect(String clientId) throws Exception; - - boolean hasNextMessage(DestinationHandle handle, String attributeName) throws Exception; - - T getAttribute(Class type, DestinationHandle handle, String attributeName) throws Exception; -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMXFactory.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMXFactory.java deleted file mode 100644 index 8ad327c..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JMXFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -/** - * @author Ales Justin - */ -public class JMXFactory { - public static JMX createJMX() { - return new RemoteJMX(); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JarFinder.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JarFinder.java deleted file mode 100644 index c7c5364..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/JarFinder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import java.io.File; - -/** - * @author Ales Justin - */ -class JarFinder { - static File findJar(String javaHome, String jarName) { - // try Sun/Oracle path - String sunFile = javaHome + File.separator + ".." + File.separator + "lib" + File.separator + jarName; - File file = new File(sunFile); - if (file.exists()) { - return file; - } - // try OpenJDK path - String openjdkFile = javaHome + File.separator + "lib" + File.separator + jarName; - file = new File(openjdkFile); - if (file.exists()) { - return file; - } - // error - throw new IllegalStateException(String.format("Cannot find %s!", jarName)); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/PokeJMX.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/PokeJMX.java deleted file mode 100644 index cd0ed22..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/PokeJMX.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import java.util.logging.Logger; - -import javax.management.ObjectInstance; - -/** - * @author Ales Justin - */ -public class PokeJMX extends AbstractJMX { - private static final Logger log = Logger.getLogger(PokeJMX.class.getName()); - - public static void main(String[] args) throws Exception { - if (args == null || args.length == 0) { - System.out.println(System.getProperties()); - System.out.println("------------"); - System.out.println("Sleeping ..."); - Thread.sleep(10 * 60 * 1000L); - } else { - PokeJMX jmx = new PokeJMX(); - jmx.list(args); - } - } - - private void list(String[] args) throws Exception { - for (ObjectInstance oi : queryMBeans(createJmxConnection(), args[0])) { - print(oi.getObjectName().getCanonicalName()); - } - } - - protected void print(String msg) { - log.info(msg); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/RemoteJMX.java b/drain/src/main/java/org/jboss/ce/amq/drain/jmx/RemoteJMX.java deleted file mode 100644 index d3ea40d..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/jmx/RemoteJMX.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.ce.amq.drain.jmx; - -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import javax.management.AttributeList; -import javax.management.ObjectInstance; -import javax.management.ObjectName; - -import org.jboss.ce.amq.drain.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author Ales Justin - */ -class RemoteJMX extends AbstractJMX implements JMX { - private static final Logger log = LoggerFactory.getLogger(RemoteJMX.class); - private static final String brokerQueryString = "type=Broker,brokerName=%s"; - private static final String connectionQueryString = "type=Broker,brokerName=%s,connectionViewType=clientId,connectionName=%s"; - - private static String BROKER_NAME; - - static { - BROKER_NAME = Utils.getSystemPropertyOrEnvVar("broker.name", Utils.getSystemPropertyOrEnvVar("hostname", "localhost")); - } - - public Collection queues() throws Exception { - return destinations("Queues"); - } - - public String queueName(DestinationHandle handle) throws Exception { - return getAttribute(String.class, handle, "Name"); - } - - public Collection durableTopicSubscribers() throws Exception { - return destinations("InactiveDurableTopicSubscribers"); - } - - public DTSTuple dtsTuple(DestinationHandle handle) throws Exception { - String clientId = getAttribute(String.class, handle, "ClientId"); - String topic = getAttribute(String.class, handle, "DestinationName"); - String subscriptionName = getAttribute(String.class, handle, "SubscriptionName"); - return new DTSTuple(clientId, topic, subscriptionName); - } - - public void disconnect(String clientId) throws Exception { - String query = connectionQuery(clientId); - List mbeans = queryMBeans(createJmxConnection(), query); - for (ObjectInstance mbean : mbeans) { - createJmxConnection().invoke(mbean.getObjectName(), "stop", new Object[0], new String[0]); - } - } - - public boolean hasNextMessage(DestinationHandle handle, String attributeName) throws Exception { - AttributeList attributes = createJmxConnection().getAttributes(handle.getObjectName(), new String[]{attributeName}); - if (attributes.size() > 0) { - Object value = attributes.asList().get(0).getValue(); - Number number = Number.class.cast(value); - return (number.longValue() > 0); - } - return false; - } - - public T getAttribute(Class type, DestinationHandle handle, String attributeName) throws Exception { - return getAttribute(type, handle.getObjectName(), attributeName); - } - - private String brokerQuery() { - return String.format(brokerQueryString, BROKER_NAME); - } - - private String connectionQuery(String clientId) { - return String.format(connectionQueryString, BROKER_NAME, clientId); - } - - private Collection destinations(String type) throws Exception { - String query = brokerQuery(); - List mbeans = queryMBeans(createJmxConnection(), query); - Set destinations = new TreeSet<>(); - for (ObjectInstance mbean : mbeans) { - ObjectName objectName = mbean.getObjectName(); - ObjectName[] names = getAttribute(ObjectName[].class, objectName, type); - for (ObjectName on : names) { - destinations.add(new DestinationHandle(on)); - } - } - return destinations; - } - - protected void print(String msg) { - log.info(msg); - } -} diff --git a/drain/src/main/java/org/jboss/ce/amq/drain/log/RandomFileAppender.java b/drain/src/main/java/org/jboss/ce/amq/drain/log/RandomFileAppender.java deleted file mode 100644 index 68ab104..0000000 --- a/drain/src/main/java/org/jboss/ce/amq/drain/log/RandomFileAppender.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.jboss.ce.amq.drain.log; - -import java.security.SecureRandom; -import java.util.Random; - -import org.apache.log4j.FileAppender; - -/** - * @author Ales Justin - */ -public class RandomFileAppender extends FileAppender { - private static final Random RANDOM = new SecureRandom(); - - private String baseDir = "/opt/amq/data/"; - - @Override - public void activateOptions() { - setFile(getBaseDir() + String.format("drainlog-%s.log", RANDOM.nextInt(1000))); - super.activateOptions(); - } - - public String getBaseDir() { - return baseDir; - } - - public void setBaseDir(String baseDir) { - this.baseDir = baseDir; - } -} diff --git a/drain/src/main/resources/log4j.properties b/drain/src/main/resources/log4j.properties index 0a02ec8..788a878 100644 --- a/drain/src/main/resources/log4j.properties +++ b/drain/src/main/resources/log4j.properties @@ -8,8 +8,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.conversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c %3x - %m%n # Define the file appender -#log4j.appender.file=org.apache.log4j.FileAppender -#log4j.appender.file.file=/dev/termination-log -log4j.appender.file=org.jboss.ce.amq.drain.log.RandomFileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.file=migration.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.conversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c %3x - %m%n diff --git a/drain/src/test/java/org/jboss/test/ce/amq/ConsumerTest.java b/drain/src/test/java/org/jboss/test/ce/amq/ConsumerTest.java deleted file mode 100644 index a7cd526..0000000 --- a/drain/src/test/java/org/jboss/test/ce/amq/ConsumerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.test.ce.amq; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.TopicSubscriber; - -import org.jboss.ce.amq.drain.Utils; -import org.jboss.ce.amq.drain.jms.Consumer; -import org.junit.Test; - -/** - * @author Ales Justin - */ -public class ConsumerTest extends TestBase { - - private static final String URL = Utils.getSystemPropertyOrEnvVar("consumer.test.url", "tcp://localhost:61616"); - - private static final long TIMEOUT = 3000; - - @Test - public void testConsumeQueue() throws Exception { - try (Consumer consumer = new Consumer(URL, null, null, "tmp123")) { - consumer.start(); - - MessageConsumer mc = consumer.queueConsumer(QUEUE); - Message message; - while ((message = mc.receive(TIMEOUT)) != null) { - System.out.println(message); - } - } - } - - @Test - public void testConsumeTopic() throws Exception { - try (Consumer consumer = new Consumer(URL, null, null, "tmp123")) { - consumer.start(); - - TopicSubscriber ts = consumer.topicSubscriber(TOPIC, SUBSCRIPTION_NAME); - Message message; - while ((message = ts.receive(TIMEOUT)) != null) { - System.out.println(message); - } - } - } - -} diff --git a/drain/src/test/java/org/jboss/test/ce/amq/ProducerTest.java b/drain/src/test/java/org/jboss/test/ce/amq/ProducerTest.java deleted file mode 100644 index cda3d85..0000000 --- a/drain/src/test/java/org/jboss/test/ce/amq/ProducerTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.test.ce.amq; - -import javax.jms.TextMessage; - -import org.jboss.ce.amq.drain.Utils; -import org.jboss.ce.amq.drain.jms.Producer; -import org.junit.Test; - -/** - * @author Ales Justin - */ -public class ProducerTest extends TestBase { - - private static final String URL = Utils.getSystemPropertyOrEnvVar("producer.test.url", "tcp://localhost:61616"); - - @Test - public void testProduceQueue() throws Exception { - try (Producer producer = new Producer(URL, null, null)) { - producer.start(); - - Producer.ProducerProcessor handle = producer.processQueueMessages(QUEUE); - TextMessage message = producer.createTextMessage("TEST -- queue + " + System.currentTimeMillis()); - handle.processMessage(message); - } - } - - @Test - public void testProduceTopic() throws Exception { - try (Producer producer = new Producer(URL, null, null)) { - producer.start(); - - Producer.ProducerProcessor handle = producer.processTopicMessages(TOPIC); - TextMessage message = producer.createTextMessage("TEST -- topic + " + System.currentTimeMillis()); - handle.processMessage(message); - } - } - -} diff --git a/drain/src/test/java/org/jboss/test/ce/amq/TestBase.java b/drain/src/test/java/org/jboss/test/ce/amq/TestBase.java deleted file mode 100644 index 9735a6a..0000000 --- a/drain/src/test/java/org/jboss/test/ce/amq/TestBase.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2016 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.jboss.test.ce.amq; - -import org.jboss.ce.amq.drain.Utils; - -/** - * @author Ales Justin - */ -public abstract class TestBase { - static final String QUEUE = Utils.getSystemPropertyOrEnvVar("test.queue", "QUEUES.FOO"); - static final String TOPIC = Utils.getSystemPropertyOrEnvVar("test.topic", "TOPICS.FOO"); - static final String SUBSCRIPTION_NAME = Utils.getSystemPropertyOrEnvVar("test.subscription", "BAR"); -} diff --git a/pom.xml b/pom.xml index cf4cb73..0d8e013 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ org.jboss.ce ce-amq-parent - 1.0.0.Alpha1 + 1.0.0.Final pom CE AMQ - Parent @@ -48,9 +48,7 @@ 5.11.0 - 1.7.5 1.2.17 - 4.11 @@ -59,17 +57,16 @@ - org.apache.activemq - activemq-client + activemq-broker ${version.org.apache.activemq} - org.slf4j - slf4j-log4j12 - ${version.log4j-slf4j} + org.apache.activemq + activemq-kahadb-store + ${version.org.apache.activemq} @@ -77,14 +74,6 @@ log4j ${version.log4j} - - - junit - junit - ${version.junit} - test - -