Skip to content

Commit

Permalink
Version 2.1.0
Browse files Browse the repository at this point in the history
* Provide a way to enqueue unique messages #27
* Provide a way to add HTTP proxy for external api calls #34
* Provide a way to fetch messages based on the id and queue name
* Provide api to delete any enqueued messages #33
* Enqueue a list of Objects #37
* Do not delete queues if mode is PRODUCER #24
* Add csp meta data #38
  • Loading branch information
sonus21 committed Sep 16, 2020
1 parent 4c9c5c8 commit 6ba371f
Show file tree
Hide file tree
Showing 180 changed files with 4,703 additions and 1,671 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ env:
- ORG_GRADLE_PROJECT_sonatypeUsername=xxx
- ORG_GRADLE_PROJECT_sonatypePassword=xxx
- USER_NAME=rqueue
- REDIS_RUNNING=true
- CI_ENV=true

cache:
directories:
Expand All @@ -30,6 +32,8 @@ before_script:
- cd 9004 && redis-server ./redis.conf &
- cd 9005 && redis-server ./redis.conf &
- redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1 --cluster-yes
- df -h
- lscpu

jobs:
include:
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

**NOTE**: The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0] - 16-Sep-2020
### Added
* Allow application to provide message id while enqueuing messages
* Unique message enqueue
* Api to check if message was enqueued or not
* Api to delete single message
* Proxy for outbound http connection
* Enqueue list of objects and process them, like batch-processing

Fixes:
* Registered queues should not be deleted when used in producer mode

## [2.0.4] - 2-Aug-2020

Expand Down Expand Up @@ -92,3 +103,4 @@
[2.0.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.1-RELEASE
[2.0.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.2-RELEASE
[2.0.4]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.4-RELEASE
[2.1.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.1.0-RELEASE
38 changes: 20 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@

## Features

* A message can be delayed for an arbitrary period or delivered immediately.
* Multiple messages can be consumed in parallel by different workers.
* Message delivery: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* Support Redis cluster
* Queue metrics
* Different Redis connection for application and worker
* Web interface for queue management and queue statistics
* Automatic message serialization and deserialization
* Queue concurrency
* Group level queue priority(weighted and strict)
* Sub queue priority(weighted and strict)
* Task execution back off, exponential and fixed back off (default fixed back off)
* Callbacks for different actions
* Events 1. Bootstrap event 2. Task execution event.
* **Message Scheduling** : A message can be scheduled for any arbitrary period
* **Competing Consumers** multiple messages can be consumed in parallel by different workers.
* **Message delivery**: It's guaranteed that a message is consumed **at least once**. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
* **Redis cluster** : Redis cluster can be used with driver.
* **Metrics** : In flight messages, waiting for consumption and delayed messages
* **Web interface**: a web interface to manage a queue and queue insights including latency
* **Automatic message serialization and deserialization**
* **Concurrency**: Concurrency of any queue can be configured
* **Queue Priority** :
* Group level queue priority(weighted and strict)
* Sub queue priority(weighted and strict)
* **Execution Backoff** : Exponential and fixed back off (default fixed back off)
* **Callbacks** : Callbacks for dead letter queue, discard etc
* **Events** 1. Bootstrap event 2. Task execution event.
* **Unique message** : Unique message processing for a queue based on the message id
* **Redis connection**: A different redis setup can be used for Rqueue

## Getting Started

Expand All @@ -37,14 +39,14 @@
* Add dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.4-RELEASE'
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.1.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring-boot-starter</artifactId>
<version>2.0.4-RELEASE</version>
<version>2.1.0-RELEASE</version>
</dependency>
```

Expand All @@ -53,14 +55,14 @@
* Add Dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring:2.0.4-RELEASE'
implementation 'com.github.sonus21:rqueue-spring:2.1.0-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring</artifactId>
<version>2.0.4-RELEASE</version>
<version>2.1.0-RELEASE</version>
</dependency>
```

Expand Down
27 changes: 13 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ allprojects {
apply plugin: 'signing'
apply plugin: 'jacoco'
apply plugin: 'nebula.optional-base'

sourceCompatibility = 1.8
targetCompatibility = 1.8

Expand All @@ -36,10 +37,9 @@ ext {
sl4jVersion = '1.7.28'

// testing
junitVersion = '4.12'
mockitoVersion = '3.1.0'
jupiterVersion = '5.5.0'
mockitoVersion = '3.5.0'
hamcrestVersion = '2.2'
powerMockVersion = '2.0.5'
jacocoVersion = '0.8.4'
embeddedRedisVersion = '0.7.2'
h2Version = '1.4.194'
Expand Down Expand Up @@ -67,7 +67,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.0.4-RELEASE'
version = '2.1.0-RELEASE'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand All @@ -84,22 +84,21 @@ subprojects {
testCompile group: 'ch.qos.logback', name: 'logback-core', version: "${logbackVersion}"
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
testCompile group: 'junit', name: 'junit', version: "${junitVersion}"
// https://mvnrepository.com/artifact/org.mockito/mockito-core
testCompile group: 'org.mockito', name: 'mockito-core', version: "${mockitoVersion}"
// https://mvnrepository.com/artifact/org.hamcrest/hamcrest-library
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrestVersion}"
testRuntime group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: "${jupiterVersion}"
testCompile group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: "${jupiterVersion}"
// https://mvnrepository.com/artifact/org.mockito/mockito-inline
testCompile group: 'org.mockito', name: 'mockito-inline', version: "${mockitoVersion}"
testCompile group: 'org.mockito', name: 'mockito-junit-jupiter', version: "${mockitoVersion}"
// https://mvnrepository.com/artifact/org.hamcrest/hamcrest
testCompile group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrestVersion}"
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
testCompile group: 'org.apache.commons', name: 'commons-lang3', version: "${lang3Version}"
testCompile("org.springframework.boot:spring-boot-test:${springBootVersion}")
// https://mvnrepository.com/artifact/org.powermock/powermock-module-junit4
testCompile group: 'org.powermock', name: 'powermock-module-junit4', version: "${powerMockVersion}"

// https://mvnrepository.com/artifact/org.powermock/powermock-api-mockito2
testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: "${powerMockVersion}"
testCompile group: 'org.junit-pioneer', name: 'junit-pioneer', version: '0.9.0'

configurations {
all*.exclude module: 'spring-boot-starter-logging'
all*.exclude module: 'junit'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rqueue-common-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
// https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api
compile group: 'javax.annotation', name: 'javax.annotation-api', version: "${javaxAnnotationVersion}"

compile group: 'junit', name: 'junit', version: "${junitVersion}"
compile group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: "${jupiterVersion}"

compile group: 'org.apache.commons', name: 'commons-lang3', version: "${lang3Version}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.github.sonus21.rqueue.test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.sonus21.rqueue.annotation.RqueueListener;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
Expand All @@ -29,6 +30,8 @@
import com.github.sonus21.rqueue.test.dto.Sms;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import java.util.List;
import java.util.UUID;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -163,4 +166,13 @@ public void onMessageReservationRequestDeadLetterQueue(ReservationRequest reques
log.info("ReservationRequest Dead Letter Queue{}", request);
consumedMessageService.save(request, "reservation-request-dlq");
}

@RqueueListener(value = "${list.email.queue.name}", active = "${list.email.queue.enabled}")
public void onMessageEmailList(List<Email> emailList) throws JsonProcessingException {
log.info("onMessageEmailList {}", emailList);
String consumedId = UUID.randomUUID().toString();
for (Email email : emailList) {
consumedMessageService.save(email, consumedId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.application;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.sql.DataSource;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import redis.embedded.RedisServer;

public abstract class ApplicationBasicConfiguration {
private static final Logger monitorLogger = LoggerFactory.getLogger("monitor");
protected RedisServer redisServer;
protected ExecutorService executorService;
protected List<RProcess> processes;
@Value("${mysql.db.name}")
protected String dbName;
@Value("${spring.redis.port}")
protected int redisPort;
@Value("${spring.redis.host}")
protected String redisHost;
@Value("${use.system.redis:false}")
protected boolean useSystemRedis;
@Value("${monitor.thread.count:0}")
protected int monitorThreads;

protected void init() {
if (monitorThreads > 0) {
executorService = Executors.newFixedThreadPool(monitorThreads);
processes = new ArrayList<>();
}
if (useSystemRedis) {
return;
}
if (redisServer == null) {
redisServer = new RedisServer(redisPort);
redisServer.start();
}
}

protected void destroy() {
if (redisServer != null) {
redisServer.stop();
}

if (processes != null) {
for (RProcess rProcess : processes) {
rProcess.process.destroy();
monitorLogger.info("RedisNode {} ", rProcess.redisNode);
for (String line : rProcess.out) {
monitorLogger.info("{}", line);
}
}
}
if (executorService != null) {
executorService.shutdown();
}
}

protected void monitor(String host, int port) {
executorService.submit(
() -> {
try {
Process process =
Runtime.getRuntime()
.exec("redis-cli " + " -h " + host + " -p " + port + " monitor");
List<String> lines = new LinkedList<>();
RProcess rProcess = new RProcess(process, new RedisNode(host, port), lines);
processes.add(rProcess);
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
String s;
while ((s = br.readLine()) != null) {
lines.add(s);
}
process.waitFor();
} catch (Exception e) {
monitorLogger.error("Process call failed", e);
}
});
}

@Bean
public DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
return builder.setType(EmbeddedDatabaseType.H2).setName(dbName).build();
}

@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource) {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setGenerateDdl(true);
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setJpaVendorAdapter(vendorAdapter);
factory.setPackagesToScan("com.github.sonus21.rqueue.test.entity");
factory.setDataSource(dataSource);
return factory;
}

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}

@AllArgsConstructor
public static class RProcess {
Process process;
RedisNode redisNode;
List<String> out;
}
}
Loading

0 comments on commit 6ba371f

Please sign in to comment.