Skip to content

Commit

Permalink
Merge pull request #2146 from reportportal/EPMRPP-98822
Browse files Browse the repository at this point in the history
EPMRPP-98822 || Change queues from classic to quorum.
  • Loading branch information
pbortnik authored Jan 22, 2025
2 parents b6223c1 + 37e20bb commit 5c5eea5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public Exchange patternAnalysisExchange() {

@Bean
public Queue patternAnalysisStringQueue() {
return QueueBuilder.durable(PATTERN_ANALYSIS_STRING).build();
return QueueBuilder.durable(PATTERN_ANALYSIS_STRING).quorum().build();
}

@Bean
public Queue patternAnalysisRegexQueue() {
return QueueBuilder.durable(PATTERN_ANALYSIS_REGEX).build();
return QueueBuilder.durable(PATTERN_ANALYSIS_REGEX).quorum().build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(prefix = "rp.searchengine", name = "host")
@Conditional(Conditions.NotTestCondition.class)
public class BackgroundProcessingConfiguration {

Expand All @@ -20,7 +23,7 @@ public class BackgroundProcessingConfiguration {

@Bean
Queue logMessageSavingQueue() {
return new Queue(LOG_MESSAGE_SAVING_QUEUE_NAME);
return QueueBuilder.durable(LOG_MESSAGE_SAVING_QUEUE_NAME).quorum().build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.epam.ta.reportportal.core.configs.Conditions;
import com.epam.ta.reportportal.core.events.MessageBus;
import com.epam.ta.reportportal.core.events.MessageBusImpl;
import java.util.Map;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
Expand All @@ -27,6 +28,7 @@
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -44,15 +46,13 @@ public class InternalConfiguration {
/**
* Exchanges
*/
public static final String EXCHANGE_EVENTS = "broadcast.events";
public static final String EXCHANGE_ACTIVITY = "activity";
public static final String EXCHANGE_ATTACHMENT = "attachment";
public static final String EXCHANGE_NOTIFICATION = "notification";

/**
* Queues
*/
public static final String KEY_EVENTS = "broadcast.events";
public static final String QUEUE_ACTIVITY = "activity";
public static final String QUEUE_ACTIVITY_KEY = "activity.#";
public static final String QUEUE_ATTACHMENT_DELETE = "attachment.delete";
Expand All @@ -68,11 +68,6 @@ public MessageBus messageBus(

// Exchanges definition

@Bean
public FanoutExchange eventsExchange() {
return new FanoutExchange(EXCHANGE_EVENTS, false, false);
}

@Bean
public TopicExchange activityExchange() {
return new TopicExchange(EXCHANGE_ACTIVITY, true, false);
Expand All @@ -89,39 +84,27 @@ public DirectExchange notificationExchange() {
}

// Queues definition

@Bean
public Queue eventsQueue() {
return new AnonymousQueue(new Base64UrlNamingStrategy(KEY_EVENTS + "."));
}

@Bean
public Queue activityQueue() {
return new Queue(QUEUE_ACTIVITY);
return QueueBuilder.durable(QUEUE_ACTIVITY).quorum().build();
}

@Bean
public Queue deleteAttachmentQueue() {
return new Queue(QUEUE_ATTACHMENT_DELETE);
return QueueBuilder.durable(QUEUE_ATTACHMENT_DELETE).quorum().build();
}

@Bean
public Queue queryQueue() {
return new Queue(QUEUE_QUERY_RQ);
return QueueBuilder.durable(QUEUE_QUERY_RQ).quorum().build();
}

@Bean
public Queue emailNotificationQueue() {
return new Queue(QUEUE_EMAIL);
return QueueBuilder.durable(QUEUE_EMAIL).quorum().build();
}

// Bindings

@Bean
public Binding eventsQueueBinding() {
return BindingBuilder.bind(eventsQueue()).to(eventsExchange());
}

@Bean
public Binding eventsActivityBinding() {
return BindingBuilder.bind(activityQueue()).to(activityExchange()).with(QUEUE_ACTIVITY_KEY);
Expand All @@ -135,6 +118,7 @@ public Binding attachmentDeleteBinding() {

@Bean
public Binding emailNotificationBinding() {
return BindingBuilder.bind(emailNotificationQueue()).to(notificationExchange()).with(QUEUE_EMAIL);
return BindingBuilder.bind(emailNotificationQueue()).to(notificationExchange())
.with(QUEUE_EMAIL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ DirectExchange retryExchange() {

@Bean
Queue ttlQueue() {
return QueueBuilder.durable(TTL_QUEUE).deadLetterExchange(REPORTING_EXCHANGE)
return QueueBuilder.durable(TTL_QUEUE).quorum().deadLetterExchange(REPORTING_EXCHANGE)
.deadLetterRoutingKey(DEFAULT_CONSISTENT_HASH_ROUTING_KEY)
.build();
}
Expand All @@ -125,7 +125,7 @@ Binding ttlQueueBinding() {

@Bean
public Queue reportingParkingLot() {
return QueueBuilder.durable(REPORTING_PARKING_LOT)
return QueueBuilder.durable(REPORTING_PARKING_LOT).quorum()
.ttl((int) TimeUnit.DAYS.toMillis(parkingLotTtl))
.build();
}
Expand All @@ -140,7 +140,7 @@ private Binding buildQueueBinding(Queue queue) {
}

private Queue buildQueue(String queueName) {
Queue queue = QueueBuilder.durable(queueName).build();
Queue queue = QueueBuilder.durable(queueName).quorum().build();
queue.setShouldDeclare(true);
queue.setAdminsThatShouldDeclare(amqpAdmin);
amqpAdmin.declareQueue(queue);
Expand Down

0 comments on commit 5c5eea5

Please sign in to comment.