From 018ff3382dcd1af20f729f85dab5e3d82fd315ff Mon Sep 17 00:00:00 2001 From: Pavel Bortnik Date: Thu, 16 Jan 2025 13:15:01 +0300 Subject: [PATCH] EPMRPP-98822 || Change queues from classic to quorum. --- .../PatternAnalysisRabbitConfiguration.java | 4 +-- .../BackgroundProcessingConfiguration.java | 5 ++- .../configs/rabbit/InternalConfiguration.java | 32 +++++-------------- .../ReportingTopologyConfiguration.java | 6 ++-- 4 files changed, 17 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/epam/ta/reportportal/core/analyzer/config/PatternAnalysisRabbitConfiguration.java b/src/main/java/com/epam/ta/reportportal/core/analyzer/config/PatternAnalysisRabbitConfiguration.java index d4dbc28ca5..c0a5c2dcba 100644 --- a/src/main/java/com/epam/ta/reportportal/core/analyzer/config/PatternAnalysisRabbitConfiguration.java +++ b/src/main/java/com/epam/ta/reportportal/core/analyzer/config/PatternAnalysisRabbitConfiguration.java @@ -49,12 +49,12 @@ public Exchange patternAnalysisExchange() { @Bean public Queue patternAnalysisStringQueue() { - return QueueBuilder.durable(PATTERN_ANALYSIS_STRING).build(); + return QueueBuilder.durable(PATTERN_ANALYSIS_STRING).quorum().build(); } @Bean public Queue patternAnalysisRegexQueue() { - return QueueBuilder.durable(PATTERN_ANALYSIS_REGEX).build(); + return QueueBuilder.durable(PATTERN_ANALYSIS_REGEX).quorum().build(); } @Bean diff --git a/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/BackgroundProcessingConfiguration.java b/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/BackgroundProcessingConfiguration.java index 895f250f8b..d4af043581 100644 --- a/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/BackgroundProcessingConfiguration.java +++ b/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/BackgroundProcessingConfiguration.java @@ -5,12 +5,15 @@ import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @Configuration +@ConditionalOnProperty(prefix = "rp.searchengine", name = "host") @Conditional(Conditions.NotTestCondition.class) public class BackgroundProcessingConfiguration { @@ -20,7 +23,7 @@ public class BackgroundProcessingConfiguration { @Bean Queue logMessageSavingQueue() { - return new Queue(LOG_MESSAGE_SAVING_QUEUE_NAME); + return QueueBuilder.durable(LOG_MESSAGE_SAVING_QUEUE_NAME).quorum().build(); } @Bean diff --git a/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/InternalConfiguration.java b/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/InternalConfiguration.java index bbc8fcd843..f4f7028b48 100644 --- a/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/InternalConfiguration.java +++ b/src/main/java/com/epam/ta/reportportal/core/configs/rabbit/InternalConfiguration.java @@ -19,6 +19,7 @@ import com.epam.ta.reportportal.core.configs.Conditions; import com.epam.ta.reportportal.core.events.MessageBus; import com.epam.ta.reportportal.core.events.MessageBusImpl; +import java.util.Map; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AnonymousQueue; import org.springframework.amqp.core.Base64UrlNamingStrategy; @@ -27,6 +28,7 @@ import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -44,7 +46,6 @@ public class InternalConfiguration { /** * Exchanges */ - public static final String EXCHANGE_EVENTS = "broadcast.events"; public static final String EXCHANGE_ACTIVITY = "activity"; public static final String EXCHANGE_ATTACHMENT = "attachment"; public static final String EXCHANGE_NOTIFICATION = "notification"; @@ -52,7 +53,6 @@ public class InternalConfiguration { /** * Queues */ - public static final String KEY_EVENTS = "broadcast.events"; public static final String QUEUE_ACTIVITY = "activity"; public static final String QUEUE_ACTIVITY_KEY = "activity.#"; public static final String QUEUE_ATTACHMENT_DELETE = "attachment.delete"; @@ -68,11 +68,6 @@ public MessageBus messageBus( // Exchanges definition - @Bean - public FanoutExchange eventsExchange() { - return new FanoutExchange(EXCHANGE_EVENTS, false, false); - } - @Bean public TopicExchange activityExchange() { return new TopicExchange(EXCHANGE_ACTIVITY, true, false); @@ -89,39 +84,27 @@ public DirectExchange notificationExchange() { } // Queues definition - - @Bean - public Queue eventsQueue() { - return new AnonymousQueue(new Base64UrlNamingStrategy(KEY_EVENTS + ".")); - } - @Bean public Queue activityQueue() { - return new Queue(QUEUE_ACTIVITY); + return QueueBuilder.durable(QUEUE_ACTIVITY).quorum().build(); } @Bean public Queue deleteAttachmentQueue() { - return new Queue(QUEUE_ATTACHMENT_DELETE); + return QueueBuilder.durable(QUEUE_ATTACHMENT_DELETE).quorum().build(); } @Bean public Queue queryQueue() { - return new Queue(QUEUE_QUERY_RQ); + return QueueBuilder.durable(QUEUE_QUERY_RQ).quorum().build(); } @Bean public Queue emailNotificationQueue() { - return new Queue(QUEUE_EMAIL); + return QueueBuilder.durable(QUEUE_EMAIL).quorum().build(); } // Bindings - - @Bean - public Binding eventsQueueBinding() { - return BindingBuilder.bind(eventsQueue()).to(eventsExchange()); - } - @Bean public Binding eventsActivityBinding() { return BindingBuilder.bind(activityQueue()).to(activityExchange()).with(QUEUE_ACTIVITY_KEY); @@ -135,6 +118,7 @@ public Binding attachmentDeleteBinding() { @Bean public Binding emailNotificationBinding() { - return BindingBuilder.bind(emailNotificationQueue()).to(notificationExchange()).with(QUEUE_EMAIL); + return BindingBuilder.bind(emailNotificationQueue()).to(notificationExchange()) + .with(QUEUE_EMAIL); } } diff --git a/src/main/java/com/epam/ta/reportportal/reporting/async/config/ReportingTopologyConfiguration.java b/src/main/java/com/epam/ta/reportportal/reporting/async/config/ReportingTopologyConfiguration.java index fc6a27401a..5414fc5b94 100644 --- a/src/main/java/com/epam/ta/reportportal/reporting/async/config/ReportingTopologyConfiguration.java +++ b/src/main/java/com/epam/ta/reportportal/reporting/async/config/ReportingTopologyConfiguration.java @@ -113,7 +113,7 @@ DirectExchange retryExchange() { @Bean Queue ttlQueue() { - return QueueBuilder.durable(TTL_QUEUE).deadLetterExchange(REPORTING_EXCHANGE) + return QueueBuilder.durable(TTL_QUEUE).quorum().deadLetterExchange(REPORTING_EXCHANGE) .deadLetterRoutingKey(DEFAULT_CONSISTENT_HASH_ROUTING_KEY) .build(); } @@ -125,7 +125,7 @@ Binding ttlQueueBinding() { @Bean public Queue reportingParkingLot() { - return QueueBuilder.durable(REPORTING_PARKING_LOT) + return QueueBuilder.durable(REPORTING_PARKING_LOT).quorum() .ttl((int) TimeUnit.DAYS.toMillis(parkingLotTtl)) .build(); } @@ -140,7 +140,7 @@ private Binding buildQueueBinding(Queue queue) { } private Queue buildQueue(String queueName) { - Queue queue = QueueBuilder.durable(queueName).build(); + Queue queue = QueueBuilder.durable(queueName).quorum().build(); queue.setShouldDeclare(true); queue.setAdminsThatShouldDeclare(amqpAdmin); amqpAdmin.declareQueue(queue);