Skip to content

Commit

Permalink
fix(3.2): The oneToOne method of the ReactorServerCalls class will ca…
Browse files Browse the repository at this point in the history
…use the request to hang when the result is Mono Empty (#14121)

* Fix triple reactor request hung when result is Mono Empty

* code format

* fix compile

---------

Co-authored-by: caoyanan <[email protected]>
  • Loading branch information
caoyanan666 and caoyanan authored May 8, 2024
1 parent dfd135e commit 1abe9f8
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.ServerTripleReactorPublisher;
import org.apache.dubbo.reactive.ServerTripleReactorSubscriber;
import org.apache.dubbo.rpc.StatusRpcException;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import reactor.core.publisher.Flux;
Expand All @@ -43,16 +44,18 @@ private ReactorServerCalls() {}
* @param func service implementation
*/
public static <T, R> void oneToOne(T request, StreamObserver<R> responseObserver, Function<Mono<T>, Mono<R>> func) {
func.apply(Mono.just(request)).subscribe(res -> {
CompletableFuture.completedFuture(res).whenComplete((r, t) -> {
if (t != null) {
responseObserver.onError(t);
} else {
responseObserver.onNext(r);
responseObserver.onCompleted();
}
});
});
try {
func.apply(Mono.just(request))
.subscribe(
res -> {
responseObserver.onNext(res);
responseObserver.onCompleted();
},
throwable -> doOnResponseHasException(throwable, responseObserver),
() -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver));
} catch (Throwable throwable) {
doOnResponseHasException(throwable, responseObserver);
}
}

/**
Expand Down Expand Up @@ -131,4 +134,10 @@ public static <T, R> StreamObserver<T> manyToMany(

return serverPublisher;
}

private static void doOnResponseHasException(Throwable throwable, StreamObserver<?> responseObserver) {
StatusRpcException statusRpcException =
TriRpcStatus.getStatus(throwable).asException();
responseObserver.onError(statusRpcException);
}
}

0 comments on commit 1abe9f8

Please sign in to comment.