Skip to content

Commit

Permalink
update tests to ms level
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 5, 2024
1 parent 929d4ab commit c40e072
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TestLoadBalancer implements LoadBalancerWithFacilities, WarmUpServi
private int _warmUpDelayMs = 0;
private int _serviceDataDelayMs = 0;

private final int DELAY_STANDARD_DEVIATION = 10; //ms
private final int DELAY_STANDARD_DEVIATION = 5; //ms
private final ScheduledExecutorService _executorService = Executors.newSingleThreadScheduledExecutor();

public TestLoadBalancer() {}
Expand All @@ -75,14 +75,15 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
@Override
public void warmUpService(String serviceName, Callback<None> callback)
{
double g = Math.min(1.0, Math.max(-1.0, new Random().nextGaussian()));
int actualDelay = Math.max(0,
_warmUpDelayMs + ((int) g * DELAY_STANDARD_DEVIATION)); // +/- DELAY_STANDARD_DEVIATION ms
_requestCount.incrementAndGet();
_executorService.schedule(() ->
{
_completedRequestCount.incrementAndGet();
callback.onSuccess(None.none());
}, Math.max(0, _warmUpDelayMs
// +/- DELAY_STANDARD_DEVIATION ms (any kind of random delay works for the test)
+ ((int) new Random().nextGaussian() * DELAY_STANDARD_DEVIATION)), TimeUnit.MILLISECONDS);
}, actualDelay, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.util;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
Expand All @@ -69,7 +71,7 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator {
private WarmUpService _serviceWarmupper;
private final String _d2FsDirPath;
private final String _d2ServicePath;
private final int _warmUpTimeoutSeconds;
private final int _warmUpTimeoutMillis;
private final int _concurrentRequests;
private final ScheduledExecutorService _executorService;
private final DownstreamServicesFetcher _downstreamServicesFetcher;
Expand All @@ -79,6 +81,7 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator {
private volatile boolean _shuttingDown = false;
private long _allStartTime;
private List<String> _servicesToWarmUp = null;
private Supplier<Long> _timeSupplier = () -> SystemClock.instance().currentTimeMillis();

/**
* Since the list might from the fetcher might not be complete (new behavior, old data, etc..), and the user might
Expand All @@ -88,28 +91,43 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator {
private final Set<String> _usedServices;

public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher,
int warmUpTimeoutSeconds, int concurrentRequests) {
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests) {
this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher,
warmUpTimeoutSeconds, concurrentRequests, null, false);
}

public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher,
int warmUpTimeoutSeconds, int concurrentRequests, DualReadStateManager dualReadStateManager, boolean isIndis) {
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis) {
this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher,
warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null);
}

@VisibleForTesting
WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis, Supplier<Long> timeSupplierForTest)
{
super(balancer);
_serviceWarmupper = serviceWarmupper;
_executorService = executorService;
_d2FsDirPath = d2FsDirPath;
_d2ServicePath = d2ServicePath;
_downstreamServicesFetcher = downstreamServicesFetcher;
_warmUpTimeoutSeconds = warmUpTimeoutSeconds;
_warmUpTimeoutMillis = warmUpTimeoutMillis;
_concurrentRequests = concurrentRequests;
_outstandingRequests = new ConcurrentLinkedDeque<>();
_usedServices = new HashSet<>();
_dualReadStateManager = dualReadStateManager;
_isIndis = isIndis;
_printName = String.format("%s WarmUp", _isIndis ? "xDS" : "ZK");
if (timeSupplierForTest != null)
{
_timeSupplier = timeSupplierForTest;
}
}

@Override
Expand All @@ -121,7 +139,7 @@ public void start(Callback<None> callback) {
public void onError(Throwable e) {
if (e instanceof TimeoutException)
{
LOG.info("{} hit timeout: {}s. The WarmUp will continue in background", _printName, _warmUpTimeoutSeconds);
LOG.info("{} hit timeout: {}ms. The WarmUp will continue in background", _printName, _warmUpTimeoutMillis);
callback.onSuccess(None.none());
}
else
Expand All @@ -145,7 +163,7 @@ public void onError(Throwable e) {

@Override
public void onSuccess(None result) {
_allStartTime = SystemClock.instance().currentTimeMillis();
_allStartTime = _timeSupplier.get();
_executorService.submit(() -> prepareWarmUp(prepareWarmUpCallback));
}
});
Expand All @@ -161,8 +179,8 @@ private void prepareWarmUp(Callback<None> callback)
// The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle
_usedServices.addAll(serviceNames);

LOG.info("{} starting to fetch dual read mode with timeout: {}s, for {} services: [{}]",
_printName, _warmUpTimeoutSeconds, serviceNames.size(), String.join(", ", serviceNames));
LOG.info("{} starting to fetch dual read mode with timeout: {}ms, for {} services: [{}]",
_printName, _warmUpTimeoutMillis, serviceNames.size(), String.join(", ", serviceNames));

_servicesToWarmUp = serviceNames;

Expand All @@ -188,7 +206,7 @@ private void prepareWarmUp(Callback<None> callback)
_servicesToWarmUp.forEach(serviceName -> {
// check timeout before continue
if (!hasTimedOut.get()
&& SystemClock.instance().currentTimeMillis() - _allStartTime > _warmUpTimeoutSeconds * 1000L)
&& _timeSupplier.get() - _allStartTime > _warmUpTimeoutMillis)
{
hasTimedOut.set(true);
callback.onError(new TimeoutException());
Expand All @@ -213,7 +231,7 @@ public void onSuccess(ServiceProperties result) {
});

LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.",
_printName, serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime,
_printName, serviceNames.size(), _timeSupplier.get() - _allStartTime,
_servicesToWarmUp.size());
}

Expand Down Expand Up @@ -250,8 +268,7 @@ private void continueWarmUp(Callback<None> callback)
*/
private void warmUpServices(Callback<None> startUpCallback)
{
long timeoutMilli = Math.max(0,
_warmUpTimeoutSeconds * 1000L - (SystemClock.instance().currentTimeMillis() - _allStartTime));
long timeoutMilli = Math.max(0, _warmUpTimeoutMillis - (_timeSupplier.get() - _allStartTime));
LOG.info("{} starting to warm up with timeout: {}ms for {} services: [{}]",
_printName, timeoutMilli, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp));

Expand All @@ -263,7 +280,7 @@ public void onError(Throwable e)
{
LOG.info("{} hit timeout after {}ms since initial start time, continuing startup. "
+ "Warmup will continue in background",
_printName, SystemClock.instance().currentTimeMillis() - _allStartTime, e);
_printName, _timeSupplier.get() - _allStartTime, e);
startUpCallback.onSuccess(None.none());
}

Expand Down Expand Up @@ -322,7 +339,7 @@ private class WarmUpTask

void execute()
{
final long startTime = SystemClock.instance().currentTimeMillis();
final long startTime = _timeSupplier.get();

final String serviceName = _serviceNamesQueue.poll();
if (serviceName == null || _shuttingDown)
Expand All @@ -342,7 +359,7 @@ private void executeNextTask()
if (_requestCompletedCount.incrementAndGet() == _serviceNames.size())
{
LOG.info("{} completed warming up {} services in {}ms",
_printName, _serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime);
_printName, _serviceNames.size(), _timeSupplier.get() - _allStartTime);
_callback.onSuccess(None.none());
_outstandingRequests.clear();
return;
Expand All @@ -362,7 +379,7 @@ public void onError(Throwable e)
public void onSuccess(None result)
{
LOG.info("{} completed warming up service {} in {}ms, completed {}/{}",
_printName, serviceName, SystemClock.instance().currentTimeMillis() - startTime,
_printName, serviceName, _timeSupplier.get() - startTime,
_requestCompletedCount.get() + 1, _serviceNames.size());
executeNextTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linkedin.d2.balancer.dualread.DualReadStateManager;
import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher;
import com.linkedin.d2.balancer.util.downstreams.FSBasedDownstreamServicesFetcher;
import com.linkedin.d2.util.TestDataHelper;
import com.linkedin.r2.message.RequestContext;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -61,7 +62,6 @@ public class WarmUpLoadBalancerTest
);

private static final List<String> VALID_AND_UNVALID_FILES = new ArrayList<>();
private static final int DEFAULT_DEVIATION = 100; // ms
private FSBasedDownstreamServicesFetcher _FSBasedDownstreamServicesFetcher;

static
Expand All @@ -73,6 +73,7 @@ public class WarmUpLoadBalancerTest
private File _tmpdir;
private DualReadModeProvider _dualReadModeProvider;
private DualReadStateManager _dualReadStateManager;
private static final int[] TIME_FREEZED_CALLS = {5}; // the first call in warmUpServices which sets timeout

@BeforeMethod
public void beforeTest() throws IOException
Expand Down Expand Up @@ -106,7 +107,7 @@ public void afterTest() throws IOException
_dualReadStateManager = null;
}

@Test(timeOut = 10000)
@Test
public void testMakingWarmUpRequests() throws URISyntaxException, InterruptedException, ExecutionException, TimeoutException
{
createDefaultServicesIniFiles();
Expand All @@ -120,7 +121,7 @@ public void testMakingWarmUpRequests() throws URISyntaxException, InterruptedExc

FutureCallback<None> callback = new FutureCallback<>();
warmUpLoadBalancer.start(callback);
callback.get(50, TimeUnit.MILLISECONDS); // 3 services should take at most 3 * 10ms
callback.get(30, TimeUnit.MILLISECONDS); // 3 services should take at most 3 * 5ms

Assert.assertEquals(VALID_FILES.size(), requestCount.get());
}
Expand Down Expand Up @@ -375,21 +376,22 @@ public Object[][] modesToWarmUpDataProvider()
public void testSuccessWithDualRead(DualReadModeProvider.DualReadMode mode, Boolean isIndis)
throws InterruptedException, ExecutionException, TimeoutException
{
int warmUpTimeout = 4;
int timeoutMillis = 65;
createDefaultServicesIniFiles();
setDualReadMode(mode);

// 3 dual read fetches take 1.5s, 3 warmups take at most 3 * (500 +/- 10) ms. Total at most is 3030 ms.
TestLoadBalancer balancer = new TestLoadBalancer(500, 500);
// 3 dual read fetches take 30ms, 3 warmups take at most 3 * (5 +/- 5) ms. Total at most is 60 ms.
TestLoadBalancer balancer = new TestLoadBalancer(5, 10);
AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount();
LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, warmUpTimeout,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis);
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis,
TestDataHelper.getTimeSupplier(10, TIME_FREEZED_CALLS));

FutureCallback<None> callback = new FutureCallback<>();
warmUpLb.start(callback);

callback.get(warmUpTimeout * 1000 + DEFAULT_DEVIATION, TimeUnit.MILLISECONDS);
callback.get(timeoutMillis, TimeUnit.MILLISECONDS);
// all dual read (service data) fetched
verify(_dualReadStateManager, times(VALID_FILES.size())).updateCluster(any(), any());
// all warmups completed
Expand All @@ -400,21 +402,22 @@ public void testSuccessWithDualRead(DualReadModeProvider.DualReadMode mode, Bool
public void testDualReadHitTimeout(DualReadModeProvider.DualReadMode mode, Boolean isIndis)
throws InterruptedException, ExecutionException, TimeoutException
{
int warmUpTimeout = 1;
int timeoutMillis = 80;
createDefaultServicesIniFiles();
setDualReadMode(mode);

// 3 dual read fetches take 1.5s
TestLoadBalancer balancer = new TestLoadBalancer(0, 500);
// 3 dual read fetches take 90ms
TestLoadBalancer balancer = new TestLoadBalancer(0, 30);
AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount();
LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, warmUpTimeout,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis);
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis,
TestDataHelper.getTimeSupplier(30, TIME_FREEZED_CALLS));

FutureCallback<None> callback = new FutureCallback<>();
warmUpLb.start(callback);

callback.get(warmUpTimeout * 1000 + DEFAULT_DEVIATION, TimeUnit.MILLISECONDS);
callback.get(timeoutMillis, TimeUnit.MILLISECONDS);
// verify that at most 2 service data were fetched within the timeout
verify(_dualReadStateManager, atMost(2)).updateCluster(any(), any());
// warmups are not started
Expand All @@ -425,21 +428,22 @@ public void testDualReadHitTimeout(DualReadModeProvider.DualReadMode mode, Boole
public void testDualReadCompleteWarmUpHitTimeout(DualReadModeProvider.DualReadMode mode, Boolean isIndis)
throws InterruptedException, ExecutionException, TimeoutException
{
int warmUpTimeout = 2;
int timeoutMillis = 120;
createDefaultServicesIniFiles();
setDualReadMode(mode);

// 3 dual read fetches take 1.5s, 3 warmups take 3 * (500 +/- 10) ms
TestLoadBalancer balancer = new TestLoadBalancer(500, 500);
// 3 dual read fetches take 90ms, 3 warmups take 3 * (30 +/- 5) ms
TestLoadBalancer balancer = new TestLoadBalancer(30, 30);
AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount();
LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, warmUpTimeout,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis);
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis,
TestDataHelper.getTimeSupplier(30, TIME_FREEZED_CALLS));

FutureCallback<None> callback = new FutureCallback<>();
warmUpLb.start(callback);

callback.get(warmUpTimeout * 1000 + DEFAULT_DEVIATION, TimeUnit.MILLISECONDS);
callback.get(timeoutMillis, TimeUnit.MILLISECONDS);
// verify dual read (service data) are all fetched
verify(_dualReadStateManager, times(VALID_FILES.size())).updateCluster(any(), any());
// only partial warmups completed
Expand All @@ -458,20 +462,21 @@ public Object[][] modesToSkipDataProvider()
@Test(dataProvider = "modesToSkipDataProvider")
public void testSkipWarmup(DualReadModeProvider.DualReadMode mode, Boolean isIndis)
throws ExecutionException, InterruptedException, TimeoutException {
int warmUpTimeout = 1;
int timeoutMillis = 40;
createDefaultServicesIniFiles();
setDualReadMode(mode);

TestLoadBalancer balancer = new TestLoadBalancer(0, 0);
AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount();
LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, warmUpTimeout,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis);
_tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis,
WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis,
TestDataHelper.getTimeSupplier(0, TIME_FREEZED_CALLS));

FutureCallback<None> callback = new FutureCallback<>();
warmUpLb.start(callback);

callback.get(DEFAULT_DEVIATION, TimeUnit.MILLISECONDS); // skipping warmup should call back nearly immediately
callback.get(timeoutMillis, TimeUnit.MILLISECONDS); // skipping warmup should call back nearly immediately
// no service data fetched
verify(_dualReadStateManager, never()).updateCluster(any(), any());
// warmups are not started
Expand Down
Loading

0 comments on commit c40e072

Please sign in to comment.