From 1cb7e7c12571680f15b4713d550179fb54be1abf Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Mon, 13 Jul 2020 23:59:02 +0530 Subject: [PATCH] Async profiler and JDK dynamic proxy --- build.gradle | 2 +- .../rqueue/example/RQueueApplication.java | 4 ++++ .../spring/boot/RqueueMetricsAutoConfig.java | 8 ++++--- .../rqueue/spring/RqueueListenerConfig.java | 8 ++++--- .../sonus21/rqueue/core/MessageScheduler.java | 2 +- .../rqueue/listener/RqueueExecutor.java | 11 +++++---- .../RqueueMessageListenerContainer.java | 8 +++---- .../sonus21/rqueue/metrics/RqueueCounter.java | 4 +++- .../sonus21/rqueue/metrics/RqueueMetrics.java | 8 +++---- .../rqueue/metrics/RqueueMetricsCounter.java | 23 ++++++++++++++++++ .../rqueue/metrics/RqueueMetricsRegistry.java | 24 +++++++++++++++++++ .../service/RqueueSystemManagerService.java | 4 +++- .../impl/RqueueSystemManagerServiceImpl.java | 4 +--- 13 files changed, 84 insertions(+), 26 deletions(-) create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsRegistry.java diff --git a/build.gradle b/build.gradle index 73dbc725..829ed493 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ ext { subprojects { group = 'com.github.sonus21' - version = '2.0.1-RELEASE' + version = '2.0.2-RELEASE' dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/RQueueApplication.java b/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/RQueueApplication.java index 00b7169e..da665c80 100644 --- a/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/RQueueApplication.java +++ b/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/RQueueApplication.java @@ -23,11 +23,15 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.repository.configuration.EnableRedisRepositories; +import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication @EnableRedisRepositories +@EnableAsync +@EnableCaching public class RQueueApplication { @Value("${workers.count:3}") private int workersCount; diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java index 514e97c1..7fddb09c 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueMetricsAutoConfig.java @@ -20,6 +20,8 @@ import com.github.sonus21.rqueue.metrics.QueueCounter; import com.github.sonus21.rqueue.metrics.RqueueCounter; import com.github.sonus21.rqueue.metrics.RqueueMetrics; +import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter; +import com.github.sonus21.rqueue.metrics.RqueueMetricsRegistry; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import java.lang.reflect.Method; @@ -41,7 +43,7 @@ @Import(RqueueMetricsProperties.class) public class RqueueMetricsAutoConfig { @Bean - public RqueueMetrics rqueueMetrics( + public RqueueMetricsRegistry rqueueMetricsRegistry( MetricsProperties metricsProperties, @Qualifier("stringRqueueRedisTemplate") RqueueRedisTemplate rqueueRedisTemplate, RqueueMetricsProperties rqueueMetricsProperties) { @@ -68,7 +70,7 @@ private Map getTags(MetricsProperties metricsProperties) { } @Bean - public RqueueCounter rqueueCounter(RqueueMetrics rqueueMetrics) { - return new RqueueCounter(rqueueMetrics.getQueueCounter()); + public RqueueMetricsCounter rqueueMetricsCounter(RqueueMetricsRegistry rqueueMetricsRegistry) { + return new RqueueCounter(rqueueMetricsRegistry.getQueueCounter()); } } diff --git a/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java b/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java index dd8debcf..11b260aa 100644 --- a/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java +++ b/rqueue-spring/src/main/java/com/github/sonus21/rqueue/spring/RqueueListenerConfig.java @@ -27,6 +27,8 @@ import com.github.sonus21.rqueue.metrics.QueueCounter; import com.github.sonus21.rqueue.metrics.RqueueCounter; import com.github.sonus21.rqueue.metrics.RqueueMetrics; +import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter; +import com.github.sonus21.rqueue.metrics.RqueueMetricsRegistry; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -76,7 +78,7 @@ public RqueueMessageSender rqueueMessageSender(RqueueMessageTemplate rqueueMessa @Bean @Conditional(MetricsEnabled.class) @DependsOn({"meterRegistry", "rqueueMetricsProperties"}) - public RqueueMetrics rqueueMetrics( + public RqueueMetricsRegistry rqueueMetricsRegistry( @Qualifier("stringRqueueRedisTemplate") RqueueRedisTemplate rqueueRedisTemplate) { QueueCounter queueCounter = new QueueCounter(); return new RqueueMetrics(rqueueRedisTemplate, queueCounter); @@ -84,7 +86,7 @@ public RqueueMetrics rqueueMetrics( @Bean @Conditional(MetricsEnabled.class) - public RqueueCounter rqueueCounter(RqueueMetrics rqueueMetrics) { - return new RqueueCounter(rqueueMetrics.getQueueCounter()); + public RqueueMetricsCounter rqueueMetricsCounter(RqueueMetricsRegistry rqueueMetricsRegistry) { + return new RqueueCounter(rqueueMetricsRegistry.getQueueCounter()); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java index 7fed9ee8..58c91c6c 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java @@ -51,7 +51,7 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -abstract class MessageScheduler +public abstract class MessageScheduler implements DisposableBean, ApplicationListener { @Autowired protected RqueueSchedulerConfig rqueueSchedulerConfig; private RedisScript redisScript; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index 1f9313a1..20701102 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -22,7 +22,7 @@ import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.support.MessageProcessor; import com.github.sonus21.rqueue.exception.UnknownSwitchCase; -import com.github.sonus21.rqueue.metrics.RqueueCounter; +import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.db.TaskStatus; import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent; @@ -122,14 +122,15 @@ private void callMessageProcessor(TaskStatus status, RqueueMessage rqueueMessage } private void updateCounter(boolean fail) { - RqueueCounter rqueueCounter = Objects.requireNonNull(container.get()).getRqueueCounter(); - if (rqueueCounter == null) { + RqueueMetricsCounter counter = + Objects.requireNonNull(container.get()).getRqueueMetricsCounter(); + if (counter == null) { return; } if (fail) { - rqueueCounter.updateFailureCount(queueDetail.getName()); + counter.updateFailureCount(queueDetail.getName()); } else { - rqueueCounter.updateExecutionCount(queueDetail.getName()); + counter.updateExecutionCount(queueDetail.getName()); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 7d657510..c110344f 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -26,7 +26,7 @@ import com.github.sonus21.rqueue.core.QueueRegistry; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.core.support.MessageProcessor; -import com.github.sonus21.rqueue.metrics.RqueueCounter; +import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter; import com.github.sonus21.rqueue.models.Concurrency; import com.github.sonus21.rqueue.models.enums.PriorityMode; import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; @@ -87,7 +87,7 @@ public class RqueueMessageListenerContainer @Autowired private RqueueConfig rqueueConfig; @Autowired(required = false) - private RqueueCounter rqueueCounter; + private RqueueMetricsCounter rqueueMetricsCounter; @Autowired private RqueueMessageMetadataService rqueueMessageMetadataService; private AsyncTaskExecutor taskExecutor; @@ -594,8 +594,8 @@ public void setPriorityMode(PriorityMode priorityMode) { this.priorityMode = priorityMode; } - RqueueCounter getRqueueCounter() { - return rqueueCounter; + RqueueMetricsCounter getRqueueMetricsCounter() { + return rqueueMetricsCounter; } RqueueWebConfig getRqueueWebConfig() { diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java index ef318676..f6d73cbc 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueCounter.java @@ -21,17 +21,19 @@ * many messages have been processed and how many of them have been failed. In the case of failure * count increases. */ -public class RqueueCounter { +public class RqueueCounter implements RqueueMetricsCounter { private final QueueCounter queueCounter; public RqueueCounter(QueueCounter queueCounter) { this.queueCounter = queueCounter; } + @Override public void updateFailureCount(String queueName) { queueCounter.updateFailureCount(queueName); } + @Override public void updateExecutionCount(String queueName) { queueCounter.updateExecutionCount(queueName); } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java index 0e1e2949..d27da8ca 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java @@ -26,7 +26,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; /** @@ -35,16 +34,16 @@ * queue messages can be in delayed queue because time has not reached. Some messages can be in dead * letter queue if dead letter queue is configured. */ -public class RqueueMetrics implements ApplicationListener { +public class RqueueMetrics implements RqueueMetricsRegistry { static final String QUEUE_KEY = "key"; private static final String QUEUE_SIZE = "queue.size"; private static final String DELAYED_QUEUE_SIZE = "delayed.queue.size"; private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size"; private static final String DEAD_LETTER_QUEUE_SIZE = "dead.letter.queue.size"; - private RqueueRedisTemplate rqueueMessageTemplate; + private final RqueueRedisTemplate rqueueMessageTemplate; + private final QueueCounter queueCounter; @Autowired private MetricsProperties metricsProperties; @Autowired private MeterRegistry meterRegistry; - private QueueCounter queueCounter; public RqueueMetrics( RqueueRedisTemplate rqueueMessageTemplate, QueueCounter queueCounter) { @@ -107,6 +106,7 @@ public void onApplicationEvent(RqueueBootstrapEvent event) { } } + @Override public QueueCounter getQueueCounter() { return this.queueCounter; } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java new file mode 100644 index 00000000..b1b5e9d5 --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsCounter.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.metrics; + +public interface RqueueMetricsCounter { + void updateFailureCount(String queueName); + + void updateExecutionCount(String queueName); +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsRegistry.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsRegistry.java new file mode 100644 index 00000000..aa000e16 --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsRegistry.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.metrics; + +import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; +import org.springframework.context.ApplicationListener; + +public interface RqueueMetricsRegistry extends ApplicationListener { + QueueCounter getQueueCounter(); +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerService.java b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerService.java index 189b90db..2b7c4c7a 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerService.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerService.java @@ -17,11 +17,13 @@ package com.github.sonus21.rqueue.web.service; import com.github.sonus21.rqueue.models.db.QueueConfig; +import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.models.response.BaseResponse; import java.util.Collection; import java.util.List; +import org.springframework.context.ApplicationListener; -public interface RqueueSystemManagerService { +public interface RqueueSystemManagerService extends ApplicationListener { BaseResponse deleteQueue(String queueName); List getQueues(); diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java index ca27a7c7..f980e027 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java @@ -37,14 +37,12 @@ import java.util.Set; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @Service -public class RqueueSystemManagerServiceImpl - implements RqueueSystemManagerService, ApplicationListener { +public class RqueueSystemManagerServiceImpl implements RqueueSystemManagerService { private final RqueueConfig rqueueConfig; private final RqueueRedisTemplate stringRqueueRedisTemplate; private final RqueueSystemConfigDao rqueueSystemConfigDao;