Skip to content

Commit 650d463

Browse files
authored
[FLINK-23362][tests] Remove timeouts
1 parent d2002c1 commit 650d463

File tree

1 file changed

+27
-50
lines changed

1 file changed

+27
-50
lines changed

flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java

+27-50
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,9 @@
8282
import java.util.concurrent.Future;
8383
import java.util.concurrent.LinkedBlockingQueue;
8484
import java.util.concurrent.ThreadLocalRandom;
85-
import java.util.concurrent.TimeUnit;
8685
import java.util.concurrent.atomic.AtomicBoolean;
8786
import java.util.concurrent.atomic.AtomicReference;
8887

89-
import scala.concurrent.duration.Deadline;
90-
import scala.concurrent.duration.FiniteDuration;
91-
9288
import static org.junit.Assert.assertArrayEquals;
9389
import static org.junit.Assert.assertEquals;
9490
import static org.junit.Assert.assertNotNull;
@@ -100,8 +96,6 @@ public class ClientTest extends TestLogger {
10096

10197
private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
10298

103-
private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS);
104-
10599
// Thread pool for client bootstrap (shared between tests)
106100
private NioEventLoopGroup nioGroup;
107101

@@ -114,14 +108,13 @@ public void setUp() throws Exception {
114108
public void tearDown() throws Exception {
115109
if (nioGroup != null) {
116110
// note: no "quiet period" to not trigger Netty#4357
117-
nioGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);
111+
nioGroup.shutdownGracefully();
118112
}
119113
}
120114

121115
/** Tests simple queries, of which half succeed and half fail. */
122116
@Test
123117
public void testSimpleRequests() throws Exception {
124-
Deadline deadline = TEST_TIMEOUT.fromNow();
125118
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
126119

127120
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -173,7 +166,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
173166
Exception testException = new RuntimeException("Expected test Exception");
174167

175168
for (long i = 0L; i < numQueries; i++) {
176-
ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
169+
ByteBuf buf = received.take();
177170
assertNotNull("Receive timed out", buf);
178171

179172
Channel ch = channel.get();
@@ -205,14 +198,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
205198
for (long i = 0L; i < numQueries; i++) {
206199

207200
if (i % 2L == 0L) {
208-
KvStateResponse serializedResult =
209-
futures.get((int) i)
210-
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
201+
KvStateResponse serializedResult = futures.get((int) i).get();
211202
assertArrayEquals(expected, serializedResult.getContent());
212203
} else {
213204
try {
214-
futures.get((int) i)
215-
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
205+
futures.get((int) i).get();
216206
fail("Did not throw expected Exception");
217207
} catch (ExecutionException e) {
218208

@@ -228,9 +218,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
228218
long expectedRequests = numQueries / 2L;
229219

230220
// Counts can take some time to propagate
231-
while (deadline.hasTimeLeft()
232-
&& (stats.getNumSuccessful() != expectedRequests
233-
|| stats.getNumFailed() != expectedRequests)) {
221+
while (stats.getNumSuccessful() != expectedRequests
222+
|| stats.getNumFailed() != expectedRequests) {
234223
Thread.sleep(100L);
235224
}
236225

@@ -246,7 +235,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
246235
// this is why we now simply wait a bit so that everything is
247236
// shut down and then we check
248237

249-
client.shutdown().get(10L, TimeUnit.SECONDS);
238+
client.shutdown().get();
250239
} catch (Exception e) {
251240
exc = e;
252241
LOG.error("An exception occurred while shutting down netty.", e);
@@ -267,7 +256,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
267256
/** Tests that a request to an unavailable host is failed with ConnectException. */
268257
@Test
269258
public void testRequestUnavailableHost() throws Exception {
270-
Deadline deadline = TEST_TIMEOUT.fromNow();
271259
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
272260

273261
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -290,7 +278,7 @@ public void testRequestUnavailableHost() throws Exception {
290278
CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request);
291279

292280
try {
293-
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
281+
future.get();
294282
fail("Did not throw expected ConnectException");
295283
} catch (ExecutionException e) {
296284
if (!(e.getCause() instanceof ConnectException)) {
@@ -301,7 +289,7 @@ public void testRequestUnavailableHost() throws Exception {
301289
} finally {
302290
if (client != null) {
303291
try {
304-
client.shutdown().get(10L, TimeUnit.SECONDS);
292+
client.shutdown().get();
305293
} catch (Exception e) {
306294
e.printStackTrace();
307295
}
@@ -315,7 +303,6 @@ public void testRequestUnavailableHost() throws Exception {
315303
/** Multiple threads concurrently fire queries. */
316304
@Test
317305
public void testConcurrentQueries() throws Exception {
318-
Deadline deadline = TEST_TIMEOUT.fromNow();
319306
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
320307

321308
final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -389,19 +376,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
389376

390377
// Verify results
391378
for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) {
392-
List<CompletableFuture<KvStateResponse>> results =
393-
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
379+
List<CompletableFuture<KvStateResponse>> results = future.get();
394380
for (CompletableFuture<KvStateResponse> result : results) {
395-
KvStateResponse actual =
396-
result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
381+
KvStateResponse actual = result.get();
397382
assertArrayEquals(serializedResult, actual.getContent());
398383
}
399384
}
400385

401386
int totalQueries = numQueryTasks * numQueriesPerTask;
402387

403388
// Counts can take some time to propagate
404-
while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
389+
while (stats.getNumSuccessful() != totalQueries) {
405390
Thread.sleep(100L);
406391
}
407392

@@ -418,7 +403,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
418403

419404
if (client != null) {
420405
try {
421-
client.shutdown().get(10L, TimeUnit.SECONDS);
406+
client.shutdown().get();
422407
} catch (Exception e) {
423408
e.printStackTrace();
424409
}
@@ -435,7 +420,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
435420
*/
436421
@Test
437422
public void testFailureClosesChannel() throws Exception {
438-
Deadline deadline = TEST_TIMEOUT.fromNow();
439423
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
440424

441425
final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -478,11 +462,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
478462
futures.add(client.sendRequest(serverAddress, request));
479463
futures.add(client.sendRequest(serverAddress, request));
480464

481-
ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
465+
ByteBuf buf = received.take();
482466
assertNotNull("Receive timed out", buf);
483467
buf.release();
484468

485-
buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
469+
buf = received.take();
486470
assertNotNull("Receive timed out", buf);
487471
buf.release();
488472

@@ -498,7 +482,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
498482
new RuntimeException("Expected test server failure")));
499483

500484
try {
501-
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
485+
futures.remove(0).get();
502486
fail("Did not throw expected server failure");
503487
} catch (ExecutionException e) {
504488

@@ -509,7 +493,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
509493
}
510494

511495
try {
512-
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
496+
futures.remove(0).get();
513497
fail("Did not throw expected server failure");
514498
} catch (ExecutionException e) {
515499

@@ -522,8 +506,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
522506
assertEquals(0L, stats.getNumConnections());
523507

524508
// Counts can take some time to propagate
525-
while (deadline.hasTimeLeft()
526-
&& (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
509+
while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L) {
527510
Thread.sleep(100L);
528511
}
529512

@@ -533,7 +516,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
533516
} finally {
534517
if (client != null) {
535518
try {
536-
client.shutdown().get(10L, TimeUnit.SECONDS);
519+
client.shutdown().get();
537520
} catch (Exception e) {
538521
e.printStackTrace();
539522
}
@@ -554,7 +537,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
554537
*/
555538
@Test
556539
public void testServerClosesChannel() throws Exception {
557-
Deadline deadline = TEST_TIMEOUT.fromNow();
558540
AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
559541

560542
final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -594,17 +576,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
594576
new KvStateInternalRequest(new KvStateID(), new byte[0]);
595577
Future<KvStateResponse> future = client.sendRequest(serverAddress, request);
596578

597-
while (!received.get() && deadline.hasTimeLeft()) {
579+
while (!received.get()) {
598580
Thread.sleep(50L);
599581
}
600582
assertTrue("Receive timed out", received.get());
601583

602584
assertEquals(1, stats.getNumConnections());
603585

604-
channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
586+
channel.get().close().await();
605587

606588
try {
607-
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
589+
future.get();
608590
fail("Did not throw expected server failure");
609591
} catch (ExecutionException e) {
610592
if (!(e.getCause() instanceof ClosedChannelException)) {
@@ -616,8 +598,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
616598
assertEquals(0L, stats.getNumConnections());
617599

618600
// Counts can take some time to propagate
619-
while (deadline.hasTimeLeft()
620-
&& (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
601+
while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L) {
621602
Thread.sleep(100L);
622603
}
623604

@@ -627,7 +608,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
627608
} finally {
628609
if (client != null) {
629610
try {
630-
client.shutdown().get(10L, TimeUnit.SECONDS);
611+
client.shutdown().get();
631612
} catch (Exception e) {
632613
e.printStackTrace();
633614
}
@@ -679,8 +660,6 @@ public void testClientServerIntegration() throws Throwable {
679660
Collections.emptyList(),
680661
new CloseableRegistry());
681662

682-
final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
683-
684663
AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
685664

686665
final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -787,9 +766,7 @@ public void testClientServerIntegration() throws Throwable {
787766
int targetServer = random.get(j) % numServers;
788767

789768
Future<KvStateResponse> future = futures.get(j);
790-
byte[] buf =
791-
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS)
792-
.getContent();
769+
byte[] buf = future.get().getContent();
793770
int value =
794771
KvStateSerializer.deserializeValue(
795772
buf, IntSerializer.INSTANCE);
@@ -811,7 +788,7 @@ public void testClientServerIntegration() throws Throwable {
811788
}
812789

813790
try {
814-
client.shutdown().get(10L, TimeUnit.SECONDS);
791+
client.shutdown().get();
815792
} catch (Exception e) {
816793
e.printStackTrace();
817794
}
@@ -855,7 +832,7 @@ public void testClientServerIntegration() throws Throwable {
855832
} finally {
856833
if (client != null) {
857834
try {
858-
client.shutdown().get(10L, TimeUnit.SECONDS);
835+
client.shutdown().get();
859836
} catch (Exception e) {
860837
e.printStackTrace();
861838
}

0 commit comments

Comments
 (0)