Skip to content

Commit

Permalink
Fix NDE caused by removing Workflow.getVersion with a succeeding Work… (
Browse files Browse the repository at this point in the history
#2370)

* Fix NDE caused by removing Workflow.getVersion with a succeeding Workflow.sideEffect

* Add comment
  • Loading branch information
Quinn-With-Two-Ns authored Jan 16, 2025
1 parent b593b35 commit 90e5125
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

package io.temporal.internal.history;

import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
Expand All @@ -42,6 +45,19 @@ public static boolean verifyMarkerName(HistoryEvent event, String markerName) {
return markerName.equals(attributes.getMarkerName());
}

/**
* @param command {@code Command} to inspect
* @param markerName expected marker name
* @return true if the command has a correct structure for a marker and an expected marker name
*/
public static boolean verifyMarkerName(Command command, String markerName) {
if (!CommandType.COMMAND_TYPE_RECORD_MARKER.equals(command.getCommandType())) {
return false;
}
RecordMarkerCommandAttributes attributes = command.getRecordMarkerCommandAttributes();
return markerName.equals(attributes.getMarkerName());
}

/**
* This method should be used to extract values from the marker persisted by the SDK itself. These
* values are converted using standard data converter to be always accessible by the SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.history;

import com.google.common.base.Preconditions;
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.history.v1.HistoryEvent;
Expand Down Expand Up @@ -56,6 +57,14 @@ public static boolean hasVersionMarkerStructure(HistoryEvent event) {
return MarkerUtils.verifyMarkerName(event, MARKER_NAME);
}

/**
* @param command {@code Command} to inspect
* @return true if the command has a correct structure for a version marker
*/
public static boolean hasVersionMarkerStructure(Command command) {
return MarkerUtils.verifyMarkerName(command, MARKER_NAME);
}

@Nullable
public static String getChangeId(MarkerRecordedEventAttributes markerAttributes) {
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_CHANGE_ID_KEY, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ public static Command createRecordMarker(RecordMarkerCommandAttributes attribute
.setRecordMarkerCommandAttributes(attributes)
.build();
}

public static Command createFakeMarkerCommand(String markerName) {
return createRecordMarker(
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.statemachines.StateMachineCommandUtils.createFakeMarkerCommand;
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -200,7 +201,7 @@ public void handleWorkflowTaskStarted() {
}

void createFakeCommand() {
addCommand(StateMachineCommandUtils.RECORD_MARKER_FAKE_COMMAND);
addCommand(createFakeMarkerCommand(VersionMarkerUtils.MARKER_NAME));
}

private void validateVersionAndThrow(boolean preloaded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,26 @@ private void handleCommandEvent(HistoryEvent event) {
continue;
}

// This checks if the next event is a version marker, but the next command is not a version
// marker. This can happen if a getVersion call was removed.
if (VersionMarkerUtils.hasVersionMarkerStructure(event)
&& !VersionMarkerUtils.hasVersionMarkerStructure(command.getCommand())) {
if (handleNonMatchingVersionMarker(event)) {
// this event is a version marker for removed getVersion call.
// Handle the version marker as unmatched and return even if there is no commands to match
// it against.
return;
} else {
throw new NonDeterministicException(
"Event "
+ event.getEventId()
+ " of type "
+ event.getEventType()
+ " does not"
+ " match command type "
+ command.getCommandType());
}
}
// Note that handleEvent can cause a command cancellation in case of
// 1. MutableSideEffect
// 2. Version State Machine during replay cancels the command and enters SKIPPED state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.versionTests;

import static org.junit.Assert.*;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import org.junit.Rule;
import org.junit.Test;

public class GetVersionRemovalBeforeMarkerTest {
private static boolean hasReplayed;

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestGetVersionRemovalWorkflowImpl.class)
.setActivityImplementations(new TestActivitiesImpl())
// Forcing a replay. Full history arrived from a normal queue causing a replay.
.setWorkerOptions(
WorkerOptions.newBuilder()
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
.build())
.build();

@Test
public void testSideEffectAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("SideEffect");
assertTrue(hasReplayed);
assertEquals("side effect", result);
}

@Test
public void testMutableSideEffectAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("MutableSideEffect");
assertTrue(hasReplayed);
assertEquals("mutable side effect", result);
}

@Test
public void testGetVersionAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("GetVersion");
assertTrue(hasReplayed);
assertEquals("6", result);
}

@Test
public void testLocalActivityAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("LocalActivity");
assertTrue(hasReplayed);
assertEquals("activity", result);
}

public static class TestGetVersionRemovalWorkflowImpl implements TestWorkflow1 {
private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
TestActivities.VariousTestActivities.class,
LocalActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(5))
.build());

@Override
public String execute(String action) {
// Test removing a version check in replaying code with an additional thread running.
if (!WorkflowUnsafe.isReplaying()) {
int version = Workflow.getVersion("changeId", 1, 2);
assertEquals(version, 2);
} else {
hasReplayed = true;
}
String result = "";
if (action.equals("SideEffect")) {
result = Workflow.sideEffect(String.class, () -> "side effect");
} else if (action.equals("MutableSideEffect")) {
result =
Workflow.mutableSideEffect(
"mutable-side-effect-i",
String.class,
(a, b) -> !a.equals(b),
() -> "mutable side effect");
} else if (action.equals("GetVersion")) {
int v = Workflow.getVersion("otherChangeId", 5, 6);
result = String.valueOf(v);
} else if (action.equals("LocalActivity")) {
result = activities.activity();
}
// Sleep to trigger at lest one more workflow task
Workflow.sleep(Duration.ofSeconds(1));
return result;
}
}
}

0 comments on commit 90e5125

Please sign in to comment.