Skip to content

Commit

Permalink
Adding RPC.
Browse files Browse the repository at this point in the history
  • Loading branch information
jlahoda committed Jan 13, 2024
1 parent dec2840 commit f0ba412
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -58,7 +61,7 @@ public Sender(InputStream in, OutputStream out) {
while (true) {
int id = Utils.readInt(in);
int size = Utils.readInt(in);
byte[] data = in.readNBytes(size);
byte[] data = in.readNBytes(size + 1);
PendingRequest<Object> request;

synchronized (id2PendingRequest) {
Expand All @@ -68,8 +71,13 @@ public Sender(InputStream in, OutputStream out) {
if (request == null) {
LOG.log(Level.SEVERE, "No pending request number {0}", id);
} else {
Object value = Utils.gson.fromJson(new String(data, StandardCharsets.UTF_8), request.responseType);
request.pending.complete(value);
String dataText = new String(data, 1, data.length - 1, StandardCharsets.UTF_8);
if (data[0] == 'D') {
Object value = Utils.gson.fromJson(dataText, request.responseType);
request.pending.complete(value);
} else {
request.pending.completeExceptionally(new RuntimeException(dataText));
}
}
}
} catch (EndOfInput ex) {
Expand Down Expand Up @@ -103,19 +111,23 @@ private void write(int id, Enum<?> messageKind, Object message) throws IOExcepti
}

public <R> CompletableFuture<R> sendAndReceive(Enum<?> task, Object request, Class<R> responseType) throws IOException {
return sendAndReceive(task, request, (Type) responseType);
}

public <R> CompletableFuture<R> sendAndReceive(Enum<?> task, Object request, Type responseType) throws IOException {
int id = nextId.getAndIncrement();
CompletableFuture<R> result = new CompletableFuture<>();

synchronized (id2PendingRequest) {
id2PendingRequest.put(id, new PendingRequest<>((CompletableFuture<Object>) result, (Class<Object>) responseType));
id2PendingRequest.put(id, new PendingRequest<>((CompletableFuture<Object>) result, responseType));
}

write(id, task, request);

return result;
}

private record PendingRequest<R>(CompletableFuture<R> pending, Class<R> responseType) {}
private record PendingRequest<R>(CompletableFuture<R> pending, Type responseType) {}
}

public static final class ReceiverBuilder<E extends Enum<E>> {
Expand Down Expand Up @@ -156,12 +168,23 @@ public Task startReceiver() {
E kind = Enum.valueOf(messageTypeClass, kindName);
Handler<?> handler = messageType2Handler.get(kind);
Object dataValue = Utils.gson.fromJson(new String(dataBytes, StandardCharsets.UTF_8), handler.messageTypeDataClass);
handler.run(dataValue).thenAccept(r -> {
byte[] messageBytes = Utils.gson.toJson(r).getBytes(StandardCharsets.UTF_8);
byte[] output = new byte[messageBytes.length + 8];
handler.run(dataValue).handle((result, exception) -> {
byte tagChar;
byte[] messageBytes;
if (exception == null) {
tagChar = 'D';
messageBytes = Utils.gson.toJson(result).getBytes(StandardCharsets.UTF_8);
} else {
StringWriter data = new StringWriter();
exception.printStackTrace(new PrintWriter(data));
tagChar = 'E';
messageBytes = data.toString().getBytes(StandardCharsets.UTF_8);
}
byte[] output = new byte[messageBytes.length + 8 + 1];
Utils.writeInt(output, 0, id);
Utils.writeInt(output, 4, messageBytes.length);
System.arraycopy(messageBytes, 0, output, 8, messageBytes.length);
output[8] = tagChar;
System.arraycopy(messageBytes, 0, output, 9, messageBytes.length);
synchronized (out) {
try {
out.write(output);
Expand All @@ -170,6 +193,7 @@ public Task startReceiver() {
Exceptions.printStackTrace(ex);
}
}
return null;
});
}
} catch (IOException ex) {
Expand Down
124 changes: 124 additions & 0 deletions ide/ide.remote/src/org/netbeans/modules/remote/RemoteInvocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.netbeans.modules.remote;

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.netbeans.modules.remote.AsynchronousConnection.ReceiverBuilder;
import org.netbeans.modules.remote.AsynchronousConnection.Sender;

/**
*
*/
public class RemoteInvocation {

public static <T> T caller(InputStream in, OutputStream out, Class<T> intf) {
Sender sender = new Sender(in, out);
return (T) Proxy.newProxyInstance(RemoteInvocation.class.getClassLoader(), new Class<?>[] {intf}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String[] encodedParams;

if (args == null) {
encodedParams = new String[0];
} else {
encodedParams = Arrays.stream(args)
.map(Utils.gson::toJson)
.toArray(s -> new String[s]);
}

InvocationData data = new InvocationData(encodeMethod(method), encodedParams);
Type returnType = method.getGenericReturnType();
if (returnType == void.class) {
returnType = Void.class;
}
return sender.sendAndReceive(Task.INVOKE, data, returnType).get();
}
});
}

public static void receiver(InputStream in, OutputStream out, Object delegate) {
Map<String, Method> encodedMethod2Method = Arrays.stream(delegate.getClass().getMethods()).collect(Collectors.toMap(m -> encodeMethod(m), m -> m));

new ReceiverBuilder<>(in, out, Task.class)
.addHandler(Task.INVOKE, InvocationData.class, data -> {
CompletableFuture<Object> result = new CompletableFuture<>();
try {
Object[] args = new Object[data.parameters.length];
Method toInvoke = encodedMethod2Method.get(data.methodNameAndSignature);
Type[] parameterTypes = toInvoke.getGenericParameterTypes();
for (int i = 0; i < parameterTypes.length; i++) {
args[i] = Utils.gson.fromJson(data.parameters[i], parameterTypes[i]);
}
toInvoke.setAccessible(true);
result.complete(toInvoke.invoke(delegate, args));
} catch (Throwable t) {
result.completeExceptionally(t);
}
return result;
})
.startReceiver();
}

public static RuntimeException sneakyThrows(Throwable t) {
return doSneakyThrows(t);
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> RuntimeException doSneakyThrows(Throwable t) throws T {
throw (T) t;
}

public enum Task {
INVOKE;
}

public static final class InvocationData {
public String methodNameAndSignature;
public String[] parameters;

public InvocationData() {
}

public InvocationData(String methodNameAndSignature, String[] parameters) {
this.methodNameAndSignature = methodNameAndSignature;
this.parameters = parameters;
}

}

private static String encodeMethod(Method m) {
StringBuilder result = new StringBuilder();
result.append(m.getName());
result.append("(");
for (Class<?> p : m.getParameterTypes()) {
result.append(p.getCanonicalName()).append(";");
}
result.append(")");
return result.toString();
}
}
Loading

0 comments on commit f0ba412

Please sign in to comment.