diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiter.java index 3eeed610..65a97edc 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/LifoBlockingLimiter.java @@ -26,29 +26,29 @@ import com.netflix.concurrency.limits.Limiter; /** - * {@link Limiter} decorator that blocks the caller when the limit has been reached. This + * {@link Limiter} decorator that blocks the caller when the limit has been reached. This * strategy ensures the resource is properly protected but favors availability over latency * by not fast failing requests when the limit has been reached. To help keep success latencies - * low and minimize timeouts any blocked requests are processed in last in/first out order. - * - * Use this limiter only when the threading model allows the limiter to be blocked. - * + * low and minimize timeouts any blocked requests are processed in last in/first out order. + * + * Use this limiter only when the threading model allows the limiter to be blocked. + * * @param */ public final class LifoBlockingLimiter implements Limiter { public static class Builder { - + private final Limiter delegate; private int maxBacklogSize = 100; private Function maxBacklogTimeoutMillis = context -> 1_000L; - + private Builder(Limiter delegate) { this.delegate = delegate; } /** * Set maximum number of blocked threads - * + * * @param size New max size. Default is 100. * @return Chainable builder */ @@ -68,7 +68,7 @@ public Builder maxBacklogSize(int size) { /** * Set maximum timeout for threads blocked on the limiter. * Default is 1 second. - * + * * @param timeout * @param units * @return Chainable builder @@ -76,11 +76,11 @@ public Builder maxBacklogSize(int size) { public Builder backlogTimeout(long timeout, TimeUnit units) { return backlogTimeoutMillis(units.toMillis(timeout)); } - + /** * Set maximum timeout for threads blocked on the limiter. * Default is 1 second. - * + * * @param timeout * @return Chainable builder */ @@ -91,7 +91,7 @@ public Builder backlogTimeoutMillis(long timeout) { /** * Function to derive the backlog timeout from the request context. This allows timeouts - * to be set dynamically based on things like request deadlines. + * to be set dynamically based on things like request deadlines. * @param mapper * @param units * @return @@ -105,18 +105,18 @@ public LifoBlockingLimiter build() { return new LifoBlockingLimiter(this); } } - + public static Builder newBuilder(Limiter delegate) { return new Builder(delegate); } - + private final Limiter delegate; - + private static class ListenerHolder { - private volatile Optional listener; + private volatile Optional listener = Optional.empty(); private final CountDownLatch latch = new CountDownLatch(1); private ContextT context; - + public ListenerHolder(ContextT context) { this.context = context; } @@ -124,40 +124,40 @@ public ListenerHolder(ContextT context) { public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); } - + public void set(Optional listener) { this.listener = listener; latch.countDown(); } - + } - + /** * Lock used to block and unblock callers as the limit is reached */ private final Deque> backlog = new LinkedList<>(); - + private final AtomicInteger backlogCounter = new AtomicInteger(); - + private final int backlogSize; - + private final Function backlogTimeoutMillis; - + private final Object lock = new Object(); - + private LifoBlockingLimiter(Builder builder) { this.delegate = builder.delegate; this.backlogSize = builder.maxBacklogSize; this.backlogTimeoutMillis = builder.maxBacklogTimeoutMillis; } - + private Optional tryAcquire(ContextT context) { // Try to acquire a token and return immediately if successful final Optional listener = delegate.acquire(context); if (listener.isPresent()) { return listener; } - + // Restrict backlog size so the queue doesn't grow unbounded during an outage if (backlogCounter.get() >= this.backlogSize) { return Optional.empty(); @@ -172,7 +172,7 @@ private Optional tryAcquire(ContextT context) { synchronized (lock) { backlog.addFirst(event); } - + if (!event.await(backlogTimeoutMillis.apply(context), TimeUnit.MILLISECONDS)) { // Remove the holder from the backlog. This item is likely to be at the end of the // list so do a removeLastOccurance to minimize the number of items to traverse @@ -181,10 +181,7 @@ private Optional tryAcquire(ContextT context) { } // if we acquired a token just as we were timing out then return it, otherwise the // token would get lost - if (event.listener != null) { - return event.listener; - } - return Optional.empty(); + return event.listener; } return event.listener; } catch (InterruptedException e) { @@ -193,15 +190,12 @@ private Optional tryAcquire(ContextT context) { } Thread.currentThread().interrupt(); // if we acquired a token just as we were interrupted, then return it - if (event.listener != null) { - return event.listener; - } - return Optional.empty(); + return event.listener; } finally { backlogCounter.decrementAndGet(); } } - + private void unblock() { synchronized (lock) { if (!backlog.isEmpty()) { @@ -218,7 +212,7 @@ private void unblock() { } } } - + @Override public Optional acquire(ContextT context) { return tryAcquire(context).map(delegate -> new Listener() { @@ -241,7 +235,7 @@ public void onDropped() { } }); } - + @Override public String toString() { return "BlockingLimiter [" + delegate + "]";