From e4aad71345c39853945fd97b2ea1f85feaf2fb3e Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Tue, 23 May 2023 15:56:45 +0900 Subject: [PATCH 01/11] Fix bug related with context propagation (CoroutineServerInterceptor) --- grpc-kotlin/build.gradle.kts | 1 + .../grpc/kotlin/CoroutineServerInterceptor.kt | 38 ++++--- .../kotlin/CoroutineServerInterceptorTest.kt | 101 ++++++++++++++++-- 3 files changed, 116 insertions(+), 24 deletions(-) diff --git a/grpc-kotlin/build.gradle.kts b/grpc-kotlin/build.gradle.kts index bdfd375c5d1..d43aa525b05 100644 --- a/grpc-kotlin/build.gradle.kts +++ b/grpc-kotlin/build.gradle.kts @@ -3,6 +3,7 @@ dependencies { implementation(project(":grpc")) implementation(libs.grpc.kotlin) + implementation(libs.kotlin.reflect) implementation(libs.kotlin.coroutines.jdk8) testImplementation(libs.kotlin.coroutines.test) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index 6952725efa5..f8d5bcc6d39 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -17,18 +17,21 @@ package com.linecorp.armeria.server.grpc.kotlin import com.linecorp.armeria.common.annotation.UnstableApi -import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext -import com.linecorp.armeria.internal.server.grpc.AbstractServerCall import com.linecorp.armeria.server.grpc.AsyncServerInterceptor +import io.grpc.Context import io.grpc.Metadata import io.grpc.ServerCall import io.grpc.ServerCallHandler import io.grpc.ServerInterceptor -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.asCoroutineDispatcher +import io.grpc.kotlin.CoroutineContextServerInterceptor +import io.grpc.kotlin.GrpcContextElement +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.future.future import java.util.concurrent.CompletableFuture +import kotlin.coroutines.CoroutineContext +import kotlin.reflect.full.companionObject +import kotlin.reflect.full.companionObjectInstance +import kotlin.reflect.full.memberProperties /** * A [ServerInterceptor] that is able to suspend the interceptor without blocking the @@ -54,20 +57,17 @@ import java.util.concurrent.CompletableFuture @UnstableApi interface CoroutineServerInterceptor : AsyncServerInterceptor { - @OptIn(DelicateCoroutinesApi::class) override fun asyncInterceptCall( call: ServerCall, headers: Metadata, next: ServerCallHandler ): CompletableFuture> { - check(call is AbstractServerCall) { - throw IllegalArgumentException( - "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server" - ) - } - val executor = call.blockingExecutor() ?: call.eventLoop() - - return GlobalScope.future(executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx())) { + // COROUTINE_CONTEXT_KEY.get(): + // It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor. + // (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor) + // GrpcContextElement.current(): + // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. + return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future { suspendedInterceptCall(call, headers, next) } } @@ -87,4 +87,14 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor { headers: Metadata, next: ServerCallHandler ): ServerCall.Listener + + companion object { + @Suppress("UNCHECKED_CAST") + internal val COROUTINE_CONTEXT_KEY: Context.Key = + CoroutineContextServerInterceptor::class.let { kclass -> + val companionObject = checkNotNull(kclass.companionObject) + val property = companionObject.memberProperties.single { it.name == "COROUTINE_CONTEXT_KEY" } + checkNotNull(property.getter.call(kclass.companionObjectInstance)) as Context.Key + } + } } diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt index 08faa79fe10..a05514ae440 100644 --- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt +++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt @@ -16,6 +16,7 @@ package com.linecorp.armeria.server.grpc.kotlin +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.protobuf.ByteString import com.linecorp.armeria.client.grpc.GrpcClients import com.linecorp.armeria.common.RequestContext @@ -35,12 +36,19 @@ import com.linecorp.armeria.server.ServiceRequestContext import com.linecorp.armeria.server.auth.Authorizer import com.linecorp.armeria.server.grpc.GrpcService import com.linecorp.armeria.testing.junit5.server.ServerExtension +import io.grpc.Context +import io.grpc.Contexts import io.grpc.Metadata import io.grpc.ServerCall import io.grpc.ServerCallHandler import io.grpc.Status import io.grpc.StatusException +import io.grpc.kotlin.CoroutineContextServerInterceptor +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.asContextElement +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow @@ -51,13 +59,16 @@ import kotlinx.coroutines.flow.toList import kotlinx.coroutines.future.await import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kotlin.coroutines.CoroutineContext internal class CoroutineServerInterceptorTest { @@ -205,19 +216,23 @@ internal class CoroutineServerInterceptorTest { @RegisterExtension val server: ServerExtension = object : ServerExtension() { override fun configure(sb: ServerBuilder) { - val statusFunction = GrpcStatusFunction { _: RequestContext, throwable: Throwable, _: Metadata -> - if (throwable is AnticipatedException && throwable.message == "Invalid access") { - return@GrpcStatusFunction Status.UNAUTHENTICATED + val statusFunction = + GrpcStatusFunction { _: RequestContext, throwable: Throwable, _: Metadata -> + if (throwable is AnticipatedException && throwable.message == "Invalid access") { + return@GrpcStatusFunction Status.UNAUTHENTICATED + } + // Fallback to the default. + null } - // Fallback to the default. - null - } + val threadLocalInterceptor = ThreadLocalInterceptor() val authInterceptor = AuthInterceptor() + val coroutineNameInterceptor = CoroutineNameInterceptor() sb.serviceUnder( "/non-blocking", GrpcService.builder() .exceptionMapping(statusFunction) - .intercept(authInterceptor) + // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor) .addService(TestService()) .build() ) @@ -226,7 +241,8 @@ internal class CoroutineServerInterceptorTest { GrpcService.builder() .addService(TestService()) .exceptionMapping(statusFunction) - .intercept(authInterceptor) + // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor) .useBlockingTaskExecutor(true) .build() ) @@ -236,6 +252,10 @@ internal class CoroutineServerInterceptorTest { private const val username = "Armeria" private const val token = "token-1234" + private val executorDispatcher = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("my-executor").build() + ).asCoroutineDispatcher() + private class AuthInterceptor : CoroutineServerInterceptor { private val authorizer = Authorizer { ctx: ServiceRequestContext, _: Metadata -> val future = CompletableFuture() @@ -254,21 +274,70 @@ internal class CoroutineServerInterceptorTest { headers: Metadata, next: ServerCallHandler ): ServerCall.Listener { + assertContextPropagation() + + delay(100) + assertContextPropagation() // OK even if resume from suspend. + + withContext(executorDispatcher) { + // OK even if the dispatcher is switched + assertContextPropagation() + assertThat(Thread.currentThread().name).contains("my-executor") + } + val result = authorizer.authorize(ServiceRequestContext.current(), headers).await() + if (result) { - return next.startCall(call, headers) + val ctx = Context.current().withValue(AUTHORIZATION_RESULT_GRPC_CONTEXT_KEY, "OK") + return Contexts.interceptCall(ctx, call, headers, next) } else { throw AnticipatedException("Invalid access") } } + + private suspend fun assertContextPropagation() { + assertThat(ServiceRequestContext.currentOrNull()).isNotNull() + assertThat(currentCoroutineContext()[CoroutineName]?.name).isEqualTo("my-coroutine-name") + } + + companion object { + val AUTHORIZATION_RESULT_GRPC_CONTEXT_KEY: Context.Key = + Context.key("authorization-result") + } + } + + private class CoroutineNameInterceptor : CoroutineContextServerInterceptor() { + override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext { + return CoroutineName("my-coroutine-name") + } + } + + private class ThreadLocalInterceptor : CoroutineContextServerInterceptor() { + override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext { + return THREAD_LOCAL.asContextElement(value = "thread-local-value") + } + + companion object { + val THREAD_LOCAL = ThreadLocal() + } } private class TestService : TestServiceGrpcKt.TestServiceCoroutineImplBase() { override suspend fun unaryCall(request: SimpleRequest): SimpleResponse { + assertContextPropagation() + + delay(100) + assertContextPropagation() // OK even if resume from suspend. + + withContext(executorDispatcher) { + // OK even if the dispatcher is switched + assertContextPropagation() + assertThat(Thread.currentThread().name).contains("my-executor") + } + if (request.fillUsername) { return SimpleResponse.newBuilder().setUsername(username).build() } - return SimpleResponse.getDefaultInstance() } @@ -276,6 +345,7 @@ internal class CoroutineServerInterceptorTest { return flow { for (i in 1..5) { delay(500) + assertContextPropagation() emit(buildReply(username)) } } @@ -284,16 +354,27 @@ internal class CoroutineServerInterceptorTest { override suspend fun streamingInputCall(requests: Flow): StreamingInputCallResponse { val names = requests.map { it.payload.body.toString() }.toList() + assertContextPropagation() + return buildReply(names) } override fun fullDuplexCall(requests: Flow): Flow { return flow { requests.collect { + delay(500) + assertContextPropagation() emit(buildReply(username)) } } } + + private suspend fun assertContextPropagation() { + assertThat(ServiceRequestContext.currentOrNull()).isNotNull() + assertThat(currentCoroutineContext()[CoroutineName]?.name).isEqualTo("my-coroutine-name") + assertThat(ThreadLocalInterceptor.THREAD_LOCAL.get()).isEqualTo("thread-local-value") + assertThat(AuthInterceptor.AUTHORIZATION_RESULT_GRPC_CONTEXT_KEY.get()).isEqualTo("OK") + } } private fun buildReply(message: String): StreamingOutputCallResponse = From 2f18c5f5333c3d615aaebb7d5de27a18d1099a58 Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Wed, 31 May 2023 15:33:14 +0900 Subject: [PATCH 02/11] apply review feedback --- .../grpc/kotlin/CoroutineServerInterceptor.kt | 17 ++++++++- .../kotlin/CoroutineServerInterceptorTest.kt | 35 ++++++++++++++++--- .../server/grpc/GrpcServiceBuilder.java | 10 +++--- 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index f8d5bcc6d39..219baf41e29 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -17,6 +17,8 @@ package com.linecorp.armeria.server.grpc.kotlin import com.linecorp.armeria.common.annotation.UnstableApi +import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext +import com.linecorp.armeria.internal.server.grpc.AbstractServerCall import com.linecorp.armeria.server.grpc.AsyncServerInterceptor import io.grpc.Context import io.grpc.Metadata @@ -26,6 +28,7 @@ import io.grpc.ServerInterceptor import io.grpc.kotlin.CoroutineContextServerInterceptor import io.grpc.kotlin.GrpcContextElement import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.future.future import java.util.concurrent.CompletableFuture import kotlin.coroutines.CoroutineContext @@ -62,12 +65,24 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor { headers: Metadata, next: ServerCallHandler ): CompletableFuture> { + check(call is AbstractServerCall) { + throw IllegalArgumentException( + "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server" + ) + } + val executor = call.blockingExecutor() ?: call.eventLoop() + // COROUTINE_CONTEXT_KEY.get(): // It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor. // (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor) // GrpcContextElement.current(): // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. - return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future { + return CoroutineScope( + executor.asCoroutineDispatcher() + + ArmeriaRequestCoroutineContext(call.ctx()) + + COROUTINE_CONTEXT_KEY.get() + + GrpcContextElement.current() + ).future { suspendedInterceptCall(call, headers, next) } } diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt index a05514ae440..c8ca61e49ac 100644 --- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt +++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt @@ -34,6 +34,7 @@ import com.linecorp.armeria.internal.testing.AnticipatedException import com.linecorp.armeria.server.ServerBuilder import com.linecorp.armeria.server.ServiceRequestContext import com.linecorp.armeria.server.auth.Authorizer +import com.linecorp.armeria.server.grpc.AsyncServerInterceptor import com.linecorp.armeria.server.grpc.GrpcService import com.linecorp.armeria.testing.junit5.server.ServerExtension import io.grpc.Context @@ -231,8 +232,13 @@ internal class CoroutineServerInterceptorTest { "/non-blocking", GrpcService.builder() .exceptionMapping(statusFunction) - // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor - .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor) + // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + .intercept( + threadLocalInterceptor, + authInterceptor, + coroutineNameInterceptor, + MyAsyncInterceptor(), + ) .addService(TestService()) .build() ) @@ -241,8 +247,13 @@ internal class CoroutineServerInterceptorTest { GrpcService.builder() .addService(TestService()) .exceptionMapping(statusFunction) - // applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor - .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor) + // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + .intercept( + threadLocalInterceptor, + authInterceptor, + coroutineNameInterceptor, + MyAsyncInterceptor(), + ) .useBlockingTaskExecutor(true) .build() ) @@ -322,6 +333,22 @@ internal class CoroutineServerInterceptorTest { } } + private class MyAsyncInterceptor : AsyncServerInterceptor { + override fun asyncInterceptCall( + call: ServerCall, + headers: Metadata, + next: ServerCallHandler + ): CompletableFuture> { + return CompletableFuture.supplyAsync({ + next.startCall(call, headers) + }, EXECUTOR) + } + + companion object { + private val EXECUTOR = Executors.newSingleThreadExecutor() + } + } + private class TestService : TestServiceGrpcKt.TestServiceCoroutineImplBase() { override suspend fun unaryCall(request: SimpleRequest): SimpleResponse { assertContextPropagation() diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java index 38b66d97664..cea97c6a5f2 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java @@ -948,6 +948,11 @@ static GrpcStatusFunction toGrpcStatusFunction( private ImmutableList.Builder interceptors() { if (interceptors == null) { interceptors = ImmutableList.builder(); + if (USE_COROUTINE_CONTEXT_INTERCEPTOR) { + final ServerInterceptor coroutineContextInterceptor = + new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor); + interceptors.add(coroutineContextInterceptor); + } } return interceptors; } @@ -961,11 +966,6 @@ private ImmutableList.Builder interceptors() { */ public GrpcService build() { final HandlerRegistry handlerRegistry; - if (USE_COROUTINE_CONTEXT_INTERCEPTOR) { - final ServerInterceptor coroutineContextInterceptor = - new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor); - interceptors().add(coroutineContextInterceptor); - } if (!enableUnframedRequests && unframedGrpcErrorHandler != null) { throw new IllegalStateException( "'unframedGrpcErrorHandler' can only be set if unframed requests are enabled"); From e2af251f019629e80e16a158bf43bbf4525d322a Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 12:35:39 +0900 Subject: [PATCH 03/11] Fix ArmeriaCoroutineContextInterceptor --- .../grpc/kotlin/CoroutineServerInterceptorTest.kt | 14 +++++++++++--- .../grpc/ArmeriaCoroutineContextInterceptor.java | 8 +++++++- .../server/grpc/AsyncServerInterceptor.java | 6 ++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt index c8ca61e49ac..89a65a9e330 100644 --- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt +++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt @@ -232,8 +232,10 @@ internal class CoroutineServerInterceptorTest { "/non-blocking", GrpcService.builder() .exceptionMapping(statusFunction) - // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + // applying order is "MyAsyncInterceptor -> coroutineNameInterceptor -> + // authInterceptor -> threadLocalInterceptor -> MyAsyncInterceptor" .intercept( + MyAsyncInterceptor(), threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, @@ -247,8 +249,10 @@ internal class CoroutineServerInterceptorTest { GrpcService.builder() .addService(TestService()) .exceptionMapping(statusFunction) - // applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor + // applying order is "MyAsyncInterceptor -> coroutineNameInterceptor -> + // authInterceptor -> threadLocalInterceptor -> MyAsyncInterceptor" .intercept( + MyAsyncInterceptor(), threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, @@ -339,8 +343,12 @@ internal class CoroutineServerInterceptorTest { headers: Metadata, next: ServerCallHandler ): CompletableFuture> { + val context = Context.current(); return CompletableFuture.supplyAsync({ - next.startCall(call, headers) + // NB: When the current thread invoking `startCall` is different from the thread which + // started `asyncInterceptCall`, `next.startCall()` should be wrapped with `context.call()` + // to propagate the context to the next interceptor. + context.call { next.startCall(call, headers) } }, EXECUTOR) } diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java index 223afad4894..05add2d4ccb 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java @@ -16,8 +16,11 @@ package com.linecorp.armeria.server.grpc; +import static com.google.common.base.Preconditions.checkState; + import java.util.concurrent.ScheduledExecutorService; +import com.linecorp.armeria.internal.server.grpc.AbstractServerCall; import com.linecorp.armeria.server.ServiceRequestContext; import io.grpc.Metadata; @@ -36,7 +39,10 @@ final class ArmeriaCoroutineContextInterceptor extends CoroutineContextServerInt @Override public CoroutineContext coroutineContext(ServerCall serverCall, Metadata metadata) { - final ServiceRequestContext ctx = ServiceRequestContext.current(); + checkState(serverCall instanceof AbstractServerCall, + "Cannot use %s with a non-Armeria gRPC server", + ArmeriaCoroutineContextInterceptor.class.getName()); + final ServiceRequestContext ctx = ((AbstractServerCall) serverCall).ctx(); final ArmeriaRequestCoroutineContext coroutineContext = new ArmeriaRequestCoroutineContext(ctx); final ScheduledExecutorService executor; if (useBlockingTaskExecutor) { diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/AsyncServerInterceptor.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/AsyncServerInterceptor.java index c796655ae0e..3975d8fac62 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/AsyncServerInterceptor.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/AsyncServerInterceptor.java @@ -36,10 +36,12 @@ * @Override * CompletableFuture> asyncInterceptCall( * ServerCall call, Metadata headers, ServerCallHandler next) { - * + * Context grpcContext = Context.current(); * return authorizer.authorize(headers).thenApply(result -> { * if (result) { - * return next.startCall(call, headers); + * // `next.startCall()` should be wrapped with `grpcContext.call()` if you want to propagate + * // the context to the next interceptor. + * return grpcContext.call(() -> next.startCall(call, headers)); * } else { * throw new AuthenticationException("Invalid access"); * } From eb1ac004efc9f37fb770a5c39f9e352b82c884e5 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 13:03:35 +0900 Subject: [PATCH 04/11] lint --- .../server/grpc/kotlin/CoroutineServerInterceptor.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index 219baf41e29..5af2461cd3c 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -78,10 +78,8 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor { // GrpcContextElement.current(): // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. return CoroutineScope( - executor.asCoroutineDispatcher() + - ArmeriaRequestCoroutineContext(call.ctx()) + - COROUTINE_CONTEXT_KEY.get() + - GrpcContextElement.current() + executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) + + COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current() ).future { suspendedInterceptCall(call, headers, next) } From c4cc245236f0d600c6af288e253118e8c6e82266 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 13:27:44 +0900 Subject: [PATCH 05/11] add a workaround for a wrapped 'ServerCall' --- .../ArmeriaCoroutineContextInterceptor.java | 8 +-- .../armeria/server/grpc/DeferredListener.java | 8 +-- .../armeria/server/grpc/ServerCallUtil.java | 72 +++++++++++++++++++ 3 files changed, 78 insertions(+), 10 deletions(-) create mode 100644 grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java index 05add2d4ccb..94f4a5562e7 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java @@ -20,7 +20,6 @@ import java.util.concurrent.ScheduledExecutorService; -import com.linecorp.armeria.internal.server.grpc.AbstractServerCall; import com.linecorp.armeria.server.ServiceRequestContext; import io.grpc.Metadata; @@ -39,10 +38,9 @@ final class ArmeriaCoroutineContextInterceptor extends CoroutineContextServerInt @Override public CoroutineContext coroutineContext(ServerCall serverCall, Metadata metadata) { - checkState(serverCall instanceof AbstractServerCall, - "Cannot use %s with a non-Armeria gRPC server", - ArmeriaCoroutineContextInterceptor.class.getName()); - final ServiceRequestContext ctx = ((AbstractServerCall) serverCall).ctx(); + final ServiceRequestContext ctx = ServerCallUtil.findRequestContext(serverCall); + checkState(ctx != null, "Failed to find the current %s from %s", + ServiceRequestContext.class.getSimpleName(), serverCall); final ArmeriaRequestCoroutineContext coroutineContext = new ArmeriaRequestCoroutineContext(ctx); final ScheduledExecutorService executor; if (useBlockingTaskExecutor) { diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/DeferredListener.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/DeferredListener.java index 2a5147dec45..3f24a173430 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/DeferredListener.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/DeferredListener.java @@ -48,11 +48,9 @@ final class DeferredListener extends ServerCall.Listener { private boolean callClosed; DeferredListener(ServerCall serverCall, CompletableFuture> listenerFuture) { - checkState(serverCall instanceof AbstractServerCall, "Cannot use %s with a non-Armeria gRPC server", - AsyncServerInterceptor.class.getName()); - @SuppressWarnings("unchecked") - final AbstractServerCall armeriaServerCall = (AbstractServerCall) serverCall; - + final AbstractServerCall armeriaServerCall = ServerCallUtil.findArmeriaServerCall(serverCall); + checkState(armeriaServerCall != null, "Cannot use %s with a non-Armeria gRPC server. ServerCall: %s", + AsyncServerInterceptor.class.getName(), serverCall); // As per `ServerCall.Listener`'s Javadoc, the caller should call one simultaneously. `blockingExecutor` // is a sequential executor which is wrapped by `MoreExecutors.newSequentialExecutor()`. So both // `blockingExecutor` and `eventLoop` guarantees the execution order. diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java new file mode 100644 index 00000000000..25a1d0605ba --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.grpc; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.server.grpc.AbstractServerCall; +import com.linecorp.armeria.server.ServiceRequestContext; + +import io.grpc.ForwardingServerCall; +import io.grpc.ServerCall; + +class ServerCallUtil { + + @Nullable + private static MethodHandle delegateMH; + + static { + try { + delegateMH = MethodHandles.lookup().findVirtual(ForwardingServerCall.class, "delegate", + MethodType.methodType(ServerCall.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + delegateMH = null; + } + } + + @Nullable + static ServiceRequestContext findRequestContext(ServerCall serverCall) { + final AbstractServerCall armeriaServerCall = findArmeriaServerCall(serverCall); + if (armeriaServerCall != null) { + return armeriaServerCall.ctx(); + } + + return ServiceRequestContext.currentOrNull(); + } + + @Nullable + static AbstractServerCall findArmeriaServerCall(ServerCall serverCall) { + if (delegateMH != null) { + while (serverCall instanceof ForwardingServerCall) { + try { + //noinspection unchecked + serverCall = (ServerCall) delegateMH.invoke(serverCall); + } catch (Throwable e) { + break; + } + } + } + if (serverCall instanceof AbstractServerCall) { + return (AbstractServerCall) serverCall; + } else { + return null; + } + } +} From def7f497bce8b3dfa70ea9a304982b1823688eb8 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 13:28:22 +0900 Subject: [PATCH 06/11] clean up --- .../java/com/linecorp/armeria/server/grpc/ServerCallUtil.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java index 25a1d0605ba..66965d7c038 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/ServerCallUtil.java @@ -27,7 +27,7 @@ import io.grpc.ForwardingServerCall; import io.grpc.ServerCall; -class ServerCallUtil { +final class ServerCallUtil { @Nullable private static MethodHandle delegateMH; @@ -69,4 +69,6 @@ static AbstractServerCall findArmeriaServerCall(ServerCall se return null; } } + + private ServerCallUtil() {} } From 7396066a559cb1b65537ed0920925b141c6dbc78 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 14:13:32 +0900 Subject: [PATCH 07/11] fix broken tests --- .../server/grpc/kotlin/CoroutineServerInterceptorTest.kt | 6 +++--- .../linecorp/armeria/server/grpc/DeferredListenerTest.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt index 89a65a9e330..0df23891f5e 100644 --- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt +++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt @@ -239,7 +239,7 @@ internal class CoroutineServerInterceptorTest { threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, - MyAsyncInterceptor(), + MyAsyncInterceptor() ) .addService(TestService()) .build() @@ -256,7 +256,7 @@ internal class CoroutineServerInterceptorTest { threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, - MyAsyncInterceptor(), + MyAsyncInterceptor() ) .useBlockingTaskExecutor(true) .build() @@ -343,7 +343,7 @@ internal class CoroutineServerInterceptorTest { headers: Metadata, next: ServerCallHandler ): CompletableFuture> { - val context = Context.current(); + val context = Context.current() return CompletableFuture.supplyAsync({ // NB: When the current thread invoking `startCall` is different from the thread which // started `asyncInterceptCall`, `next.startCall()` should be wrapped with `context.call()` diff --git a/grpc/src/test/java/com/linecorp/armeria/server/grpc/DeferredListenerTest.java b/grpc/src/test/java/com/linecorp/armeria/server/grpc/DeferredListenerTest.java index 4a8d3d76069..4233b9c47e9 100644 --- a/grpc/src/test/java/com/linecorp/armeria/server/grpc/DeferredListenerTest.java +++ b/grpc/src/test/java/com/linecorp/armeria/server/grpc/DeferredListenerTest.java @@ -54,7 +54,7 @@ class DeferredListenerTest { void shouldHaveRequestContextInThread() { assertThatThrownBy(() -> new DeferredListener<>(mock(ServerCall.class), null)) .isInstanceOf(IllegalStateException.class) - .hasMessage("Cannot use %s with a non-Armeria gRPC server", + .hasMessageContaining("Cannot use %s with a non-Armeria gRPC server", AsyncServerInterceptor.class.getName()); } From c2d1dd3ddab4db75c5bce941a0cdc2a46f6ed90e Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 9 Jun 2023 14:51:05 +0900 Subject: [PATCH 08/11] use getter instead of field access --- .../com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java index cea97c6a5f2..5500506b5d4 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java @@ -981,9 +981,9 @@ public GrpcService build() { if (grpcHealthCheckService != null) { registryBuilder.addService(grpcHealthCheckService.bindService(), null, ImmutableList.of()); } - if (interceptors != null) { + final ImmutableList interceptors = interceptors().build(); + if (!interceptors.isEmpty()) { final HandlerRegistry.Builder newRegistryBuilder = new HandlerRegistry.Builder(); - final ImmutableList interceptors = this.interceptors.build(); for (Entry entry : registryBuilder.entries()) { final MethodDescriptor methodDescriptor = entry.method(); final ServerServiceDefinition intercepted = From 89e9b1f71865468c338de062335efc1d79bea829 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Mon, 12 Jun 2023 15:01:20 +0900 Subject: [PATCH 09/11] Address comments by @be-hase --- .../grpc/kotlin/CoroutineServerInterceptor.kt | 8 -------- .../armeria/server/grpc/GrpcServiceBuilder.java | 14 +++++++------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index 5af2461cd3c..04b6eb7e0a7 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -65,20 +65,12 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor { headers: Metadata, next: ServerCallHandler ): CompletableFuture> { - check(call is AbstractServerCall) { - throw IllegalArgumentException( - "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server" - ) - } - val executor = call.blockingExecutor() ?: call.eventLoop() - // COROUTINE_CONTEXT_KEY.get(): // It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor. // (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor) // GrpcContextElement.current(): // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. return CoroutineScope( - executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) + COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current() ).future { suspendedInterceptCall(call, headers, next) diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java index 5500506b5d4..38b66d97664 100644 --- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java +++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java @@ -948,11 +948,6 @@ static GrpcStatusFunction toGrpcStatusFunction( private ImmutableList.Builder interceptors() { if (interceptors == null) { interceptors = ImmutableList.builder(); - if (USE_COROUTINE_CONTEXT_INTERCEPTOR) { - final ServerInterceptor coroutineContextInterceptor = - new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor); - interceptors.add(coroutineContextInterceptor); - } } return interceptors; } @@ -966,6 +961,11 @@ private ImmutableList.Builder interceptors() { */ public GrpcService build() { final HandlerRegistry handlerRegistry; + if (USE_COROUTINE_CONTEXT_INTERCEPTOR) { + final ServerInterceptor coroutineContextInterceptor = + new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor); + interceptors().add(coroutineContextInterceptor); + } if (!enableUnframedRequests && unframedGrpcErrorHandler != null) { throw new IllegalStateException( "'unframedGrpcErrorHandler' can only be set if unframed requests are enabled"); @@ -981,9 +981,9 @@ public GrpcService build() { if (grpcHealthCheckService != null) { registryBuilder.addService(grpcHealthCheckService.bindService(), null, ImmutableList.of()); } - final ImmutableList interceptors = interceptors().build(); - if (!interceptors.isEmpty()) { + if (interceptors != null) { final HandlerRegistry.Builder newRegistryBuilder = new HandlerRegistry.Builder(); + final ImmutableList interceptors = this.interceptors.build(); for (Entry entry : registryBuilder.entries()) { final MethodDescriptor methodDescriptor = entry.method(); final ServerServiceDefinition intercepted = From d8c3d4f8d43770fc869ac56cb660614c994f3d9a Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Mon, 12 Jun 2023 15:37:59 +0900 Subject: [PATCH 10/11] lint --- .../armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index 04b6eb7e0a7..0eae8413a98 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -17,8 +17,6 @@ package com.linecorp.armeria.server.grpc.kotlin import com.linecorp.armeria.common.annotation.UnstableApi -import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext -import com.linecorp.armeria.internal.server.grpc.AbstractServerCall import com.linecorp.armeria.server.grpc.AsyncServerInterceptor import io.grpc.Context import io.grpc.Metadata @@ -28,7 +26,6 @@ import io.grpc.ServerInterceptor import io.grpc.kotlin.CoroutineContextServerInterceptor import io.grpc.kotlin.GrpcContextElement import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.future.future import java.util.concurrent.CompletableFuture import kotlin.coroutines.CoroutineContext From 2d2fa46812e472f571aaa5a8439d2fc434775f2e Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Mon, 12 Jun 2023 16:57:17 +0900 Subject: [PATCH 11/11] lint --- .../armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt index 0eae8413a98..4e2496d6731 100644 --- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt +++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt @@ -68,7 +68,7 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor { // GrpcContextElement.current(): // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. return CoroutineScope( - COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current() + COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current() ).future { suspendedInterceptCall(call, headers, next) }