diff --git a/src/main/groovy/bpipe/agent/JMSAgent.groovy b/src/main/groovy/bpipe/agent/JMSAgent.groovy index 56a294cd..8d62ebf0 100644 --- a/src/main/groovy/bpipe/agent/JMSAgent.groovy +++ b/src/main/groovy/bpipe/agent/JMSAgent.groovy @@ -273,8 +273,19 @@ class JMSAgent extends Agent { this.session = connection.createSession(false, acknowledgeMode == 'read' ? Session.AUTO_ACKNOWLEDGE : ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE) this.queue = session.createQueue((String)config.commandQueue) - this.consumer = session.createConsumer(queue) - + + if (config.containsKey('userMessageSelector') && config['userMessageSelector']) { + String username = System.getProperty('user.name') + String messageSelector = "user='$username'" + File userHome = new File(System.getProperty('user.home')) + assert userHome.canRead() && userHome.canonicalPath.endsWith(username), "Directory $userHome is not readable or does not reconcile with $username" + log.info("Creating consumer for queue=${queue.queueName} with messageSelector=$messageSelector") + this.consumer = session.createConsumer(queue, messageSelector) + } else { + log.info("Creating consumer for queue=${queue.queueName}") + this.consumer = session.createConsumer(queue) + } + log.info "Connected to ActiveMQ $config.commandQueue @ $config.brokerURL" }