Skip to content

Commit

Permalink
Remove UnicastProcessor from reply router when timeout triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
dbuos committed Dec 12, 2020
1 parent 8dbf4c7 commit deca841
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static java.lang.Boolean.TRUE;
import static org.reactivecommons.async.impl.Headers.*;
Expand Down Expand Up @@ -62,6 +63,7 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class

final Mono<R> replyHolder = router.register(correlationID)
.timeout(replyTimeout)
.doOnError(TimeoutException.class, e -> router.deregister(correlationID))
.flatMap(s -> fromCallable(() -> converter.readValue(s, type)));

Map<String, Object> headers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ public void routeReply(String correlationID, Message data) {
}
}

public <E> void routeError(String correlationID, String data) {
final UnicastProcessor<Message> processor = processors.remove(correlationID);
if (processor != null) {
processor.onError(new RuntimeException(data));
}
public void deregister(String correlationID){
processors.remove(correlationID);
}

public void routeEmpty(String correlationID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -58,6 +60,25 @@ public void init(ReactiveMessageSender sender) {
asyncGateway = new RabbitDirectAsyncGateway(config, router, sender, "exchange", converter);
}

@Test
public void shouldReleaseRouterResourcesOnTimeout(){
BrokerConfig config = new BrokerConfig(false, false, false, Duration.ofSeconds(1));
asyncGateway = new RabbitDirectAsyncGateway(config, router, senderMock, "ex", converter);
when(router.register(anyString())).thenReturn(Mono.never());
when(senderMock.sendNoConfirm(any(), anyString(), anyString(), anyMap(), anyBoolean()))
.thenReturn(Mono.empty());

AsyncQuery<String> query = new AsyncQuery<>("some.query", "data");
asyncGateway.requestReply(query, "some.target", String.class)
.as(StepVerifier::create)
.expectError(TimeoutException.class)
.verify();

ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(router).register(captor.capture());
verify(router).deregister(captor.getValue());
}

@Test
public void shouldSendInOptimalTime() throws InterruptedException {
init(getReactiveMessageSender());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.reactivecommons.async.impl.reply;

import org.junit.Test;
import org.mockito.Mockito;
import org.reactivecommons.async.impl.communications.Message;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.UUID;

public class ReactiveReplyRouterTest {

private ReactiveReplyRouter replyRouter = new ReactiveReplyRouter();

@Test
public void shouldRouteReply(){
final String uuid = UUID.randomUUID().toString();
final Mono<Message> registered = replyRouter.register(uuid);

Message message = Mockito.mock(Message.class);
replyRouter.routeReply(uuid, message);

StepVerifier.create(registered)
.expectNext(message)
.verifyComplete();

}

@Test
public void shouldRouteEmptyResponse(){
final String uuid = UUID.randomUUID().toString();
final Mono<Message> registered = replyRouter.register(uuid);

replyRouter.routeEmpty(uuid);

StepVerifier.create(registered)
.verifyComplete();
}

@Test
public void shouldDeRegisterProcessor(){
final String uuid = UUID.randomUUID().toString();
final Mono<Message> registered = replyRouter.register(uuid);

replyRouter.deregister(uuid);
replyRouter.routeEmpty(uuid);

StepVerifier.create(registered.timeout(Duration.ofSeconds(1)))
.expectTimeout(Duration.ofSeconds(3)).verify();
}

}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=1.0.0-beta2
version=1.0.0-beta3
springBootVersion=2.2.9.RELEASE
gradleVersionsVersion=0.28.0
reactorRabbitVersion=1.5.0

0 comments on commit deca841

Please sign in to comment.