diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 751104cdb..4896f5329 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -20,6 +20,7 @@ package io.temporal.internal.testservice; +import static io.temporal.api.enums.v1.EventType.*; import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.*; import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static io.temporal.internal.testservice.CronUtils.getBackoffInterval; @@ -60,17 +61,6 @@ import io.temporal.internal.common.ProtoEnumNameUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.WorkflowExecutionUtils; -import io.temporal.internal.testservice.StateMachines.Action; -import io.temporal.internal.testservice.StateMachines.ActivityTaskData; -import io.temporal.internal.testservice.StateMachines.CancelExternalData; -import io.temporal.internal.testservice.StateMachines.ChildWorkflowData; -import io.temporal.internal.testservice.StateMachines.NexusOperationData; -import io.temporal.internal.testservice.StateMachines.SignalExternalData; -import io.temporal.internal.testservice.StateMachines.State; -import io.temporal.internal.testservice.StateMachines.TimerData; -import io.temporal.internal.testservice.StateMachines.UpdateWorkflowExecutionData; -import io.temporal.internal.testservice.StateMachines.WorkflowData; -import io.temporal.internal.testservice.StateMachines.WorkflowTaskData; import io.temporal.serviceclient.StatusUtils; import java.time.Duration; import java.util.*; @@ -310,7 +300,7 @@ private void validateLinks(List links) { .withDescription("workflow event link must not have an empty run ID field") .asRuntimeException(); } - if (l.getWorkflowEvent().getEventRef().getEventType() == EventType.EVENT_TYPE_UNSPECIFIED + if (l.getWorkflowEvent().getEventRef().getEventType() == EVENT_TYPE_UNSPECIFIED && l.getWorkflowEvent().getEventRef().getEventId() != 0) { throw Status.INVALID_ARGUMENT .withDescription( @@ -3372,9 +3362,9 @@ private static Optional getStartEvent(List history) // This is true today (see StateMachines.startWorkflow), even in the signalWithStartCase (signal // is the _second_ event). But if it becomes untrue in the future, we'd rather fail than lie. Preconditions.checkState( - firstEvent.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + firstEvent.getEventType() == EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, "The first event in a workflow's history should be %s, but was %s", - EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.name(), + EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.name(), firstEvent.getEventType().name()); return Optional.of(firstEvent); @@ -3399,12 +3389,17 @@ private void addExecutionSignaledEvent( .setIdentity(signalRequest.getIdentity()) .setInput(signalRequest.getInput()) .setSignalName(signalRequest.getSignalName()); - HistoryEvent executionSignaled = + + HistoryEvent.Builder event = HistoryEvent.newBuilder() - .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) - .setWorkflowExecutionSignaledEventAttributes(a) - .build(); - ctx.addEvent(executionSignaled); + .setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) + .setWorkflowExecutionSignaledEventAttributes(a); + + if (signalRequest.getLinksCount() > 0) { + event.addAllLinks(signalRequest.getLinksList()); + } + + ctx.addEvent(event.build()); } private void addExecutionSignaledByExternalEvent( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index d1670d2c4..48cb601f7 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -1325,6 +1325,7 @@ public void signalWithStartWorkflowExecution( .setControl(r.getControl()) .setNamespace(r.getNamespace()) .setIdentity(r.getIdentity()) + .addAllLinks(r.getLinksList()) .build(); if (mutableState != null && !mutableState.isTerminalState()) { mutableState.signal(signalRequest); @@ -1364,6 +1365,9 @@ public void signalWithStartWorkflowExecution( if (r.hasWorkflowStartDelay()) { startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay()); } + if (!r.getLinksList().isEmpty()) { + startRequest.addAllLinks(r.getLinksList()); + } StartWorkflowExecutionResponse startResult = startWorkflowExecutionImpl( diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/SignalLinksTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/SignalLinksTest.java new file mode 100644 index 000000000..61255b33a --- /dev/null +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/SignalLinksTest.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.testserver.functional; + +import static java.util.UUID.randomUUID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes; +import io.temporal.api.taskqueue.v1.TaskQueue; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.junit.Rule; +import org.junit.Test; + +public class SignalLinksTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestWorkflowImpl.class).build(); + + @Test + public void testSignalWithLinks() { + WorkflowStub stub = testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("TestWorkflow"); + WorkflowExecution execution = stub.start(); + + Link testLink = createTestLink(execution.getRunId()); + SignalWorkflowExecutionRequest signalRequest = + SignalWorkflowExecutionRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setWorkflowExecution(execution) + .setSignalName("test-signal") + .setInput(Payloads.newBuilder().build()) + .addLinks(testLink) + .build(); + + testWorkflowRule + .getWorkflowServiceStubs() + .blockingStub() + .signalWorkflowExecution(signalRequest); + + stub.getResult(Void.class); + + verifySignalLink(execution, testLink); + } + + @Test + public void testSignalWithStartLinks() { + String workflowId = "test-workflow-id"; + Link testLink = createTestLink("some-run-id"); + + SignalWithStartWorkflowExecutionRequest signalWithStartRequest = + SignalWithStartWorkflowExecutionRequest.newBuilder() + .setTaskQueue(TaskQueue.newBuilder().setName(testWorkflowRule.getTaskQueue()).build()) + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setWorkflowType(WorkflowType.newBuilder().setName("TestWorkflow").build()) + .setSignalInput(Payloads.newBuilder().build()) + .setRequestId(randomUUID().toString()) + .setSignalName("test-signal") + .setWorkflowId(workflowId) + .addLinks(testLink) + .build(); + + SignalWithStartWorkflowExecutionResponse response = + testWorkflowRule + .getWorkflowServiceStubs() + .blockingStub() + .signalWithStartWorkflowExecution(signalWithStartRequest); + + WorkflowExecution execution = + WorkflowExecution.newBuilder() + .setWorkflowId(workflowId) + .setRunId(response.getRunId()) + .build(); + + verifySignalLink(execution, testLink); + verifyStartEventLink(execution, testLink); + } + + private Link createTestLink(String runId) { + return Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setWorkflowId("someWorkflow") + .setNamespace("default") + .setRunId(runId) + .build()) + .build(); + } + + private void verifySignalLink(WorkflowExecution execution, Link expectedLink) { + GetWorkflowExecutionHistoryResponse history = getHistory(execution); + boolean foundSignalWithLink = false; + + for (HistoryEvent event : history.getHistory().getEventsList()) { + if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED) { + WorkflowExecutionSignaledEventAttributes attrs = + event.getWorkflowExecutionSignaledEventAttributes(); + if ("test-signal".equals(attrs.getSignalName())) { + assertEquals(1, event.getLinksCount()); + assertEquals(expectedLink, event.getLinks(0)); + foundSignalWithLink = true; + break; + } + } + } + + assertTrue("Should have found signal event with link", foundSignalWithLink); + } + + private void verifyStartEventLink(WorkflowExecution execution, Link expectedLink) { + GetWorkflowExecutionHistoryResponse history = getHistory(execution); + boolean foundStartWithLink = false; + + for (HistoryEvent event : history.getHistory().getEventsList()) { + if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) { + assertEquals("Link should be present on start event", 1, event.getLinksCount()); + assertEquals("Link in start event should match", expectedLink, event.getLinks(0)); + foundStartWithLink = true; + break; + } + } + + assertTrue("Should have found start event with link", foundStartWithLink); + } + + private GetWorkflowExecutionHistoryResponse getHistory(WorkflowExecution execution) { + return testWorkflowRule + .getWorkflowServiceStubs() + .blockingStub() + .getWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setExecution(execution) + .build()); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod(name = "TestWorkflow") + void run(); + } + + public static class TestWorkflowImpl implements TestWorkflow { + @Override + public void run() { + // Empty workflow that completes quickly + } + } +}