Skip to content

Commit

Permalink
Test and fix Deployment SAs (#7293)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Making sure the following SAs are populated correctly for Versioning
v3.1
- TemporalWorkerDeployment 
- TemporalWorkerDeploymentVersion 
- TemporalWorkflowVersioningBehavior
- BuildIds

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
ShahabT authored Feb 7, 2025
1 parent fdc39ca commit 850101f
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 73 deletions.
23 changes: 6 additions & 17 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,12 @@ func escapeChar(s, escape, delimiter string) string {
// will. The values used will be the override deployment_series and build_id if set, or versioningInfo.Deployment.
//
// If the workflow becomes unpinned or unversioned, this entry will be removed from that list.
func PinnedBuildIdSearchAttribute(deployment *deploymentpb.Deployment, version string) string {
if version != "" {
return fmt.Sprintf("%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapeChar(version, BuildIdSearchAttributeEscape, BuildIdSearchAttributeDelimiter),
)
} else if deployment != nil {
return fmt.Sprintf("%s%s%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapeChar(deployment.GetSeriesName(), BuildIdSearchAttributeEscape, BuildIdSearchAttributeDelimiter),
BuildIdSearchAttributeDelimiter,
escapeChar(deployment.GetBuildId(), BuildIdSearchAttributeEscape, BuildIdSearchAttributeDelimiter),
)
}
return ""
func PinnedBuildIdSearchAttribute(version string) string {
return fmt.Sprintf("%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
version,
)
}

// AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID
Expand Down
11 changes: 8 additions & 3 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,12 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
}

behavior := request.GetVersioningBehavior()
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && request.GetDeployment() == nil {
//nolint:staticcheck // SA1019 deprecated Deployment will clean up later
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED && request.GetDeployment() == nil &&
(request.GetDeploymentOptions() == nil || request.GetDeploymentOptions().GetWorkerVersioningMode() != enumspb.WORKER_VERSIONING_MODE_VERSIONED) {
// Mutable state wasn't changed yet and doesn't have to be cleared.
releaseLeaseWithError = false
return nil, serviceerror.NewInvalidArgument("deployment must be set when versioning behavior specified")
return nil, serviceerror.NewInvalidArgument("versioning behavior cannot be specified without deployment options being set with versioned mode")
}

assignedBuildId := ms.GetAssignedBuildId()
Expand Down Expand Up @@ -252,7 +254,10 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
}()

// It's an error if the workflow has used versioning in the past but this task has no versioning info.
if ms.GetMostRecentWorkerVersionStamp().GetUseVersioning() && !request.GetWorkerVersionStamp().GetUseVersioning() {
if ms.GetMostRecentWorkerVersionStamp().GetUseVersioning() &&
//nolint:staticcheck // SA1019 deprecated stamp will clean up later
!request.GetWorkerVersionStamp().GetUseVersioning() &&
request.GetDeploymentOptions().GetWorkerVersioningMode() != enumspb.WORKER_VERSIONING_MODE_VERSIONED {
// Mutable state wasn't changed yet and doesn't have to be cleared.
releaseLeaseWithError = false
return nil, serviceerror.NewInvalidArgument("Workflow using versioning must continue to use versioning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,9 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity(

// TODO: versioning 3 allows eager activity dispatch for both pinned and unpinned workflows, no
// special consideration is need. Remove the versioning logic from here. [cleanup-old-wv]
oldVersioningUsed := handler.mutableState.GetMostRecentWorkerVersionStamp().GetUseVersioning()
oldVersioningUsed := handler.mutableState.GetMostRecentWorkerVersionStamp().GetUseVersioning() &&
// for V3 versioning it's ok to dispatch eager activities
handler.mutableState.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED
newVersioningUsed := handler.mutableState.GetExecutionInfo().GetAssignedBuildId() != ""
versioningUsed := oldVersioningUsed || newVersioningUsed

Expand Down
27 changes: 12 additions & 15 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2957,22 +2957,19 @@ func (ms *MutableStateImpl) addBuildIdToLoadedSearchAttribute(
) []string {
var newValues []string
var buildId string
effectiveBehavior := ms.GetEffectiveVersioningBehavior()

behavior := ms.GetWorkflowVersioningBehaviorSA()

// set up the unversioned or assigned:x sentinels (versioning v2)
if !stamp.GetUseVersioning() && effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // unversioned workflows may still have non-nil deployment, so we don't check deployment
if !stamp.GetUseVersioning() && behavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // unversioned workflows may still have non-nil deployment, so we don't check deployment
newValues = append(newValues, worker_versioning.UnversionedSearchAttribute)
} else if ms.GetAssignedBuildId() != "" {
} else if ms.GetAssignedBuildId() != "" && behavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
newValues = append(newValues, worker_versioning.AssignedBuildIdSearchAttribute(ms.GetAssignedBuildId()))
}

// get the most up-to-date pinned entry put it at the front (v3 reachability and v3.1 drainage)
if effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
if ms.GetExecutionInfo().GetVersioningInfo().GetVersion() == "" { // deployments
newValues = append(newValues, worker_versioning.PinnedBuildIdSearchAttribute(ms.getPinnedDeployment(), ""))
} else { // worker deployments
newValues = append(newValues, worker_versioning.PinnedBuildIdSearchAttribute(nil, ms.getPinnedVersion()))
}
if behavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
newValues = append(newValues, worker_versioning.PinnedBuildIdSearchAttribute(ms.GetWorkerDeploymentVersionSA()))
}

// get the build id entry (all versions of versioning)
Expand Down Expand Up @@ -3084,7 +3081,10 @@ func (ms *MutableStateImpl) addBuildIdAndDeploymentInfoToSearchAttributesWithNoV
modifiedBuildIds := ms.addBuildIdToLoadedSearchAttribute(existingBuildIds, stamp)
modifiedDeployment := ms.GetWorkerDeploymentSA()
modifiedVersion := ms.GetWorkerDeploymentVersionSA()
modifiedBehavior := ms.GetWorkflowVersioningBehaviorSA()
modifiedBehavior := ""
if b := ms.GetWorkflowVersioningBehaviorSA(); b != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
modifiedBehavior = b.String()
}

// check equality
if slices.Equal(existingBuildIds, modifiedBuildIds) &&
Expand Down Expand Up @@ -7667,15 +7667,12 @@ func (ms *MutableStateImpl) GetWorkerDeploymentVersionSA() string {
return versioningInfo.GetVersion()
}

func (ms *MutableStateImpl) GetWorkflowVersioningBehaviorSA() string {
func (ms *MutableStateImpl) GetWorkflowVersioningBehaviorSA() enumspb.VersioningBehavior {
b := ms.executionInfo.GetVersioningInfo().GetBehavior()
if override := ms.executionInfo.GetVersioningInfo().GetVersioningOverride(); override != nil {
b = override.GetBehavior()
}
if b == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return ""
}
return b.String()
return b
}

func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTransition {
Expand Down
11 changes: 6 additions & 5 deletions service/worker/deployment/deployment_reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"fmt"
"time"

deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -99,10 +99,11 @@ func makeCountRequest(

func makeDeploymentQuery(seriesName, buildID string, open bool) string {
var statusFilter string
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
SeriesName: seriesName,
BuildId: buildID,
}, ""))
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(
worker_versioning.WorkerDeploymentVersionToString(&deploymentspb.WorkerDeploymentVersion{
DeploymentName: seriesName,
BuildId: buildID,
})))
if open {
statusFilter = "= 'Running'"
} else {
Expand Down
4 changes: 2 additions & 2 deletions service/worker/deployment/deployment_reachability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func TestMakeDeploymentQuery(t *testing.T) {
buildId := "A"

query := makeDeploymentQuery(seriesName, buildId, true)
expectedQuery := "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
expectedQuery := "BuildIds = 'pinned:test-deployment.A' AND ExecutionStatus = 'Running'"
assert.Equal(t, expectedQuery, query)

query = makeDeploymentQuery(seriesName, buildId, false)
expectedQuery = "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
expectedQuery = "BuildIds = 'pinned:test-deployment.A' AND ExecutionStatus != 'Running'"
assert.Equal(t, expectedQuery, query)
}

Expand Down
2 changes: 1 addition & 1 deletion service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (d *ClientImpl) GetVersionDrainageStatus(

func makeDeploymentQuery(version string) string {
var statusFilter string
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(nil, version))
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(version))
statusFilter = "= 'Running'"
return fmt.Sprintf("%s %s AND %s %s", searchattribute.BuildIds, deploymentFilter, searchattribute.ExecutionStatus, statusFilter)
}
Expand Down
7 changes: 4 additions & 3 deletions service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d *VersionWorkflowRunner) run(ctx workflow.Context) error {

// if we were draining and just continued-as-new, restart drainage child wf
if d.VersionState.GetDrainageInfo().GetStatus() == enumspb.VERSION_DRAINAGE_STATUS_DRAINING {
d.startDrainage(ctx, false)
d.startDrainage(ctx, true)
}

// Set up Query Handlers here:
Expand Down Expand Up @@ -232,12 +232,13 @@ func (d *VersionWorkflowRunner) handleUpdateVersionMetadata(ctx workflow.Context
}, nil
}

func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context, first bool) {
func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context, isCan bool) {
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE,
})
fut := workflow.ExecuteChildWorkflow(childCtx, WorkerDeploymentDrainageWorkflowType, &deploymentspb.DrainageWorkflowArgs{
Version: d.VersionState.Version,
IsCan: isCan,
})
d.drainageWorkflowFuture = &fut
}
Expand Down Expand Up @@ -580,7 +581,7 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl

// stopped accepting new workflows --> start drainage child wf
if wasAcceptingNewWorkflows && !isAcceptingNewWorkflows {
d.startDrainage(ctx, true)
d.startDrainage(ctx, false)
}

// started accepting new workflows --> stop drainage child wf if it exists
Expand Down
82 changes: 56 additions & 26 deletions tests/versioning_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/taskpoller"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/common/tqid"
Expand Down Expand Up @@ -185,6 +186,7 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) {
s.pollWftAndHandle(tv, false, wftCompleted,
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
s.verifyVersioningSAs(tv, vbPinned)
return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil
})

Expand All @@ -199,6 +201,7 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) {

<-wftCompleted
s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), tv.VersioningOverridePinned(), nil)
s.verifyVersioningSAs(tv, vbPinned, tv)
if sticky {
s.verifyWorkflowStickyQueue(tv.WithRunID(runID))
}
Expand Down Expand Up @@ -425,6 +428,7 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) {

<-wftCompleted
s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil)
s.verifyVersioningSAs(tv, vbUnpinned, tv)
if sticky {
s.verifyWorkflowStickyQueue(tv.WithRunID(runID))
}
Expand Down Expand Up @@ -1666,8 +1670,17 @@ func respondWftWithActivities(
Commands: commands,
StickyAttributes: stickyAttr,
ForceCreateNewWorkflowTask: false,
Deployment: tvWf.Deployment(),
VersioningBehavior: behavior,
DeploymentOptions: &deploymentpb.WorkerDeploymentOptions{
BuildId: tvWf.BuildID(),
DeploymentName: tvWf.DeploymentSeries(),
WorkerVersioningMode: enumspb.WORKER_VERSIONING_MODE_VERSIONED,
},
// TODO (shahab): remove stamp once build ID is added to wftc event
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
BuildId: tvWf.BuildID(),
UseVersioning: true,
},
}
}

Expand Down Expand Up @@ -1695,33 +1708,12 @@ func respondCompleteWorkflow(
},
},
ForceCreateNewWorkflowTask: false,
Deployment: tv.Deployment(),
VersioningBehavior: behavior,
}
}

func respondScheduleNexusOperation(
tv *testvars.TestVars,
behavior enumspb.VersioningBehavior,
endpointName string,
) *workflowservice.RespondWorkflowTaskCompletedRequest {
return &workflowservice.RespondWorkflowTaskCompletedRequest{
Commands: []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION,
Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{
ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{
Endpoint: endpointName,
Service: tv.Service(),
Operation: tv.Operation(),
Input: tv.Any().Payload(),
},
},
},
DeploymentOptions: &deploymentpb.WorkerDeploymentOptions{
BuildId: tv.BuildID(),
DeploymentName: tv.DeploymentSeries(),
WorkerVersioningMode: enumspb.WORKER_VERSIONING_MODE_VERSIONED,
},
ForceCreateNewWorkflowTask: false,
Deployment: tv.Deployment(),
VersioningBehavior: behavior,
}
}

Expand Down Expand Up @@ -2088,3 +2080,41 @@ func (s *Versioning3Suite) validateBacklogCount(
a.Equal(expectedCount, typeInfo.Stats.GetApproximateBacklogCount())
}, 6*time.Second, 100*time.Millisecond)
}

func (s *Versioning3Suite) verifyVersioningSAs(
tv *testvars.TestVars,
behavior enumspb.VersioningBehavior,
usedBuilds ...*testvars.TestVars,
) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

s.EventuallyWithT(func(t *assert.CollectT) {
query := fmt.Sprintf("WorkflowId = '%s' AND TemporalWorkerDeployment = '%s' AND TemporalWorkerDeploymentVersion= '%s' AND TemporalWorkflowVersioningBehavior = '%s'",
tv.WorkflowID(), tv.DeploymentSeries(), tv.DeploymentVersionString(), behavior.String())
resp, err := s.FrontendClient().ListWorkflowExecutions(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.Namespace().String(),
Query: query,
})
a := assert.New(t)
a.Nil(err)
if a.NotEmpty(resp.GetExecutions()) {
w := resp.GetExecutions()[0]
payload, ok := w.GetSearchAttributes().GetIndexedFields()["BuildIds"]
a.True(ok)
searchAttrAny, err := searchattribute.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, true)
a.NoError(err)
var searchAttr []string
if searchAttrAny != nil {
searchAttr = searchAttrAny.([]string)
}
if behavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
a.Contains(searchAttr, "pinned:"+tv.DeploymentVersionString())
}
for _, b := range usedBuilds {
a.Contains(searchAttr, "versioned:"+b.BuildID())
}
fmt.Println(resp.GetExecutions()[0])
}
}, 5*time.Second, 50*time.Millisecond)
}
3 changes: 3 additions & 0 deletions tests/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,9 @@ func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Unversioned_NoRamp() {

// Should see that the current version of the task queue becomes unversioned, and the unversioned ramping version of the task queue is removed
func (s *WorkerDeploymentSuite) TestSetCurrentVersion_Unversioned_PromoteUnversionedRamp() {
// TODO (Carly): fix flakiness and unskip
s.T().Skip()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
tv := testvars.New(s)
Expand Down

0 comments on commit 850101f

Please sign in to comment.