diff --git a/concurrency-limits-core/dependencies.lock b/concurrency-limits-core/dependencies.lock index 5d0e8ad1..37efc834 100644 --- a/concurrency-limits-core/dependencies.lock +++ b/concurrency-limits-core/dependencies.lock @@ -22,10 +22,10 @@ }, "testRuntimeClasspath": { "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.junit.vintage:junit-vintage-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.slf4j:slf4j-api": { "locked": "1.7.36" diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java index 08b1a25a..cafe685a 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java @@ -24,12 +24,41 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import java.util.function.Supplier; public abstract class AbstractLimiter implements Limiter { public static final String ID_TAG = "id"; public static final String STATUS_TAG = "status"; + /** + * Constructs a new builder with a list of bypass resolvers. + * If the predicate condition in any of the resolver is satisfied, + * the call is bypassed without increasing the limiter inflight count + * and affecting the algorithm. + */ + public abstract static class BypassLimiterBuilder, ContextT> extends Builder { + + private final Predicate ALWAYS_FALSE = (context) -> false; + private Predicate bypassResolver = ALWAYS_FALSE; + + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + public BuilderT bypassLimitResolver(Predicate shouldBypass) { + if (this.bypassResolver == ALWAYS_FALSE) { + this.bypassResolver = shouldBypass; + } else { + this.bypassResolver = bypassResolver.or(shouldBypass); + } + return self(); + } + } + public abstract static class Builder> { private static final AtomicInteger idCounter = new AtomicInteger(); @@ -69,6 +98,8 @@ public BuilderT metricRegistry(MetricRegistry registry) { private final MetricRegistry.Counter droppedCounter; private final MetricRegistry.Counter ignoredCounter; private final MetricRegistry.Counter rejectedCounter; + private final MetricRegistry.Counter bypassCounter; + private Predicate bypassResolver = (context) -> false; private volatile int limit; @@ -77,12 +108,19 @@ protected AbstractLimiter(Builder builder) { this.limitAlgorithm = builder.limit; this.limit = limitAlgorithm.getLimit(); this.limitAlgorithm.notifyOnChange(this::onNewLimit); - + if (builder instanceof BypassLimiterBuilder) { + this.bypassResolver = ((BypassLimiterBuilder) builder).bypassResolver; + } builder.registry.gauge(MetricIds.LIMIT_NAME, this::getLimit); this.successCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "success"); this.droppedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "dropped"); this.ignoredCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "ignored"); this.rejectedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "rejected"); + this.bypassCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "bypassed"); + } + + protected boolean shouldBypass(ContextT context){ + return bypassResolver.test(context); } protected Optional createRejectedListener() { @@ -90,6 +128,27 @@ protected Optional createRejectedListener() { return Optional.empty(); } + protected Optional createBypassListener() { + this.bypassCounter.increment(); + return Optional.of(new Listener() { + + @Override + public void onSuccess() { + // Do nothing + } + + @Override + public void onIgnore() { + // Do nothing + } + + @Override + public void onDropped() { + // Do nothing + } + }); + } + protected Listener createListener() { final long startTime = clock.get(); final int currentInflight = inFlight.incrementAndGet(); diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java index 57d1e8f1..53b28940 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java @@ -37,7 +37,7 @@ public abstract class AbstractPartitionedLimiter extends AbstractLimit private static final Logger LOG = LoggerFactory.getLogger(AbstractPartitionedLimiter.class); private static final String PARTITION_TAG_NAME = "partition"; - public abstract static class Builder, ContextT> extends AbstractLimiter.Builder { + public abstract static class Builder, ContextT> extends AbstractLimiter.BypassLimiterBuilder { private List> partitionResolvers = new ArrayList<>(); private final Map partitions = new LinkedHashMap<>(); private int maxDelayedThreads = 100; @@ -215,6 +215,9 @@ public Optional acquire(ContextT context) { try { lock.lock(); + if (shouldBypass(context)){ + return createBypassListener(); + } if (getInflight() >= getLimit() && partition.isLimitExceeded()) { lock.unlock(); if (partition.backoffMillis > 0 && delayedThreads.get() < maxDelayedThreads) { diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java index 1d916028..6792e054 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java @@ -23,6 +23,18 @@ import java.util.concurrent.Semaphore; public class SimpleLimiter extends AbstractLimiter { + + public static class BypassLimiterBuilder extends AbstractLimiter.BypassLimiterBuilder, ContextT> { + public SimpleLimiter build() { + return new SimpleLimiter<>(this); + } + + @Override + protected BypassLimiterBuilder self() { + return this; + } + } + public static class Builder extends AbstractLimiter.Builder { public SimpleLimiter build() { return new SimpleLimiter<>(this); @@ -34,6 +46,10 @@ protected Builder self() { } } + public static BypassLimiterBuilder newBypassLimiterBuilder() { + return new BypassLimiterBuilder<>(); + } + public static Builder newBuilder() { return new Builder(); } @@ -42,7 +58,6 @@ public static Builder newBuilder() { public SimpleLimiter(AbstractLimiter.Builder builder) { super(builder); - this.inflightDistribution = builder.registry.distribution(MetricIds.INFLIGHT_NAME); this.semaphore = new AdjustableSemaphore(getLimit()); } @@ -50,13 +65,15 @@ public SimpleLimiter(AbstractLimiter.Builder builder) { @Override public Optional acquire(ContextT context) { Optional listener; - if (!semaphore.tryAcquire()) { + if (shouldBypass(context)) { + listener = createBypassListener(); + } + else if (!semaphore.tryAcquire()) { listener = createRejectedListener(); } else { listener = Optional.of(new Listener(createListener())); } - inflightDistribution.addSample(getInflight()); return listener; } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java index cf81b191..b2a548ae 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java @@ -8,6 +8,7 @@ import java.util.Optional; import java.util.function.Function; +import java.util.function.Predicate; public class AbstractPartitionedLimiterTest { public static class TestPartitionedLimiter extends AbstractPartitionedLimiter { @@ -27,6 +28,13 @@ public TestPartitionedLimiter(Builder builder) { } } + public static class ShouldBypassPredicate implements Predicate { + @Override + public boolean test(String s) { + return s.contains("admin"); + } + } + @Test public void limitAllocatedToBins() { AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() @@ -156,4 +164,67 @@ public void setLimitReservesBusy() { Assert.assertEquals(1, limiter.getPartition("batch").getInflight()); Assert.assertEquals(1, limiter.getInflight()); } + + @Test + public void testBypassPartitionedLimiter() { + + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.1) + .partition("live", 0.9) + .limit(FixedLimit.of(10)) + .bypassLimitResolver(new ShouldBypassPredicate()) + .build(); + + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(1, limiter.getPartition("batch").getInflight()); + Assert.assertTrue(limiter.acquire("admin").isPresent()); + + for (int i = 0; i < 9; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("live").getInflight()); + Assert.assertTrue(limiter.acquire("admin").isPresent()); + } + + // Verify that bypassed requests are able to proceed even when the limiter is full + Assert.assertFalse(limiter.acquire("batch").isPresent()); + Assert.assertEquals(1, limiter.getPartition("batch").getInflight()); + Assert.assertFalse(limiter.acquire("live").isPresent()); + Assert.assertEquals(9, limiter.getPartition("live").getInflight()); + Assert.assertEquals(10, limiter.getInflight()); + Assert.assertTrue(limiter.acquire("admin").isPresent()); + } + + @Test + public void testBypassSimpleLimiter() { + + SimpleLimiter limiter = (SimpleLimiter) TestPartitionedLimiter.newBuilder() + .limit(FixedLimit.of(10)) + .bypassLimitResolver(new ShouldBypassPredicate()) + .build(); + + int inflightCount = 0; + for (int i = 0; i < 5; i++) { + Assert.assertTrue(limiter.acquire("request").isPresent()); + Assert.assertEquals(i+1, limiter.getInflight()); + inflightCount++; + } + + for (int i = 0; i < 15; i++) { + Assert.assertTrue(limiter.acquire("admin").isPresent()); + Assert.assertEquals(inflightCount, limiter.getInflight()); + } + + for (int i = 0; i < 5; i++) { + Assert.assertTrue(limiter.acquire("request").isPresent()); + Assert.assertEquals(inflightCount+i+1, limiter.getInflight()); + } + + // Calls with passing bypass condition will return a token + // whereas remaining calls will be throttled since inflight count is greater than the limit + for (int i = 0; i < 10; i++) { + Assert.assertFalse(limiter.acquire("request").isPresent()); + Assert.assertTrue(limiter.acquire("admin").isPresent()); + } + } } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java new file mode 100644 index 00000000..ebc80f83 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java @@ -0,0 +1,85 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +public class SimpleLimiterTest { + + @Test + public void useLimiterCapacityUntilTotalLimit() { + SimpleLimiter limiter = SimpleLimiter.newBuilder() + .limit(FixedLimit.of(10)) + .build(); + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + } + + // Rejected call after total limit is utilized + Assert.assertFalse(limiter.acquire("live").isPresent()); + Assert.assertEquals(10, limiter.getInflight()); + } + + @Test + public void testReleaseLimit() { + SimpleLimiter limiter = SimpleLimiter.newBuilder() + .limit(FixedLimit.of(10)) + .build(); + + Optional completion = limiter.acquire("live"); + for (int i = 1; i < 10; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + } + + Assert.assertEquals(10, limiter.getInflight()); + Assert.assertFalse(limiter.acquire("live").isPresent()); + + // Release token + completion.get().onSuccess(); + Assert.assertEquals(9, limiter.getInflight()); + + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(10, limiter.getInflight()); + } + + @Test + public void testSimpleBypassLimiter() { + SimpleLimiter limiter = SimpleLimiter.newBypassLimiterBuilder() + .limit(FixedLimit.of(10)) + .bypassLimitResolver((context) -> context.equals("admin")) + .build(); + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(i+1, limiter.getInflight()); + } + + // Verify calls with passing bypass condition will return a token + // whereas remaining calls will be throttled since inflight count is greater than the limit + for (int i = 0; i < 10; i++) { + Assert.assertFalse(limiter.acquire("live").isPresent()); + Assert.assertTrue(limiter.acquire("admin").isPresent()); + } + } + + @Test + public void testSimpleBypassLimiterDefault() { + SimpleLimiter limiter = SimpleLimiter.newBypassLimiterBuilder() + .limit(FixedLimit.of(10)) + .build(); + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(i+1, limiter.getInflight()); + } + + // Verify that no calls are bypassed by default + Assert.assertFalse(limiter.acquire("live").isPresent()); + Assert.assertFalse(limiter.acquire("admin").isPresent()); + } + +} diff --git a/concurrency-limits-grpc/dependencies.lock b/concurrency-limits-grpc/dependencies.lock index 0d9336c8..54d3e6b2 100644 --- a/concurrency-limits-grpc/dependencies.lock +++ b/concurrency-limits-grpc/dependencies.lock @@ -32,7 +32,7 @@ "firstLevelTransitive": [ "com.netflix.concurrency-limits:concurrency-limits-spectator" ], - "locked": "1.3.6" + "locked": "1.7.9" }, "io.grpc:grpc-netty": { "locked": "1.9.0" @@ -47,7 +47,7 @@ "locked": "3.6.1" }, "org.mockito:mockito-core": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.slf4j:slf4j-api": { "locked": "1.7.36" @@ -70,7 +70,7 @@ "firstLevelTransitive": [ "com.netflix.concurrency-limits:concurrency-limits-spectator" ], - "locked": "1.3.6" + "locked": "1.7.9" }, "io.grpc:grpc-netty": { "locked": "1.9.0" @@ -82,13 +82,13 @@ "locked": "3.6.1" }, "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.junit.vintage:junit-vintage-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.mockito:mockito-core": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.slf4j:slf4j-api": { "firstLevelTransitive": [ diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java index 672ffe88..da084cbc 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java @@ -18,7 +18,6 @@ import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; import com.netflix.concurrency.limits.limiter.BlockingLimiter; -import com.netflix.concurrency.limits.limiter.SimpleLimiter; import io.grpc.CallOptions; /** @@ -34,6 +33,28 @@ public GrpcClientLimiterBuilder partitionByMethod() { public GrpcClientLimiterBuilder partitionByCallOption(CallOptions.Key option) { return partitionResolver(context -> context.getCallOptions().getOption(option)); } + + /** + * Bypass limit if the request's full method name matches the specified gRPC method's full name. + * @param fullMethodName The full method name to check against the {@link GrpcClientRequestContext}'s method. + * If the request's method matches this fullMethodName, the limit will be bypassed. + * @return Chainable builder + */ + public GrpcClientLimiterBuilder bypassLimitByMethod(String fullMethodName) { + return bypassLimitResolver(context -> fullMethodName.equals(context.getMethod().getFullMethodName())); + } + + /** + * Bypass limit if the value of the specified call option matches the provided value. + * @param option The call option key to check against the {@link GrpcClientRequestContext}'s call options. + * @param value The value to compare against the value of the specified call option in the request. + * If they match, the limit will be bypassed. + * @param The type of the call option value. + * @return Chainable builder + */ + public GrpcClientLimiterBuilder bypassLimitByCallOption(CallOptions.Key option, T value) { + return bypassLimitResolver(context -> value.equals(context.getCallOptions().getOption(option))); + } /** * When set to true new calls to the channel will block when the limit has been reached instead diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java index 03cf9f64..21dad243 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java @@ -44,6 +44,40 @@ public GrpcServerLimiterBuilder partitionByAttribute(Attributes.Key attr return partitionResolver(context -> context.getCall().getAttributes().get(attribute)); } + /** + * Bypass limit if the request's full method name matches the specified gRPC method's full name. + * @param fullMethodName The full method name to check against the {@link GrpcServerRequestContext}'s method. + * If the request's method matches this fullMethodName, the limit will be bypassed. + * @return Chainable builder + */ + public GrpcServerLimiterBuilder bypassLimitByMethod(String fullMethodName) { + return bypassLimitResolver(context -> fullMethodName.equals(context.getCall().getMethodDescriptor().getFullMethodName())); + } + + /** + * Bypass limit if the value of the specified header matches the provided value. + * @param header The header key to check against the {@link GrpcServerRequestContext}'s headers. + * @param value The value to compare against the value of the specified header in the request. + * If they match, the limit will be bypassed. + * @param The type of the header value. + * @return Chainable builder + */ + public GrpcServerLimiterBuilder bypassLimitByHeader(Metadata.Key header, T value) { + return bypassLimitResolver(context -> value.equals(context.getHeaders().get(header))); + } + + /** + * Bypass limit if the value of the specified attribute matches the provided value. + * @param attribute The attribute key to check against the {@link GrpcServerRequestContext}'s attributes. + * @param value The value to compare against the value of the specified attribute in the request. + * If they match, the limit will be bypassed. + * @param The type of the attribute value. + * @return Chainable builder + */ + public GrpcServerLimiterBuilder bypassLimitByAttribute(Attributes.Key attribute, T value) { + return bypassLimitResolver(context -> value.equals(context.getCall().getAttributes().get(attribute))); + } + @Override protected GrpcServerLimiterBuilder self() { return this; diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java index 30fbbab6..6772e357 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/client/ConcurrencyLimitClientInterceptorTest.java @@ -1,16 +1,18 @@ package com.netflix.concurrency.limits.grpc.client; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.grpc.StringMarshaller; +import com.netflix.concurrency.limits.grpc.util.OptionalResultCaptor; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; +import com.netflix.spectator.api.DefaultRegistry; import io.grpc.CallOptions; import io.grpc.Channel; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; -import io.grpc.Server; -import io.grpc.ServerServiceDefinition; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; +import io.grpc.Server; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; import io.grpc.stub.ClientCalls; import io.grpc.stub.ServerCalls; @@ -20,16 +22,138 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.BYPASS_METHOD_DESCRIPTOR; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.METHOD_DESCRIPTOR; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.TEST_METRIC_NAME; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.verifyCounts; public class ConcurrencyLimitClientInterceptorTest { - private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() - .setType(MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(StringMarshaller.INSTANCE) - .setResponseMarshaller(StringMarshaller.INSTANCE) - .build(); + + @Rule + public TestName testName = new TestName(); + + Limiter limiter; + Limiter bypassEnabledLimiter; + OptionalResultCaptor listener; + DefaultRegistry registry = new DefaultRegistry(); + SpectatorMetricRegistry spectatorMetricRegistry = new SpectatorMetricRegistry(registry, registry.createId(TEST_METRIC_NAME)); + private Server server; + private Channel channel; + + @Before + public void beforeEachTest() { + limiter = Mockito.spy(SimpleLimiter.newBuilder() + .named(testName.getMethodName()) + .metricRegistry(spectatorMetricRegistry) + .build()); + + bypassEnabledLimiter = Mockito.spy(new GrpcClientLimiterBuilder() + .bypassLimitByMethod("service/bypass") + .named(testName.getMethodName()) + .metricRegistry(spectatorMetricRegistry) + .build()); + + listener = OptionalResultCaptor.forClass(Limiter.Listener.class); + Mockito.doAnswer(listener).when(limiter).acquire(Mockito.any()); + } + + private void startServer(Limiter limiter) { + + ServerCalls.UnaryMethod method = (request, responseObserver) -> { + responseObserver.onNext("response"); + responseObserver.onCompleted(); + }; + + try { + server = NettyServerBuilder.forPort(0) + .addService(ServerInterceptors.intercept( + ServerServiceDefinition.builder("service") + .addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method)) + .addMethod(BYPASS_METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method)) + .build()) + ) + .build() + .start(); + + channel = NettyChannelBuilder.forAddress("localhost", server.getPort()) + .usePlaintext(true) + .intercept(new ConcurrencyLimitClientInterceptor(limiter)) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void verifySuccessCountOnRelease() { + // Setup server + startServer(limiter); + + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcClientRequestContext.class)); + Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onSuccess(); + + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); + } + + @Test + public void verifyBypassCountWhenBypassConditionAdded() { + // Setup server with a bypass condition enabled limiter + startServer(bypassEnabledLimiter); + + // Calling a method for which the predicate condition passes + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(bypassEnabledLimiter, Mockito.times(1)).acquire(Mockito.isA(GrpcClientRequestContext.class)); + + verifyCounts(0, 0, 0, 0, 1, registry, testName.getMethodName()); + } + + @Test + public void verifyBypassCountWhenBypassConditionFailed() { + // Setup server with a bypass condition enabled limiter + startServer(bypassEnabledLimiter); + + // Calling a method for which the predicate condition fails + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(bypassEnabledLimiter, Mockito.times(1)).acquire(Mockito.isA(GrpcClientRequestContext.class)); + + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); + } + + @Test + public void verifyBypassCountWhenNoBypassConditionAdded() { + // Setup server with no bypass condition enabled + startServer(limiter); + + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcClientRequestContext.class)); + + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); + } + + @Test + public void testMultipleCalls() { + // Setup server with a bypass condition enabled limiter + startServer(bypassEnabledLimiter); + + // Calling both method types + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + + Mockito.verify(bypassEnabledLimiter, Mockito.times(4)).acquire(Mockito.isA(GrpcClientRequestContext.class)); + + verifyCounts(0, 0, 2, 0, 2, registry, testName.getMethodName()); + } @Test @Ignore @@ -72,4 +196,5 @@ public void simulation() throws IOException { ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "request"); } } + } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java index a15aa88a..be87389f 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java @@ -3,19 +3,12 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.grpc.StringMarshaller; -import com.netflix.concurrency.limits.grpc.mockito.OptionalResultCaptor; +import com.netflix.concurrency.limits.grpc.util.OptionalResultCaptor; import com.netflix.concurrency.limits.limiter.SimpleLimiter; import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; import com.netflix.spectator.api.DefaultRegistry; -import com.netflix.spectator.api.Meter; -import com.netflix.spectator.api.Timer; import io.grpc.CallOptions; import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; import io.grpc.Server; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; @@ -34,37 +27,39 @@ import org.mockito.Mockito; import java.io.IOException; -import java.util.Comparator; import java.util.concurrent.TimeUnit; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.BYPASS_METHOD_DESCRIPTOR; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.METHOD_DESCRIPTOR; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.TEST_METRIC_NAME; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.verifyCounts; + public class ConcurrencyLimitServerInterceptorTest { @Rule public TestName testName = new TestName(); - private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() - .setType(MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(StringMarshaller.INSTANCE) - .setResponseMarshaller(StringMarshaller.INSTANCE) - .build(); - - private DefaultRegistry registry = new DefaultRegistry(); - - private Server server; - private Channel channel; - Limiter limiter; + Limiter bypassEnabledLimiter; OptionalResultCaptor listener; + DefaultRegistry registry = new DefaultRegistry(); + SpectatorMetricRegistry spectatorMetricRegistry = new SpectatorMetricRegistry(registry, registry.createId(TEST_METRIC_NAME)); + private Server server; + private Channel channel; @Before public void beforeEachTest() { limiter = Mockito.spy(SimpleLimiter.newBuilder() .named(testName.getMethodName()) - .metricRegistry(new SpectatorMetricRegistry(registry, registry.createId("unit.test.limiter"))) + .metricRegistry(spectatorMetricRegistry) .build()); - listener = OptionalResultCaptor.forClass(Limiter.Listener.class); + bypassEnabledLimiter = Mockito.spy(new GrpcServerLimiterBuilder() + .bypassLimitByMethod("service/bypass") + .named(testName.getMethodName()) + .metricRegistry(spectatorMetricRegistry) + .build()); + listener = OptionalResultCaptor.forClass(Limiter.Listener.class); Mockito.doAnswer(listener).when(limiter).acquire(Mockito.any()); } @@ -80,12 +75,13 @@ public void afterEachTest() { registry.distributionSummaries().forEach(t -> System.out.println(" " + t.id() + " " + t.count() + " " + t.totalAmount())); } - private void startServer(ServerCalls.UnaryMethod method) { + private void startServer(ServerCalls.UnaryMethod method, Limiter limiter) { try { server = NettyServerBuilder.forPort(0) .addService(ServerInterceptors.intercept( ServerServiceDefinition.builder("service") .addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method)) + .addMethod(BYPASS_METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method)) .build(), ConcurrencyLimitServerInterceptor.newBuilder(limiter) .build()) @@ -107,13 +103,13 @@ public void releaseOnSuccess() { startServer((req, observer) -> { observer.onNext("response"); observer.onCompleted(); - }); + }, limiter); ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onSuccess(); - verifyCounts(0, 0, 1, 0); + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); } @Test @@ -121,7 +117,7 @@ public void releaseOnError() { // Setup server startServer((req, observer) -> { observer.onError(Status.INVALID_ARGUMENT.asRuntimeException()); - }); + }, limiter); try { ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); @@ -132,7 +128,7 @@ public void releaseOnError() { // Verify Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); - verifyCounts(0, 0, 1, 0); + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); } @Test @@ -140,7 +136,7 @@ public void releaseOnUncaughtException() throws IOException { // Setup server startServer((req, observer) -> { throw new RuntimeException("failure"); - }); + }, limiter); try { ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); @@ -152,7 +148,7 @@ public void releaseOnUncaughtException() throws IOException { Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore(); - verifyCounts(0, 1, 0, 0); + verifyCounts(0, 1, 0, 0, 0, registry, testName.getMethodName()); } @Test @@ -162,7 +158,7 @@ public void releaseOnCancellation() { Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); observer.onNext("delayed_response"); observer.onCompleted(); - }); + }, limiter); ListenableFuture future = ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "foo"); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); @@ -174,7 +170,7 @@ public void releaseOnCancellation() { Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess(); - verifyCounts(0, 0, 1, 0); + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); } @Test @@ -184,7 +180,7 @@ public void releaseOnDeadlineExceeded() { Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); observer.onNext("delayed_response"); observer.onCompleted(); - }); + }, limiter); try { ClientCalls.blockingUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS)), "foo"); @@ -197,17 +193,51 @@ public void releaseOnDeadlineExceeded() { Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess(); - verifyCounts(0, 0, 1, 0); + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); } - public void verifyCounts(int dropped, int ignored, int success, int rejected) { - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - } - Assert.assertEquals(dropped, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "dropped").count()); - Assert.assertEquals(ignored, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "ignored").count()); - Assert.assertEquals(success, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "success").count()); - Assert.assertEquals(rejected, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "rejected").count()); + @Test + public void verifyBypassCountWhenBypassConditionAdded() { + // Setup server with a bypass condition enabled limiter + startServer((req, observer) -> { + observer.onNext("response"); + observer.onCompleted(); + }, bypassEnabledLimiter); + + // Calling a method for which the predicate condition passes + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(bypassEnabledLimiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + + verifyCounts(0, 0, 0, 0, 1, registry, testName.getMethodName()); } + + @Test + public void verifyBypassCountWhenBypassConditionFailed() { + // Setup server with a bypass condition enabled limiter + startServer((req, observer) -> { + observer.onNext("response"); + observer.onCompleted(); + }, bypassEnabledLimiter); + + // Calling a method for which the predicate condition fails + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(bypassEnabledLimiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); + } + + @Test + public void verifyBypassCountWhenNoBypassConditionAdded() { + // Setup server with no bypass condition enabled + startServer((req, observer) -> { + observer.onNext("response"); + observer.onCompleted(); + }, limiter); + + ClientCalls.blockingUnaryCall(channel, BYPASS_METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + + verifyCounts(0, 0, 1, 0, 0, registry, testName.getMethodName()); + } + } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Driver.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Driver.java index 8862b206..ab00500a 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Driver.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Driver.java @@ -1,9 +1,6 @@ package com.netflix.concurrency.limits.grpc.server.example; import com.google.common.util.concurrent.Uninterruptibles; -import com.netflix.concurrency.limits.grpc.client.ConcurrencyLimitClientInterceptor; -import com.netflix.concurrency.limits.limit.FixedLimit; -import com.netflix.concurrency.limits.limiter.SimpleLimiter; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientInterceptors; @@ -25,6 +22,8 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.METHOD_DESCRIPTOR; + public class Driver { public static final Metadata.Key ID_HEADER = Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER); @@ -158,7 +157,7 @@ public void run() { long startTime = System.nanoTime(); Uninterruptibles.sleepUninterruptibly(Math.max(0, segment.nextDelay()), TimeUnit.MILLISECONDS); - ClientCalls.asyncUnaryCall(channel.newCall(TestServer.METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request", + ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request", new StreamObserver() { @Override public void onNext(String value) { diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/TestServer.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/TestServer.java index 21a52948..3d43a124 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/TestServer.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/TestServer.java @@ -13,12 +13,9 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.grpc.StringMarshaller; import com.netflix.concurrency.limits.grpc.server.ConcurrencyLimitServerInterceptor; import com.netflix.concurrency.limits.grpc.server.GrpcServerRequestContext; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; import io.grpc.Server; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptors; @@ -31,16 +28,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.netflix.concurrency.limits.grpc.util.InterceptorTestUtil.METHOD_DESCRIPTOR; + public class TestServer { private static final Logger LOG = LoggerFactory.getLogger(TestServer.class); - public static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() - .setType(MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(StringMarshaller.INSTANCE) - .setResponseMarshaller(StringMarshaller.INSTANCE) - .build(); - private interface Segment { long duration(); long latency(); diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/InterceptorTestUtil.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/InterceptorTestUtil.java new file mode 100644 index 00000000..92e83647 --- /dev/null +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/InterceptorTestUtil.java @@ -0,0 +1,41 @@ +package com.netflix.concurrency.limits.grpc.util; + +import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.Registry; +import io.grpc.MethodDescriptor; +import org.junit.Assert; + +import java.util.concurrent.TimeUnit; + +public class InterceptorTestUtil { + + public static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(StringMarshaller.INSTANCE) + .setResponseMarshaller(StringMarshaller.INSTANCE) + .build(); + + public static final MethodDescriptor BYPASS_METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/bypass") + .setRequestMarshaller(StringMarshaller.INSTANCE) + .setResponseMarshaller(StringMarshaller.INSTANCE) + .build(); + + public static final String TEST_METRIC_NAME = "unit.test.limiter"; + + public static void verifyCounts(int dropped, int ignored, int success, int rejected, int bypassed, Registry registry, String idTagValue) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + Assert.assertEquals(dropped, registry.counter(TEST_METRIC_NAME + ".call", "id", idTagValue, "status", "dropped").count()); + Assert.assertEquals(ignored, registry.counter(TEST_METRIC_NAME + ".call", "id", idTagValue, "status", "ignored").count()); + Assert.assertEquals(success, registry.counter(TEST_METRIC_NAME + ".call", "id", idTagValue, "status", "success").count()); + Assert.assertEquals(rejected, registry.counter(TEST_METRIC_NAME + ".call", "id", idTagValue, "status", "rejected").count()); + Assert.assertEquals(bypassed, registry.counter(TEST_METRIC_NAME + ".call", "id", idTagValue, "status", "bypassed").count()); + } + + +} diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/mockito/OptionalResultCaptor.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/OptionalResultCaptor.java similarity index 93% rename from concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/mockito/OptionalResultCaptor.java rename to concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/OptionalResultCaptor.java index 230217fd..b2f33d79 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/mockito/OptionalResultCaptor.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/OptionalResultCaptor.java @@ -1,4 +1,4 @@ -package com.netflix.concurrency.limits.grpc.mockito; +package com.netflix.concurrency.limits.grpc.util; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/StringMarshaller.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/StringMarshaller.java similarity index 94% rename from concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/StringMarshaller.java rename to concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/StringMarshaller.java index 44e6376e..f9583423 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/StringMarshaller.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/util/StringMarshaller.java @@ -1,4 +1,4 @@ -package com.netflix.concurrency.limits.grpc; +package com.netflix.concurrency.limits.grpc.util; import java.io.ByteArrayInputStream; import java.io.IOException; diff --git a/concurrency-limits-servlet-jakarta/build.gradle b/concurrency-limits-servlet-jakarta/build.gradle index 5a0d35b1..731adebb 100644 --- a/concurrency-limits-servlet-jakarta/build.gradle +++ b/concurrency-limits-servlet-jakarta/build.gradle @@ -30,13 +30,13 @@ dependencies { compileOnly "jakarta.servlet:jakarta.servlet-api:6.0.0" implementation "org.slf4j:slf4j-api:${slf4jVersion}" + testImplementation project(":concurrency-limits-spectator") testImplementation "org.mockito:mockito-core:${mockitoVersion}" testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}" testImplementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" testImplementation "org.eclipse.jetty:jetty-server:11.+" testImplementation "org.eclipse.jetty:jetty-servlet:11.+" testCompileOnly "junit:junit:${jUnitLegacyVersion}" - testImplementation "org.junit.jupiter:junit-jupiter-api:${jUnitVersion}" testImplementation "org.springframework:spring-test:6.+" testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${jUnitVersion}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${jUnitVersion}" diff --git a/concurrency-limits-servlet-jakarta/dependencies.lock b/concurrency-limits-servlet-jakarta/dependencies.lock index e72313af..873ec968 100644 --- a/concurrency-limits-servlet-jakarta/dependencies.lock +++ b/concurrency-limits-servlet-jakarta/dependencies.lock @@ -25,6 +25,15 @@ "com.netflix.concurrency-limits:concurrency-limits-core": { "project": true }, + "com.netflix.concurrency-limits:concurrency-limits-spectator": { + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], + "locked": "1.7.9" + }, "jakarta.servlet:jakarta.servlet-api": { "locked": "6.0.0" }, @@ -32,13 +41,10 @@ "locked": "4.13.2" }, "org.eclipse.jetty:jetty-server": { - "locked": "11.0.13" + "locked": "11.0.20" }, "org.eclipse.jetty:jetty-servlet": { - "locked": "11.0.13" - }, - "org.junit.jupiter:junit-jupiter-api": { - "locked": "5.9.2" + "locked": "11.0.20" }, "org.mockito:mockito-core": { "locked": "4.11.0" @@ -47,36 +53,45 @@ "locked": "4.11.0" }, "org.slf4j:slf4j-api": { - "locked": "2.0.5" + "locked": "2.0.9" }, "org.slf4j:slf4j-log4j12": { "locked": "1.7.36" }, "org.springframework:spring-test": { - "locked": "6.0.5" + "locked": "6.1.5" } }, "testRuntimeClasspath": { "com.netflix.concurrency-limits:concurrency-limits-core": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], "project": true }, + "com.netflix.concurrency-limits:concurrency-limits-spectator": { + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], + "locked": "1.7.9" + }, "jakarta.servlet:jakarta.servlet-api": { "locked": "6.0.0" }, "org.eclipse.jetty:jetty-server": { - "locked": "11.0.13" + "locked": "11.0.20" }, "org.eclipse.jetty:jetty-servlet": { - "locked": "11.0.13" - }, - "org.junit.jupiter:junit-jupiter-api": { - "locked": "5.9.2" + "locked": "11.0.20" }, "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.9.2" + "locked": "5.10.2" }, "org.junit.vintage:junit-vintage-engine": { - "locked": "5.9.2" + "locked": "5.10.2" }, "org.mockito:mockito-core": { "locked": "4.11.0" @@ -86,15 +101,16 @@ }, "org.slf4j:slf4j-api": { "firstLevelTransitive": [ - "com.netflix.concurrency-limits:concurrency-limits-core" + "com.netflix.concurrency-limits:concurrency-limits-core", + "com.netflix.concurrency-limits:concurrency-limits-spectator" ], - "locked": "2.0.5" + "locked": "2.0.9" }, "org.slf4j:slf4j-log4j12": { "locked": "1.7.36" }, "org.springframework:spring-test": { - "locked": "6.0.5" + "locked": "6.1.5" } } } \ No newline at end of file diff --git a/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java b/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java index e44484c2..c9d6fe3d 100644 --- a/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java @@ -73,6 +73,62 @@ public ServletLimiterBuilder partitionByPathInfo(Function pathTo return partitionResolver(request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null)); } + /** + * Bypass limit if the value of the provided header name matches the specified value. + * @param name The name of the header to check. + * This should match exactly with the header name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the header in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByHeader(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getHeader(name))); + } + + /** + * Bypass limit if the value of the provided attribute name matches the specified value. + * @param name The name of the attribute to check. + * This should match exactly with the attribute name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the attribute in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByAttribute(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getAttribute(name).toString())); + } + + /** + * Bypass limit if the value of the provided parameter name matches the specified value. + * @param name The name of the parameter to check. + * This should match exactly with the parameter name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the parameter in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByParameter(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getParameter(name))); + } + + /** + * Bypass limit if the request path info matches the specified path. + * @param pathInfo The path info to check against the {@link HttpServletRequest } pathInfo. + * If the request's pathInfo matches this, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByPathInfo(String pathInfo) { + return bypassLimitResolver((context) -> pathInfo.equals(context.getPathInfo())); + } + + /** + * Bypass limit if the request method matches the specified method. + * @param method The HTTP method (e.g. GET, POST, or PUT) to check against the {@link HttpServletRequest } method. + * If the request's method matches this method, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByMethod(String method) { + return bypassLimitResolver((context) -> method.equals(context.getMethod())); + } + @Override protected ServletLimiterBuilder self() { return this; diff --git a/concurrency-limits-servlet-jakarta/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java b/concurrency-limits-servlet-jakarta/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java index 6d4dcb72..701a9a9e 100644 --- a/concurrency-limits-servlet-jakarta/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java +++ b/concurrency-limits-servlet-jakarta/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java @@ -1,11 +1,14 @@ package com.netflix.concurrency.limits; - import com.netflix.concurrency.limits.servlet.jakarta.ConcurrencyLimitServletFilter; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; +import com.netflix.concurrency.limits.servlet.jakarta.ServletLimiterBuilder; +import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; +import com.netflix.spectator.api.DefaultRegistry; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; import org.springframework.mock.web.MockFilterChain; import org.springframework.mock.web.MockHttpServletRequest; import org.springframework.mock.web.MockHttpServletResponse; @@ -13,40 +16,49 @@ import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServletRequest; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.Optional; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doReturn; -@ExtendWith(MockitoExtension.class) public class ConcurrencyLimitServletFilterTest { - @Mock - Limiter limiter; + @Rule + public TestName testName = new TestName(); - @Mock - Limiter.Listener listener; + Limiter limiter; + DefaultRegistry registry = new DefaultRegistry(); + SpectatorMetricRegistry spectatorMetricRegistry = new SpectatorMetricRegistry(registry, registry.createId("unit.test.limiter")); + + @Before + public void beforeEachTest() { + + // Will bypass GET calls or calls with /admin path or both + limiter = Mockito.spy(new ServletLimiterBuilder() + .bypassLimitByMethod("GET") + .bypassLimitByPathInfo("/admin/health") + .named(testName.getMethodName()) + .metricRegistry(spectatorMetricRegistry) + .build()); + } @Test public void testDoFilterAllowed() throws ServletException, IOException { ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); - when(limiter.acquire(any())).thenReturn(Optional.of(listener)); - MockHttpServletRequest request = new MockHttpServletRequest(); MockHttpServletResponse response = new MockHttpServletResponse(); MockFilterChain filterChain = new MockFilterChain(); - filter.doFilter(request, response, filterChain); - assertEquals(request, filterChain.getRequest(), "Request should be passed to the downstream chain"); - assertEquals(response, filterChain.getResponse(), "Response should be passed to the downstream chain"); + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); - verify(listener).onSuccess(); + verifyCounts(0, 0, 1, 0, 0); } @Test @@ -54,17 +66,15 @@ public void testDoFilterThrottled() throws ServletException, IOException { ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); //Empty means to throttle this request - when(limiter.acquire(any())).thenReturn(Optional.empty()); + doReturn(Optional.empty()).when(limiter).acquire(any()); MockHttpServletResponse response = new MockHttpServletResponse(); MockFilterChain filterChain = new MockFilterChain(); - filter.doFilter(new MockHttpServletRequest(), response, filterChain); - assertNull(filterChain.getRequest(), "doFilter should not be called on the filterchain"); - - assertEquals(429, response.getStatus(), "Status should be 429 - too many requests"); + assertNull("doFilter should not be called on the filterchain", filterChain.getRequest()); + assertEquals("Status should be 429 - too many requests", 429, response.getStatus()); } @Test @@ -73,12 +83,80 @@ public void testDoFilterThrottledCustomStatus() throws ServletException, IOExcep ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter, customThrottleStatus); //Empty means to throttle this request - when(limiter.acquire(any())).thenReturn(Optional.empty()); + doReturn(Optional.empty()).when(limiter).acquire(any()); MockHttpServletResponse response = new MockHttpServletResponse(); filter.doFilter(new MockHttpServletRequest(), response, new MockFilterChain()); - assertEquals(customThrottleStatus, response.getStatus(), "custom status should be respected"); + assertEquals("custom status should be respected", customThrottleStatus, response.getStatus()); + + } + + @Test + public void testDoFilterBypassCheckPassedForMethod() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("GET"); + request.setPathInfo("/live/path"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 0, 0, 1); + + } + + @Test + public void testDoFilterBypassCheckPassedForPath() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("POST"); + request.setPathInfo("/admin/health"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 0, 0, 1); + } + + @Test + public void testDoFilterBypassCheckFailed() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("POST"); + request.setPathInfo("/live/path"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 1, 0, 0); + } + + public void verifyCounts(int dropped, int ignored, int success, int rejected, int bypassed) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + assertEquals(dropped, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "dropped").count()); + assertEquals(ignored, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "ignored").count()); + assertEquals(success, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "success").count()); + assertEquals(rejected, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "rejected").count()); + assertEquals(bypassed, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "bypassed").count()); } } diff --git a/concurrency-limits-servlet/build.gradle b/concurrency-limits-servlet/build.gradle index 92be696c..370a30fc 100644 --- a/concurrency-limits-servlet/build.gradle +++ b/concurrency-limits-servlet/build.gradle @@ -9,13 +9,13 @@ dependencies { compileOnly "javax.servlet:javax.servlet-api:3.1.0" implementation "org.slf4j:slf4j-api:${slf4jVersion}" + testImplementation project(":concurrency-limits-spectator") testImplementation "org.mockito:mockito-core:${mockitoVersion}" testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}" testImplementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" testImplementation "org.eclipse.jetty:jetty-server:9.4.+" testImplementation "org.eclipse.jetty:jetty-servlet:9.4.+" testCompileOnly "junit:junit:${jUnitLegacyVersion}" - testImplementation "org.junit.jupiter:junit-jupiter-api:${jUnitVersion}" testImplementation "org.springframework:spring-test:${springVersion}" testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${jUnitVersion}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${jUnitVersion}" diff --git a/concurrency-limits-servlet/dependencies.lock b/concurrency-limits-servlet/dependencies.lock index 011e4aa6..a84fbe39 100644 --- a/concurrency-limits-servlet/dependencies.lock +++ b/concurrency-limits-servlet/dependencies.lock @@ -25,23 +25,29 @@ "com.netflix.concurrency-limits:concurrency-limits-core": { "project": true }, + "com.netflix.concurrency-limits:concurrency-limits-spectator": { + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], + "locked": "1.7.9" + }, "junit:junit": { "locked": "4.13.2" }, "org.eclipse.jetty:jetty-server": { - "locked": "9.4.48.v20220622" + "locked": "9.4.54.v20240208" }, "org.eclipse.jetty:jetty-servlet": { - "locked": "9.4.48.v20220622" - }, - "org.junit.jupiter:junit-jupiter-api": { - "locked": "5.9.0" + "locked": "9.4.54.v20240208" }, "org.mockito:mockito-core": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.mockito:mockito-junit-jupiter": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.slf4j:slf4j-api": { "locked": "1.7.36" @@ -50,37 +56,47 @@ "locked": "1.7.36" }, "org.springframework:spring-test": { - "locked": "5.3.22" + "locked": "5.3.33" } }, "testRuntimeClasspath": { "com.netflix.concurrency-limits:concurrency-limits-core": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], "project": true }, + "com.netflix.concurrency-limits:concurrency-limits-spectator": { + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.concurrency-limits:concurrency-limits-spectator" + ], + "locked": "1.7.9" + }, "org.eclipse.jetty:jetty-server": { - "locked": "9.4.48.v20220622" + "locked": "9.4.54.v20240208" }, "org.eclipse.jetty:jetty-servlet": { - "locked": "9.4.48.v20220622" - }, - "org.junit.jupiter:junit-jupiter-api": { - "locked": "5.9.0" + "locked": "9.4.54.v20240208" }, "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.junit.vintage:junit-vintage-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.mockito:mockito-core": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.mockito:mockito-junit-jupiter": { - "locked": "4.7.0" + "locked": "4.11.0" }, "org.slf4j:slf4j-api": { "firstLevelTransitive": [ - "com.netflix.concurrency-limits:concurrency-limits-core" + "com.netflix.concurrency-limits:concurrency-limits-core", + "com.netflix.concurrency-limits:concurrency-limits-spectator" ], "locked": "1.7.36" }, @@ -88,7 +104,7 @@ "locked": "1.7.36" }, "org.springframework:spring-test": { - "locked": "5.3.22" + "locked": "5.3.33" } } } \ No newline at end of file diff --git a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java index dcc78b76..6bc07d15 100644 --- a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java @@ -72,7 +72,63 @@ public ServletLimiterBuilder partitionByParameter(String name) { public ServletLimiterBuilder partitionByPathInfo(Function pathToGroup) { return partitionResolver(request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null)); } - + + /** + * Bypass the limit if the value of the provided header name matches the specified value. + * @param name The name of the header to check. + * This should match exactly with the header name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the header in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByHeader(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getHeader(name))); + } + + /** + * Bypass limit if the value of the provided attribute name matches the specified value. + * @param name The name of the attribute to check. + * This should match exactly with the attribute name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the attribute in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByAttribute(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getAttribute(name).toString())); + } + + /** + * Bypass limit if the value of the provided parameter name matches the specified value. + * @param name The name of the parameter to check. + * This should match exactly with the parameter name in the {@link HttpServletRequest } context. + * @param value The value to compare against. + * If the value of the parameter in the context matches this value, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByParameter(String name, String value) { + return bypassLimitResolver((context) -> value.equals(context.getParameter(name))); + } + + /** + * Bypass limit if the request path info matches the specified path. + * @param pathInfo The path info to check against the {@link HttpServletRequest } pathInfo. + * If the request's pathInfo matches this, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByPathInfo(String pathInfo) { + return bypassLimitResolver((context) -> pathInfo.equals(context.getPathInfo())); + } + + /** + * Bypass limit if the request method matches the specified method. + * @param method The HTTP method (e.g. GET, POST, or PUT) to check against the {@link HttpServletRequest } method. + * If the request's method matches this method, the limit will be bypassed. + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitByMethod(String method) { + return bypassLimitResolver((context) -> method.equals(context.getMethod())); + } + @Override protected ServletLimiterBuilder self() { return this; diff --git a/concurrency-limits-servlet/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java b/concurrency-limits-servlet/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java index 8eecc953..bbcd50ac 100644 --- a/concurrency-limits-servlet/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java +++ b/concurrency-limits-servlet/src/test/java/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java @@ -1,54 +1,64 @@ package com.netflix.concurrency.limits; - import com.netflix.concurrency.limits.servlet.ConcurrencyLimitServletFilter; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; +import com.netflix.concurrency.limits.servlet.ServletLimiterBuilder; +import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; +import com.netflix.spectator.api.DefaultRegistry; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.mock.web.MockFilterChain; import org.springframework.mock.web.MockHttpServletRequest; import org.springframework.mock.web.MockHttpServletResponse; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; - import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.Optional; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doReturn; -@ExtendWith(MockitoExtension.class) public class ConcurrencyLimitServletFilterTest { - @Mock - Limiter limiter; + @Rule + public TestName testName = new TestName(); - @Mock - Limiter.Listener listener; + Limiter limiter; + DefaultRegistry registry = new DefaultRegistry(); + SpectatorMetricRegistry spectatorMetricRegistry = new SpectatorMetricRegistry(registry, registry.createId("unit.test.limiter")); + + @Before + public void beforeEachTest() { + + // Will bypass GET calls or calls with /admin path or both + limiter = Mockito.spy(new ServletLimiterBuilder() + .bypassLimitByMethod("GET") + .bypassLimitByPathInfo("/admin/health") + .named(testName.getMethodName()) + .metricRegistry(spectatorMetricRegistry) + .build()); + } @Test public void testDoFilterAllowed() throws ServletException, IOException { ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); - when(limiter.acquire(any())).thenReturn(Optional.of(listener)); - MockHttpServletRequest request = new MockHttpServletRequest(); MockHttpServletResponse response = new MockHttpServletResponse(); MockFilterChain filterChain = new MockFilterChain(); - filter.doFilter(request, response, filterChain); - assertEquals(request, filterChain.getRequest(), "Request should be passed to the downstream chain"); - assertEquals(response, filterChain.getResponse(), "Response should be passed to the downstream chain"); + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); - verify(listener).onSuccess(); + verifyCounts(0, 0, 1, 0, 0); } @Test @@ -56,17 +66,15 @@ public void testDoFilterThrottled() throws ServletException, IOException { ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); //Empty means to throttle this request - when(limiter.acquire(any())).thenReturn(Optional.empty()); + doReturn(Optional.empty()).when(limiter).acquire(any()); MockHttpServletResponse response = new MockHttpServletResponse(); MockFilterChain filterChain = new MockFilterChain(); - filter.doFilter(new MockHttpServletRequest(), response, filterChain); - assertNull(filterChain.getRequest(), "doFilter should not be called on the filterchain"); - - assertEquals(429, response.getStatus(), "Status should be 429 - too many requests"); + assertNull("doFilter should not be called on the filterchain", filterChain.getRequest()); + assertEquals("Status should be 429 - too many requests", 429, response.getStatus()); } @Test @@ -75,12 +83,80 @@ public void testDoFilterThrottledCustomStatus() throws ServletException, IOExcep ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter, customThrottleStatus); //Empty means to throttle this request - when(limiter.acquire(any())).thenReturn(Optional.empty()); + doReturn(Optional.empty()).when(limiter).acquire(any()); MockHttpServletResponse response = new MockHttpServletResponse(); - filter.doFilter(new MockHttpServletRequest(), response, new MockFilterChain()); + filter.doFilter(new MockHttpServletRequest(), response, new MockFilterChain()); + + assertEquals("custom status should be respected", customThrottleStatus, response.getStatus()); + + } + + @Test + public void testDoFilterBypassCheckPassedForMethod() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("GET"); + request.setPathInfo("/live/path"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 0, 0, 1); + + } + + @Test + public void testDoFilterBypassCheckPassedForPath() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("POST"); + request.setPathInfo("/admin/health"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 0, 0, 1); + } + + @Test + public void testDoFilterBypassCheckFailed() throws ServletException, IOException { + + ConcurrencyLimitServletFilter filter = new ConcurrencyLimitServletFilter(limiter); + + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setMethod("POST"); + request.setPathInfo("/live/path"); + MockHttpServletResponse response = new MockHttpServletResponse(); + MockFilterChain filterChain = new MockFilterChain(); + + filter.doFilter(request, response, filterChain); + + assertEquals("Request should be passed to the downstream chain", request, filterChain.getRequest()); + assertEquals("Response should be passed to the downstream chain", response, filterChain.getResponse()); + verifyCounts(0, 0, 1, 0, 0); + } - assertEquals(customThrottleStatus, response.getStatus(), "custom status should be respected"); + public void verifyCounts(int dropped, int ignored, int success, int rejected, int bypassed) { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + assertEquals(dropped, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "dropped").count()); + assertEquals(ignored, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "ignored").count()); + assertEquals(success, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "success").count()); + assertEquals(rejected, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "rejected").count()); + assertEquals(bypassed, registry.counter("unit.test.limiter.call", "id", testName.getMethodName(), "status", "bypassed").count()); } } diff --git a/concurrency-limits-spectator/dependencies.lock b/concurrency-limits-spectator/dependencies.lock index 16e03891..2dfd05e2 100644 --- a/concurrency-limits-spectator/dependencies.lock +++ b/concurrency-limits-spectator/dependencies.lock @@ -4,7 +4,7 @@ "project": true }, "com.netflix.spectator:spectator-api": { - "locked": "1.3.6" + "locked": "1.7.9" }, "org.slf4j:slf4j-api": { "locked": "1.7.36" @@ -15,7 +15,7 @@ "project": true }, "com.netflix.spectator:spectator-api": { - "locked": "1.3.6" + "locked": "1.7.9" }, "org.slf4j:slf4j-api": { "firstLevelTransitive": [ @@ -29,7 +29,7 @@ "project": true }, "com.netflix.spectator:spectator-api": { - "locked": "1.3.6" + "locked": "1.7.9" }, "junit:junit": { "locked": "4.13.2" @@ -43,13 +43,13 @@ "project": true }, "com.netflix.spectator:spectator-api": { - "locked": "1.3.6" + "locked": "1.7.9" }, "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.junit.vintage:junit-vintage-engine": { - "locked": "5.9.0" + "locked": "5.10.2" }, "org.slf4j:slf4j-api": { "firstLevelTransitive": [