Skip to content

Commit d5582b3

Browse files
authored
Merge pull request #180 from kilink/simple-limiter-semaphore
Use a counting semaphore in SimpleLimiter
2 parents 6e86611 + 73d390a commit d5582b3

File tree

1 file changed

+61
-6
lines changed
  • concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter

1 file changed

+61
-6
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616
package com.netflix.concurrency.limits.limiter;
1717

18+
import com.netflix.concurrency.limits.Limiter;
1819
import com.netflix.concurrency.limits.MetricIds;
1920
import com.netflix.concurrency.limits.MetricRegistry;
2021

2122
import java.util.Optional;
23+
import java.util.concurrent.Semaphore;
2224

2325
public class SimpleLimiter<ContextT> extends AbstractLimiter<ContextT> {
2426
public static class Builder extends AbstractLimiter.Builder<Builder> {
@@ -35,22 +37,75 @@ protected Builder self() {
3537
public static Builder newBuilder() {
3638
return new Builder();
3739
}
38-
3940
private final MetricRegistry.SampleListener inflightDistribution;
41+
private final AdjustableSemaphore semaphore;
4042

4143
public SimpleLimiter(AbstractLimiter.Builder<?> builder) {
4244
super(builder);
4345

4446
this.inflightDistribution = builder.registry.distribution(MetricIds.INFLIGHT_NAME);
47+
this.semaphore = new AdjustableSemaphore(getLimit());
4548
}
4649

4750
@Override
48-
public Optional<Listener> acquire(ContextT context) {
49-
int currentInFlight = getInflight();
50-
inflightDistribution.addSample(currentInFlight);
51-
if (currentInFlight >= getLimit()) {
51+
public Optional<Limiter.Listener> acquire(ContextT context) {
52+
if (!semaphore.tryAcquire()) {
5253
return createRejectedListener();
5354
}
54-
return Optional.of(createListener());
55+
Listener listener = new Listener(createListener());
56+
inflightDistribution.addSample(getInflight());
57+
return Optional.of(listener);
58+
}
59+
60+
@Override
61+
protected void onNewLimit(int newLimit) {
62+
int oldLimit = this.getLimit();
63+
super.onNewLimit(newLimit);
64+
65+
if (newLimit > oldLimit) {
66+
semaphore.release(newLimit - oldLimit);
67+
} else {
68+
semaphore.reducePermits(oldLimit - newLimit);
69+
}
70+
}
71+
72+
/**
73+
* Simple Semaphore subclass that allows access to its reducePermits method.
74+
*/
75+
private static final class AdjustableSemaphore extends Semaphore {
76+
AdjustableSemaphore(int permits) {
77+
super(permits);
78+
}
79+
80+
@Override
81+
public void reducePermits(int reduction) {
82+
super.reducePermits(reduction);
83+
}
84+
}
85+
86+
private class Listener implements Limiter.Listener {
87+
private final Limiter.Listener delegate;
88+
89+
Listener(Limiter.Listener delegate) {
90+
this.delegate = delegate;
91+
}
92+
93+
@Override
94+
public void onSuccess() {
95+
delegate.onSuccess();
96+
semaphore.release();
97+
}
98+
99+
@Override
100+
public void onIgnore() {
101+
delegate.onIgnore();
102+
semaphore.release();
103+
}
104+
105+
@Override
106+
public void onDropped() {
107+
delegate.onDropped();
108+
semaphore.release();
109+
}
55110
}
56111
}

0 commit comments

Comments
 (0)