Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Nexus sync client handler #2403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.nexus.Nexus;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerFactoryOptions;
Expand Down Expand Up @@ -83,10 +84,14 @@ public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return WorkflowClientOperationHandlers.fromWorkflowMethod(
(context, details, client, input) ->
client.newWorkflowStub(
TestOtherWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
(context, details, input) ->
Nexus.getOperationContext()
.getWorkflowClient()
.newWorkflowStub(
TestOtherWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(details.getRequestId())
.build())
::workflow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
* Intercepts inbound calls to a Nexus operation on the worker side.
*
* <p>An instance should be created in {@link
* WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}.
* WorkerInterceptor#interceptNexusOperation(OperationContext,
* NexusOperationInboundCallsInterceptor)}.
*
* <p>Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the
* methods you need instead of implementing this interface directly. {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;

/**
Expand All @@ -41,4 +42,7 @@
public interface NexusOperationOutboundCallsInterceptor {
/** Intercepts call to get the metric scope in a Nexus operation. */
Scope getMetricsScope();

/** Intercepts call to get the workflow client in a Nexus operation. */
WorkflowClient getWorkflowClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;

/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */
Expand All @@ -37,4 +38,9 @@ public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInt
public Scope getMetricsScope() {
return next.getMetricsScope();
}

@Override
public WorkflowClient getWorkflowClient() {
return next.getWorkflowClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
* used directly.
*/
public final class CurrentNexusOperationContext {
private static final ThreadLocal<NexusOperationContextImpl> CURRENT = new ThreadLocal<>();
private static final ThreadLocal<InternalNexusOperationContext> CURRENT = new ThreadLocal<>();

public static NexusOperationContextImpl get() {
NexusOperationContextImpl result = CURRENT.get();
public static InternalNexusOperationContext get() {
InternalNexusOperationContext result = CURRENT.get();
if (result == null) {
throw new IllegalStateException(
"NexusOperationContext can be used only inside of nexus operation handler "
Expand All @@ -37,7 +37,7 @@ public static NexusOperationContextImpl get() {
return CURRENT.get();
}

public static void set(NexusOperationContextImpl context) {
public static void set(InternalNexusOperationContext context) {
if (context == null) {
throw new IllegalArgumentException("null context");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;

public class NexusOperationContextImpl implements NexusOperationContext {
public class InternalNexusOperationContext {
private final String namespace;
private final String taskQueue;
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;

public NexusOperationContextImpl(
String namespace,
String taskQueue,
WorkflowClient client,
NexusOperationOutboundCallsInterceptor outboundCalls) {
public InternalNexusOperationContext(
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.metricScope = metricScope;
this.client = client;
this.outboundCalls = outboundCalls;
}

@Override
public Scope getMetricsScope() {
return outboundCalls.getMetricsScope();
return metricScope;
}

public WorkflowClient getWorkflowClient() {
Expand All @@ -62,4 +59,23 @@ public String getNamespace() {
public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) {
this.outboundCalls = outboundCalls;
}

public NexusOperationContext getUserFacingContext() {
if (outboundCalls == null) {
throw new IllegalStateException("Outbound interceptor is not set");
}
return new NexusOperationContextImpl();
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public Scope getMetricsScope() {
return outboundCalls.getMetricsScope();
}

@Override
public WorkflowClient getWorkflowClient() {
return outboundCalls.getWorkflowClient();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public final class NexusInternal {
private NexusInternal() {}

public static NexusOperationContext getOperationContext() {
return CurrentNexusOperationContext.get();
return CurrentNexusOperationContext.get().getUserFacingContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}

CurrentNexusOperationContext.set(
new NexusOperationContextImpl(
namespace,
taskQueue,
client,
new RootNexusOperationOutboundCallsInterceptor(metricsScope)));
new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client));

switch (request.getVariantCase()) {
case START_OPERATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,26 @@
package io.temporal.internal.nexus;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;

public class RootNexusOperationOutboundCallsInterceptor
implements NexusOperationOutboundCallsInterceptor {
private final Scope scope;
private final WorkflowClient client;

RootNexusOperationOutboundCallsInterceptor(Scope scope) {
RootNexusOperationOutboundCallsInterceptor(Scope scope, WorkflowClient client) {
this.scope = scope;
this.client = client;
}

@Override
public Scope getMetricsScope() {
return scope;
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public OperationHandler<Object, Object> intercept(
interceptor.interceptNexusOperation(context, inboundCallsInterceptor);
}

InternalNexusOperationContext temporalNexusContext = CurrentNexusOperationContext.get();
inboundCallsInterceptor.init(
new RootNexusOperationOutboundCallsInterceptor(
CurrentNexusOperationContext.get().getMetricsScope()));
temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient()));
return new OperationInterceptorConverter(inboundCallsInterceptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.nexus;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;

Expand All @@ -39,4 +40,10 @@ public interface NexusOperationContext {
* WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up.
*/
Scope getMetricsScope();

/**
* Get a {@link WorkflowClient} that can be used to start interact with the Temporal service from
* a Nexus handler.
*/
WorkflowClient getWorkflowClient();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a method to get the nexus context too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a use case comes up yes, at least internally.

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,11 @@

import io.nexusrpc.handler.*;
import io.nexusrpc.handler.OperationHandler;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;

/** WorkflowClientOperationHandlers can be used to create Temporal specific OperationHandlers */
@Experimental
public final class WorkflowClientOperationHandlers {
/**
* Helper to create {@link io.nexusrpc.handler.OperationHandler} instances that take a {@link
* io.temporal.client.WorkflowClient}.
*/
public static <T, R> OperationHandler<T, R> sync(
SynchronousWorkflowClientOperationFunction<T, R> func) {
return io.nexusrpc.handler.OperationHandler.sync(
(OperationContext ctx, OperationStartDetails details, T input) -> {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
return func.apply(ctx, details, nexusCtx.getWorkflowClient(), input);
});
}

/**
* Maps a workflow method to an {@link io.nexusrpc.handler.OperationHandler}.
*
Expand All @@ -52,9 +36,8 @@ public static <T, R> OperationHandler<T, R> sync(
public static <T, R> OperationHandler<T, R> fromWorkflowMethod(
WorkflowMethodFactory<T, R> startMethod) {
return new RunWorkflowOperation<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be in this PR but I would to call this WorkflowRunOperation across all SDKs.

(OperationContext context, OperationStartDetails details, WorkflowClient client, T input) ->
WorkflowHandle.fromWorkflowMethod(
startMethod.apply(context, details, client, input), input));
(OperationContext context, OperationStartDetails details, T input) ->
WorkflowHandle.fromWorkflowMethod(startMethod.apply(context, details, input), input));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,5 @@ public interface WorkflowHandleFactory<T, R> {
* through the provided {@link WorkflowClient}.
*/
@Nullable
WorkflowHandle<R> apply(
OperationContext context, OperationStartDetails details, WorkflowClient client, T input);
WorkflowHandle<R> apply(OperationContext context, OperationStartDetails details, T input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ public interface WorkflowMethodFactory<T, R> {
* provided {@link WorkflowClient}.
*/
@Nullable
Functions.Func1<T, R> apply(
OperationContext context, OperationStartDetails details, WorkflowClient client, T input);
Functions.Func1<T, R> apply(OperationContext context, OperationStartDetails details, T input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.WorkflowClientInternal;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import io.temporal.internal.nexus.InternalNexusOperationContext;
import io.temporal.workflow.Functions;

class WorkflowMethodMethodInvoker implements WorkflowHandleInvoker {
Expand All @@ -36,7 +36,7 @@ public WorkflowMethodMethodInvoker(Functions.Proc workflow) {

@Override
public WorkflowExecution invoke(NexusStartWorkflowRequest request) {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
return ((WorkflowClientInternal) nexusCtx.getWorkflowClient().getInternal())
.startNexus(request, workflow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import io.temporal.internal.nexus.InternalNexusOperationContext;
import java.net.URISyntaxException;

class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
Expand All @@ -45,10 +45,9 @@ class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
@Override
public OperationStartResult<R> start(
OperationContext ctx, OperationStartDetails operationStartDetails, T input) {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();

WorkflowHandle handle =
handleFactory.apply(ctx, operationStartDetails, nexusCtx.getWorkflowClient(), input);
WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input);

NexusStartWorkflowRequest nexusRequest =
new NexusStartWorkflowRequest(
Expand All @@ -73,7 +72,7 @@ public OperationStartResult<R> start(
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
try {
OperationStartResult.Builder<R> result =
OperationStartResult.<R>newAsyncBuilder(workflowExec.getWorkflowId());
OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId());
if (nexusLink != null) {
result.addLink(nexusProtoLinkToLink(nexusLink));
}
Expand Down
Loading
Loading