Skip to content

Commit

Permalink
Make ListenerHolder.listener non-nullable
Browse files Browse the repository at this point in the history
An Optional field that can be null is unexpected.
  • Loading branch information
coekie committed Sep 30, 2022
1 parent d217530 commit 0e911af
Showing 1 changed file with 33 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,29 @@
import com.netflix.concurrency.limits.Limiter;

/**
* {@link Limiter} decorator that blocks the caller when the limit has been reached. This
* {@link Limiter} decorator that blocks the caller when the limit has been reached. This
* strategy ensures the resource is properly protected but favors availability over latency
* by not fast failing requests when the limit has been reached. To help keep success latencies
* low and minimize timeouts any blocked requests are processed in last in/first out order.
*
* Use this limiter only when the threading model allows the limiter to be blocked.
*
* low and minimize timeouts any blocked requests are processed in last in/first out order.
*
* Use this limiter only when the threading model allows the limiter to be blocked.
*
* @param <ContextT>
*/
public final class LifoBlockingLimiter<ContextT> implements Limiter<ContextT> {
public static class Builder<ContextT> {

private final Limiter<ContextT> delegate;
private int maxBacklogSize = 100;
private Function<ContextT, Long> maxBacklogTimeoutMillis = context -> 1_000L;

private Builder(Limiter<ContextT> delegate) {
this.delegate = delegate;
}

/**
* Set maximum number of blocked threads
*
*
* @param size New max size. Default is 100.
* @return Chainable builder
*/
Expand All @@ -68,19 +68,19 @@ public Builder<ContextT> maxBacklogSize(int size) {
/**
* Set maximum timeout for threads blocked on the limiter.
* Default is 1 second.
*
*
* @param timeout
* @param units
* @return Chainable builder
*/
public Builder<ContextT> backlogTimeout(long timeout, TimeUnit units) {
return backlogTimeoutMillis(units.toMillis(timeout));
}

/**
* Set maximum timeout for threads blocked on the limiter.
* Default is 1 second.
*
*
* @param timeout
* @return Chainable builder
*/
Expand All @@ -91,7 +91,7 @@ public Builder<ContextT> backlogTimeoutMillis(long timeout) {

/**
* Function to derive the backlog timeout from the request context. This allows timeouts
* to be set dynamically based on things like request deadlines.
* to be set dynamically based on things like request deadlines.
* @param mapper
* @param units
* @return
Expand All @@ -105,59 +105,59 @@ public LifoBlockingLimiter<ContextT> build() {
return new LifoBlockingLimiter<ContextT>(this);
}
}

public static <ContextT> Builder<ContextT> newBuilder(Limiter<ContextT> delegate) {
return new Builder<ContextT>(delegate);
}

private final Limiter<ContextT> delegate;

private static class ListenerHolder<ContextT> {
private volatile Optional<Listener> listener;
private volatile Optional<Listener> listener = Optional.empty();
private final CountDownLatch latch = new CountDownLatch(1);
private ContextT context;

public ListenerHolder(ContextT context) {
this.context = context;
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}

public void set(Optional<Listener> listener) {
this.listener = listener;
latch.countDown();
}

}

/**
* Lock used to block and unblock callers as the limit is reached
*/
private final Deque<ListenerHolder<ContextT>> backlog = new LinkedList<>();

private final AtomicInteger backlogCounter = new AtomicInteger();

private final int backlogSize;

private final Function<ContextT, Long> backlogTimeoutMillis;

private final Object lock = new Object();

private LifoBlockingLimiter(Builder<ContextT> builder) {
this.delegate = builder.delegate;
this.backlogSize = builder.maxBacklogSize;
this.backlogTimeoutMillis = builder.maxBacklogTimeoutMillis;
}

private Optional<Listener> tryAcquire(ContextT context) {
// Try to acquire a token and return immediately if successful
final Optional<Listener> listener = delegate.acquire(context);
if (listener.isPresent()) {
return listener;
}

// Restrict backlog size so the queue doesn't grow unbounded during an outage
if (backlogCounter.get() >= this.backlogSize) {
return Optional.empty();
Expand All @@ -172,7 +172,7 @@ private Optional<Listener> tryAcquire(ContextT context) {
synchronized (lock) {
backlog.addFirst(event);
}

if (!event.await(backlogTimeoutMillis.apply(context), TimeUnit.MILLISECONDS)) {
// Remove the holder from the backlog. This item is likely to be at the end of the
// list so do a removeLastOccurance to minimize the number of items to traverse
Expand All @@ -181,10 +181,7 @@ private Optional<Listener> tryAcquire(ContextT context) {
}
// if we acquired a token just as we were timing out then return it, otherwise the
// token would get lost
if (event.listener != null) {
return event.listener;
}
return Optional.empty();
return event.listener;
}
return event.listener;
} catch (InterruptedException e) {
Expand All @@ -193,15 +190,12 @@ private Optional<Listener> tryAcquire(ContextT context) {
}
Thread.currentThread().interrupt();
// if we acquired a token just as we were interrupted, then return it
if (event.listener != null) {
return event.listener;
}
return Optional.empty();
return event.listener;
} finally {
backlogCounter.decrementAndGet();
}
}

private void unblock() {
synchronized (lock) {
if (!backlog.isEmpty()) {
Expand All @@ -218,7 +212,7 @@ private void unblock() {
}
}
}

@Override
public Optional<Listener> acquire(ContextT context) {
return tryAcquire(context).map(delegate -> new Listener() {
Expand All @@ -241,7 +235,7 @@ public void onDropped() {
}
});
}

@Override
public String toString() {
return "BlockingLimiter [" + delegate + "]";
Expand Down

0 comments on commit 0e911af

Please sign in to comment.