Skip to content

Commit

Permalink
Merge pull request #179 from coekie/lifo-limiter-fix-race
Browse files Browse the repository at this point in the history
Fix race condition in LifoBlockingLimiter
  • Loading branch information
pandeyabhi1987 authored Oct 27, 2022
2 parents ff90760 + 0e911af commit 3ce3644
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,29 @@
import com.netflix.concurrency.limits.Limiter;

/**
* {@link Limiter} decorator that blocks the caller when the limit has been reached. This
* {@link Limiter} decorator that blocks the caller when the limit has been reached. This
* strategy ensures the resource is properly protected but favors availability over latency
* by not fast failing requests when the limit has been reached. To help keep success latencies
* low and minimize timeouts any blocked requests are processed in last in/first out order.
*
* Use this limiter only when the threading model allows the limiter to be blocked.
*
* low and minimize timeouts any blocked requests are processed in last in/first out order.
*
* Use this limiter only when the threading model allows the limiter to be blocked.
*
* @param <ContextT>
*/
public final class LifoBlockingLimiter<ContextT> implements Limiter<ContextT> {
public static class Builder<ContextT> {

private final Limiter<ContextT> delegate;
private int maxBacklogSize = 100;
private Function<ContextT, Long> maxBacklogTimeoutMillis = context -> 1_000L;

private Builder(Limiter<ContextT> delegate) {
this.delegate = delegate;
}

/**
* Set maximum number of blocked threads
*
*
* @param size New max size. Default is 100.
* @return Chainable builder
*/
Expand All @@ -68,19 +68,19 @@ public Builder<ContextT> maxBacklogSize(int size) {
/**
* Set maximum timeout for threads blocked on the limiter.
* Default is 1 second.
*
*
* @param timeout
* @param units
* @return Chainable builder
*/
public Builder<ContextT> backlogTimeout(long timeout, TimeUnit units) {
return backlogTimeoutMillis(units.toMillis(timeout));
}

/**
* Set maximum timeout for threads blocked on the limiter.
* Default is 1 second.
*
*
* @param timeout
* @return Chainable builder
*/
Expand All @@ -91,7 +91,7 @@ public Builder<ContextT> backlogTimeoutMillis(long timeout) {

/**
* Function to derive the backlog timeout from the request context. This allows timeouts
* to be set dynamically based on things like request deadlines.
* to be set dynamically based on things like request deadlines.
* @param mapper
* @param units
* @return
Expand All @@ -105,59 +105,59 @@ public LifoBlockingLimiter<ContextT> build() {
return new LifoBlockingLimiter<ContextT>(this);
}
}

public static <ContextT> Builder<ContextT> newBuilder(Limiter<ContextT> delegate) {
return new Builder<ContextT>(delegate);
}

private final Limiter<ContextT> delegate;

private static class ListenerHolder<ContextT> {
private volatile Optional<Listener> listener;
private volatile Optional<Listener> listener = Optional.empty();
private final CountDownLatch latch = new CountDownLatch(1);
private ContextT context;

public ListenerHolder(ContextT context) {
this.context = context;
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}

public void set(Optional<Listener> listener) {
this.listener = listener;
latch.countDown();
}

}

/**
* Lock used to block and unblock callers as the limit is reached
*/
private final Deque<ListenerHolder<ContextT>> backlog = new LinkedList<>();

private final AtomicInteger backlogCounter = new AtomicInteger();

private final int backlogSize;

private final Function<ContextT, Long> backlogTimeoutMillis;

private final Object lock = new Object();

private LifoBlockingLimiter(Builder<ContextT> builder) {
this.delegate = builder.delegate;
this.backlogSize = builder.maxBacklogSize;
this.backlogTimeoutMillis = builder.maxBacklogTimeoutMillis;
}

private Optional<Listener> tryAcquire(ContextT context) {
// Try to acquire a token and return immediately if successful
final Optional<Listener> listener = delegate.acquire(context);
if (listener.isPresent()) {
return listener;
}

// Restrict backlog size so the queue doesn't grow unbounded during an outage
if (backlogCounter.get() >= this.backlogSize) {
return Optional.empty();
Expand All @@ -172,27 +172,30 @@ private Optional<Listener> tryAcquire(ContextT context) {
synchronized (lock) {
backlog.addFirst(event);
}

if (!event.await(backlogTimeoutMillis.apply(context), TimeUnit.MILLISECONDS)) {
// Remove the holder from the backlog. This item is likely to be at the end of the
// list so do a removeLastOccurance to minimize the number of items to traverse
synchronized (lock) {
backlog.removeLastOccurrence(event);
}
return Optional.empty();
// if we acquired a token just as we were timing out then return it, otherwise the
// token would get lost
return event.listener;
}
return event.listener;
} catch (InterruptedException e) {
synchronized (lock) {
backlog.removeFirstOccurrence(event);
}
Thread.currentThread().interrupt();
return Optional.empty();
// if we acquired a token just as we were interrupted, then return it
return event.listener;
} finally {
backlogCounter.decrementAndGet();
}
}

private void unblock() {
synchronized (lock) {
if (!backlog.isEmpty()) {
Expand All @@ -209,7 +212,7 @@ private void unblock() {
}
}
}

@Override
public Optional<Listener> acquire(ContextT context) {
return tryAcquire(context).map(delegate -> new Listener() {
Expand All @@ -232,7 +235,7 @@ public void onDropped() {
}
});
}

@Override
public String toString() {
return "BlockingLimiter [" + delegate + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class LifoBlockingLimiterTest {
private static final Logger LOGGER = LoggerFactory.getLogger(LifoBlockingLimiterTest.class);

final Executor executor = Executors.newCachedThreadPool();
final ExecutorService executor = Executors.newCachedThreadPool();

final SettableLimit limit = SettableLimit.startingAt(4);

Expand Down Expand Up @@ -64,7 +68,7 @@ public void unblockWhenFullBeforeTimeout() {
long start = System.nanoTime();
Optional<Limiter.Listener> listener = blockingLimiter.acquire(null);
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Assert.assertTrue(duration >= 250);
Assert.assertTrue("Duration = " + duration, duration >= 200);
Assert.assertTrue(listener.isPresent());
}

Expand Down Expand Up @@ -162,6 +166,51 @@ public void verifyFifoOrder() {
Assert.assertEquals(Arrays.asList(4, 3, 2, 1, 0), values);
}

// this test reproduces the condition where a thread acquires a token just as it is timing out.
// before that was fixed, it would lead to a token getting lost.
@Test
public void timeoutAcquireRaceCondition() throws InterruptedException, ExecutionException {
// a limiter with a short timeout, and large backlog (we don't want it to hit that limit)
LifoBlockingLimiter<Void> limiter = LifoBlockingLimiter.newBuilder(simpleLimiter)
.backlogSize(1000)
.backlogTimeout(10, TimeUnit.MILLISECONDS)
.build();

// acquire all except one token
acquireN(limiter, 3);

// try to reproduce the problem a couple of times
for (int round = 0; round < 10; round++) {
// indicates if there has already been a timeout
AtomicBoolean firstTimeout = new AtomicBoolean(false);
// take the last token
Limiter.Listener one = limiter.acquire(null).get();
// in a bunch of threads in parallel, try to take one more. all of these will start to
// time out at around the same time
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futures.add(executor.submit(() -> {
Optional<Limiter.Listener> listener = limiter.acquire(null);
if (listener.isPresent()) {
// if we got the last token, release it again. this might give it to a
// thread that is in the process of timing out
listener.get().onSuccess();
} else if (firstTimeout.compareAndSet(false, true)) {
// if this is the first one that times out, then other threads are going to
// start timing out soon too, so it's time to release a token
one.onSuccess();
}
return null;
}));
}
// wait for this round to finish
for (Future<?> future : futures) {
future.get();
}
Assert.assertEquals(3, simpleLimiter.getInflight());
}
}

private List<Optional<Limiter.Listener>> acquireN(Limiter<Void> limiter, int N) {
return IntStream.range(0, N)
.mapToObj(i -> limiter.acquire(null))
Expand Down

0 comments on commit 3ce3644

Please sign in to comment.