From feedc1b1d07f6ce7b4c8e95a7679284c52f946c2 Mon Sep 17 00:00:00 2001 From: warunalakshitha Date: Wed, 30 Oct 2024 14:16:07 +0530 Subject: [PATCH] Migrate to new runtime APIs --- gradle.properties | 6 +- .../io/ballerina/stdlib/grpc/DataContext.java | 11 +-- .../io/ballerina/stdlib/grpc/GrpcUtil.java | 13 ++++ .../stdlib/grpc/ServicesBuilderUtils.java | 14 ++-- .../AbstractCallableUnitCallBack.java | 17 +---- .../StreamingCallableUnitCallBack.java | 76 ++++++++++--------- .../callback/UnaryCallableUnitCallBack.java | 73 ++++++++++-------- .../grpc/listener/ServerCallHandler.java | 63 +++++++-------- .../listener/StreamingServerCallHandler.java | 30 +++++--- .../grpc/nativeimpl/client/FunctionUtils.java | 46 ++++++----- .../serviceendpoint/FunctionUtils.java | 40 +++++----- .../streamingclient/FunctionUtils.java | 25 +++--- .../grpc/testutils/NativeTestUtils.java | 4 +- 13 files changed, 226 insertions(+), 192 deletions(-) diff --git a/gradle.properties b/gradle.properties index b9d490a94..96362b899 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ org.gradle.caching=true group=io.ballerina.stdlib version=1.12.2-SNAPSHOT #dependency versions -ballerinaLangVersion=2201.10.0-SNAPSHOT +ballerinaLangVersion=2201.10.0-20241025-103700-5c9e6a27 ballerinaTomlParserVersion=1.2.2 checkstylePluginVersion=10.12.0 commonsLang3Version=3.8.1 @@ -45,9 +45,9 @@ stdlibUuidVersion=1.8.1-20241009-134600-a05012b stdlibAuthVersion=2.12.1-20241010-130800-733dbef stdlibJwtVersion=2.13.1-20241010-123600-5ea6a94 -stdlibOAuth2Version=2.12.1-20241010-123600-0e0cfcc +stdlibOAuth2Version=2.12.1-20241029-084800-d7ba9e5 -stdlibHttpVersion=2.12.1-20241018-081800-bb91312 +stdlibHttpVersion=2.13.0-20241029-110700-30ed05b # Ballerinax Observer observeVersion=1.3.1-20241007-161000-645452d diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/DataContext.java b/native/src/main/java/io/ballerina/stdlib/grpc/DataContext.java index 65247b4f8..48dcb9d3b 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/DataContext.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/DataContext.java @@ -19,16 +19,17 @@ package io.ballerina.stdlib.grpc; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; + +import java.util.concurrent.CompletableFuture; /** * {@code DataContext} is the wrapper to hold {@code Strand} and {@code Future}. */ public class DataContext { - private Environment environment; - private Future future; + private final Environment environment; + private final CompletableFuture future; - public DataContext(Environment env, Future future) { + public DataContext(Environment env, CompletableFuture future) { this.environment = env; this.future = future; } @@ -37,7 +38,7 @@ public Environment getEnvironment() { return environment; } - public Future getFuture() { + public CompletableFuture getFuture() { return future; } } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/GrpcUtil.java b/native/src/main/java/io/ballerina/stdlib/grpc/GrpcUtil.java index 2452c0ac2..45225a573 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/GrpcUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/GrpcUtil.java @@ -19,9 +19,11 @@ package io.ballerina.stdlib.grpc; import io.ballerina.runtime.api.TypeTags; +import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BDecimal; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; import io.ballerina.stdlib.grpc.exception.StatusRuntimeException; @@ -40,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static io.ballerina.runtime.api.constants.RuntimeConstants.BALLERINA_VERSION; @@ -473,4 +476,14 @@ public static String getTypeName(Type type) { } return type.getName(); } + + public static Object getResult(CompletableFuture balFuture) { + try { + return balFuture.get(); + } catch (BError error) { + throw error; + } catch (Throwable throwable) { + throw ErrorCreator.createError(throwable); + } + } } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/ServicesBuilderUtils.java b/native/src/main/java/io/ballerina/stdlib/grpc/ServicesBuilderUtils.java index bfa14ad86..0c9abfed1 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/ServicesBuilderUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/ServicesBuilderUtils.java @@ -239,7 +239,7 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptor(Obje if (descriptorData == null) { descriptorData = annotationMap.getStringValue(StringUtils.fromString("descriptor")); } - BMap descMap = (BMap) annotationMap.getMapValue( + BMap descMap = (BMap) annotationMap.getMapValue( StringUtils.fromString("descMap")); return getFileDescriptor(descriptorData, descMap); } catch (IOException | Descriptors.DescriptorValidationException e) { @@ -258,9 +258,9 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptorFromS } else if (type.getFields().containsKey("value")) { descriptorData = service.getStringValue(StringUtils.fromString("value")); } - BMap descMap = null; + BMap descMap = null; if (type.getFields().containsKey("descMap")) { - descMap = (BMap) service.getMapValue(StringUtils.fromString("descMap")); + descMap = service.getMapValue(StringUtils.fromString("descMap")); } if (descriptorData == null || descMap == null) { return null; @@ -273,8 +273,7 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptorFromS } - private static Descriptors.FileDescriptor getFileDescriptor( - BString descriptorData, BMap descMap) + private static Descriptors.FileDescriptor getFileDescriptor(BString descriptorData, BMap descMap) throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, GrpcServerException { byte[] descriptor = hexStringToByteArray(descriptorData.getValue()); @@ -291,14 +290,15 @@ private static Descriptors.FileDescriptor getFileDescriptor( List fileDescriptors = new ArrayList<>(); for (ByteString dependency : descriptorProto.getDependencyList().asByteStringList()) { String dependencyKey = dependency.toStringUtf8(); - if (descMap == null || descMap.size() == 0) { + if (descMap == null || descMap.isEmpty()) { Descriptors.FileDescriptor dependentDescriptor = StandardDescriptorBuilder.getFileDescriptor (dependencyKey); if (dependentDescriptor != null) { fileDescriptors.add(dependentDescriptor); } } else if (descMap.containsKey(StringUtils.fromString(dependencyKey))) { - fileDescriptors.add(getFileDescriptor(descMap.get(StringUtils.fromString(dependencyKey)), descMap)); + fileDescriptors.add(getFileDescriptor((BString) descMap.get(StringUtils.fromString(dependencyKey)), + descMap)); } } return Descriptors.FileDescriptor.buildFrom(descriptorProto, diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/callback/AbstractCallableUnitCallBack.java b/native/src/main/java/io/ballerina/stdlib/grpc/callback/AbstractCallableUnitCallBack.java index 22c920304..f38e7330a 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/callback/AbstractCallableUnitCallBack.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/callback/AbstractCallableUnitCallBack.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.grpc.callback; -import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.values.BError; import io.ballerina.stdlib.grpc.GrpcConstants; import io.ballerina.stdlib.grpc.Message; @@ -26,26 +25,12 @@ import io.ballerina.stdlib.grpc.StreamObserver; import io.ballerina.stdlib.grpc.exception.StatusRuntimeException; -import java.util.concurrent.Semaphore; - /** * Abstract call back class registered for gRPC service in B7a executor. * * @since 0.995.0 */ -public class AbstractCallableUnitCallBack implements Callback { - - public final Semaphore available = new Semaphore(1, true); - - @Override - public void notifySuccess(Object o) { - available.release(); - } - - @Override - public void notifyFailure(io.ballerina.runtime.api.values.BError error) { - available.release(); - } +public class AbstractCallableUnitCallBack { /** * Handles failures in GRPC callable unit callback. diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/callback/StreamingCallableUnitCallBack.java b/native/src/main/java/io/ballerina/stdlib/grpc/callback/StreamingCallableUnitCallBack.java index 7099f1283..b74265fb7 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/callback/StreamingCallableUnitCallBack.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/callback/StreamingCallableUnitCallBack.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.grpc.callback; import com.google.protobuf.Descriptors; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.types.ObjectType; @@ -56,15 +55,14 @@ public class StreamingCallableUnitCallBack extends AbstractCallableUnitCallBack private static final Logger LOG = LoggerFactory.getLogger(StreamingCallableUnitCallBack.class); - private Runtime runtime; - private StreamObserver responseSender; - private boolean emptyResponse; - private Descriptors.Descriptor outputType; - private ObserverContext observerContext; + private final Runtime runtime; + private final StreamObserver responseSender; + private final boolean emptyResponse; + private final Descriptors.Descriptor outputType; + private final ObserverContext observerContext; public StreamingCallableUnitCallBack(Runtime runtime, StreamObserver responseSender, boolean isEmptyResponse, Descriptors.Descriptor outputType, ObserverContext context) { - available.acquireUninterruptibly(); this.runtime = runtime; this.responseSender = responseSender; this.emptyResponse = isEmptyResponse; @@ -72,9 +70,7 @@ public StreamingCallableUnitCallBack(Runtime runtime, StreamObserver responseSen this.outputType = outputType; } - @Override public void notifySuccess(Object response) { - super.notifySuccess(response); // check whether connection is closed. if (responseSender instanceof ServerCallHandler.ServerCallStreamObserver) { ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler @@ -112,17 +108,24 @@ public void notifySuccess(Object response) { ReturnStreamUnitCallBack returnStreamUnitCallBack = new ReturnStreamUnitCallBack( runtime, responseSender, outputType, bObject, headers); ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject)); - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { - runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null, null, - returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL); - } else { - runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null, null, - returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL); - } + Thread.startVirtualThread(() -> { + try { + Object result; + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { + result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null).get(); + } else { + result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } + returnStreamUnitCallBack.notifySuccess(result); + } catch (BError error) { + returnStreamUnitCallBack.notifyFailure(error); + } + }); } else { // If content is null and remote function doesn't return empty response means. response is already sent // to client via caller object, but connection is not closed already by calling complete function. - // Hence closing the connection. + // Hence, closing the connection. if (content == null) { if (this.emptyResponse) { Message responseMessage = new Message(GrpcConstants.EMPTY_DATATYPE_NAME, null); @@ -138,7 +141,6 @@ public void notifySuccess(Object response) { } } - @Override public void notifyFailure(io.ballerina.runtime.api.values.BError error) { if (responseSender instanceof ServerCallHandler.ServerCallStreamObserver) { ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler @@ -165,7 +167,6 @@ public void notifyFailure(io.ballerina.runtime.api.values.BError error) { if (observerContext != null) { observerContext.addProperty(PROPERTY_KEY_HTTP_STATUS_CODE, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); } - super.notifyFailure(error); if (isPanic) { System.exit(1); } @@ -175,11 +176,11 @@ public void notifyFailure(io.ballerina.runtime.api.values.BError error) { * Call back class registered to send returned stream from a remote function. * */ - public class ReturnStreamUnitCallBack extends AbstractCallableUnitCallBack { - private StreamObserver requestSender; - private Descriptors.Descriptor outputType; - private Runtime runtime; - private BObject bObject; + public static class ReturnStreamUnitCallBack extends AbstractCallableUnitCallBack { + private final StreamObserver requestSender; + private final Descriptors.Descriptor outputType; + private final Runtime runtime; + private final BObject bObject; private HttpHeaders headers; public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender, @@ -191,7 +192,6 @@ public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender, this.headers = headers; } - @Override public void notifySuccess(Object response) { if (response != null) { Message msg; @@ -207,22 +207,28 @@ public void notifySuccess(Object response) { } requestSender.onNext(msg); ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject)); - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { - runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null, - null, this, null, PredefinedTypes.TYPE_NULL); - } else { - runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null, - null, this, null, PredefinedTypes.TYPE_NULL); - } + Thread.startVirtualThread(() -> { + try { + Object result; + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { + result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } else { + result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } + this.notifySuccess(result); + } catch (BError error) { + this.notifyFailure(error); + } + }); } else { requestSender.onCompleted(); } - } - @Override public void notifyFailure(BError error) { - super.notifyFailure(error); + } } } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/callback/UnaryCallableUnitCallBack.java b/native/src/main/java/io/ballerina/stdlib/grpc/callback/UnaryCallableUnitCallBack.java index 403288795..637673645 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/callback/UnaryCallableUnitCallBack.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/callback/UnaryCallableUnitCallBack.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.grpc.callback; import com.google.protobuf.Descriptors; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.types.ObjectType; @@ -56,11 +55,11 @@ public class UnaryCallableUnitCallBack extends AbstractCallableUnitCallBack { private static final Logger LOG = LoggerFactory.getLogger(UnaryCallableUnitCallBack.class); - private Runtime runtime; - private StreamObserver requestSender; - private boolean emptyResponse; - private Descriptors.Descriptor outputType; - private ObserverContext observerContext; + private final Runtime runtime; + private final StreamObserver requestSender; + private final boolean emptyResponse; + private final Descriptors.Descriptor outputType; + private final ObserverContext observerContext; public UnaryCallableUnitCallBack(Runtime runtime, StreamObserver requestSender, boolean isEmptyResponse, @@ -72,9 +71,7 @@ public UnaryCallableUnitCallBack(Runtime runtime, StreamObserver requestSender, this.observerContext = context; } - @Override public void notifySuccess(Object response) { - super.notifySuccess(response); // check whether connection is closed. if (requestSender instanceof ServerCallHandler.ServerCallStreamObserver) { ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler @@ -120,17 +117,25 @@ public void notifySuccess(Object response) { ReturnStreamUnitCallBack returnStreamUnitCallBack = new ReturnStreamUnitCallBack( runtime, requestSender, outputType, bObject, headers); ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject)); - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { - runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null, null, - returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL); - } else { - runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null, null, - returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL); - } + Thread.startVirtualThread(() -> { + try { + Object result; + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { + result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } else { + result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, + null).get(); + } + returnStreamUnitCallBack.notifySuccess(result); + } catch (BError error) { + returnStreamUnitCallBack.notifyFailure(error); + } + }); } else { // If content is null and remote function doesn't return empty response means. response is already sent // to client via caller object, but connection is not closed already by calling complete function. - // Hence closing the connection. + // Hence, closing the connection. if (content == null) { requestSender.onCompleted(); } else { @@ -151,7 +156,6 @@ public void notifySuccess(Object response) { } } - @Override public void notifyFailure(BError error) { if (requestSender instanceof ServerCallHandler.ServerCallStreamObserver) { ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler @@ -176,7 +180,6 @@ public void notifyFailure(BError error) { if (observerContext != null) { observerContext.addProperty(PROPERTY_KEY_HTTP_STATUS_CODE, HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); } - super.notifyFailure(error); if (isPanic) { System.exit(1); } @@ -187,10 +190,10 @@ public void notifyFailure(BError error) { * */ public class ReturnStreamUnitCallBack extends AbstractCallableUnitCallBack { - private StreamObserver requestSender; - private Descriptors.Descriptor outputType; - private Runtime runtime; - private BObject bObject; + private final StreamObserver requestSender; + private final Descriptors.Descriptor outputType; + private final Runtime runtime; + private final BObject bObject; private HttpHeaders headers; public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender, @@ -202,7 +205,6 @@ public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender, this.headers = headers; } - @Override public void notifySuccess(Object response) { if (response != null) { Message msg; @@ -218,22 +220,27 @@ public void notifySuccess(Object response) { } requestSender.onNext(msg); ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject)); - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { - runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null, null, - this, null, PredefinedTypes.TYPE_NULL); - } else { - runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null, null, - this, null, PredefinedTypes.TYPE_NULL); - } + Thread.startVirtualThread(() -> { + try { + Object result; + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) { + result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } else { + result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null) + .get(); + } + this.notifySuccess(result); + } catch (BError error) { + this.notifyFailure(error); + } + }); } else { requestSender.onCompleted(); } - } - @Override public void notifyFailure(BError error) { - super.notifyFailure(error); } } } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/listener/ServerCallHandler.java b/native/src/main/java/io/ballerina/stdlib/grpc/listener/ServerCallHandler.java index 857639558..940604478 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/listener/ServerCallHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/listener/ServerCallHandler.java @@ -20,12 +20,12 @@ import com.google.protobuf.Descriptors; import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.TypeTags; -import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.TypeUtils; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.observability.ObservabilityConstants; @@ -173,8 +173,9 @@ private boolean isEmptyResponse() { void onMessageInvoke(ServiceResource resource, Message request, StreamObserver responseObserver, ObserverContext context) { - Callback callback = new UnaryCallableUnitCallBack(resource.getRuntime(), responseObserver, isEmptyResponse(), - this.methodDescriptor.getOutputType(), context); + UnaryCallableUnitCallBack callback = + new UnaryCallableUnitCallBack(resource.getRuntime(), responseObserver, isEmptyResponse(), + this.methodDescriptor.getOutputType(), context); Object requestParam = request != null ? request.getbMessage() : null; HttpHeaders headers = request != null ? request.getHeaders() : null; Object[] requestParams = computeResourceParams(resource, requestParam, headers, responseObserver); @@ -186,29 +187,32 @@ void onMessageInvoke(ServiceResource resource, Message request, StreamObserver r String functionName = resource.getFunctionName(); ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(resource.getService())); - - boolean isEmpty = isEmpty(requestParams); - if (isEmpty) { - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { - resource.getRuntime().invokeMethodAsyncConcurrently(resource.getService(), functionName, null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType()); - } else { - resource.getRuntime().invokeMethodAsyncSequentially(resource.getService(), functionName, null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType()); - } - } else { - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { - resource.getRuntime().invokeMethodAsyncConcurrently(resource.getService(), functionName, null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType(), requestParams); - } else { - resource.getRuntime().invokeMethodAsyncSequentially(resource.getService(), functionName, null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType(), requestParams); + Thread.startVirtualThread(() -> { + try { + boolean isEmpty = isEmpty(requestParams); + Object result; + if (isEmpty) { + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { + result = resource.getRuntime().startIsolatedWorker(resource.getService(), functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties).get(); + } else { + result = resource.getRuntime().startNonIsolatedWorker(resource.getService(), functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties).get(); + } + } else { + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { + result = resource.getRuntime().startIsolatedWorker(resource.getService(), functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties, requestParams).get(); + } else { + result = resource.getRuntime().startNonIsolatedWorker(resource.getService(), functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties, requestParams).get(); + } + } + callback.notifySuccess(result); + } catch (BError error) { + callback.notifyFailure(error); } - } + }); } Boolean isEmpty(Object[] requestParams) { @@ -230,12 +234,11 @@ Object[] computeResourceParams(ServiceResource resource, Object requestParam, Ht int i = 0; if ((signatureParamSize >= 1) && (signatureParams.get(0).getTag() == TypeTags.OBJECT_TYPE_TAG) && signatureParams.get(0).getName().contains(CALLER_TYPE)) { - paramValues = new Object[signatureParams.size() * 2]; + paramValues = new Object[signatureParams.size()]; paramValues[i] = getConnectionParameter(resource, responseObserver); - paramValues[i + 1] = true; - i = i + 2; + i = i + 1; } else { - paramValues = new Object[2]; + paramValues = new Object[1]; } if (resource.isHeaderRequired()) { BMap headerValues = MessageUtils.createHeaderMap(headers); @@ -268,11 +271,9 @@ Object[] computeResourceParams(ServiceResource resource, Object requestParam, Ht MessageUtils.getContextTypeName(resource.getRpcInputType()), valueMap); } paramValues[i] = contentContext; - paramValues[i + 1] = true; } } else if (requestParam != null) { paramValues[i] = requestParam; - paramValues[i + 1] = true; } return paramValues; } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/listener/StreamingServerCallHandler.java b/native/src/main/java/io/ballerina/stdlib/grpc/listener/StreamingServerCallHandler.java index e63d8093b..f932c478e 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/listener/StreamingServerCallHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/listener/StreamingServerCallHandler.java @@ -24,6 +24,7 @@ import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.TypeUtils; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BStream; import io.ballerina.runtime.observability.ObservabilityConstants; @@ -123,7 +124,7 @@ private static final class StreamingServerCallListener implements Listener { private final ServerCallStreamObserver responseObserver; private boolean halfClosed = false; - // Non private to avoid synthetic class + // Non-private to avoid synthetic class StreamingServerCallListener( StreamObserver requestObserver, ServerCallStreamObserver responseObserver) { @@ -185,15 +186,22 @@ void onStreamInvoke(ServiceResource resource, BStream requestStream, HttpHeaders responseObserver, isEmptyResponse(), this.methodDescriptor.getOutputType(), context); String functionName = resource.getFunctionName(); - ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(resource.getService())); - if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { - resource.getRuntime().invokeMethodAsyncConcurrently(resource.getService(), resource.getFunctionName(), null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType(), requestParams); - } else { - resource.getRuntime().invokeMethodAsyncSequentially(resource.getService(), resource.getFunctionName(), null, - GrpcConstants.ON_MESSAGE_METADATA, callback, properties, - resource.getReturnType(), requestParams); - } + BObject service = resource.getService(); + ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); + Thread.startVirtualThread(() -> { + try { + Object result; + if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(functionName)) { + result = resource.getRuntime().startIsolatedWorker(service, functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties, requestParams).get(); + } else { + result = resource.getRuntime().startNonIsolatedWorker(service, functionName, null, + GrpcConstants.ON_MESSAGE_METADATA, properties, requestParams).get(); + } + callback.notifySuccess(result); + } catch (BError error) { + callback.notifyFailure(error); + } + }); } } diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/client/FunctionUtils.java b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/client/FunctionUtils.java index c00f1bf3d..a1f58d638 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/client/FunctionUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/client/FunctionUtils.java @@ -52,6 +52,7 @@ import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static io.ballerina.stdlib.grpc.GrpcConstants.CLIENT_CONNECTOR; import static io.ballerina.stdlib.grpc.GrpcConstants.CONFIG; @@ -60,6 +61,7 @@ import static io.ballerina.stdlib.grpc.GrpcConstants.METHOD_DESCRIPTORS; import static io.ballerina.stdlib.grpc.GrpcConstants.SERVICE_STUB; import static io.ballerina.stdlib.grpc.GrpcUtil.getConnectionManager; +import static io.ballerina.stdlib.grpc.GrpcUtil.getResult; import static io.ballerina.stdlib.grpc.GrpcUtil.populatePoolingConfig; import static io.ballerina.stdlib.grpc.GrpcUtil.populateSenderConfigurations; import static io.ballerina.stdlib.grpc.MessageUtils.convertToHttpHeaders; @@ -230,7 +232,6 @@ public static Object externExecuteSimpleRPC(Environment env, BObject clientEndpo HttpHeaders headers = convertToHttpHeaders(headerValues); requestMsg.setHeaders(headers); Stub stub = (Stub) connectionStub; - DataContext dataContext = null; Map messageSizeMap = new HashMap<>(); messageSizeMap.put(MAX_INBOUND_MESSAGE_SIZE, (Long) clientEndpoint.getMapValue(CONFIG) @@ -238,24 +239,24 @@ public static Object externExecuteSimpleRPC(Environment env, BObject clientEndpo try { MethodDescriptor.MethodType methodType = getMethodType(methodDescriptor); if (methodType.equals(MethodDescriptor.MethodType.UNARY)) { - - dataContext = new DataContext(env, env.markAsync()); - stub.executeUnary(requestMsg, methodDescriptors.get(methodName.getValue()), - dataContext, messageSizeMap); + return env.yieldAndRun(() -> { + try { + CompletableFuture future = new CompletableFuture<>(); + DataContext dataContext = new DataContext(env, future); + stub.executeUnary(requestMsg, methodDescriptors.get(methodName.getValue()), + dataContext, messageSizeMap); + return getResult(future); + } catch (Exception e) { + return notifyErrorReply(INTERNAL, "gRPC Client Connector Error :" + e.getMessage()); + } + }); } else { return notifyErrorReply(INTERNAL, "Error while executing the client call. Method type " + methodType.name() + " not supported"); } } catch (Exception e) { - try { - if (dataContext != null) { - dataContext.getFuture().complete(e); - } - } finally { - return notifyErrorReply(INTERNAL, "gRPC Client Connector Error :" + e.getMessage()); - } + return notifyErrorReply(INTERNAL, "gRPC Client Connector Error :" + e.getMessage()); } - return null; } /** @@ -307,22 +308,25 @@ public static Object externExecuteServerStreaming(Environment env, BObject clien HttpHeaders headers = convertToHttpHeaders(headerValues); requestMsg.setHeaders(headers); Stub stub = (Stub) connectionStub; - DataContext dataContext = null; Map messageSizeMap = new HashMap<>(); messageSizeMap.put(MAX_INBOUND_MESSAGE_SIZE, (Long) clientEndpoint.getMapValue(CONFIG) .get(StringUtils.fromString((MAX_INBOUND_MESSAGE_SIZE)))); try { - dataContext = new DataContext(env, env.markAsync()); - stub.executeServerStreaming(requestMsg, methodDescriptors.get(methodName.getValue()), - dataContext, messageSizeMap); + return env.yieldAndRun(() -> { + try { + CompletableFuture future = new CompletableFuture<>(); + DataContext dataContext = new DataContext(env, future); + stub.executeServerStreaming(requestMsg, methodDescriptors.get(methodName.getValue()), + dataContext, messageSizeMap); + return getResult(future); + } catch (Exception e) { + return notifyErrorReply(INTERNAL, "gRPC Client Connector Error :" + e.getMessage()); + } + }); } catch (Exception e) { - if (dataContext != null) { - dataContext.getFuture().complete(e); - } return notifyErrorReply(INTERNAL, "gRPC Client Connector Error :" + e.getMessage()); } - return null; } /** diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/serviceendpoint/FunctionUtils.java b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/serviceendpoint/FunctionUtils.java index e7201b8af..241ed4af0 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/serviceendpoint/FunctionUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/serviceendpoint/FunctionUtils.java @@ -209,26 +209,28 @@ public static Object immediateStop(BObject serverEndpoint) { return null; } - public static Object nextResult(BObject streamIterator) { - - BlockingQueue messageQueue = (BlockingQueue) streamIterator.getNativeData(GrpcConstants.MESSAGE_QUEUE); - try { - Message nextMessage = (Message) messageQueue.take(); - if (nextMessage.getHeaders() != null) { - streamIterator.addNativeData(GrpcConstants.HEADERS, - MessageUtils.createHeaderMap(nextMessage.getHeaders())); - } - if (nextMessage.isError()) { - return MessageUtils.getConnectorError(nextMessage.getError()); - } else { - return nextMessage.getbMessage(); + public static Object nextResult(Environment env, BObject streamIterator) { + return env.yieldAndRun(() -> { + BlockingQueue messageQueue = + (BlockingQueue) streamIterator.getNativeData(GrpcConstants.MESSAGE_QUEUE); + try { + Message nextMessage = (Message) messageQueue.take(); + if (nextMessage.getHeaders() != null) { + streamIterator.addNativeData(GrpcConstants.HEADERS, + MessageUtils.createHeaderMap(nextMessage.getHeaders())); + } + if (nextMessage.isError()) { + return MessageUtils.getConnectorError(nextMessage.getError()); + } else { + return nextMessage.getbMessage(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + String message = "Internal error occurred. The current thread got interrupted"; + throw MessageUtils.getConnectorError(new StatusRuntimeException(Status + .fromCode(Status.Code.INTERNAL.toStatus().getCode()).withDescription(message))); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - String message = "Internal error occurred. The current thread got interrupted"; - throw MessageUtils.getConnectorError(new StatusRuntimeException(Status - .fromCode(Status.Code.INTERNAL.toStatus().getCode()).withDescription(message))); - } + }); } public static Object closeStream(Environment env, BObject streamIterator) { diff --git a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/streamingclient/FunctionUtils.java b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/streamingclient/FunctionUtils.java index 6ccda385f..d82af0c16 100644 --- a/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/streamingclient/FunctionUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/grpc/nativeimpl/streamingclient/FunctionUtils.java @@ -175,7 +175,7 @@ public static Object externIsBidirectional(BObject streamingConnection) { * @param streamingConnection streaming connection instance. * @return In streaming scenarios, return an `anydata`. */ - public static Object externReceive(BObject streamingConnection) { + public static Object externReceive(Environment env, BObject streamingConnection) { Boolean isStreamCancelled = (Boolean) streamingConnection.getNativeData(GrpcConstants.IS_STREAM_CANCELLED); if (isStreamCancelled != null && isStreamCancelled) { @@ -200,14 +200,21 @@ public static Object externReceive(BObject streamingConnection) { return ValueCreator.createStreamValue(TypeCreator.createStreamType(PredefinedTypes.TYPE_ANYDATA), streamIterator); } else { - Message nextMessage = (Message) messageQueue.take(); - streamingConnection.addNativeData(GrpcConstants.HEADERS, - MessageUtils.createHeaderMap(nextMessage.getHeaders())); - if (nextMessage.isError()) { - return MessageUtils.getConnectorError(nextMessage.getError()); - } else { - return nextMessage.getbMessage(); - } + return env.yieldAndRun(() -> { + try { + Message nextMessage = (Message) messageQueue.take(); + streamingConnection.addNativeData(GrpcConstants.HEADERS, + MessageUtils.createHeaderMap(nextMessage.getHeaders())); + if (nextMessage.isError()) { + return MessageUtils.getConnectorError(nextMessage.getError()); + } else { + return nextMessage.getbMessage(); + } + } catch (Exception e) { + LOG.error("Error while sending request message to server.", e); + return MessageUtils.getConnectorError(e); + } + }); } } catch (Exception e) { LOG.error("Error while sending request message to server.", e); diff --git a/test-utils/src/main/java/io/ballerina/stdlib/grpc/testutils/NativeTestUtils.java b/test-utils/src/main/java/io/ballerina/stdlib/grpc/testutils/NativeTestUtils.java index c33f0cccd..bc5b056ea 100644 --- a/test-utils/src/main/java/io/ballerina/stdlib/grpc/testutils/NativeTestUtils.java +++ b/test-utils/src/main/java/io/ballerina/stdlib/grpc/testutils/NativeTestUtils.java @@ -204,7 +204,7 @@ public static Object externInvokeRegisterErrorCase(BObject serviceEndpoint) { return externRegister(null, serviceEndpoint, null, null); } - public static Object externInvokeNextResultErrorCase(BObject streamIterator) { + public static Object externInvokeNextResultErrorCase(Environment env, BObject streamIterator) { BlockingQueue b = new BlockingQueue() { @Override public boolean add(Object o) { @@ -305,7 +305,7 @@ public void clear() {} }; streamIterator.addNativeData(GrpcConstants.MESSAGE_QUEUE, b); try { - nextResult(streamIterator); + nextResult(env, streamIterator); } catch (Exception e) { return e; }