From bd4f4ff54f9f73cb4cb9256117059a922df6707d Mon Sep 17 00:00:00 2001 From: "tommy.li" Date: Fri, 16 Jun 2023 14:47:14 +1000 Subject: [PATCH] feat: User agent JMS message selector --- src/main/groovy/bpipe/agent/JMSAgent.groovy | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/bpipe/agent/JMSAgent.groovy b/src/main/groovy/bpipe/agent/JMSAgent.groovy index 56a294cd..8d62ebf0 100644 --- a/src/main/groovy/bpipe/agent/JMSAgent.groovy +++ b/src/main/groovy/bpipe/agent/JMSAgent.groovy @@ -273,8 +273,19 @@ class JMSAgent extends Agent { this.session = connection.createSession(false, acknowledgeMode == 'read' ? Session.AUTO_ACKNOWLEDGE : ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE) this.queue = session.createQueue((String)config.commandQueue) - this.consumer = session.createConsumer(queue) - + + if (config.containsKey('userMessageSelector') && config['userMessageSelector']) { + String username = System.getProperty('user.name') + String messageSelector = "user='$username'" + File userHome = new File(System.getProperty('user.home')) + assert userHome.canRead() && userHome.canonicalPath.endsWith(username), "Directory $userHome is not readable or does not reconcile with $username" + log.info("Creating consumer for queue=${queue.queueName} with messageSelector=$messageSelector") + this.consumer = session.createConsumer(queue, messageSelector) + } else { + log.info("Creating consumer for queue=${queue.queueName}") + this.consumer = session.createConsumer(queue) + } + log.info "Connected to ActiveMQ $config.commandQueue @ $config.brokerURL" }