diff --git a/pom.xml b/pom.xml index ea1f061..250a13e 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ edu.stanford.protege webprotege-ipc - 0.10.3-SNAPSHOT + 0.10.4 webprotege-ipc Inter Process Communication framework @@ -57,7 +57,7 @@ edu.stanford.protege webprotege-common - 0.9.5 + 0.9.6-SNAPSHOT diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java index 31e97a8..c412b4a 100644 --- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java @@ -8,6 +8,7 @@ import edu.stanford.protege.webprotege.ipc.EventDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -49,9 +50,9 @@ public void dispatchEvent(Event event) { var projectId = ((ProjectEvent) event).projectId().value(); message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId); } - eventRabbitTemplate.send(message); - logger.info("Sent event message"); - } catch (JsonProcessingException e) { + eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message); + logger.info("Sent event message!"); + } catch (JsonProcessingException | AmqpException e) { logger.info("Could not serialize event: {}", e.getMessage(), e); } diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventHandlerWrapper.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventHandlerWrapper.java index 79d5749..51b527c 100644 --- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventHandlerWrapper.java +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventHandlerWrapper.java @@ -32,7 +32,7 @@ public void onMessage(Message message) { EventHandler eventHandler = eventHandlers.stream() .filter(handler -> { String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL)); - return handler.getChannelName().equals(channel); + return channel.contains(handler.getChannelName()); }).findFirst() .orElse(null); if(eventHandler != null) { diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventsConfiguration.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventsConfiguration.java new file mode 100644 index 0000000..bb2c9f2 --- /dev/null +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventsConfiguration.java @@ -0,0 +1,85 @@ +package edu.stanford.protege.webprotege.ipc.impl; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.stanford.protege.webprotege.common.Event; +import edu.stanford.protege.webprotege.ipc.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; + +@Configuration +public class RabbitMQEventsConfiguration { + private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventsConfiguration.class); + + @Value("${webprotege.rabbitmq.timeout}") + public Long rabbitMqTimeout; + + @Value("${webprotege.rabbitmq.eventsqueue:EVENTS_QUEUE}") + public String eventsQueue; + + + public static final String EVENT_EXCHANGE = "webprotege-event-exchange"; + + + @Autowired(required = false) + private List> eventHandlers = new ArrayList<>(); + + @Autowired + private ObjectMapper objectMapper; + + @Bean + public Queue eventsQueue() { + return new Queue(eventsQueue, true); + } + + + @Bean + FanoutExchange eventExchange() { + return new FanoutExchange(EVENT_EXCHANGE, true, false); + } + + + @Bean(name = "eventRabbitTemplate") + public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setReplyTimeout(rabbitMqTimeout); + rabbitTemplate.setExchange(EVENT_EXCHANGE); + return rabbitTemplate; + } + + @Bean + @ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe") + public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory) { + logger.info("[RabbitMQEventConfiguration] Listening to event queue {}", eventsQueue); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setQueueNames(eventsQueue); + container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper)); + return container; + } + + @Bean + @ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe") + public Binding binding(Queue eventsQueue, FanoutExchange fanoutExchange) { + logger.info("[RabbitMQEventConfiguration] Binding to event queue {}", eventsQueue); + + return BindingBuilder.bind(eventsQueue).to(fanoutExchange); + } + + +} diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java index 383d97c..f425d9f 100644 --- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java @@ -5,13 +5,11 @@ import com.rabbitmq.client.Channel; import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest; import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse; -import edu.stanford.protege.webprotege.common.Event; import edu.stanford.protege.webprotege.common.Request; import edu.stanford.protege.webprotege.common.Response; import edu.stanford.protege.webprotege.ipc.CommandExecutor; -import org.springframework.context.ApplicationContext; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import edu.stanford.protege.webprotege.ipc.CommandHandler; -import edu.stanford.protege.webprotege.ipc.EventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; @@ -33,6 +31,7 @@ import java.util.concurrent.TimeoutException; @Configuration +@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public class RabbitMqConfiguration { private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfiguration.class); @@ -50,56 +49,41 @@ public class RabbitMqConfiguration { public static final String COMMANDS_EXCHANGE = "webprotege-exchange"; - public static final String EVENT_EXCHANGE = "webprotege-event-exchange"; - - public static final String EVENT_QUEUE = "webprotege-event-queue"; @Autowired private ConnectionFactory connectionFactory; - @Autowired(required = false) - private List> eventHandlers = new ArrayList<>(); - + @Autowired + private ObjectMapper objectMapper; @Autowired(required = false) private List> handlers = new ArrayList<>(); - - @Autowired - private ApplicationContext applicationContext; - - @Autowired - private ObjectMapper objectMapper; - @Autowired private CommandExecutor authorizationStatusExecutor; @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) Queue msgQueue() { return new Queue(getCommandQueue(), true); } @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) Queue replyQueue() { return new Queue(getCommandResponseQueue(), true); } @Bean - public Queue eventsQueue() { - return new Queue(EVENT_QUEUE, true); - } - - @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) DirectExchange exchange() { return new DirectExchange(COMMANDS_EXCHANGE, true, false); } - @Bean - FanoutExchange eventExchange() { - return new FanoutExchange(EVENT_EXCHANGE, true, false); - } + @Bean(name = "rabbitTemplate") + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setReplyTimeout(rabbitMqTimeout); @@ -108,29 +92,15 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { } @Bean(name = "asyncRabbitTemplate") + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) { return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue()); } - @Bean(name = "eventRabbitTemplate") - public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) { - RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); - rabbitTemplate.setReplyTimeout(rabbitMqTimeout); - rabbitTemplate.setExchange(EVENT_EXCHANGE); - rabbitTemplate.setUseDirectReplyToContainer(false); - return rabbitTemplate; - } - @Bean - public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory, Queue eventsQueue, @Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) { - SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); - container.setConnectionFactory(connectionFactory); - container.setQueueNames(EVENT_QUEUE); - container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper)); - return container; - } @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory, Queue replyQueue) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); @@ -139,11 +109,13 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c } @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public RabbitMqCommandHandlerWrapper rabbitMqCommandHandlerWrapper(){ return new RabbitMqCommandHandlerWrapper<>(handlers, objectMapper, authorizationStatusExecutor); } @Bean + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public SimpleMessageListenerContainer messageListenerContainers() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setQueueNames(getCommandQueue()); @@ -152,29 +124,8 @@ public SimpleMessageListenerContainer messageListenerContainers() { return container; } - @Bean - public List eventsBindings(FanoutExchange fanoutExchange, Queue eventsQueue) { - var response = new ArrayList(); - - try (Connection connection = connectionFactory.createConnection(); - Channel channel = connection.createChannel(true)) { - channel.exchangeDeclare(EVENT_EXCHANGE, "fanout", true); - channel.queueDeclare(EVENT_QUEUE, true, false, false, null); - - for(EventHandler eventHandler : eventHandlers) { - channel.queueBind(eventsQueue.getName(), fanoutExchange.getName(), eventHandler.getChannelName()); - response.add(BindingBuilder.bind(eventsQueue).to(fanoutExchange)); - } - } catch (IOException | TimeoutException e) { - logger.error("Error initialize bindings", e); - throw new RuntimeException("Error initialize bindings", e); - } - - - return response; - } - @PostConstruct + @ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true) public void createBindings() { try (Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(true)) { diff --git a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutionException_Tests.java b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutionException_Tests.java index 9a2b425..0f1dea2 100644 --- a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutionException_Tests.java +++ b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutionException_Tests.java @@ -7,6 +7,7 @@ import org.springframework.boot.test.json.JacksonTester; import org.springframework.http.HttpStatus; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import java.io.IOException; @@ -20,6 +21,7 @@ @SpringBootTest @AutoConfigureJsonTesters @ContextConfiguration(classes = WebProtegeIpcApplication.class) +@TestPropertySource(properties = "webprotege.rabbitmq.commands-subscribe=false") public class CommandExecutionException_Tests extends IntegrationTestsExtension { @Autowired diff --git a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase.java b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase.java index 56bae1a..263029b 100644 --- a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase.java +++ b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase.java @@ -31,7 +31,6 @@ * 2022-02-09 */ @SpringBootTest -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase extends IntegrationTestsExtension { @Autowired diff --git a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_Test.java b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_Test.java index 255178c..2797062 100644 --- a/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_Test.java +++ b/src/test/java/edu/stanford/protege/webprotege/ipc/CommandExecutor_CommandHandler_Test.java @@ -31,7 +31,6 @@ * 2021-08-03 */ @SpringBootTest -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class CommandExecutor_CommandHandler_Test extends IntegrationTestsExtension { @Autowired diff --git a/src/test/java/edu/stanford/protege/webprotege/ipc/EventHandler_TestCase.java b/src/test/java/edu/stanford/protege/webprotege/ipc/EventHandler_TestCase.java index 4c8671d..9025fa0 100644 --- a/src/test/java/edu/stanford/protege/webprotege/ipc/EventHandler_TestCase.java +++ b/src/test/java/edu/stanford/protege/webprotege/ipc/EventHandler_TestCase.java @@ -10,6 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.TestPropertySource; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -25,6 +26,7 @@ * 2022-02-08 */ @SpringBootTest(classes = WebProtegeIpcApplication.class) +@TestPropertySource(properties = "webprotege.rabbitmq.event-subscribe=true") public class EventHandler_TestCase extends IntegrationTestsExtension { public static CountDownLatch countDownLatch; diff --git a/src/test/java/edu/stanford/protege/webprotege/ipc/IntegrationTestsExtension.java b/src/test/java/edu/stanford/protege/webprotege/ipc/IntegrationTestsExtension.java index d4e5081..1e4f954 100644 --- a/src/test/java/edu/stanford/protege/webprotege/ipc/IntegrationTestsExtension.java +++ b/src/test/java/edu/stanford/protege/webprotege/ipc/IntegrationTestsExtension.java @@ -1,9 +1,12 @@ package edu.stanford.protege.webprotege.ipc; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.RabbitMQContainer; @@ -21,6 +24,7 @@ @SpringBootTest(properties = {"spring.mongodb.embedded.version=5.0.6"}) @AutoConfigureWebTestClient @Testcontainers +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) public class IntegrationTestsExtension { private static Logger logger = LoggerFactory.getLogger(IntegrationTestsExtension.class); @@ -36,4 +40,14 @@ static void configure(DynamicPropertyRegistry registry) { registry.add("spring.rabbitmq.host", rabbitContainer::getHost); registry.add("spring.rabbitmq.port", rabbitContainer::getAmqpPort); } + + @BeforeAll + public static void containerSetUp(){ + rabbitContainer.start(); + } + + @AfterAll + public static void containerTearDown(){ + rabbitContainer.stop(); + } }