From 237001092e735dd5d73705332ce6946eb28a72a1 Mon Sep 17 00:00:00 2001 From: caoyanan Date: Mon, 22 Apr 2024 19:11:52 +0800 Subject: [PATCH 1/3] Fix triple reactor request hung when result is Mono Empty --- .../reactive/calls/ReactorServerCalls.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java index 58ec934c42e..4c1db9b1fe8 100644 --- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java +++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java @@ -43,16 +43,18 @@ private ReactorServerCalls() {} * @param func service implementation */ public static void oneToOne(T request, StreamObserver responseObserver, Function, Mono> func) { - func.apply(Mono.just(request)).subscribe(res -> { - CompletableFuture.completedFuture(res).whenComplete((r, t) -> { - if (t != null) { - responseObserver.onError(t); - } else { - responseObserver.onNext(r); - responseObserver.onCompleted(); - } - }); - }); + try { + func.apply(Mono.just(request)).subscribe( + res -> { + responseObserver.onNext(res); + responseObserver.onCompleted(); + }, + throwable -> doOnResponseHasException(throwable, responseObserver), + () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver) + ); + } catch (Throwable throwable) { + doOnResponseHasException(throwable, responseObserver); + } } /** @@ -131,4 +133,9 @@ public static StreamObserver manyToMany( return serverPublisher; } + + private static void doOnResponseHasException(Throwable throwable, StreamObserver responseObserver) { + StatusRpcException statusRpcException = TriRpcStatus.getStatus(throwable).asException(); + responseObserver.onError(statusRpcException); + } } From 223e108e7f3dc0fbfa85c777b7b69d070bbfaac8 Mon Sep 17 00:00:00 2001 From: caoyanan Date: Mon, 22 Apr 2024 19:21:33 +0800 Subject: [PATCH 2/3] code format --- .../reactive/calls/ReactorServerCalls.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java index 4c1db9b1fe8..1c8e9bd7592 100644 --- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java +++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java @@ -22,7 +22,6 @@ import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver; import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import reactor.core.publisher.Flux; @@ -44,17 +43,17 @@ private ReactorServerCalls() {} */ public static void oneToOne(T request, StreamObserver responseObserver, Function, Mono> func) { try { - func.apply(Mono.just(request)).subscribe( - res -> { - responseObserver.onNext(res); - responseObserver.onCompleted(); - }, - throwable -> doOnResponseHasException(throwable, responseObserver), - () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver) - ); + func.apply(Mono.just(request)) + .subscribe( + res -> { + responseObserver.onNext(res); + responseObserver.onCompleted(); + }, + throwable -> doOnResponseHasException(throwable, responseObserver), + () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver)); } catch (Throwable throwable) { - doOnResponseHasException(throwable, responseObserver); - } + doOnResponseHasException(throwable, responseObserver); + } } /** @@ -135,7 +134,8 @@ public static StreamObserver manyToMany( } private static void doOnResponseHasException(Throwable throwable, StreamObserver responseObserver) { - StatusRpcException statusRpcException = TriRpcStatus.getStatus(throwable).asException(); - responseObserver.onError(statusRpcException); - } + StatusRpcException statusRpcException = + TriRpcStatus.getStatus(throwable).asException(); + responseObserver.onError(statusRpcException); + } } From ea930f19bd1aba0487b0502717d856ef85e48570 Mon Sep 17 00:00:00 2001 From: caoyanan Date: Mon, 22 Apr 2024 22:01:35 +0800 Subject: [PATCH 3/3] fix compile --- .../org/apache/dubbo/reactive/calls/ReactorServerCalls.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java index 1c8e9bd7592..f6a39c944c4 100644 --- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java +++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java @@ -19,6 +19,8 @@ import org.apache.dubbo.common.stream.StreamObserver; import org.apache.dubbo.reactive.ServerTripleReactorPublisher; import org.apache.dubbo.reactive.ServerTripleReactorSubscriber; +import org.apache.dubbo.rpc.StatusRpcException; +import org.apache.dubbo.rpc.TriRpcStatus; import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver; import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;