diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java index 843992619..d242fbace 100644 --- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java @@ -26,6 +26,7 @@ import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.SaveStateRequest; @@ -296,7 +297,7 @@ public Mono> getState(String storeName, String key, Class clazz) */ @Override public Mono> getState( - String storeName, String key, StateOptions options, TypeRef type) { + String storeName, String key, StateOptions options, TypeRef type) { GetStateRequest request = new GetStateRequest(storeName, key) .setStateOptions(options); return this.getState(request, type); @@ -388,6 +389,14 @@ public Mono> queryState(QueryStateRequest request, Cla return this.queryState(request, TypeRef.get(clazz)); } + /** + * {@inheritDoc} + */ + @Override + public Mono> queryMethod(InvokeMethodRequest request, Class clazz) { + return this.queryMethod(request, TypeRef.get(clazz)); + } + /** * {@inheritDoc} */ @@ -542,6 +551,7 @@ public Mono> getConfiguration( /** * {@inheritDoc} */ + @Override public Flux> subscribeToConfiguration(String storeName, String... keys) { List listOfKeys = filterEmptyKeys(keys); SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys); @@ -551,6 +561,7 @@ public Flux> subscribeToConfiguration(String storeName, /** * {@inheritDoc} */ + @Override public Flux> subscribeToConfiguration( String storeName, List keys, diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 39f0d227b..d268c4ca5 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -29,6 +29,7 @@ import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; @@ -54,6 +55,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.stub.StreamObserver; +import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -62,7 +64,10 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -186,34 +191,17 @@ public Mono publishEvent(PublishEventRequest request) { @Override public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { try { - String appId = invokeMethodRequest.getAppId(); - String method = invokeMethodRequest.getMethod(); - Object body = invokeMethodRequest.getBody(); - HttpExtension httpExtension = invokeMethodRequest.getHttpExtension(); - DaprProtos.InvokeServiceRequest envelope = buildInvokeServiceRequest( - httpExtension, - appId, - method, - body); - // Regarding missing metadata in method invocation for gRPC: - // gRPC to gRPC does not handle metadata in Dapr runtime proto. - // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 - - return Mono.subscriberContext().flatMap( - context -> this.createMono( - it -> intercept(context, asyncStub).invokeService(envelope, it) - ) - ).flatMap( + return queryMethod(invokeMethodRequest).flatMap( it -> { try { return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); - } catch (IOException e) { - throw DaprException.propagate(e); + } catch (IOException e) { + return DaprException.wrapMono(e); } } ); - } catch (Exception ex) { - return DaprException.wrapMono(ex); + } catch (Exception e) { + return DaprException.wrapMono(e); } } @@ -717,6 +705,64 @@ public Mono> queryState(QueryStateRequest request, Typ } } + @Override + public Mono> queryMethod(InvokeMethodRequest request, TypeRef type) { + try { + return queryMethod(request).flatMap(r -> { + try { + return Mono.justOrEmpty(buildQueryMethodResponse(r, type)); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + }); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + } + + @NotNull + private Mono queryMethod(InvokeMethodRequest invokeMethodRequest) throws IOException { + String appId = invokeMethodRequest.getAppId(); + String method = invokeMethodRequest.getMethod(); + Object body = invokeMethodRequest.getBody(); + HttpExtension httpExtension = invokeMethodRequest.getHttpExtension(); + DaprProtos.InvokeServiceRequest envelope = buildInvokeServiceRequest( + httpExtension, + appId, + method, + body); + // Regarding missing metadata in method invocation for gRPC: + // gRPC to gRPC does not handle metadata in Dapr runtime proto. + // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 + + Mono invokeResponseMono = Mono.subscriberContext().flatMap( + context -> this.createMono( + it -> intercept(context, asyncStub).invokeService(envelope, it) + ) + ); + return invokeResponseMono; + } + + private QueryMethodResponse buildQueryMethodResponse(CommonProtos.InvokeResponse response, + TypeRef type) throws IOException { + Map headers = new HashMap<>(); + headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE,response.getContentType()); + byte[] respBody = response.getData().getValue().toByteArray(); + if (respBody.length > 1 && respBody[0] == 34) { + respBody = Arrays.copyOfRange(respBody, 1, respBody.length - 1); + } + Object data; + if (type.getType() == String.class) { + data = new String(respBody, StandardCharsets.UTF_8); + } else if (type.getType() == byte[].class) { + data = respBody; + } else { + data = objectSerializer.deserialize(respBody, type); + } + + return new QueryMethodResponse(200, headers, (T) data); + } + private QueryStateItem buildQueryStateKeyValue( DaprProtos.QueryStateItem item, TypeRef type) throws IOException { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index f325c0e03..cad2295c5 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -27,6 +27,7 @@ import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.client.domain.PublishEventRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; @@ -46,6 +47,9 @@ import reactor.core.publisher.Mono; import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,6 +57,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -180,40 +185,16 @@ public Mono publishEvent(PublishEventRequest request) { @Override public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { try { - final String appId = invokeMethodRequest.getAppId(); - final String method = invokeMethodRequest.getMethod(); - final Object request = invokeMethodRequest.getBody(); - final HttpExtension httpExtension = invokeMethodRequest.getHttpExtension(); - final String contentType = invokeMethodRequest.getContentType(); - if (httpExtension == null) { - throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead."); - } - // If the httpExtension is not null, then the method will not be null based on checks in constructor - final String httpMethod = httpExtension.getMethod().toString(); - if (appId == null || appId.trim().isEmpty()) { - throw new IllegalArgumentException("App Id cannot be null or empty."); - } - if (method == null || method.trim().isEmpty()) { - throw new IllegalArgumentException("Method name cannot be null or empty."); - } - - - String[] methodSegments = method.split("/"); - - List pathSegments = new ArrayList<>(Arrays.asList(DaprHttp.API_VERSION, "invoke", appId, "method")); - pathSegments.addAll(Arrays.asList(methodSegments)); - - byte[] serializedRequestBody = objectSerializer.serialize(request); - final Map headers = new HashMap<>(); - if (contentType != null && !contentType.isEmpty()) { - headers.put("content-type", contentType); + if (!(type instanceof ParameterizedType) || ((ParameterizedType)type).getRawType() != QueryMethodResponse.class) { + return queryMethod(invokeMethodRequest).flatMap(r -> getMono(type, r)); + } else { + TypeRef resultType = type; + Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments(); + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { + resultType = TypeRef.get(actualTypeArguments[0]); + } + return this.queryMethod(invokeMethodRequest, resultType); } - headers.putAll(httpExtension.getHeaders()); - Mono response = Mono.subscriberContext().flatMap( - context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]), - httpExtension.getQueryParams(), serializedRequestBody, headers, context) - ); - return response.flatMap(r -> getMono(type, r)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -697,6 +678,61 @@ public Mono> queryState(QueryStateRequest request, Typ } } + /** + * {@inheritDoc} + */ + @Override + public Mono> queryMethod(InvokeMethodRequest request, TypeRef type) { + try { + return queryMethod(request).flatMap(r -> { + try { + return Mono.justOrEmpty(buildQueryMethodResponse(r, type)); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + }); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + } + + private Mono queryMethod(InvokeMethodRequest invokeMethodRequest) throws IOException { + final String appId = invokeMethodRequest.getAppId(); + final String method = invokeMethodRequest.getMethod(); + final Object request = invokeMethodRequest.getBody(); + final HttpExtension httpExtension = invokeMethodRequest.getHttpExtension(); + final String contentType = invokeMethodRequest.getContentType(); + if (httpExtension == null) { + throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead."); + } + // If the httpExtension is not null, then the method will not be null based on checks in constructor + final String httpMethod = httpExtension.getMethod().toString(); + if (appId == null || appId.trim().isEmpty()) { + throw new IllegalArgumentException("App Id cannot be null or empty."); + } + if (method == null || method.trim().isEmpty()) { + throw new IllegalArgumentException("Method name cannot be null or empty."); + } + + + String[] methodSegments = method.split("/"); + + List pathSegments = new ArrayList<>(Arrays.asList(DaprHttp.API_VERSION, "invoke", appId, "method")); + pathSegments.addAll(Arrays.asList(methodSegments)); + + byte[] serializedRequestBody = objectSerializer.serialize(request); + final Map headers = new HashMap<>(); + if (contentType != null && !contentType.isEmpty()) { + headers.put("content-type", contentType); + } + headers.putAll(httpExtension.getHeaders()); + Mono response = Mono.subscriberContext().flatMap( + context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]), + httpExtension.getQueryParams(), serializedRequestBody, headers, context) + ); + return response; + } + /** * {@inheritDoc} */ @@ -758,6 +794,22 @@ private QueryStateResponse buildQueryStateResponse(DaprHttp.Response resp return new QueryStateResponse<>(result, token).setMetadata(metadata); } + private QueryMethodResponse buildQueryMethodResponse(DaprHttp.Response response, + TypeRef type) throws IOException { + byte[] respBody = response.getBody(); + Object data = null; + if (Objects.nonNull(type)) { + if (type.getType() == String.class) { + data = new String(respBody, StandardCharsets.UTF_8); + } else if (type.getType() == byte[].class) { + data = respBody; + } else { + data = objectSerializer.deserialize(respBody, type); + } + } + return new QueryMethodResponse(response.getStatusCode(), response.getHeaders(), (T) data); + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 2928ba747..682ccda3f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -15,6 +15,8 @@ import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.SubscribeConfigurationRequest; @@ -223,4 +225,24 @@ Mono> queryState(String storeName, Query query, * @return A Mono of QueryStateResponse of type T. */ Mono> queryState(QueryStateRequest request, TypeRef type); + + /** + * Query for method using a query request. + * + * @param request Query request object. + * @param clazz The type needed as return for the call. + * @param The Type of the return, use byte[] to skip serialization. + * @return A Mono of QueryStateResponse of type T. + */ + Mono> queryMethod(InvokeMethodRequest request, Class clazz); + + /** + * Query for method using a query request. + * + * @param request Query request object. + * @param type The type needed as return for the call. + * @param The Type of the return, use byte[] to skip serialization. + * @return A Mono of QueryStateResponse of type T. + */ + Mono> queryMethod(InvokeMethodRequest request, TypeRef type); } diff --git a/sdk/src/main/java/io/dapr/client/domain/QueryMethodResponse.java b/sdk/src/main/java/io/dapr/client/domain/QueryMethodResponse.java new file mode 100644 index 000000000..bde0db781 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/QueryMethodResponse.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import java.util.Map; + +/** + * HttpDaprResponse. + */ +public class QueryMethodResponse { + + private final int code; + + private final Map headers; + + private final T result; + + /** + * build query method response. + * @param code http response code. + * @param headers http headers. + * @param result the type of the return. + */ + public QueryMethodResponse(int code, Map headers, T result) { + this.code = code; + this.headers = headers; + this.result = result; + } + + + /** + * get response code. + * @return response code + */ + public int getCode() { + return code; + } + + /** + * get response result. + * @return response result + */ + public T getResult() { + return result; + } + + /** + * get response header. + * @return response header + */ + public Map getHeaders() { + return headers; + } + +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 0cdf2a82e..494566863 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -417,7 +417,7 @@ public void invokeServiceWithContext() { } @Test - public void invokeBinding() { + public void invokeBindingReturnResponse() { Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 8219ac8f5..b6f10674e 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -16,15 +16,20 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.SubscribeConfigurationRequest; import io.dapr.client.domain.query.Query; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; @@ -33,6 +38,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; +import reactor.core.publisher.Mono; import java.io.Closeable; import java.io.IOException; @@ -388,4 +394,53 @@ private DaprProtos.QueryStateItem buildQueryStateItem(QueryStateItem item) th } return it.build(); } + + + + private ByteString serialize(Object value) throws IOException { + byte[] byteValue = new ObjectSerializer().serialize(value); + return ByteString.copyFrom(byteValue); + } + + private Any getAny(Object value) throws IOException { + return Any.newBuilder().setValue(serialize(value)).build(); + } + + + @Test + public void invokeServiceTestReturnResponse() throws IOException { + String expected = "Value"; + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + + InvokeMethodRequest req = new InvokeMethodRequest("appId", "method") + .setBody("request") + .setHttpExtension(HttpExtension.NONE); + Mono> result = previewClient.queryMethod(req, new TypeRef() {}); + QueryMethodResponse res = result.block(); + + assertEquals(expected, res.getResult()); + } + + @Test + public void invokeServiceTestReturnResponseWithBytes() throws IOException { + String expected = "Value"; + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + + InvokeMethodRequest req = new InvokeMethodRequest("appId", "method") + .setBody("request") + .setHttpExtension(HttpExtension.NONE); + Mono> result = previewClient.queryMethod(req, new TypeRef() {}); + QueryMethodResponse res = result.block(); + assertEquals(expected, new String(res.getResult())); + } } diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java index 121db19b4..390765c2e 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java @@ -13,6 +13,9 @@ package io.dapr.client; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.QueryMethodResponse; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.query.Query; @@ -24,6 +27,12 @@ import okhttp3.mock.MockInterceptor; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -120,4 +129,49 @@ public void queryStateTest() { assertEquals("result must be same", "testData", response.getResults().get(0).getValue()); assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag()); } + + + @Test + public void invokeServiceReturnResponse() throws IOException { + String resultString = "request success"; + String resultHeaderName = "test-header"; + String resultHeaderValue = "1"; + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") + .respond(resultString) + .addHeader(resultHeaderName,resultHeaderValue); + + InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder") + .setBody("request") + .setHttpExtension(HttpExtension.POST); + Mono> result = daprPreviewClientHttp.queryMethod(req, new TypeRef() {}); + QueryMethodResponse response = result.block(); + Assertions.assertNotNull(response); + Assertions.assertEquals(200, response.getCode()); + Assertions.assertEquals(resultString,response.getResult()); + Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName)); + } + + @Test + public void invokeServiceReturnResponseBytes() throws IOException { + String resultString = "request success"; + String resultHeaderName = "test-header"; + String resultHeaderValue = "1"; + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") + .respond(resultString) + .addHeader(resultHeaderName,resultHeaderValue); + + InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder") + .setBody("request") + .setHttpExtension(HttpExtension.POST); + + Mono> result = daprPreviewClientHttp.queryMethod(req, new TypeRef() {}); + QueryMethodResponse response = result.block(); + Assertions.assertNotNull(response); + Assertions.assertEquals(200, response.getCode()); + Assertions.assertEquals(resultString,new String(response.getResult())); + Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName)); + } + } diff --git a/settings.xml b/settings.xml deleted file mode 100644 index cad39afb1..000000000 --- a/settings.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - ossrh - ${env.OSSRH_USER_TOKEN} - ${env.OSSRH_PWD_TOKEN} - - - - - - true - - - ${env.GPG_KEY} - ${env.GPG_PWD} - - - - \ No newline at end of file