Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement actor client metadata. #1165

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.Map;

/**
* Holds a client for Dapr sidecar communication. ActorClient should be reused.
*/
Expand Down Expand Up @@ -59,7 +62,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
* @param overrideProperties Override properties.
*/
public ActorClient(Properties overrideProperties) {
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
this(overrideProperties, null);
}

/**
Expand All @@ -69,21 +72,38 @@ public ActorClient(Properties overrideProperties) {
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
this(overrideProperties, null, resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param overrideProperties Override properties.
* @param metadata gRPC metadata or HTTP headers for actor invocation.
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param grpcManagedChannel gRPC channel.
* @param metadata gRPC metadata or HTTP headers for actor invocation.
* @param resiliencyOptions Client resiliency options.
* @param daprApiToken Dapr API token.
*/
private ActorClient(
ManagedChannel grpcManagedChannel,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
this.daprClient = buildDaprClient(grpcManagedChannel, metadata, resiliencyOptions, daprApiToken);
}

/**
Expand Down Expand Up @@ -137,10 +157,12 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
*/
private static DaprClient buildDaprClient(
Channel grpcManagedChannel,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
return new DaprClientImpl(
DaprGrpc.newStub(grpcManagedChannel),
metadata == null ? null : Collections.unmodifiableMap(metadata),
resiliencyOptions,
daprApiToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand All @@ -57,19 +58,30 @@ class DaprClientImpl implements DaprClient {
*/
private final DaprClientGrpcInterceptors grpcInterceptors;

/**
* Metadata for actor invocation requests.
*/
private final Map<String, String> metadata;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param metadata gRPC metadata or HTTP headers for actor server to receive.
* @param resiliencyOptions Client resiliency options (optional).
* @param daprApiToken Dapr API token (optional).
*/
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
DaprClientImpl(
DaprGrpc.DaprStub grpcClient,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this.client = grpcClient;
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
this.metadata = metadata == null ? Map.of() : metadata;
}

/**
Expand All @@ -82,6 +94,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setActorType(actorType)
.setActorId(actorId)
.setMethod(methodName)
.putAllMetadata(this.metadata)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.deferContextual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null, null);
}

@Test
Expand Down
12 changes: 10 additions & 2 deletions sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,19 @@ public DaprClientBuilder newDaprClientBuilder() {
}

public ActorClient newActorClient() {
return this.newActorClient(null);
return this.newActorClient(null, null);
}

public ActorClient newActorClient(Map<String, String> metadata) {
return this.newActorClient(metadata, null);
}

public ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
return new ActorClient(new Properties(this.getPropertyOverrides()), resiliencyOptions);
return this.newActorClient(null, resiliencyOptions);
}

public ActorClient newActorClient(Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
return new ActorClient(new Properties(this.getPropertyOverrides()), metadata, resiliencyOptions);
}

public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {
Expand Down
38 changes: 30 additions & 8 deletions sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.app.MyActor;
import io.dapr.it.actors.app.MyActorService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;

Expand All @@ -30,23 +35,24 @@ public class ActorExceptionIT extends BaseIT {

private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);

@Test
public void exceptionTest() throws Exception {
private static DaprRun run;

@BeforeAll
public static void start() throws Exception {
// The call below will fail if service cannot start successfully.
var run = startDaprApp(
run = startDaprApp(
ActorExceptionIT.class.getSimpleName(),
MyActorService.SUCCESS_MESSAGE,
MyActorService.class,
true,
60000);
}

logger.debug("Creating proxy builder");
@Test
public void exceptionTest() throws Exception {
ActorProxyBuilder<MyActor> proxyBuilder =
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient()));
logger.debug("Creating actorId");
ActorId actorId1 = new ActorId("1");
logger.debug("Building proxy");
MyActor proxy = proxyBuilder.build(actorId1);
MyActor proxy = proxyBuilder.build(new ActorId("1"));

callWithRetry(() -> {
assertThrowsDaprExceptionSubstring(
Expand All @@ -55,4 +61,20 @@ public void exceptionTest() throws Exception {
() -> proxy.throwException());
}, 10000);
}

@Test
public void exceptionDueToMetadataTest() throws Exception {
// Setting this HTTP header via actor metadata will cause the Actor HTTP server to error.
Map<String, String> metadata = Map.of("Content-Length", "9999");
ActorProxyBuilder<MyActor> proxyBuilderMetadataOverride =
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient(metadata)));

MyActor proxyWithMetadata = proxyBuilderMetadataOverride.build(new ActorId("2"));
callWithRetry(() -> {
assertThrowsDaprExceptionSubstring(
"INTERNAL",
"ContentLength=9999 with Body length 13",
() -> proxyWithMetadata.say("hello world"));
}, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor;
import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor;
import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor;
import io.dapr.internal.grpc.interceptors.DaprMetadataReceiverInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor;
import io.dapr.internal.resiliency.TimeoutPolicy;
Expand All @@ -35,10 +35,18 @@ public class DaprClientGrpcInterceptors {

private final TimeoutPolicy timeoutPolicy;

/**
* Instantiates a holder of all gRPC interceptors.
*/
public DaprClientGrpcInterceptors() {
this(null, null);
}

/**
* Instantiates a holder of all gRPC interceptors.
* @param daprApiToken Dapr API token.
* @param timeoutPolicy Timeout Policy.
*/
public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) {
this.daprApiToken = daprApiToken;
this.timeoutPolicy = timeoutPolicy;
Expand Down Expand Up @@ -118,7 +126,7 @@ public <T extends AbstractStub<T>> T intercept(
new DaprApiTokenInterceptor(this.daprApiToken),
new DaprTimeoutInterceptor(this.timeoutPolicy),
new DaprTracingInterceptor(context),
new DaprMetadataInterceptor(metadataConsumer));
new DaprMetadataReceiverInterceptor(metadataConsumer));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.dapr.internal.grpc.interceptors;

import io.dapr.client.Headers;
import io.dapr.config.Properties;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
/**
* Consumes gRPC metadata.
*/
public class DaprMetadataInterceptor implements ClientInterceptor {
public class DaprMetadataReceiverInterceptor implements ClientInterceptor {

private final Consumer<Metadata> metadataConsumer;

/**
* Creates an instance of the consumer for gRPC metadata.
* @param metadataConsumer gRPC metadata consumer
*/
public DaprMetadataInterceptor(Consumer<Metadata> metadataConsumer) {
public DaprMetadataReceiverInterceptor(Consumer<Metadata> metadataConsumer) {
this.metadataConsumer = metadataConsumer;
}

Expand Down
Loading