From 19fa88a53963aa691be07f59713c020cdb8f6c9b Mon Sep 17 00:00:00 2001 From: elandau Date: Mon, 15 Jul 2019 21:36:10 -0700 Subject: [PATCH] Don't onIgnore in gRPC cancel The current behavior is to onIgnore the acquired token when the requests is cancelled. This is not the correct behavior as it would artificially make it look like the limit has dropped while the server thread is till busy processing requests. Instead the limit should remain acquired until the endpoint handler either calls onCompleted or onError. Note that for async endpoint implementations that forget or fail to call onCompleted or onError the limit will never be reduced. --- .../ConcurrencyLimitServerInterceptor.java | 21 ---------- ...ConcurrencyLimitServerInterceptorTest.java | 42 ++++++++----------- 2 files changed, 18 insertions(+), 45 deletions(-) diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptor.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptor.java index 61057775..d82b0800 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptor.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptor.java @@ -15,10 +15,8 @@ */ package com.netflix.concurrency.limits.grpc.server; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.internal.Preconditions; -import io.grpc.Context; import io.grpc.ForwardingServerCall; import io.grpc.ForwardingServerCallListener; import io.grpc.Metadata; @@ -30,8 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -51,12 +47,6 @@ public class ConcurrencyLimitServerInterceptor implements ServerInterceptor { private Supplier trailerSupplier; - private static final Executor executor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("concurrency-limit-cleanup-%d") - .build()); - public static class Builder { private Supplier statusSupplier = () -> LIMIT_EXCEEDED_STATUS; private Supplier trailerSupplier = Metadata::new; @@ -158,8 +148,6 @@ void safeComplete(Runnable action) { @Override public Listener apply(Limiter.Listener listener) { - Context.current().addListener(context -> safeComplete(listener::onIgnore), executor); - final Listener delegate; try { @@ -214,15 +202,6 @@ public void onHalfClose() { throw t; } } - - @Override - public void onCancel() { - try { - super.onCancel(); - } finally { - safeComplete(listener::onIgnore); - } - } }; } }) diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java index 9cb8a7b9..a15aa88a 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java @@ -158,22 +158,33 @@ public void releaseOnUncaughtException() throws IOException { @Test public void releaseOnCancellation() { // Setup server - startServer((req, observer) -> {}); + startServer((req, observer) -> { + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + observer.onNext("delayed_response"); + observer.onCompleted(); + }); ListenableFuture future = ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "foo"); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); future.cancel(true); + // Verify Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); - Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore(); + Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore(); - verifyCounts(0, 1, 0, 0); + Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess(); + + verifyCounts(0, 0, 1, 0); } @Test public void releaseOnDeadlineExceeded() { // Setup server - startServer((req, observer) -> {}); + startServer((req, observer) -> { + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + observer.onNext("delayed_response"); + observer.onCompleted(); + }); try { ClientCalls.blockingUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS)), "foo"); @@ -182,28 +193,11 @@ public void releaseOnDeadlineExceeded() { } // Verify Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); - Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore(); - - verifyCounts(0, 1, 0, 0); - } + Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore(); - @Test - public void releaseOnMissingHalfClose() { - // Setup server - startServer((req, observer) -> {}); - - ClientCall call = channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withDeadlineAfter(500, TimeUnit.MILLISECONDS)); - call.start(new ClientCall.Listener() {}, new Metadata()); - call.request(2); - call.sendMessage("foo"); - - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess(); - // Verify - Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); - Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore(); - - verifyCounts(0, 1, 0, 0); + verifyCounts(0, 0, 1, 0); } public void verifyCounts(int dropped, int ignored, int success, int rejected) {