Skip to content

Commit

Permalink
Improve NetBandwidthLimitedTest which seems to have issues with CI
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Dec 10, 2024
1 parent 27cb961 commit aaa9aac
Showing 1 changed file with 59 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.vertx.core.net.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -31,7 +34,6 @@
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
Expand All @@ -49,6 +51,7 @@ public class NetBandwidthLimitingTest extends VertxTestBase {

private SocketAddress testAddress;
private NetClient client = null;
private final List<NetServer> servers = Collections.synchronizedList(new ArrayList<>());

@Before
public void setUp() throws Exception {
Expand All @@ -63,12 +66,22 @@ public void setUp() throws Exception {
client = vertx.createNetClient();
}

@After
public void after() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
client.close().onComplete(v -> countDownLatch.countDown());
vertx.close().onComplete(v -> countDownLatch.countDown());
awaitLatch(countDownLatch);
@Override
protected void tearDown() throws Exception {
if (client != null) {
try {
client.close().await();
} finally {
client = null;
}
}
while (!servers.isEmpty()) {
Iterator<NetServer> it = servers.iterator();
NetServer server = it.next();
it.remove();
server.close().await();
}
super.tearDown();
}

@Test
Expand All @@ -77,7 +90,7 @@ public void sendBufferThrottled() {

Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();
server.connectHandler(sock -> {
sock.handler(buf -> {
sock.write(expected);
Expand Down Expand Up @@ -112,7 +125,7 @@ public void sendFileIsThrottled() throws Exception {
File file = setupFile(fDir.toString(), "some-file.txt", content);
Buffer expected = Buffer.buffer(content);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();
server.connectHandler(sock -> {
sock.handler(buf -> {
sock.sendFile(file.getAbsolutePath());
Expand Down Expand Up @@ -144,7 +157,7 @@ public void dataUploadIsThrottled() {

Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();
server.connectHandler(sock -> {
sock.handler(buff -> {
received.appendBuffer(buff);
Expand All @@ -159,14 +172,12 @@ public void dataUploadIsThrottled() {
// Send some data to the client to trigger the buffer write
sock.write("foo");
});
Future<NetServer> result = server.listen(testAddress);
result.onComplete(onSuccess(resp -> {
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.write(expected);
});
}));
server.listen(testAddress).await();
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.write(expected);
});
}));
await();
}
Expand All @@ -180,7 +191,7 @@ public void fileUploadIsThrottled() throws Exception {
File file = setupFile(fDir.toString(), "some-file.txt", content);
Buffer expected = Buffer.buffer(content);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();
server.connectHandler(sock -> {
sock.handler(buff -> {
received.appendBuffer(buff);
Expand All @@ -195,14 +206,12 @@ public void fileUploadIsThrottled() throws Exception {
// Send some data to the client to trigger the sendfile
sock.write("foo");
});
Future<NetServer> result = server.listen(testAddress);
result.onComplete(onSuccess(resp -> {
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.sendFile(file.getAbsolutePath());
});
}));
server.listen(testAddress).await();
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.sendFile(file.getAbsolutePath());
});
}));
await();
}
Expand All @@ -215,7 +224,7 @@ public void testSendBufferIsTrafficShapedWithSharedServers() throws Exception {
Future<String> listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start(Promise<Void> startPromise) {
NetServer testServer = netServer(vertx);
NetServer testServer = netServer();
testServer.connectHandler(sock -> {
sock.handler(buf -> {
sock.write(expected);
Expand Down Expand Up @@ -256,7 +265,7 @@ public void testDynamicInboundRateUpdate() {

Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();

server.connectHandler(sock -> {
sock.handler(buff -> {
Expand All @@ -271,21 +280,19 @@ public void testDynamicInboundRateUpdate() {
// Send some data to the client to trigger the buffer write
sock.write("foo");
});
Future<NetServer> result = server.listen(testAddress);
server.listen(testAddress).await();

// update rate
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
.setOutboundGlobalBandwidth(OUTBOUND_LIMIT) // unchanged
.setInboundGlobalBandwidth(2 * INBOUND_LIMIT);
server.updateTrafficShapingOptions(trafficOptions);

result.onComplete(onSuccess(resp -> {
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.write(expected);
});
}));
Future<NetSocket> clientConnect = client.connect(testAddress);
clientConnect.onComplete(onSuccess(sock -> {
sock.handler(buf -> {
sock.write(expected);
});
}));
await();
}
Expand All @@ -296,7 +303,7 @@ public void testDynamicOutboundRateUpdate() {

Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4);
Buffer received = Buffer.buffer();
NetServer server = netServer(vertx);
NetServer server = netServer();
server.connectHandler(sock -> {
sock.handler(buf -> {
sock.write(expected);
Expand Down Expand Up @@ -331,7 +338,7 @@ public void testDynamicOutboundRateUpdate() {
@Test(expected = IllegalStateException.class)
public void testRateUpdateWhenServerStartedWithoutTrafficShaping() {
NetServerOptions options = new NetServerOptions().setHost(DEFAULT_HOST).setPort(DEFAULT_PORT);
NetServer testServer = vertx.createNetServer(options);
NetServer testServer = netServer(options);

// update inbound rate to twice the limit
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
Expand Down Expand Up @@ -368,15 +375,19 @@ private void assertTimeTakenFallsInRange(long expectedTimeInMillis, long actualT
Assert.assertTrue(actualTimeInMillis <= expectedTimeInMillis + 2000); // +/- 2000 millis considered to be tolerant of time pauses during CI runs
}

private NetServer netServer(Vertx vertx) {
NetServerOptions options = new NetServerOptions()
.setHost(DEFAULT_HOST)
.setPort(DEFAULT_PORT)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(NetBandwidthLimitingTest.INBOUND_LIMIT)
.setOutboundGlobalBandwidth(NetBandwidthLimitingTest.OUTBOUND_LIMIT));
private NetServer netServer() {
return netServer(new NetServerOptions()
.setHost(DEFAULT_HOST)
.setPort(DEFAULT_PORT)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(NetBandwidthLimitingTest.INBOUND_LIMIT)
.setOutboundGlobalBandwidth(NetBandwidthLimitingTest.OUTBOUND_LIMIT)));
}

return vertx.createNetServer(options);
private NetServer netServer(NetServerOptions options) {
NetServer server = vertx.createNetServer(options);
servers.add(server);
return server;
}

private File setupFile(String testDir, String fileName, String content) throws Exception {
Expand Down

0 comments on commit aaa9aac

Please sign in to comment.