Skip to content

Commit 80eeaac

Browse files
committed
Test connection is closed if consumer update takes too long
References rabbitmq/rabbitmq-server#7743
1 parent e09ddc6 commit 80eeaac

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

src/test/java/com/rabbitmq/stream/Host.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream;
1515

16+
import static java.lang.String.format;
17+
1618
import com.google.common.reflect.TypeToken;
1719
import com.google.gson.Gson;
1820
import java.io.BufferedReader;
@@ -29,7 +31,7 @@ public class Host {
2931

3032
private static final Gson GSON = new Gson();
3133

32-
private static String capture(InputStream is) throws IOException {
34+
public static String capture(InputStream is) throws IOException {
3335
BufferedReader br = new BufferedReader(new InputStreamReader(is));
3436
String line;
3537
StringBuilder buff = new StringBuilder();
@@ -103,7 +105,7 @@ public static Process killConnection(String connectionName) {
103105
List<ConnectionInfo> cs = listConnections();
104106
if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
105107
throw new IllegalArgumentException(
106-
String.format(
108+
format(
107109
"Could not find 1 connection '%s' in stream connections: %s",
108110
connectionName,
109111
cs.stream()
@@ -137,6 +139,10 @@ public static Process killStreamLeaderProcess(String stream) throws IOException
137139
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
138140
}
139141

142+
public static void setEnv(String parameter, String value) throws IOException {
143+
rabbitmqctl(format("eval 'application:set_env(rabbitmq_stream, %s, %s).'", parameter, value));
144+
}
145+
140146
public static String rabbitmqctlCommand() {
141147
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
142148
if (rabbitmqCtl == null) {

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_3_11_14;
1617
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
1718
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
1819
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
@@ -35,9 +36,11 @@
3536
import com.rabbitmq.stream.impl.Client.CreditNotification;
3637
import com.rabbitmq.stream.impl.Client.MessageListener;
3738
import com.rabbitmq.stream.impl.Client.Response;
39+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3840
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
3941
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
4042
import java.nio.charset.StandardCharsets;
43+
import java.time.Duration;
4144
import java.util.Collections;
4245
import java.util.HashMap;
4346
import java.util.List;
@@ -578,6 +581,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep
578581
} finally {
579582
keepPublishing.set(false);
580583
deleteSuperStreamTopology(c, superStream, 3);
584+
c.close();
581585
}
582586
}
583587

@@ -593,4 +597,38 @@ void singleActiveConsumerMustHaveName() {
593597
Collections.singletonMap("single-active-consumer", "true"));
594598
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
595599
}
600+
601+
@Test
602+
@DisabledIfRabbitMqCtlNotSet
603+
@BrokerVersionAtLeast(RABBITMQ_3_11_14)
604+
void connectionShouldBeClosedIfConsumerUpdateTakesTooLong() throws Exception {
605+
Duration timeout = Duration.ofSeconds(1);
606+
try {
607+
Host.setEnv("request_timeout", String.valueOf(timeout.getSeconds()));
608+
CountDownLatch shutdownLatch = new CountDownLatch(1);
609+
Client client =
610+
cf.get(
611+
new ClientParameters()
612+
.consumerUpdateListener(
613+
(c, subscriptionId, active) -> {
614+
try {
615+
Thread.sleep(timeout.multipliedBy(2).toMillis());
616+
} catch (InterruptedException e) {
617+
throw new RuntimeException(e);
618+
}
619+
return OffsetSpecification.first();
620+
})
621+
.shutdownListener(shutdownContext -> shutdownLatch.countDown()));
622+
Map<String, String> parameters = new HashMap<>();
623+
parameters.put("single-active-consumer", "true");
624+
parameters.put("name", "foo");
625+
Response response =
626+
client.subscribe(b(0), stream, OffsetSpecification.first(), 1, parameters);
627+
assertThat(response).is(ok());
628+
629+
assertThat(latchAssert(shutdownLatch)).completes(timeout.multipliedBy(5));
630+
} finally {
631+
Host.setEnv("request_timeout", "60000");
632+
}
633+
}
596634
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -939,7 +939,8 @@ public enum BrokerVersion {
939939
RABBITMQ_3_11("3.11.0"),
940940
RABBITMQ_3_11_7("3.11.7"),
941941
RABBITMQ_3_11_9("3.11.9"),
942-
RABBITMQ_3_11_11("3.11.11");
942+
RABBITMQ_3_11_11("3.11.11"),
943+
RABBITMQ_3_11_14("3.11.14");
943944

944945
final String value;
945946

0 commit comments

Comments
 (0)