Skip to content

Commit

Permalink
Migrate to new runtime APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
warunalakshitha committed Oct 30, 2024
1 parent 9382632 commit feedc1b
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 192 deletions.
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ org.gradle.caching=true
group=io.ballerina.stdlib
version=1.12.2-SNAPSHOT
#dependency versions
ballerinaLangVersion=2201.10.0-SNAPSHOT
ballerinaLangVersion=2201.10.0-20241025-103700-5c9e6a27
ballerinaTomlParserVersion=1.2.2
checkstylePluginVersion=10.12.0
commonsLang3Version=3.8.1
Expand Down Expand Up @@ -45,9 +45,9 @@ stdlibUuidVersion=1.8.1-20241009-134600-a05012b

stdlibAuthVersion=2.12.1-20241010-130800-733dbef
stdlibJwtVersion=2.13.1-20241010-123600-5ea6a94
stdlibOAuth2Version=2.12.1-20241010-123600-0e0cfcc
stdlibOAuth2Version=2.12.1-20241029-084800-d7ba9e5

stdlibHttpVersion=2.12.1-20241018-081800-bb91312
stdlibHttpVersion=2.13.0-20241029-110700-30ed05b

# Ballerinax Observer
observeVersion=1.3.1-20241007-161000-645452d
Expand Down
11 changes: 6 additions & 5 deletions native/src/main/java/io/ballerina/stdlib/grpc/DataContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package io.ballerina.stdlib.grpc;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;

import java.util.concurrent.CompletableFuture;

/**
* {@code DataContext} is the wrapper to hold {@code Strand} and {@code Future}.
*/
public class DataContext {
private Environment environment;
private Future future;
private final Environment environment;
private final CompletableFuture<Object> future;

public DataContext(Environment env, Future future) {
public DataContext(Environment env, CompletableFuture<Object> future) {
this.environment = env;
this.future = future;
}
Expand All @@ -37,7 +38,7 @@ public Environment getEnvironment() {
return environment;
}

public Future getFuture() {
public CompletableFuture<Object> getFuture() {
return future;
}
}
13 changes: 13 additions & 0 deletions native/src/main/java/io/ballerina/stdlib/grpc/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package io.ballerina.stdlib.grpc;

import io.ballerina.runtime.api.TypeTags;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.types.Type;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BDecimal;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.stdlib.grpc.exception.StatusRuntimeException;
Expand All @@ -40,6 +42,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static io.ballerina.runtime.api.constants.RuntimeConstants.BALLERINA_VERSION;
Expand Down Expand Up @@ -473,4 +476,14 @@ public static String getTypeName(Type type) {
}
return type.getName();
}

public static Object getResult(CompletableFuture<Object> balFuture) {
try {
return balFuture.get();
} catch (BError error) {
throw error;
} catch (Throwable throwable) {
throw ErrorCreator.createError(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptor(Obje
if (descriptorData == null) {
descriptorData = annotationMap.getStringValue(StringUtils.fromString("descriptor"));
}
BMap<BString, BString> descMap = (BMap<BString, BString>) annotationMap.getMapValue(
BMap<BString, Object> descMap = (BMap<BString, Object>) annotationMap.getMapValue(
StringUtils.fromString("descMap"));
return getFileDescriptor(descriptorData, descMap);
} catch (IOException | Descriptors.DescriptorValidationException e) {
Expand All @@ -258,9 +258,9 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptorFromS
} else if (type.getFields().containsKey("value")) {
descriptorData = service.getStringValue(StringUtils.fromString("value"));
}
BMap<BString, BString> descMap = null;
BMap<BString, Object> descMap = null;
if (type.getFields().containsKey("descMap")) {
descMap = (BMap<BString, BString>) service.getMapValue(StringUtils.fromString("descMap"));
descMap = service.getMapValue(StringUtils.fromString("descMap"));
}
if (descriptorData == null || descMap == null) {
return null;
Expand All @@ -273,8 +273,7 @@ private static com.google.protobuf.Descriptors.FileDescriptor getDescriptorFromS

}

private static Descriptors.FileDescriptor getFileDescriptor(
BString descriptorData, BMap<BString, BString> descMap)
private static Descriptors.FileDescriptor getFileDescriptor(BString descriptorData, BMap<BString, Object> descMap)
throws InvalidProtocolBufferException, Descriptors.DescriptorValidationException, GrpcServerException {

byte[] descriptor = hexStringToByteArray(descriptorData.getValue());
Expand All @@ -291,14 +290,15 @@ private static Descriptors.FileDescriptor getFileDescriptor(
List<Descriptors.FileDescriptor> fileDescriptors = new ArrayList<>();
for (ByteString dependency : descriptorProto.getDependencyList().asByteStringList()) {
String dependencyKey = dependency.toStringUtf8();
if (descMap == null || descMap.size() == 0) {
if (descMap == null || descMap.isEmpty()) {
Descriptors.FileDescriptor dependentDescriptor = StandardDescriptorBuilder.getFileDescriptor
(dependencyKey);
if (dependentDescriptor != null) {
fileDescriptors.add(dependentDescriptor);
}
} else if (descMap.containsKey(StringUtils.fromString(dependencyKey))) {
fileDescriptors.add(getFileDescriptor(descMap.get(StringUtils.fromString(dependencyKey)), descMap));
fileDescriptors.add(getFileDescriptor((BString) descMap.get(StringUtils.fromString(dependencyKey)),
descMap));
}
}
return Descriptors.FileDescriptor.buildFrom(descriptorProto,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,19 @@

package io.ballerina.stdlib.grpc.callback;

import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.stdlib.grpc.GrpcConstants;
import io.ballerina.stdlib.grpc.Message;
import io.ballerina.stdlib.grpc.Status;
import io.ballerina.stdlib.grpc.StreamObserver;
import io.ballerina.stdlib.grpc.exception.StatusRuntimeException;

import java.util.concurrent.Semaphore;

/**
* Abstract call back class registered for gRPC service in B7a executor.
*
* @since 0.995.0
*/
public class AbstractCallableUnitCallBack implements Callback {

public final Semaphore available = new Semaphore(1, true);

@Override
public void notifySuccess(Object o) {
available.release();
}

@Override
public void notifyFailure(io.ballerina.runtime.api.values.BError error) {
available.release();
}
public class AbstractCallableUnitCallBack {

/**
* Handles failures in GRPC callable unit callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.ballerina.stdlib.grpc.callback;

import com.google.protobuf.Descriptors;
import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.Runtime;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.types.ObjectType;
Expand Down Expand Up @@ -56,25 +55,22 @@ public class StreamingCallableUnitCallBack extends AbstractCallableUnitCallBack

private static final Logger LOG = LoggerFactory.getLogger(StreamingCallableUnitCallBack.class);

private Runtime runtime;
private StreamObserver responseSender;
private boolean emptyResponse;
private Descriptors.Descriptor outputType;
private ObserverContext observerContext;
private final Runtime runtime;
private final StreamObserver responseSender;
private final boolean emptyResponse;
private final Descriptors.Descriptor outputType;
private final ObserverContext observerContext;

public StreamingCallableUnitCallBack(Runtime runtime, StreamObserver responseSender, boolean isEmptyResponse,
Descriptors.Descriptor outputType, ObserverContext context) {
available.acquireUninterruptibly();
this.runtime = runtime;
this.responseSender = responseSender;
this.emptyResponse = isEmptyResponse;
this.observerContext = context;
this.outputType = outputType;
}

@Override
public void notifySuccess(Object response) {
super.notifySuccess(response);
// check whether connection is closed.
if (responseSender instanceof ServerCallHandler.ServerCallStreamObserver) {
ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler
Expand Down Expand Up @@ -112,17 +108,24 @@ public void notifySuccess(Object response) {
ReturnStreamUnitCallBack returnStreamUnitCallBack = new ReturnStreamUnitCallBack(
runtime, responseSender, outputType, bObject, headers);
ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject));
if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) {
runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null, null,
returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL);
} else {
runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null, null,
returnStreamUnitCallBack, null, PredefinedTypes.TYPE_NULL);
}
Thread.startVirtualThread(() -> {
try {
Object result;
if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) {
result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null).get();
} else {
result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null)
.get();
}
returnStreamUnitCallBack.notifySuccess(result);
} catch (BError error) {
returnStreamUnitCallBack.notifyFailure(error);
}
});
} else {
// If content is null and remote function doesn't return empty response means. response is already sent
// to client via caller object, but connection is not closed already by calling complete function.
// Hence closing the connection.
// Hence, closing the connection.
if (content == null) {
if (this.emptyResponse) {
Message responseMessage = new Message(GrpcConstants.EMPTY_DATATYPE_NAME, null);
Expand All @@ -138,7 +141,6 @@ public void notifySuccess(Object response) {
}
}

@Override
public void notifyFailure(io.ballerina.runtime.api.values.BError error) {
if (responseSender instanceof ServerCallHandler.ServerCallStreamObserver) {
ServerCallHandler.ServerCallStreamObserver serverCallStreamObserver = (ServerCallHandler
Expand All @@ -165,7 +167,6 @@ public void notifyFailure(io.ballerina.runtime.api.values.BError error) {
if (observerContext != null) {
observerContext.addProperty(PROPERTY_KEY_HTTP_STATUS_CODE, HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
}
super.notifyFailure(error);
if (isPanic) {
System.exit(1);
}
Expand All @@ -175,11 +176,11 @@ public void notifyFailure(io.ballerina.runtime.api.values.BError error) {
* Call back class registered to send returned stream from a remote function.
*
*/
public class ReturnStreamUnitCallBack extends AbstractCallableUnitCallBack {
private StreamObserver requestSender;
private Descriptors.Descriptor outputType;
private Runtime runtime;
private BObject bObject;
public static class ReturnStreamUnitCallBack extends AbstractCallableUnitCallBack {
private final StreamObserver requestSender;
private final Descriptors.Descriptor outputType;
private final Runtime runtime;
private final BObject bObject;
private HttpHeaders headers;

public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender,
Expand All @@ -191,7 +192,6 @@ public ReturnStreamUnitCallBack(Runtime runtime, StreamObserver requestSender,
this.headers = headers;
}

@Override
public void notifySuccess(Object response) {
if (response != null) {
Message msg;
Expand All @@ -207,22 +207,28 @@ public void notifySuccess(Object response) {
}
requestSender.onNext(msg);
ObjectType serviceObjectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(bObject));
if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) {
runtime.invokeMethodAsyncConcurrently(bObject, STREAMING_NEXT_FUNCTION, null,
null, this, null, PredefinedTypes.TYPE_NULL);
} else {
runtime.invokeMethodAsyncSequentially(bObject, STREAMING_NEXT_FUNCTION, null,
null, this, null, PredefinedTypes.TYPE_NULL);
}
Thread.startVirtualThread(() -> {
try {
Object result;
if (serviceObjectType.isIsolated() && serviceObjectType.isIsolated(STREAMING_NEXT_FUNCTION)) {
result = runtime.startIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null)
.get();
} else {
result = runtime.startNonIsolatedWorker(bObject, STREAMING_NEXT_FUNCTION, null, null, null)
.get();
}
this.notifySuccess(result);
} catch (BError error) {
this.notifyFailure(error);
}
});
} else {
requestSender.onCompleted();
}

}

@Override
public void notifyFailure(BError error) {
super.notifyFailure(error);

}
}
}
Loading

0 comments on commit feedc1b

Please sign in to comment.