From 0c8fbc320abcdb1597d0999367508d18cdeba966 Mon Sep 17 00:00:00 2001 From: Shahab Tajik Date: Thu, 23 Jan 2025 19:10:15 +0100 Subject: [PATCH 1/3] Fix flaky TestTransitionFromActivity (#7132) ## What changed? See title. ## Why? Flaky tests no good! ## How did you test it? ## Potential risks ## Documentation ## Is hotfix candidate? --- tests/versioning_3_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 473152212d9..7cfd78bd01c 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -588,7 +588,7 @@ func (s *Versioning3Suite) TestTransitionFromActivity_NoSticky() { } func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { - // Wf runs one TWF on d1 and schedules four activities, then: + // The workflow runs one WFT on d1 which schedules four activities, then: // 1. The first and second activities starts on d1 // 2. Current deployment becomes d2 // 3. The third activity is redirected to d2 and starts a transition in the wf, without being @@ -664,7 +664,7 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { // tasks might still be waiting behind the old deployment's poll channel. Partition manage should // immediately react to the deployment data changes, but there still is a race possible and the // only way to safeguard against it is to wait a little while before proceeding. - time.Sleep(time.Millisecond * 100) //nolint:forbidigo + time.Sleep(time.Millisecond * 200) //nolint:forbidigo // Pollers of d1 are there, but should not get any task go s.idlePollActivity(tv1, true, ver3MinPollTime, "activities should not go to the old deployment") @@ -878,8 +878,8 @@ func respondWftWithActivities( // TODO (shahab): tests with forced task forward take multiple seconds. Need to know why? ScheduleToCloseTimeout: durationpb.New(10 * time.Second), ScheduleToStartTimeout: durationpb.New(10 * time.Second), - StartToCloseTimeout: durationpb.New(1 * time.Second), - HeartbeatTimeout: durationpb.New(1 * time.Second), + StartToCloseTimeout: durationpb.New(3 * time.Second), + HeartbeatTimeout: durationpb.New(3 * time.Second), RequestEagerExecution: false, }, }, From 21503b73322e0a22502797ff1119ba7ed7b7e232 Mon Sep 17 00:00:00 2001 From: Hai Zhao <164949006+hai719@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:23:14 -0800 Subject: [PATCH 2/3] fix flaky xdc test TestCronWorkflowCompleteAndFailover (#7139) ## What changed? Wait for clusters to be synced instead of using Sleep. Make sure second run is started before failover. ## Why? Current implementation depends on the timing which is not reliable. ## How did you test it? Repeatedly run the test locally and no failure found. ## Potential risks ## Documentation ## Is hotfix candidate? --- tests/xdc/base.go | 2 +- tests/xdc/failover_test.go | 93 +++++++++++++++++++++++++++++++++++--- 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/tests/xdc/base.go b/tests/xdc/base.go index aaac17925d5..c28eb565775 100644 --- a/tests/xdc/base.go +++ b/tests/xdc/base.go @@ -255,7 +255,7 @@ func (s *xdcBaseSuite) failover( client workflowservice.WorkflowServiceClient, ) { // wait for replication task propagation - time.Sleep(4 * time.Second) + s.waitForClusterSynced() // update namespace to fail over updateReq := &workflowservice.UpdateNamespaceRequest{ diff --git a/tests/xdc/failover_test.go b/tests/xdc/failover_test.go index fd71c652cbf..da0aa63bf7e 100644 --- a/tests/xdc/failover_test.go +++ b/tests/xdc/failover_test.go @@ -844,8 +844,44 @@ func (s *FunctionalClustersTestSuite) TestTerminateFailover() { s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) s.NoError(err) + // check terminate done + getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: id, + }, + } + + s.WaitForHistory(` + 1 v1 WorkflowExecutionStarted + 2 v1 WorkflowTaskScheduled + 3 v1 WorkflowTaskStarted + 4 v1 WorkflowTaskCompleted + 5 v1 ActivityTaskScheduled`, + func() *historypb.History { + historyResponse, err := client1.GetWorkflowExecutionHistory(testcore.NewContext(), getHistoryReq) + s.NoError(err) + return historyResponse.History + }, 1*time.Second, 100*time.Millisecond, + ) + s.failover(namespace, s.clusterNames[1], int64(2), client1) + s.WaitForHistory(` + 1 v1 WorkflowExecutionStarted + 2 v1 WorkflowTaskScheduled + 3 v1 WorkflowTaskStarted + 4 v1 WorkflowTaskCompleted + 5 v1 ActivityTaskScheduled + 6 v2 ActivityTaskTimedOut + 7 v2 WorkflowTaskScheduled`, + func() *historypb.History { + historyResponse, err := client2.GetWorkflowExecutionHistory(testcore.NewContext(), getHistoryReq) + s.NoError(err) + return historyResponse.History + }, 5*time.Second, 100*time.Millisecond, + ) + // terminate workflow at cluster 2 terminateReason := "terminate reason" terminateDetails := payloads.EncodeString("terminate details") @@ -861,13 +897,6 @@ func (s *FunctionalClustersTestSuite) TestTerminateFailover() { s.NoError(err) // check terminate done - getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: id, - }, - } - s.WaitForHistory(` 1 v1 WorkflowExecutionStarted 2 v1 WorkflowTaskScheduled @@ -1896,6 +1925,54 @@ func (s *FunctionalClustersTestSuite) TestCronWorkflowStartAndFailover() { s.NoError(err) } +func (s *FunctionalClustersTestSuite) getLastEvent( + client workflowservice.WorkflowServiceClient, + namespace string, + execution *commonpb.WorkflowExecution, +) *historypb.HistoryEvent { + + resp, err := client.GetWorkflowExecutionHistory(testcore.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: namespace, + Execution: execution, + }) + s.NoError(err) + s.NotNil(resp.History) + s.NotEmpty(resp.History.Events) + + return resp.History.Events[len(resp.History.Events)-1] +} + +func (s *FunctionalClustersTestSuite) getNewExecutionRunIdFromLastEvent( + client workflowservice.WorkflowServiceClient, + namespace string, + execution *commonpb.WorkflowExecution, +) string { + lastEvent := s.getLastEvent(client, namespace, execution) + s.NotNil(lastEvent) + + if lastEvent.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED { + attrs := lastEvent.GetWorkflowExecutionCompletedEventAttributes() + s.NotNil(attrs) + return attrs.GetNewExecutionRunId() + } + return "" +} + +func (s *FunctionalClustersTestSuite) waitForNewRunToStart( + client workflowservice.WorkflowServiceClient, + namespace string, + execution *commonpb.WorkflowExecution, +) string { + var newRunID string + s.Eventually(func() bool { + newRunID = s.getNewExecutionRunIdFromLastEvent(client, namespace, execution) + return newRunID != "" + }, 10*time.Second, 100*time.Millisecond) + + s.NotEmpty(newRunID, "New run should have started") + return newRunID +} + func (s *FunctionalClustersTestSuite) TestCronWorkflowCompleteAndFailover() { namespace := "test-cron-workflow-complete-and-failover-" + common.GenerateRandomString(5) client1 := s.cluster1.FrontendClient() // active @@ -1988,6 +2065,8 @@ func (s *FunctionalClustersTestSuite) TestCronWorkflowCompleteAndFailover() { 4 v1 WorkflowTaskCompleted 5 v1 WorkflowExecutionCompleted`, events) + _ = s.waitForNewRunToStart(client1, namespace, executions[0]) + s.failover(namespace, s.clusterNames[1], int64(2), client1) _, err = poller2.PollAndProcessWorkflowTask() From a2d0e4cc3659965536539a10dc424506b8a3e421 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 30 Jan 2025 17:32:01 -0800 Subject: [PATCH 3/3] taskmatcher rewrite wip --- common/dynamicconfig/constants.go | 7 +- service/matching/ack_manager.go | 8 - service/matching/backlog_age_tracker.go | 71 ++ service/matching/backlog_manager.go | 9 +- service/matching/config.go | 17 +- service/matching/forwarder.go | 112 +-- service/matching/matcher.go | 759 ++++++++---------- service/matching/matcher_data.go | 516 ++++++++++++ service/matching/matcher_test.go | 4 + service/matching/matching_engine.go | 23 +- .../matching/physical_task_queue_manager.go | 38 +- .../physical_task_queue_manager_interface.go | 7 +- .../physical_task_queue_manager_mock.go | 66 +- .../physical_task_queue_manager_test.go | 2 +- service/matching/task.go | 63 +- .../matching/task_queue_partition_manager.go | 92 ++- .../task_queue_partition_manager_interface.go | 8 +- .../task_queue_partition_manager_mock.go | 28 +- service/matching/task_reader.go | 220 +++-- service/matching/task_validation.go | 5 +- service/matching/task_writer.go | 1 + service/matching/user_data_manager.go | 18 +- service/matching/user_data_manager_test.go | 2 +- 23 files changed, 1281 insertions(+), 795 deletions(-) create mode 100644 service/matching/backlog_age_tracker.go create mode 100644 service/matching/matcher_data.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 388a53b4732..6d34c9422c8 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1070,11 +1070,6 @@ See DynamicRateLimitingParams comments for more details.`, time.Minute, `MatchingLongPollExpirationInterval is the long poll expiration interval in the matching service`, ) - MatchingSyncMatchWaitDuration = NewTaskQueueDurationSetting( - "matching.syncMatchWaitDuration", - 200*time.Millisecond, - `MatchingSyncMatchWaitDuration is to wait time for sync match`, - ) MatchingHistoryMaxPageSize = NewNamespaceIntSetting( "matching.historyMaxPageSize", primitives.GetHistoryMaxPageSize, @@ -1175,7 +1170,7 @@ for VERSIONED queues.`, 1, `MatchingForwarderMaxOutstandingTasks is the max number of inflight addTask/queryTask from the forwarder`, ) - MatchingForwarderMaxRatePerSecond = NewTaskQueueIntSetting( + MatchingForwarderMaxRatePerSecond = NewTaskQueueFloatSetting( "matching.forwarderMaxRatePerSecond", 10, `MatchingForwarderMaxRatePerSecond is the max rate at which add/query can be forwarded`, diff --git a/service/matching/ack_manager.go b/service/matching/ack_manager.go index a3562f174ab..73727133910 100644 --- a/service/matching/ack_manager.go +++ b/service/matching/ack_manager.go @@ -26,7 +26,6 @@ package matching import ( "sync" - "sync/atomic" "github.com/emirpasic/gods/maps/treemap" godsutils "github.com/emirpasic/gods/utils" @@ -41,7 +40,6 @@ type ackManager struct { outstandingTasks *treemap.Map // TaskID->acked readLevel int64 // Maximum TaskID inserted into outstandingTasks ackLevel int64 // Maximum TaskID below which all tasks are acked - backlogCountHint atomic.Int64 logger log.Logger } @@ -68,7 +66,6 @@ func (m *ackManager) addTask(taskID int64) { m.logger.Fatal("Already present in outstanding tasks", tag.TaskID(taskID)) } m.outstandingTasks.Put(taskID, false) - m.backlogCountHint.Add(1) } func (m *ackManager) getReadLevel() int64 { @@ -135,7 +132,6 @@ func (m *ackManager) completeTask(taskID int64) int64 { // TODO the ack level management should be done by a dedicated coroutine // this is only a temporarily solution m.outstandingTasks.Put(taskID, true) - m.backlogCountHint.Add(-1) // Adjust the ack level as far as we can var numberOfAckedTasks int64 @@ -153,7 +149,3 @@ func (m *ackManager) completeTask(taskID int64) int64 { } return m.ackLevel } - -func (m *ackManager) getBacklogCountHint() int64 { - return m.backlogCountHint.Load() -} diff --git a/service/matching/backlog_age_tracker.go b/service/matching/backlog_age_tracker.go new file mode 100644 index 00000000000..b1cbb926de4 --- /dev/null +++ b/service/matching/backlog_age_tracker.go @@ -0,0 +1,71 @@ +// The MIT License +// +// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "time" + + "github.com/emirpasic/gods/maps/treemap" + godsutils "github.com/emirpasic/gods/utils" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const emptyBacklogAge time.Duration = -1 + +// backlogAgeTracker is not safe for concurrent use +type backlogAgeTracker struct { + tree treemap.Map // unix nano as int64 -> int (count) +} + +func newBacklogAgeTracker() backlogAgeTracker { + return backlogAgeTracker{tree: *treemap.NewWith(godsutils.Int64Comparator)} +} + +// record adds or removes a task from the tracker. +func (b backlogAgeTracker) record(ts *timestamppb.Timestamp, delta int) { + if ts == nil { + return + } + + createTime := ts.AsTime().UnixNano() + count := delta + if prev, ok := b.tree.Get(createTime); ok { + count += prev.(int) + } + if count = max(0, count); count == 0 { + b.tree.Remove(createTime) + } else { + b.tree.Put(createTime, count) + } +} + +// getBacklogAge returns the largest age in this backlog (age of oldest task), +// or emptyBacklogAge if empty. +func (b backlogAgeTracker) getAge() time.Duration { + if b.tree.Empty() { + return emptyBacklogAge + } + k, _ := b.tree.Min() + oldest := k.(int64) + return time.Since(time.Unix(0, oldest)) +} diff --git a/service/matching/backlog_manager.go b/service/matching/backlog_manager.go index ea6477acee0..558b207c227 100644 --- a/service/matching/backlog_manager.go +++ b/service/matching/backlog_manager.go @@ -181,15 +181,12 @@ func (c *backlogManagerImpl) SpoolTask(taskInfo *persistencespb.TaskInfo) error return err } -func (c *backlogManagerImpl) processSpooledTask( - ctx context.Context, - task *internalTask, -) error { - return c.pqMgr.ProcessSpooledTask(ctx, task) +func (c *backlogManagerImpl) addSpooledTask(ctx context.Context, task *internalTask) error { + return c.pqMgr.AddSpooledTask(ctx, task) } func (c *backlogManagerImpl) BacklogCountHint() int64 { - return c.taskAckManager.getBacklogCountHint() + return c.taskReader.getLoadedTasks() } func (c *backlogManagerImpl) BacklogStatus() *taskqueuepb.TaskQueueStatus { diff --git a/service/matching/config.go b/service/matching/config.go index ec852c79c0e..41135e3a7d9 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -32,6 +32,7 @@ import ( "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/tqid" ) @@ -46,7 +47,6 @@ type ( PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistenceDynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams] PersistenceQPSBurstRatio dynamicconfig.FloatPropertyFn - SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueFilter RPS dynamicconfig.IntPropertyFn OperatorRPSRatio dynamicconfig.FloatPropertyFn AlignMembershipChange dynamicconfig.DurationPropertyFn @@ -70,7 +70,7 @@ type ( BreakdownMetricsByBuildID dynamicconfig.BoolPropertyFnWithTaskQueueFilter ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskQueueFilter ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskQueueFilter - ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueFilter + ForwarderMaxRatePerSecond dynamicconfig.FloatPropertyFnWithTaskQueueFilter ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueFilter VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -124,13 +124,13 @@ type ( forwarderConfig struct { ForwarderMaxOutstandingPolls func() int ForwarderMaxOutstandingTasks func() int - ForwarderMaxRatePerSecond func() int + ForwarderMaxRatePerSecond func() float64 ForwarderMaxChildrenPerNode func() int } taskQueueConfig struct { forwarderConfig - SyncMatchWaitDuration func() time.Duration + CallerInfo headers.CallerInfo BacklogNegligibleAge func() time.Duration MaxWaitForPollerBeforeFwd func() time.Duration QueryPollerUnavailableWindow func() time.Duration @@ -213,7 +213,6 @@ func NewConfig( PersistencePerShardNamespaceMaxQPS: dynamicconfig.DefaultPerShardNamespaceRPSMax, PersistenceDynamicRateLimitingParams: dynamicconfig.MatchingPersistenceDynamicRateLimitingParams.Get(dc), PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc), - SyncMatchWaitDuration: dynamicconfig.MatchingSyncMatchWaitDuration.Get(dc), LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc), HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc), EnableDeployments: dynamicconfig.EnableDeployments.Get(dc), @@ -285,7 +284,8 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * taskType := tq.TaskType() return &taskQueueConfig{ - RangeSize: config.RangeSize, + CallerInfo: headers.NewBackgroundCallerInfo(ns.String()), + RangeSize: config.RangeSize, GetTasksBatchSize: func() int { return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType) }, @@ -298,9 +298,6 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * MinTaskThrottlingBurstSize: func() int { return config.MinTaskThrottlingBurstSize(ns.String(), taskQueueName, taskType) }, - SyncMatchWaitDuration: func() time.Duration { - return config.SyncMatchWaitDuration(ns.String(), taskQueueName, taskType) - }, BacklogNegligibleAge: func() time.Duration { return config.BacklogNegligibleAge(ns.String(), taskQueueName, taskType) }, @@ -353,7 +350,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * ForwarderMaxOutstandingTasks: func() int { return config.ForwarderMaxOutstandingTasks(ns.String(), taskQueueName, taskType) }, - ForwarderMaxRatePerSecond: func() int { + ForwarderMaxRatePerSecond: func() float64 { return config.ForwarderMaxRatePerSecond(ns.String(), taskQueueName, taskType) }, ForwarderMaxChildrenPerNode: func() int { diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index 92bd35d63ed..6a442055798 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -27,7 +27,6 @@ package matching import ( "context" "errors" - "sync/atomic" "time" enumspb "go.temporal.io/api/enums/v1" @@ -37,7 +36,6 @@ import ( "go.temporal.io/server/api/matchingservice/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" "go.temporal.io/server/common" - "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/tqid" "google.golang.org/protobuf/types/known/durationpb" ) @@ -50,36 +48,11 @@ type ( queue *PhysicalTaskQueueKey partition *tqid.NormalPartition client matchingservice.MatchingServiceClient - - // token channels that vend tokens necessary to make - // API calls exposed by forwarder. Tokens are used - // to enforce maxOutstanding forwarded calls from this - // instance. And channels are used so that the caller - // can use them in a select{} block along with other - // conditions - addReqToken atomic.Value - pollReqToken atomic.Value - - // cached values of maxOutstanding dynamic config values. - // these are used to detect changes - outstandingTasksLimit int32 - outstandingPollsLimit int32 - - // todo: implement a rate limiter that automatically - // adjusts rate based on ServiceBusy errors from API calls - limiter *quotas.DynamicRateLimiterImpl - } - // ForwarderReqToken is the token that must be acquired before - // making forwarder API calls. This type contains the state - // for the token itself - ForwarderReqToken struct { - ch chan *ForwarderReqToken } ) var ( errInvalidTaskQueueType = errors.New("unrecognized task queue type") - errForwarderSlowDown = errors.New("limit exceeded") ) // newForwarder returns an instance of Forwarder object which @@ -89,7 +62,6 @@ var ( // methods can return the following errors: // Returns following errors: // - taskqueue.ErrNoParent, taskqueue.ErrInvalidDegree: If this task queue doesn't have a parent to forward to -// - errForwarderSlowDown: When the rate limit is exceeded func newForwarder( cfg *forwarderConfig, queue *PhysicalTaskQueueKey, @@ -99,21 +71,12 @@ func newForwarder( if !ok { return nil, serviceerror.NewInvalidArgument("physical queue of normal partition expected") } - - fwdr := &Forwarder{ - cfg: cfg, - client: client, - partition: partition, - queue: queue, - outstandingTasksLimit: int32(cfg.ForwarderMaxOutstandingTasks()), - outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls()), - limiter: quotas.NewDefaultOutgoingRateLimiter( - func() float64 { return float64(cfg.ForwarderMaxRatePerSecond()) }, - ), - } - fwdr.addReqToken.Store(newForwarderReqToken(cfg.ForwarderMaxOutstandingTasks())) - fwdr.pollReqToken.Store(newForwarderReqToken(cfg.ForwarderMaxOutstandingPolls())) - return fwdr, nil + return &Forwarder{ + cfg: cfg, + client: client, + partition: partition, + queue: queue, + }, nil } // ForwardTask forwards an activity or workflow task to the parent task queue partition if it exists @@ -124,10 +87,6 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro return err } - if !fwdr.limiter.Allow() { - return errForwarderSlowDown - } - var expirationDuration *durationpb.Duration var expirationTime time.Time if task.event.Data.ExpiryTime != nil { @@ -179,7 +138,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro return errInvalidTaskQueueType } - return fwdr.handleErr(err) + return err } func (fwdr *Forwarder) getForwardInfo(task *internalTask) *taskqueuespb.TaskForwardInfo { @@ -190,14 +149,13 @@ func (fwdr *Forwarder) getForwardInfo(task *internalTask) *taskqueuespb.TaskForw return clone } // task is forwarded for the first time - forwardInfo := &taskqueuespb.TaskForwardInfo{ + return &taskqueuespb.TaskForwardInfo{ TaskSource: task.source, SourcePartition: fwdr.partition.RpcName(), DispatchBuildId: fwdr.queue.Version().BuildId(), DispatchVersionSet: fwdr.queue.Version().VersionSet(), RedirectInfo: task.redirectInfo, } - return forwardInfo } // ForwardQueryTask forwards a query task to parent task queue partition, if it exists @@ -222,7 +180,7 @@ func (fwdr *Forwarder) ForwardQueryTask( ForwardInfo: fwdr.getForwardInfo(task), }) - return resp, fwdr.handleErr(err) + return resp, err } // ForwardNexusTask forwards a nexus task to parent task queue partition, if it exists. @@ -243,7 +201,7 @@ func (fwdr *Forwarder) ForwardNexusTask(ctx context.Context, task *internalTask) ForwardInfo: fwdr.getForwardInfo(task), }) - return resp, fwdr.handleErr(err) + return resp, err } // ForwardPoll forwards a poll request to parent task queue partition if it exist @@ -273,7 +231,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada ForwardedSource: fwdr.partition.RpcName(), }) if err != nil { - return nil, fwdr.handleErr(err) + return nil, err } return newInternalStartedTask(&startedTaskInfo{workflowTaskInfo: resp}), nil case enumspb.TASK_QUEUE_TYPE_ACTIVITY: @@ -291,7 +249,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada ForwardedSource: fwdr.partition.RpcName(), }) if err != nil { - return nil, fwdr.handleErr(err) + return nil, err } return newInternalStartedTask(&startedTaskInfo{activityTaskInfo: resp}), nil case enumspb.TASK_QUEUE_TYPE_NEXUS: @@ -310,54 +268,10 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada ForwardedSource: fwdr.partition.RpcName(), }) if err != nil { - return nil, fwdr.handleErr(err) + return nil, err } return newInternalStartedTask(&startedTaskInfo{nexusTaskInfo: resp}), nil default: return nil, errInvalidTaskQueueType } } - -// AddReqTokenC returns a channel that can be used to wait for a token -// that's necessary before making a ForwardTask or ForwardQueryTask API call. -// After the API call is invoked, token.release() must be invoked -func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken { - fwdr.refreshTokenC(&fwdr.addReqToken, &fwdr.outstandingTasksLimit, int32(fwdr.cfg.ForwarderMaxOutstandingTasks())) - return fwdr.addReqToken.Load().(*ForwarderReqToken).ch -} - -// PollReqTokenC returns a channel that can be used to wait for a token -// that's necessary before making a ForwardPoll API call. After the API -// call is invoked, token.release() must be invoked -func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken { - fwdr.refreshTokenC(&fwdr.pollReqToken, &fwdr.outstandingPollsLimit, int32(fwdr.cfg.ForwarderMaxOutstandingPolls())) - return fwdr.pollReqToken.Load().(*ForwarderReqToken).ch -} - -func (fwdr *Forwarder) refreshTokenC(value *atomic.Value, curr *int32, maxLimit int32) { - currLimit := atomic.LoadInt32(curr) - if currLimit != maxLimit { - if atomic.CompareAndSwapInt32(curr, currLimit, maxLimit) { - value.Store(newForwarderReqToken(int(maxLimit))) - } - } -} - -func (fwdr *Forwarder) handleErr(err error) error { - if _, ok := err.(*serviceerror.ResourceExhausted); ok { - return errForwarderSlowDown - } - return err -} - -func newForwarderReqToken(maxOutstanding int) *ForwarderReqToken { - reqToken := &ForwarderReqToken{ch: make(chan *ForwarderReqToken, maxOutstanding)} - for i := 0; i < maxOutstanding; i++ { - reqToken.ch <- reqToken - } - return reqToken -} - -func (token *ForwarderReqToken) release() { - token.ch <- token -} diff --git a/service/matching/matcher.go b/service/matching/matcher.go index 8ea28fa64e6..59652dc7fa9 100644 --- a/service/matching/matcher.go +++ b/service/matching/matcher.go @@ -26,18 +26,22 @@ package matching import ( "context" - "errors" "math" "strconv" "sync" - "sync/atomic" "time" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/tqid" + "go.temporal.io/server/common/util" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -48,15 +52,12 @@ import ( type TaskMatcher struct { config *taskQueueConfig - // synchronous task channel to match producer/consumer - taskC chan *internalTask - // synchronous task channel to match query task - the reason to have a - // separate channel for this is that there are cases where consumers - // are interested in queryTasks but not others. One example is when a - // namespace is not active in a cluster. - queryTaskC chan *internalTask - // channel closed when task queue is closed, to interrupt pollers - closeC chan struct{} + // holds waiting polls and tasks + data matcherData + + // Background context used for forwarding tasks. Closed when task queue is closed. + matcherCtx context.Context + matcherCtxCancel context.CancelFunc // dynamicRate is the dynamic rate & burst for rate limiter dynamicRateBurst quotas.MutableRateBurst @@ -64,32 +65,46 @@ type TaskMatcher struct { dynamicRateLimiter *quotas.DynamicRateLimiterImpl // forceRefreshRateOnce is used to force refresh rate limit for first time forceRefreshRateOnce sync.Once - // rateLimiter that limits the rate at which tasks can be dispatched to consumers - rateLimiter quotas.RateLimiter - - fwdr *Forwarder - metricsHandler metrics.Handler // namespace metric scope - numPartitions func() int // number of task queue partitions - backlogTasksCreateTime map[int64]int // task creation time (unix nanos) -> number of tasks with that time - backlogTasksLock sync.Mutex - lastPoller atomic.Int64 // unix nanos of most recent poll start time + + partition tqid.Partition + fwdr *Forwarder + validator taskValidator + metricsHandler metrics.Handler // namespace metric scope + numPartitions func() int // number of task queue partitions +} + +type waitingPoller struct { + waitableMatchResult + startTime time.Time + forwardCtx context.Context // non-nil iff poll can be forwarded + pollMetadata *pollMetadata // non-nil iff poll can be forwarded + queryOnly bool // if true, poller can be given only query task, otherwise any task + isTaskForwarder bool + isTaskValidator bool +} + +type matchResult struct { + task *internalTask + poller *waitingPoller + ctxErr error // set if context timed out/canceled + ctxErrIdx int // index of context that closed first } const ( - defaultTaskDispatchRPS = 100000.0 - defaultTaskDispatchRPSTTL = time.Minute - emptyBacklogAge time.Duration = -1 + defaultTaskDispatchRPS = 100000.0 + defaultTaskDispatchRPSTTL = time.Minute + + // TODO(pri): make dynamic config + backlogTaskForwardTimeout = 60 * time.Second ) var ( - // Sentinel error to redirect while blocked in matcher. - errInterrupted = errors.New("interrupted offer") errNoRecentPoller = status.Error(codes.FailedPrecondition, "no poller seen for task queue recently, worker may be down") ) // newTaskMatcher returns a task matcher instance. The returned instance can be used by task producers and consumers to // find a match. Both sync matches and non-sync matches should use this implementation -func newTaskMatcher(config *taskQueueConfig, fwdr *Forwarder, metricsHandler metrics.Handler) *TaskMatcher { +func newTaskMatcher(config *taskQueueConfig, partition tqid.Partition, fwdr *Forwarder, validator taskValidator, metricsHandler metrics.Handler) *TaskMatcher { dynamicRateBurst := quotas.NewMutableRateBurst( defaultTaskDispatchRPS, int(defaultTaskDispatchRPS), @@ -107,23 +122,183 @@ func newTaskMatcher(config *taskQueueConfig, fwdr *Forwarder, metricsHandler met config.AdminNamespaceToPartitionDispatchRate, ), }) + + matcherCtx := headers.SetCallerInfo(context.Background(), config.CallerInfo) + matcherCtx, matcherCtxCancel := context.WithCancel(matcherCtx) + return &TaskMatcher{ - config: config, - dynamicRateBurst: dynamicRateBurst, - dynamicRateLimiter: dynamicRateLimiter, - rateLimiter: limiter, - metricsHandler: metricsHandler, - fwdr: fwdr, - taskC: make(chan *internalTask), - queryTaskC: make(chan *internalTask), - closeC: make(chan struct{}), - numPartitions: config.NumReadPartitions, - backlogTasksCreateTime: make(map[int64]int), + config: config, + data: matcherData{ + config: config, + rateLimiter: limiter, + backlogAge: newBacklogAgeTracker(), + }, + dynamicRateBurst: dynamicRateBurst, + dynamicRateLimiter: dynamicRateLimiter, + metricsHandler: metricsHandler, + partition: partition, + fwdr: fwdr, + validator: validator, + matcherCtx: matcherCtx, + matcherCtxCancel: matcherCtxCancel, + numPartitions: config.NumReadPartitions, + } +} + +func (tm *TaskMatcher) Start() { + policy := backoff.NewExponentialRetryPolicy(time.Second). + WithMaximumInterval(backlogTaskForwardTimeout). + WithExpirationInterval(backoff.NoInterval) + retrier := backoff.NewRetrier(policy, clock.NewRealTimeSource()) + lim := quotas.NewDefaultOutgoingRateLimiter(tm.config.ForwarderMaxRatePerSecond) + + if tm.fwdr == nil { + // Root doesn't forward. But it does need something to validate tasks. + go tm.validateTasksOnRoot(lim, retrier) + return + } + + // Child partitions: + for range tm.config.ForwarderMaxOutstandingTasks() { + go tm.forwardTasks(lim, retrier) + } + for range tm.config.ForwarderMaxOutstandingPolls() { + go tm.forwardPolls() } } func (tm *TaskMatcher) Stop() { - close(tm.closeC) + tm.matcherCtxCancel() +} + +func (tm *TaskMatcher) forwardTasks(lim quotas.RateLimiter, retrier backoff.Retrier) { + ctxs := []context.Context{tm.matcherCtx} + poller := waitingPoller{isTaskForwarder: true} + for { + if lim.Wait(tm.matcherCtx) != nil { + return + } + + res := tm.data.EnqueuePollerAndWait(ctxs, &poller) + if res.ctxErr != nil { + return // task queue closing + } + bugIf(res.task == nil, "bug: bad match result in forwardTasks") + + err := tm.forwardTask(res.task) + + // backoff on resource exhausted errors + if common.IsResourceExhausted(err) { + util.InterruptibleSleep(tm.matcherCtx, retrier.NextBackOff(err)) + } else { + retrier.Reset() + } + } +} + +func (tm *TaskMatcher) forwardTask(task *internalTask) error { + var ctx context.Context + var cancel context.CancelFunc + if task.forwardCtx != nil { + // Use sync match context if we have it (for deadline, headers, etc.) + // TODO(pri): does it make sense to subtract 1s from the context deadline here? + ctx = task.forwardCtx + } else { + // Task is from local backlog. + + // Before we forward, ask task validator. This will happen every backlogTaskForwardTimeout + // to the head of the backlog, which is what taskValidator expects. + maybeValid := tm.validator.maybeValidate(task.event.AllocatedTaskInfo, tm.fwdr.partition.TaskType()) + if !maybeValid { + task.finish(nil, false) + tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1) + return nil + } + + // Add a timeout for forwarding. + // Note that this does not block local match of other local backlog tasks. + ctx, cancel = context.WithTimeout(tm.matcherCtx, backlogTaskForwardTimeout) + defer cancel() + } + + if task.isQuery() { + res, err := tm.fwdr.ForwardQueryTask(ctx, task) + task.finishForward(res, err, true) + return err + } + + if task.isNexus() { + res, err := tm.fwdr.ForwardNexusTask(ctx, task) + task.finishForward(res, err, true) + return err + } + + // normal wf/activity task + err := tm.fwdr.ForwardTask(ctx, task) + + // If task was from our backlog, and forwarding timed out (as opposed to explicitly + // failed), then we should re-enqueue the task and let it match or forward again. + // Parent may explicitly return errRemoteSyncMatchFailed which is a Canceled error. + if task.forwardCtx == nil && (common.IsContextDeadlineExceededErr(err) || common.IsContextCanceledErr(err)) { + // Ask the task to redirect itself again. If not, add it back ourselves. + tm.data.RedirectOrEnqueue(task) + } else { + task.finishForward(nil, err, true) + } + return err +} + +func (tm *TaskMatcher) validateTasksOnRoot(lim quotas.RateLimiter, retrier backoff.Retrier) { + ctxs := []context.Context{tm.matcherCtx} + poller := &waitingPoller{isTaskForwarder: true, isTaskValidator: true} + for { + if lim.Wait(tm.matcherCtx) != nil { + return + } + + res := tm.data.EnqueuePollerAndWait(ctxs, poller) + if res.ctxErr != nil { + return // task queue closing + } + bugIf(res.task == nil, "bug: bad match result in validateTasksOnRoot") + + task := res.task + maybeValid := tm.validator.maybeValidate(task.event.AllocatedTaskInfo, tm.partition.TaskType()) + if !maybeValid { + // we found an invalid one, complete it and go back for another immediately + task.finish(nil, false) + tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1) + retrier.Reset() + } else { + // Task was valid, put it back and slow down checking. + tm.data.RedirectOrEnqueue(task) + util.InterruptibleSleep(tm.matcherCtx, retrier.NextBackOff(nil)) + } + } +} + +func (tm *TaskMatcher) forwardPolls() { + forwarderTask := &internalTask{isPollForwarder: true} + ctxs := []context.Context{tm.matcherCtx} + for { + res := tm.data.EnqueueTaskAndWait(ctxs, forwarderTask) + if res.ctxErr != nil { + return // task queue closing + } + bugIf(res.poller == nil, "bug: bad match result in forwardPolls") + + poller := res.poller + // We need to use the real source poller context since it has the poller id and + // identity, plus the right deadline. + task, err := tm.fwdr.ForwardPoll(poller.forwardCtx, poller.pollMetadata) + if err == nil { + tm.data.finishMatchAfterPollForward(poller, task) + } else { + // Re-enqueue to let it match again, if it hasn't gotten a context timeout already. + poller.forwardCtx = nil // disable forwarding next time + tm.data.ReenqueuePollerIfNotMatched(poller) + } + } } // Offer offers a task to a potential consumer (poller) @@ -156,267 +331,120 @@ func (tm *TaskMatcher) Stop() { // - context deadline is exceeded // - task is matched and consumer returns error in response channel func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error) { - if !tm.isBacklogNegligible() { - // To ensure better dispatch ordering, we block sync match when a significant backlog is present. - // Note that this check does not make a noticeable difference for history tasks, as they do not wait for a - // poller to become available. In presence of a backlog the chance of a poller being available when sync match - // request comes is almost zero. - // This check is mostly effective for the sync match requests that come from child partitions for spooled tasks. - return false, nil - } - - if !task.isForwarded() { - if err := tm.rateLimiter.Wait(ctx); err != nil { - metrics.SyncThrottlePerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return false, err - } - // because we waited on the rate limiter to offer this task, - // attach the rate limiter's RecycleToken func to the task - // so that if the task is later determined to be invalid, - // we can recycle the token it used. - task.recycleToken = tm.rateLimiter.RecycleToken - } - - select { - case tm.taskC <- task: // poller picked up the task - if task.responseC != nil { - // if there is a response channel, block until resp is received - // and return error if the response contains error - err := <-task.responseC - - if err == nil && !task.isForwarded() { - tm.emitDispatchLatency(task, false) - } - return true, err - } - return false, nil - default: - // no poller waiting for tasks, try forwarding this task to the - // root partition if possible - select { - case token := <-tm.fwdrAddReqTokenC(): - if err := tm.fwdr.ForwardTask(ctx, task); err == nil { + finish := func() (bool, error) { + res, ok := task.getResponse() + bugIf(!ok, "Offer must be given a sync match task") + if res.forwarded { + if res.forwardErr == nil { // task was remotely sync matched on the parent partition - token.release() tm.emitDispatchLatency(task, true) return true, nil } - token.release() - default: - if !tm.isForwardingAllowed() && // we are the root partition and forwarding is not possible - task.source == enumsspb.TASK_SOURCE_DB_BACKLOG && // task was from backlog (stored in db) - task.isForwarded() { // task came from a child partition - // a forwarded backlog task from a child partition, block trying - // to match with a poller until ctx timeout - return tm.offerOrTimeout(ctx, task) - } + return false, nil // forward error, give up here + } + // TODO(pri): can we just always do this on the parent and simplify this to: + // if res.startErr == nil { tm.emitDispatchLatency(task, task.isForwarded) } + // and get rid of the call above so there's only one? + if res.startErr == nil && !task.isForwarded() { + tm.emitDispatchLatency(task, false) } + return true, res.startErr + } + // Fast path if we have a waiting poller (or forwarder). + // Forwarding happens here if we match with the task forwarding poller. + task.forwardCtx = ctx + if canMatch, gotMatch := tm.data.MatchNextPoller(task); gotMatch { + return finish() + } else if !canMatch { return false, nil } -} -func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, task *internalTask) (bool, error) { - select { - case tm.taskC <- task: // poller picked up the task - if task.responseC != nil { - select { - case err := <-task.responseC: - return true, err - case <-ctx.Done(): - return false, nil - } - } + // We only block if we are the root and the task is forwarded from a backlog. + // Otherwise, stop here. + if tm.isForwardingAllowed() || + task.source != enumsspb.TASK_SOURCE_DB_BACKLOG || + !task.isForwarded() { return false, nil - case <-ctx.Done(): + } + + res := tm.data.EnqueueTaskAndWait([]context.Context{ctx, tm.matcherCtx}, task) + + if res.ctxErr != nil { return false, nil } + bugIf(res.poller == nil, "bug: bad match result in Offer") + return finish() } -func syncOfferTask[T any]( +func (tm *TaskMatcher) syncOfferTask( ctx context.Context, - tm *TaskMatcher, task *internalTask, - taskChan chan *internalTask, - forwardFunc func(context.Context, *internalTask) (T, error), returnNoPollerErr bool, -) (T, error) { - var t T - select { - case taskChan <- task: - <-task.responseC - return t, nil - default: - } - - fwdrTokenC := tm.fwdrAddReqTokenC() - var noPollerC <-chan time.Time - - for { - if returnNoPollerErr { - returnNoPollerErr = false // only do this once - if deadline, ok := ctx.Deadline(); ok && fwdrTokenC == nil { - // Reserving 1sec to customize the timeout error if user is querying a workflow - // without having started the workers. - noPollerTimeout := time.Until(deadline) - returnEmptyTaskTimeBudget - t := time.NewTimer(noPollerTimeout) - noPollerC = t.C - defer t.Stop() - } +) (any, error) { + ctxs := []context.Context{ctx, tm.matcherCtx} + + if returnNoPollerErr { + if deadline, ok := ctx.Deadline(); ok && tm.fwdr == nil { + // Reserving 1sec to customize the timeout error if user is querying a workflow + // without having started the workers. + noPollerDeadline := deadline.Add(-returnEmptyTaskTimeBudget) + noPollerCtx, cancel := context.WithDeadline(context.Background(), noPollerDeadline) + defer cancel() + ctxs = append(ctxs, noPollerCtx) } + } - select { - case taskChan <- task: - <-task.responseC - return t, nil - case token := <-fwdrTokenC: - resp, err := forwardFunc(ctx, task) - token.release() - if err == nil { - return resp, nil - } - if errors.Is(err, errForwarderSlowDown) { - // if we are rate limited, try only local match for the remainder of the context timeout - // left - fwdrTokenC = nil - continue + task.forwardCtx = ctx +again: + res := tm.data.EnqueueTaskAndWait(ctxs, task) + + if res.ctxErr != nil { + if res.ctxErrIdx == 2 { + // Index 2 is the noPollerCtx. Only error if there has not been a recent poller. + // Otherwise, let it wait for the remaining time hopping for a match, or ultimately + // returning the default CDE error. + if tm.data.TimeSinceLastPoll() > tm.config.QueryPollerUnavailableWindow() { + return nil, errNoRecentPoller } - return t, err - case <-noPollerC: - // only error if there has not been a recent poller. Otherwise, let it wait for the remaining time - // hopping for a match, or ultimately returning the default CDE error. - if tm.timeSinceLastPoll() > tm.config.QueryPollerUnavailableWindow() { - return t, errNoRecentPoller - } - continue - case <-ctx.Done(): - return t, ctx.Err() + ctxs = ctxs[:2] // remove noPollerCtx otherwise we'll fail immediately again + goto again } + return nil, res.ctxErr } + bugIf(res.poller == nil, "bug: bad match result in syncOfferTask") + response, ok := task.getResponse() + bugIf(!ok, "OfferQuery/OfferNexusTask must be given a sync match task") + // Note: if task was not forwarded, this will just be the zero value and nil. + // That's intended: the query/nexus handler in matchingEngine will wait for the real + // result separately. + return response.forwardRes, response.forwardErr } // OfferQuery will either match task to local poller or will forward query task. // Local match is always attempted before forwarding is attempted. If local match occurs // response and error are both nil, if forwarding occurs then response or error is returned. func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error) { - return syncOfferTask(ctx, tm, task, tm.queryTaskC, tm.fwdr.ForwardQueryTask, true) + res, err := tm.syncOfferTask(ctx, task, true) + if res != nil { // note res may be non-nil "any" containing nil pointer + return res.(*matchingservice.QueryWorkflowResponse), err + } + return nil, err } // OfferNexusTask either matchs a task to a local poller or forwards it if no local pollers available. // Local match is always attempted before forwarding. If local match occurs response and error are both nil, if // forwarding occurs then response or error is returned. func (tm *TaskMatcher) OfferNexusTask(ctx context.Context, task *internalTask) (*matchingservice.DispatchNexusTaskResponse, error) { - return syncOfferTask(ctx, tm, task, tm.taskC, tm.fwdr.ForwardNexusTask, false) -} - -// MustOffer blocks until a consumer is found to handle this task -// Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) -// The passed in context MUST NOT have a deadline associated with it -// Note that calling MustOffer is the only way that matcher knows there are spooled tasks in the -// backlog, in absence of a pending MustOffer call, the forwarding logic assumes that backlog is empty. -func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask, interruptCh <-chan struct{}) error { - tm.registerBacklogTask(task) - defer tm.unregisterBacklogTask(task) - - if err := tm.rateLimiter.Wait(ctx); err != nil { - return err - } - - // because we waited on the rate limiter to offer this task, - // attach the rate limiter's RecycleToken func to the task - // so that if the task is later determined to be invalid, - // we can recycle the token it used. - task.recycleToken = tm.rateLimiter.RecycleToken - - // attempt a match with local poller first. When that - // doesn't succeed, try both local match and remote match - select { - case tm.taskC <- task: - tm.emitDispatchLatency(task, false) - return nil - case <-ctx.Done(): - return ctx.Err() - default: + res, err := tm.syncOfferTask(ctx, task, true) + if res != nil { // note res may be non-nil "any" containing nil pointer + return res.(*matchingservice.DispatchNexusTaskResponse), err } + return nil, err +} - var reconsiderFwdTimer *time.Timer - defer func() { - if reconsiderFwdTimer != nil { - reconsiderFwdTimer.Stop() - } - }() - -forLoop: - for { - fwdTokenC := tm.fwdrAddReqTokenC() - reconsiderFwdTimer = nil - var reconsiderFwdTimerC <-chan time.Time - if fwdTokenC != nil && !tm.isBacklogNegligible() { - // If there is a non-negligible backlog, we stop forwarding to make sure - // root and leaf partitions are treated equally and can process their - // backlog at the same rate. Stopping task forwarding, prevent poll - // forwarding as well (in presence of a backlog). This ensures all partitions - // receive polls and tasks at the same rate. - - // Exception: we allow forward if this partition has not got any polls - // recently. This is helpful when there are very few pollers and they - // and they are all stuck in the wrong (root) partition. (Note that since - // frontend balanced the number of pending pollers per partition this only - // becomes an issue when the pollers are fewer than the partitions) - lp := tm.timeSinceLastPoll() - maxWaitForLocalPoller := tm.config.MaxWaitForPollerBeforeFwd() - if lp < maxWaitForLocalPoller { - fwdTokenC = nil - reconsiderFwdTimer = time.NewTimer(maxWaitForLocalPoller - lp) - reconsiderFwdTimerC = reconsiderFwdTimer.C - } - } - - select { - case tm.taskC <- task: - tm.emitDispatchLatency(task, false) - return nil - case token := <-fwdTokenC: - childCtx, cancel := context.WithTimeout(ctx, time.Second*2) - err := tm.fwdr.ForwardTask(childCtx, task) - token.release() - if err != nil { - metrics.ForwardTaskErrorsPerTaskQueue.With(tm.metricsHandler).Record(1) - // forwarder returns error only when the call is rate limited. To - // avoid a busy loop on such rate limiting events, we only attempt to make - // the next forwarded call after this childCtx expires. Till then, we block - // hoping for a local poller match - select { - case tm.taskC <- task: - cancel() - tm.emitDispatchLatency(task, false) - return nil - case <-childCtx.Done(): - case <-ctx.Done(): - cancel() - return ctx.Err() - case <-interruptCh: - cancel() - return errInterrupted - } - cancel() - continue forLoop - } - cancel() - // at this point, we forwarded the task to a parent partition which - // in turn dispatched the task to a poller, because there was no error. - // Make sure we delete the task from the database. - task.finish(nil, true) - tm.emitDispatchLatency(task, true) - return nil - case <-ctx.Done(): - return ctx.Err() - case <-reconsiderFwdTimerC: - continue forLoop - case <-interruptCh: - return errInterrupted - } - } +func (tm *TaskMatcher) AddTask(task *internalTask) { + tm.data.EnqueueTaskNoWait(task) } func (tm *TaskMatcher) emitDispatchLatency(task *internalTask, forwarded bool) { @@ -435,15 +463,17 @@ func (tm *TaskMatcher) emitDispatchLatency(task *internalTask, forwarded bool) { // On success, the returned task could be a query task or a regular task // Returns errNoTasks when context deadline is exceeded func (tm *TaskMatcher) Poll(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error) { - task, _, err := tm.poll(ctx, pollMetadata, false) - return task, err + return tm.poll(ctx, pollMetadata, false) } // PollForQuery blocks until a *query* task is found or context deadline is exceeded // Returns errNoTasks when context deadline is exceeded func (tm *TaskMatcher) PollForQuery(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error) { - task, _, err := tm.poll(ctx, pollMetadata, true) - return task, err + return tm.poll(ctx, pollMetadata, true) +} + +func (tm *TaskMatcher) RecheckAllRedirects() { + tm.data.RecheckAllRedirects() } // UpdateRatelimit updates the task dispatch rate @@ -477,190 +507,62 @@ func (tm *TaskMatcher) UpdateRatelimit(rpsPtr *float64) { // Rate returns the current rate at which tasks are dispatched func (tm *TaskMatcher) Rate() float64 { - return tm.rateLimiter.Rate() + return tm.data.rateLimiter.Rate() } func (tm *TaskMatcher) poll( ctx context.Context, pollMetadata *pollMetadata, queryOnly bool, -) (task *internalTask, forwardedPoll bool, err error) { - taskC, queryTaskC := tm.taskC, tm.queryTaskC - if queryOnly { - taskC = nil - } - +) (*internalTask, error) { start := time.Now() - tm.lastPoller.Store(start.UnixNano()) + pollWasForwarded := false defer func() { + // TODO(pri): can we consolidate all the metrics code below? if pollMetadata.forwardedFrom == "" { // Only recording for original polls metrics.PollLatencyPerTaskQueue.With(tm.metricsHandler).Record( - time.Since(start), metrics.StringTag("forwarded", strconv.FormatBool(forwardedPoll))) - } - - if err == nil { - tm.emitForwardedSourceStats(task.isForwarded(), pollMetadata.forwardedFrom, forwardedPoll) + time.Since(start), metrics.StringTag("forwarded", strconv.FormatBool(pollWasForwarded))) } }() - // We want to effectively do a prioritized select, but Go select is random - // if multiple cases are ready, so split into multiple selects. - // The priority order is: - // 1. ctx.Done or tm.closeC - // 2. taskC and queryTaskC - // 3. forwarding - // 4. block looking locally for remainder of context lifetime - // To correctly handle priorities and allow any case to succeed, all select - // statements except for the last one must be non-blocking, and the last one - // must include all the previous cases. - - // 1. ctx.Done - select { - case <-ctx.Done(): - metrics.PollTimeoutPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return nil, false, errNoTasks - case <-tm.closeC: - return nil, false, errNoTasks - default: - } - - // 2. taskC and queryTaskC - select { - case task := <-taskC: - if task.responseC != nil { - metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - } - metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil - case task := <-queryTaskC: - metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil - default: + ctxs := []context.Context{ctx, tm.matcherCtx} + poller := &waitingPoller{ + startTime: start, + queryOnly: queryOnly, + forwardCtx: ctx, + pollMetadata: pollMetadata, } + res := tm.data.EnqueuePollerAndWait(ctxs, poller) - if tm.isBacklogNegligible() { - // 3. forwarding (and all other clauses repeated) - // We don't forward pollers if there is a non-negligible backlog in this partition. - select { - case <-ctx.Done(): + if res.ctxErr != nil { + if res.ctxErrIdx == 0 { metrics.PollTimeoutPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return nil, false, errNoTasks - case <-tm.closeC: - return nil, false, errNoTasks - case task := <-taskC: - if task.responseC != nil { - metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - } - metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil - case task := <-queryTaskC: - metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil - case token := <-tm.fwdrPollReqTokenC(): - // Arrange to cancel this request if closeC is closed - fwdCtx, cancel := contextWithCancelOnChannelClose(ctx, tm.closeC) - task, err := tm.fwdr.ForwardPoll(fwdCtx, pollMetadata) - cancel() - token.release() - if err == nil { - return task, true, nil - } } + return nil, errNoTasks } + bugIf(res.task == nil, "bug: bad match result in poll") - // 4. blocking local poll - select { - case <-ctx.Done(): - metrics.PollTimeoutPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return nil, false, errNoTasks - case <-tm.closeC: - return nil, false, errNoTasks - case task := <-taskC: - if task.responseC != nil { + task := res.task + pollWasForwarded = task.isStarted() + + if !task.isQuery() { + if task.isSyncMatchTask() { metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) } metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil - case task := <-queryTaskC: + } else { metrics.PollSuccessWithSyncPerTaskQueueCounter.With(tm.metricsHandler).Record(1) metrics.PollSuccessPerTaskQueueCounter.With(tm.metricsHandler).Record(1) - return task, false, nil } -} + tm.emitForwardedSourceStats(task.isForwarded(), pollMetadata.forwardedFrom, pollWasForwarded) -func (tm *TaskMatcher) fwdrPollReqTokenC() <-chan *ForwarderReqToken { - if tm.fwdr == nil { - return nil - } - return tm.fwdr.PollReqTokenC() -} - -func (tm *TaskMatcher) fwdrAddReqTokenC() <-chan *ForwarderReqToken { - if tm.fwdr == nil { - return nil - } - return tm.fwdr.AddReqTokenC() + return task, nil } func (tm *TaskMatcher) isForwardingAllowed() bool { return tm.fwdr != nil } -// isBacklogNegligible returns true of the age of backlog is less than the threshold. Note that this relies on -// MustOffer being called when there is a backlog, otherwise we'd not know. -func (tm *TaskMatcher) isBacklogNegligible() bool { - return tm.getBacklogAge() < tm.config.BacklogNegligibleAge() -} - -func (tm *TaskMatcher) registerBacklogTask(task *internalTask) { - if task.event.Data.CreateTime == nil { - return // should not happen but for safety - } - - tm.backlogTasksLock.Lock() - defer tm.backlogTasksLock.Unlock() - - ts := timestamp.TimeValue(task.event.Data.CreateTime).UnixNano() - tm.backlogTasksCreateTime[ts] += 1 -} - -func (tm *TaskMatcher) unregisterBacklogTask(task *internalTask) { - if task.event.Data.CreateTime == nil { - return // should not happen but for safety - } - - tm.backlogTasksLock.Lock() - defer tm.backlogTasksLock.Unlock() - - ts := timestamp.TimeValue(task.event.Data.CreateTime).UnixNano() - counter := tm.backlogTasksCreateTime[ts] - if counter == 1 { - delete(tm.backlogTasksCreateTime, ts) - } else { - tm.backlogTasksCreateTime[ts] = counter - 1 - } -} - -// getBacklogAge is the latest age across all backlogs re-directing to this matcher; may momentarily -// be 0 cause of race conditions when no reader pushes a task into the matcher at this moment -func (tm *TaskMatcher) getBacklogAge() time.Duration { - tm.backlogTasksLock.Lock() - defer tm.backlogTasksLock.Unlock() - - if len(tm.backlogTasksCreateTime) == 0 { - return emptyBacklogAge - } - - oldest := int64(math.MaxInt64) - for createTime := range tm.backlogTasksCreateTime { - oldest = min(oldest, createTime) - } - - return time.Since(time.Unix(0, oldest)) -} - func (tm *TaskMatcher) emitForwardedSourceStats( isTaskForwarded bool, pollForwardedSource string, @@ -684,22 +586,3 @@ func (tm *TaskMatcher) emitForwardedSourceStats( metrics.LocalToLocalMatchPerTaskQueueCounter.With(tm.metricsHandler).Record(1) } } - -func (tm *TaskMatcher) timeSinceLastPoll() time.Duration { - return time.Since(time.Unix(0, tm.lastPoller.Load())) -} - -// contextWithCancelOnChannelClose returns a child Context and CancelFunc just like -// context.WithCancel, but additionally propagates cancellation from another channel (besides -// the parent's cancellation channel). -func contextWithCancelOnChannelClose(parent context.Context, closeC <-chan struct{}) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(parent) - go func() { - select { - case <-closeC: - cancel() - case <-ctx.Done(): - } - }() - return ctx, cancel -} diff --git a/service/matching/matcher_data.go b/service/matching/matcher_data.go new file mode 100644 index 00000000000..cc7adee9610 --- /dev/null +++ b/service/matching/matcher_data.go @@ -0,0 +1,516 @@ +// The MIT License +// +// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "container/heap" + "context" + "slices" + "sync" + "time" + + enumspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/util" +) + +type matcherData struct { + config *taskQueueConfig + rateLimiter quotas.RateLimiter // whole-queue rate limiter + + lock sync.Mutex // covers everything below, and all fields in any waitableMatchResult + + rateLimitTimer resettableTimer + reconsiderForwardTimer resettableTimer + + // waiting pollers and tasks + // invariant: all pollers and tasks in these data structures have matchResult == nil + pollers pollerPQ + tasks taskPQ + + // backlogAge holds task create time for tasks from merged local backlogs (not forwarded). + // note that matcherData may get tasks from multiple versioned backlogs due to + // versioning redirection. + backlogAge backlogAgeTracker + lastPoller time.Time // most recent poll start time +} + +type pollerPQ struct { + heap []*waitingPoller +} + +func (p *pollerPQ) Len() int { + return len(p.heap) +} + +func (p *pollerPQ) Less(i int, j int) bool { + a, b := p.heap[i], p.heap[j] + if !(a.isTaskForwarder || a.isTaskValidator) && (b.isTaskForwarder || b.isTaskValidator) { + return true + } else if a.startTime.Before(b.startTime) { + return true + } + return false +} + +func (p *pollerPQ) Swap(i int, j int) { + p.heap[i], p.heap[j] = p.heap[j], p.heap[i] + p.heap[i].matchHeapIndex = i + p.heap[j].matchHeapIndex = j +} + +func (p *pollerPQ) Push(x any) { + poller := x.(*waitingPoller) + poller.matchHeapIndex = len(p.heap) + p.heap = append(p.heap, poller) +} + +func (p *pollerPQ) Pop() any { + last := len(p.heap) - 1 + poller := p.heap[last] + p.heap = p.heap[:last] + poller.matchHeapIndex = -11 + return poller +} + +type taskPQ struct { + heap []*internalTask +} + +func (t *taskPQ) Len() int { + return len(t.heap) +} + +func (t *taskPQ) Less(i int, j int) bool { + a, b := t.heap[i], t.heap[j] + if !a.isPollForwarder && b.isPollForwarder { + return true + } + // TODO(pri): use priority, task id, etc. + return false +} + +func (t *taskPQ) Swap(i int, j int) { + t.heap[i], t.heap[j] = t.heap[j], t.heap[i] + t.heap[i].matchHeapIndex = i + t.heap[j].matchHeapIndex = j +} + +func (t *taskPQ) Push(x any) { + task := x.(*internalTask) + task.matchHeapIndex = len(t.heap) + t.heap = append(t.heap, task) +} + +func (t *taskPQ) Pop() any { + last := len(t.heap) - 1 + task := t.heap[last] + t.heap = t.heap[:last] + task.matchHeapIndex = -13 + return task +} + +func (d *matcherData) EnqueueTaskNoWait(task *internalTask) { + d.lock.Lock() + defer d.lock.Unlock() + + if task.source == enumspb.TASK_SOURCE_DB_BACKLOG { + d.backlogAge.record(task.event.Data.CreateTime, 1) + } + + task.initMatch(d) + heap.Push(&d.tasks, task) + d.match() +} + +func (d *matcherData) EnqueueTaskAndWait(ctxs []context.Context, task *internalTask) *matchResult { + d.lock.Lock() + defer d.lock.Unlock() + + // add and look for match + task.initMatch(d) + heap.Push(&d.tasks, task) + d.match() + + // if already matched, return + if task.matchResult != nil { + return task.matchResult + } + + // arrange to wake up on context close + for i, ctx := range ctxs { + stop := context.AfterFunc(ctx, func() { + d.lock.Lock() + defer d.lock.Unlock() + + if task.matchResult == nil { + heap.Remove(&d.tasks, task.matchHeapIndex) + task.wake(&matchResult{ctxErr: ctx.Err(), ctxErrIdx: i}) + } + }) + defer stop() + } + + return task.waitForMatch() +} + +func (d *matcherData) ReenqueuePollerIfNotMatched(poller *waitingPoller) { + d.lock.Lock() + defer d.lock.Unlock() + + if poller.matchResult == nil { + heap.Push(&d.pollers, poller) + d.match() + } +} + +func (d *matcherData) EnqueuePollerAndWait(ctxs []context.Context, poller *waitingPoller) *matchResult { + d.lock.Lock() + defer d.lock.Unlock() + + // update this for timeSinceLastPoll + d.lastPoller = util.MaxTime(d.lastPoller, poller.startTime) + + // add and look for match + poller.initMatch(d) + heap.Push(&d.pollers, poller) + d.match() + + // if already matched, return + if poller.matchResult != nil { + return poller.matchResult + } + + // arrange to wake up on context close + for i, ctx := range ctxs { + stop := context.AfterFunc(ctx, func() { + d.lock.Lock() + defer d.lock.Unlock() + + if poller.matchResult == nil { + // if poll was being forwarded, it would be absent from heap even though + // matchResult == nil + if poller.matchHeapIndex >= 0 { + heap.Remove(&d.pollers, poller.matchHeapIndex) + } + poller.wake(&matchResult{ctxErr: ctx.Err(), ctxErrIdx: i}) + } + }) + defer stop() + } + + return poller.waitForMatch() +} + +func (d *matcherData) MatchNextPoller(task *internalTask) (canSyncMatch, gotSyncMatch bool) { + d.lock.Lock() + defer d.lock.Unlock() + + if !d.isBacklogNegligible() { + // To ensure better dispatch ordering, we block sync match when a significant backlog is present. + // Note that this check does not make a noticeable difference for history tasks, as they do not wait for a + // poller to become available. In presence of a backlog the chance of a poller being available when sync match + // request comes is almost zero. + // This check is mostly effective for the sync match requests that come from child partitions for spooled tasks. + return false, false + } + + task.initMatch(d) + heap.Push(&d.tasks, task) + d.match() + // don't wait, check if match() picked this one already + if task.matchResult != nil { + return true, true + } + heap.Remove(&d.tasks, task.matchHeapIndex) + return true, false +} + +func (d *matcherData) RecheckAllRedirects() { + d.lock.Lock() + + // TODO(pri): do we have to do _all_ backlog tasks or can we determine somehow which are + // potentially redirected? + redirectable := make([]*internalTask, 0, len(d.tasks.heap)) + d.tasks.heap = slices.DeleteFunc(d.tasks.heap, func(task *internalTask) bool { + if task.forwardInfo.GetTaskSource() == enumspb.TASK_SOURCE_DB_BACKLOG { + // forwarded from a backlog, kick this back to the child so they can redirect it, + // by faking a context cancel + task.matchHeapIndex = -13 + task.wake(&matchResult{ctxErr: context.Canceled}) + return true + } else if task.checkRedirect != nil { + redirectable = append(redirectable, task) + if task.source == enumspb.TASK_SOURCE_DB_BACKLOG { + d.backlogAge.record(task.event.Data.CreateTime, -1) + } + task.matchHeapIndex = -13 + return true + } + return false + }) + + // fix indexes and re-establish heap + for i, task := range d.tasks.heap { + task.matchHeapIndex = i + } + heap.Init(&d.tasks) + + d.lock.Unlock() + + // re-redirect them all (or put back if redirect fails) + for _, task := range redirectable { + d.RedirectOrEnqueue(task) + } +} + +func (d *matcherData) RedirectOrEnqueue(task *internalTask) { + if task.checkRedirect == nil || !task.checkRedirect() { + d.EnqueueTaskNoWait(task) + } +} + +// findMatch should return the highest priority task+poller match even if the per-task rate +// limit doesn't allow the task to be matched yet. +// call with lock held +func (d *matcherData) findMatch(allowForwarding bool) (*internalTask, *waitingPoller) { + // FIXME: optimize so it's not O(d*n) worst case + // FIXME: this isn't actually correct + for _, task := range d.tasks.heap { + if !allowForwarding && task.isPollForwarder { + continue + } + + for _, poller := range d.pollers.heap { + // can't match cases: + if poller.queryOnly && !(task.isQuery() || task.isPollForwarder) { + continue + } else if task.isPollForwarder && poller.forwardCtx == nil { + continue + } else if poller.isTaskForwarder && !allowForwarding { + continue + } else if poller.isTaskValidator && task.forwardCtx != nil { + continue + } + + return task, poller + } + } + return nil, nil +} + +// call with lock held +func (d *matcherData) allowForwarding() (allowForwarding bool) { + // If there is a non-negligible backlog, we pause forwarding to make sure + // root and leaf partitions are treated equally and can process their + // backlog at the same rate. Stopping task forwarding, prevent poll + // forwarding as well (in presence of a backlog). This ensures all partitions + // receive polls and tasks at the same rate. + // + // Exception: we allow forward if this partition has not got any polls + // recently. This is helpful when there are very few pollers and they + // and they are all stuck in the wrong (root) partition. (Note that since + // frontend balanced the number of pending pollers per partition this only + // becomes an issue when the pollers are fewer than the partitions) + // + // If allowForwarding was false and changes to true due solely to the passage + // of time, then we should ensure that match() is called again so that + // pending tasks/polls can now be forwarded. When does that happen? if + // isBacklogNegligible changes from false to true, or if we no longer have + // recent polls. + // + // With time, backlog age gets larger, so isBacklogNegligible can go from + // true to false and not the other way, so that's safe. But it is possible + // that we no longer have recent polls. So we need to ensure that match() is + // called again in that case, using reconsiderForwardTimer. + defer func() { + if allowForwarding { + d.reconsiderForwardTimer.unset() + } + }() + if d.isBacklogNegligible() { + return true + } + delayToForwardingAllowed := d.config.MaxWaitForPollerBeforeFwd() - time.Since(d.lastPoller) + d.reconsiderForwardTimer.set(d.rematch, delayToForwardingAllowed) + return delayToForwardingAllowed <= 0 +} + +// call with lock held +func (d *matcherData) match() { + allowForwarding := d.allowForwarding() + + for { + // search for highest priority match + task, poller := d.findMatch(allowForwarding) + if task == nil || poller == nil { + // no more current matches, stop timers if they were running + d.rateLimitTimer.unset() + d.reconsiderForwardTimer.unset() + return + } + + // got task, check rate limit + if !d.rateLimitTask(task) { + return // not ready yet, timer will call match later + } + + // ready to signal match + heap.Remove(&d.tasks, task.matchHeapIndex) + heap.Remove(&d.pollers, poller.matchHeapIndex) + + res := &matchResult{task: task, poller: poller} + + task.wake(res) + // for poll forwarder: skip waking poller, forwarder will call finishMatchAfterPollForward + if !task.isPollForwarder { + poller.wake(res) + } + // TODO(pri): consider having task forwarding work the same way, with a half-match, + // instead of full match and then pass forward result on response channel? + // TODO(pri): maybe consider leaving tasks and polls in the heap while forwarding and + // allow them to be matched locally while forwarded (and then cancel the forward)? + + if task.source == enumspb.TASK_SOURCE_DB_BACKLOG { + d.backlogAge.record(task.event.Data.CreateTime, -1) + } + } +} + +// call with lock held +// returns true if task can go now +func (d *matcherData) rateLimitTask(task *internalTask) bool { + if task.recycleToken != nil { + // we use task.recycleToken as a signal that we've already got a token for this task, + // so the next time we see it we'll skip the wait. + return true + } else if task.isForwarded() { + // don't count rate limit for forwarded tasks, it was counted on the child + return true + } + + delay := d.rateLimiter.Reserve().Delay() + task.recycleToken = d.rateLimiter.RecycleToken + + // TODO: account for per-priority/fairness key rate limits also, e.g. + // delay = max(delay, perTaskDelay) + + d.rateLimitTimer.set(d.rematch, delay) + return delay <= 0 +} + +// called from timer +func (d *matcherData) rematch() { + d.lock.Lock() + defer d.lock.Unlock() + d.match() +} + +func (d *matcherData) finishMatchAfterPollForward(poller *waitingPoller, task *internalTask) { + d.lock.Lock() + defer d.lock.Unlock() + + if poller.matchResult == nil { + poller.wake(&matchResult{task: task, poller: poller}) + } +} + +// isBacklogNegligible returns true if the age of the task backlog is less than the threshold. +// call with lock held. +func (d *matcherData) isBacklogNegligible() bool { + return d.backlogAge.getAge() < d.config.BacklogNegligibleAge() +} + +func (d *matcherData) TimeSinceLastPoll() time.Duration { + d.lock.Lock() + defer d.lock.Unlock() + return time.Since(d.lastPoller) +} + +// waitable match result: + +type waitableMatchResult struct { + // these fields are under matcherData.lock even though they're embedded in other structs + matchCond sync.Cond + matchResult *matchResult + matchHeapIndex int // current heap index for easy removal +} + +func (w *waitableMatchResult) initMatch(d *matcherData) { + w.matchCond.L = &d.lock + w.matchResult = nil +} + +// call with matcherData.lock held. +// w.matchResult must be nil (can't call wake twice). +// w must not be in queues anymore. +func (w *waitableMatchResult) wake(res *matchResult) { + bugIf(w.matchResult != nil, "bug: wake called twice") + bugIf(w.matchHeapIndex >= 0, "bug: wake called but still in heap") + w.matchResult = res + w.matchCond.Signal() +} + +// call with matcherData.lock held +func (w *waitableMatchResult) waitForMatch() *matchResult { + for w.matchResult == nil { + w.matchCond.Wait() + } + return w.matchResult +} + +// resettable timer: + +type resettableTimer struct { + timer *time.Timer // AfterFunc timer + target time.Time // target time of timer +} + +// set sets rt to call f after delay. If it was already set to a future time, it's reset +// to only the sooner time. If it was already set to a sooner time, it's unchanged. +// set to <= 0 stops the timer. +func (rt *resettableTimer) set(f func(), delay time.Duration) { + if delay <= 0 { + rt.unset() + } else if rt.timer == nil { + rt.target = time.Now().Add(delay) + rt.timer = time.AfterFunc(delay, f) + } else if target := time.Now().Add(delay); target.Before(rt.target) { + rt.target = target + rt.timer.Reset(delay) + } +} + +// unset stops the timer. +func (rt *resettableTimer) unset() { + if rt.timer != nil { + rt.timer.Stop() + rt.timer = nil + } +} + +func bugIf(cond bool, msg string) { + if cond { + panic(msg) + } +} diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index 8641f9a18e2..5371aa692a2 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -96,12 +96,16 @@ func (t *MatcherTestSuite) SetupTest() { t.fwdr, err = newForwarder(&t.childConfig.forwarderConfig, t.queue, t.client) t.Assert().NoError(err) t.childMatcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopMetricsHandler) + t.childMatcher.Start() t.rootConfig = newTaskQueueConfig(prtn.TaskQueue(), cfg, "test-namespace") t.rootMatcher = newTaskMatcher(t.rootConfig, nil, metrics.NoopMetricsHandler) + t.rootMatcher.Start() } func (t *MatcherTestSuite) TearDownTest() { + t.childMatcher.Stop() + t.rootMatcher.Stop() t.controller.Finish() } diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index f54bf0e9bf2..31da5ee53dd 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -413,8 +413,27 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager( logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(nsName, partition, tqConfig) var newPM *taskQueuePartitionManagerImpl onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) } - userDataManager := newUserDataManager(e.taskManager, e.matchingRawClient, onFatalErr, partition, tqConfig, logger, e.namespaceRegistry) - newPM, err = newTaskQueuePartitionManager(e, namespaceEntry, partition, tqConfig, logger, throttledLogger, metricsHandler, userDataManager) + onUserDataChanged := func() { newPM.userDataChanged() } + userDataManager := newUserDataManager( + e.taskManager, + e.matchingRawClient, + onFatalErr, + onUserDataChanged, + partition, + tqConfig, + logger, + e.namespaceRegistry, + ) + newPM, err = newTaskQueuePartitionManager( + e, + namespaceEntry, + partition, + tqConfig, + logger, + throttledLogger, + metricsHandler, + userDataManager, + ) if err != nil { return nil, false, err } diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index a628bfb6654..5630924ed2b 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -275,7 +275,7 @@ func newPhysicalTaskQueueManager( return nil, err } } - pqMgr.matcher = newTaskMatcher(config, fwdr, pqMgr.metricsHandler) + pqMgr.matcher = newTaskMatcher(config, queue.partition, fwdr, pqMgr.taskValidator, pqMgr.metricsHandler) for _, opt := range opts { opt(pqMgr) } @@ -292,6 +292,7 @@ func (c *physicalTaskQueueManagerImpl) Start() { } c.liveness.Start() c.backlogMgr.Start() + c.matcher.Start() c.logger.Info("Started physicalTaskQueueManager", tag.LifeCycleStarted, tag.Cause(c.config.loadCause.String())) c.metricsHandler.Counter(metrics.TaskQueueStartedCounter.Name()).Record(1) c.partitionMgr.engine.updatePhysicalTaskQueueGauge(c, 1) @@ -391,28 +392,16 @@ func (c *physicalTaskQueueManagerImpl) MarkAlive() { c.liveness.markAlive() } -// DispatchSpooledTask dispatches a task to a poller. When there are no pollers to pick -// up the task or if rate limit is exceeded, this method will return error. Task -// *will not* be persisted to db -func (c *physicalTaskQueueManagerImpl) DispatchSpooledTask( - ctx context.Context, - task *internalTask, - userDataChanged <-chan struct{}, -) error { - return c.matcher.MustOffer(ctx, task, userDataChanged) +func (c *physicalTaskQueueManagerImpl) AddSpooledTask(ctx context.Context, task *internalTask) error { + return c.partitionMgr.AddSpooledTask(ctx, task, c.queue) } -func (c *physicalTaskQueueManagerImpl) ProcessSpooledTask( - ctx context.Context, - task *internalTask, -) error { - if !c.taskValidator.maybeValidate(task.event.AllocatedTaskInfo, c.queue.TaskType()) { - task.finish(nil, false) - c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1) - // Don't try to set read level here because it may have been advanced already. - return nil - } - return c.partitionMgr.ProcessSpooledTask(ctx, task, c.queue) +func (c *physicalTaskQueueManagerImpl) AddSpooledTaskToMatcher(task *internalTask) { + c.matcher.AddTask(task) +} + +func (c *physicalTaskQueueManagerImpl) UserDataChanged() { + c.matcher.RecheckAllRedirects() } // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned, @@ -509,7 +498,7 @@ func (c *physicalTaskQueueManagerImpl) GetInternalTaskQueueStatus() *taskqueuesp ReadLevel: c.backlogMgr.taskAckManager.getReadLevel(), AckLevel: c.backlogMgr.taskAckManager.getAckLevel(), TaskIdBlock: &taskqueuepb.TaskIdBlock{StartId: c.backlogMgr.taskWriter.taskIDBlock.start, EndId: c.backlogMgr.taskWriter.taskIDBlock.end}, - ReadBufferLength: int64(len(c.backlogMgr.taskReader.taskBuffer)), + ReadBufferLength: c.backlogMgr.taskReader.loadedTasks.Load(), } } @@ -534,10 +523,7 @@ func (c *physicalTaskQueueManagerImpl) TrySyncMatch(ctx context.Context, task *i return false, nil } } - childCtx, cancel := newChildContext(ctx, c.config.SyncMatchWaitDuration(), time.Second) - defer cancel() - - return c.matcher.Offer(childCtx, task) + return c.matcher.Offer(ctx, task) } func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeployment( diff --git a/service/matching/physical_task_queue_manager_interface.go b/service/matching/physical_task_queue_manager_interface.go index ac914525f66..743a719e2a4 100644 --- a/service/matching/physical_task_queue_manager_interface.go +++ b/service/matching/physical_task_queue_manager_interface.go @@ -51,10 +51,9 @@ type ( TrySyncMatch(ctx context.Context, task *internalTask) (bool, error) // SpoolTask spools a task to persistence to be matched asynchronously when a poller is available. SpoolTask(taskInfo *persistencespb.TaskInfo) error - ProcessSpooledTask(ctx context.Context, task *internalTask) error - // DispatchSpooledTask dispatches a task to a poller. When there are no pollers to pick - // up the task, this method will return error. Task will not be persisted to db - DispatchSpooledTask(ctx context.Context, task *internalTask, userDataChanged <-chan struct{}) error + AddSpooledTask(ctx context.Context, task *internalTask) error + AddSpooledTaskToMatcher(task *internalTask) + UserDataChanged() // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned, // if dispatched to local poller then nil and nil is returned. DispatchQueryTask(ctx context.Context, taskId string, request *matchingservice.QueryWorkflowRequest) (*matchingservice.QueryWorkflowResponse, error) diff --git a/service/matching/physical_task_queue_manager_mock.go b/service/matching/physical_task_queue_manager_mock.go index 3721bbb9835..1e97488da87 100644 --- a/service/matching/physical_task_queue_manager_mock.go +++ b/service/matching/physical_task_queue_manager_mock.go @@ -68,6 +68,32 @@ func (m *MockphysicalTaskQueueManager) EXPECT() *MockphysicalTaskQueueManagerMoc return m.recorder } +// AddSpooledTask mocks base method. +func (m *MockphysicalTaskQueueManager) AddSpooledTask(ctx context.Context, task *internalTask) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddSpooledTask", ctx, task) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddSpooledTask indicates an expected call of AddSpooledTask. +func (mr *MockphysicalTaskQueueManagerMockRecorder) AddSpooledTask(ctx, task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSpooledTask", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).AddSpooledTask), ctx, task) +} + +// AddSpooledTaskToMatcher mocks base method. +func (m *MockphysicalTaskQueueManager) AddSpooledTaskToMatcher(task *internalTask) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSpooledTaskToMatcher", task) +} + +// AddSpooledTaskToMatcher indicates an expected call of AddSpooledTaskToMatcher. +func (mr *MockphysicalTaskQueueManagerMockRecorder) AddSpooledTaskToMatcher(task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSpooledTaskToMatcher", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).AddSpooledTaskToMatcher), task) +} + // DispatchNexusTask mocks base method. func (m *MockphysicalTaskQueueManager) DispatchNexusTask(ctx context.Context, taskId string, request *matchingservice.DispatchNexusTaskRequest) (*matchingservice.DispatchNexusTaskResponse, error) { m.ctrl.T.Helper() @@ -98,20 +124,6 @@ func (mr *MockphysicalTaskQueueManagerMockRecorder) DispatchQueryTask(ctx, taskI return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DispatchQueryTask", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).DispatchQueryTask), ctx, taskId, request) } -// DispatchSpooledTask mocks base method. -func (m *MockphysicalTaskQueueManager) DispatchSpooledTask(ctx context.Context, task *internalTask, userDataChanged <-chan struct{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DispatchSpooledTask", ctx, task, userDataChanged) - ret0, _ := ret[0].(error) - return ret0 -} - -// DispatchSpooledTask indicates an expected call of DispatchSpooledTask. -func (mr *MockphysicalTaskQueueManagerMockRecorder) DispatchSpooledTask(ctx, task, userDataChanged any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DispatchSpooledTask", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).DispatchSpooledTask), ctx, task, userDataChanged) -} - // GetAllPollerInfo mocks base method. func (m *MockphysicalTaskQueueManager) GetAllPollerInfo() []*taskqueue.PollerInfo { m.ctrl.T.Helper() @@ -209,20 +221,6 @@ func (mr *MockphysicalTaskQueueManagerMockRecorder) PollTask(ctx, pollMetadata a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollTask", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).PollTask), ctx, pollMetadata) } -// ProcessSpooledTask mocks base method. -func (m *MockphysicalTaskQueueManager) ProcessSpooledTask(ctx context.Context, task *internalTask) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessSpooledTask", ctx, task) - ret0, _ := ret[0].(error) - return ret0 -} - -// ProcessSpooledTask indicates an expected call of ProcessSpooledTask. -func (mr *MockphysicalTaskQueueManagerMockRecorder) ProcessSpooledTask(ctx, task any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSpooledTask", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).ProcessSpooledTask), ctx, task) -} - // QueueKey mocks base method. func (m *MockphysicalTaskQueueManager) QueueKey() *PhysicalTaskQueueKey { m.ctrl.T.Helper() @@ -342,6 +340,18 @@ func (mr *MockphysicalTaskQueueManagerMockRecorder) UpdatePollerInfo(arg0, arg1 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePollerInfo", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).UpdatePollerInfo), arg0, arg1) } +// UserDataChanged mocks base method. +func (m *MockphysicalTaskQueueManager) UserDataChanged() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UserDataChanged") +} + +// UserDataChanged indicates an expected call of UserDataChanged. +func (mr *MockphysicalTaskQueueManagerMockRecorder) UserDataChanged() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UserDataChanged", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).UserDataChanged)) +} + // WaitUntilInitialized mocks base method. func (m *MockphysicalTaskQueueManager) WaitUntilInitialized(arg0 context.Context) error { m.ctrl.T.Helper() diff --git a/service/matching/physical_task_queue_manager_test.go b/service/matching/physical_task_queue_manager_test.go index ae27a7d70c9..e0637e86f22 100644 --- a/service/matching/physical_task_queue_manager_test.go +++ b/service/matching/physical_task_queue_manager_test.go @@ -283,7 +283,7 @@ func createTestTaskQueueManagerWithConfig( partition := testOpts.dbq.Partition() tqConfig := newTaskQueueConfig(partition.TaskQueue(), me.config, nsName) onFatalErr := func(unloadCause) { t.Fatal("user data manager called onFatalErr") } - userDataManager := newUserDataManager(me.taskManager, me.matchingRawClient, onFatalErr, partition, tqConfig, me.logger, me.namespaceRegistry) + userDataManager := newUserDataManager(me.taskManager, me.matchingRawClient, onFatalErr, nil, partition, tqConfig, me.logger, me.namespaceRegistry) pm := createTestTaskQueuePartitionManager(ns, partition, tqConfig, me, userDataManager) tlMgr, err := newPhysicalTaskQueueManager(pm, testOpts.dbq, opts...) pm.defaultQueue = tlMgr diff --git a/service/matching/task.go b/service/matching/task.go index 05f52b37359..b1172be89a7 100644 --- a/service/matching/task.go +++ b/service/matching/task.go @@ -25,6 +25,7 @@ package matching import ( + "context" "time" commonpb "go.temporal.io/api/common/v1" @@ -70,7 +71,7 @@ type ( started *startedTaskInfo // non-nil for a task received from a parent partition which is already started namespace namespace.Name source enumsspb.TaskSource - responseC chan error // non-nil only where there is a caller waiting for response (sync-match) + responseC chan syncMatchResponse // non-nil only where there is a caller waiting for response (sync-match) backlogCountHint func() int64 // forwardInfo contains information about forward source partition and versioning decisions made by it // a parent partition receiving forwarded tasks makes no versioning decisions and only follows what the source @@ -79,7 +80,27 @@ type ( // redirectInfo is only set when redirect rule is applied on the task. for forwarded tasks, this is populated // based on forwardInfo. redirectInfo *taskqueuespb.BuildIdRedirectInfo + + // These fields are for use by matcherData: + waitableMatchResult + forwardCtx context.Context // non-nil for sync match task only recycleToken func() + // If checkRedirect is set, matcher may call it periodically. If it returns true, + // checkRedirect has handled the task and matcher should drop it. Otherwise matcher + // should hold on to it. + checkRedirect func() (handled bool) + isPollForwarder bool + } + + // syncMatchResponse is used to report the result of either a match with a local poller, + // or forwarding a task, query, or nexus task. + syncMatchResponse struct { + // If forwarded is true, then forwardRes and forwardErr have the result of forwarding. + // If it's false, then startErr has the result of RecordTaskStarted. + forwarded bool + forwardRes any // note this may be a non-nil "any" containing a nil pointer + forwardErr error + startErr error } ) @@ -95,7 +116,7 @@ func newInternalTaskForSyncMatch( source = forwardInfo.TaskSource redirectInfo = forwardInfo.GetRedirectInfo() } - task := &internalTask{ + return &internalTask{ event: &genericTaskInfo{ AllocatedTaskInfo: &persistencespb.AllocatedTaskInfo{ Data: info, @@ -105,23 +126,21 @@ func newInternalTaskForSyncMatch( forwardInfo: forwardInfo, source: source, redirectInfo: redirectInfo, - responseC: make(chan error, 1), + responseC: make(chan syncMatchResponse, 1), } - return task } func newInternalTaskFromBacklog( info *persistencespb.AllocatedTaskInfo, completionFunc func(*persistencespb.AllocatedTaskInfo, error), ) *internalTask { - task := &internalTask{ + return &internalTask{ event: &genericTaskInfo{ AllocatedTaskInfo: info, completionFunc: completionFunc, }, source: enumsspb.TASK_SOURCE_DB_BACKLOG, } - return task } func newInternalQueryTask( @@ -134,7 +153,7 @@ func newInternalQueryTask( request: request, }, forwardInfo: request.GetForwardInfo(), - responseC: make(chan error, 1), + responseC: make(chan syncMatchResponse, 1), source: enumsspb.TASK_SOURCE_HISTORY, } } @@ -153,7 +172,7 @@ func newInternalNexusTask( request: request, }, forwardInfo: request.GetForwardInfo(), - responseC: make(chan error, 1), + responseC: make(chan syncMatchResponse, 1), source: enumsspb.TASK_SOURCE_HISTORY, } } @@ -179,6 +198,11 @@ func (task *internalTask) isQuery() bool { return task.query != nil } +// isNexus returns true if the underlying task is a nexus task +func (task *internalTask) isNexus() bool { + return task.nexus != nil +} + // isStarted is true when this task is already marked as started func (task *internalTask) isStarted() bool { return task.started != nil @@ -235,6 +259,14 @@ func (task *internalTask) pollNexusTaskQueueResponse() *matchingservice.PollNexu return nil } +// getResponse waits for a response on the task's response channel. +func (task *internalTask) getResponse() (syncMatchResponse, bool) { + if task.responseC == nil { + return syncMatchResponse{}, false + } + return <-task.responseC, true +} + // finish marks a task as finished. Should be called after a poller picks up a task // and marks it as started. If the task is unable to marked as started, then this // method should be called with a non-nil error argument. @@ -244,13 +276,26 @@ func (task *internalTask) pollNexusTaskQueueResponse() *matchingservice.PollNexu // so finish will call the rate limiter's RecycleToken to give the unused token back to any process // that is waiting on the token, if one exists. func (task *internalTask) finish(err error, wasValid bool) { + task.finishInternal(false, nil, err, wasValid) +} + +// finishForward should be called after forwarding a task. +func (task *internalTask) finishForward(forwardRes any, forwardErr error, wasValid bool) { + task.finishInternal(true, forwardRes, forwardErr, wasValid) +} + +func (task *internalTask) finishInternal(forwarded bool, forwardResult any, err error, wasValid bool) { if !wasValid && task.recycleToken != nil { task.recycleToken() } switch { case task.responseC != nil: - task.responseC <- err + if forwarded { + task.responseC <- syncMatchResponse{forwarded: true, forwardRes: forwardResult, forwardErr: err} + } else { + task.responseC <- syncMatchResponse{startErr: err} + } case task.event.completionFunc != nil: // TODO: this probably should not be done synchronously in PollWorkflow/ActivityTaskQueue task.event.completionFunc(task.event.AllocatedTaskInfo, err) diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 5904037a041..3918bb8740b 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -329,7 +329,7 @@ func (pm *taskQueuePartitionManagerImpl) PollTask( return task, versionSetUsed, err } -func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask( +func (pm *taskQueuePartitionManagerImpl) AddSpooledTask( ctx context.Context, task *internalTask, backlogQueue *PhysicalTaskQueueKey, @@ -342,47 +342,53 @@ func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask( // construct directive based on the build ID of the spool queue directive = worker_versioning.MakeBuildIdDirective(assignedBuildId) } - // Redirect and re-resolve if we're blocked in matcher and user data changes. - for { - newBacklogQueue, syncMatchQueue, userDataChanged, err := pm.getPhysicalQueuesForAdd( - ctx, - directive, - nil, - taskInfo.GetRunId(), - ) + newBacklogQueue, syncMatchQueue, userDataChanged, err := pm.getPhysicalQueuesForAdd( + ctx, + directive, + nil, + taskInfo.GetRunId(), + ) + if err != nil { + return err + } + // set redirect info if spoolQueue and syncMatchQueue build ids are different + if assignedBuildId != syncMatchQueue.QueueKey().Version().BuildId() { + task.redirectInfo = &taskqueuespb.BuildIdRedirectInfo{ + AssignedBuildId: assignedBuildId, + } + } else { + // make sure to reset redirectInfo in case it was set in a previous loop cycle + task.redirectInfo = nil + } + if !backlogQueue.version.Deployment().Equal(newBacklogQueue.QueueKey().version.Deployment()) { + // Backlog queue has changed, spool to the new queue. This should happen rarely: when + // activity of pinned workflow was determined independent and sent to the default queue + // but now at dispatch time, the determination is different because the activity pollers + // on the pinned deployment have reached server. + // TODO: before spooling, try to sync-match the task on the new queue + err = newBacklogQueue.SpoolTask(taskInfo) if err != nil { + // return the error so task_reader retries the outer call return err } - // set redirect info if spoolQueue and syncMatchQueue build ids are different - if assignedBuildId != syncMatchQueue.QueueKey().Version().BuildId() { - task.redirectInfo = &taskqueuespb.BuildIdRedirectInfo{ - AssignedBuildId: assignedBuildId, - } - } else { - // make sure to reset redirectInfo in case it was set in a previous loop cycle - task.redirectInfo = nil - } - if !backlogQueue.version.Deployment().Equal(newBacklogQueue.QueueKey().version.Deployment()) { - // Backlog queue has changed, spool to the new queue. This should happen rarely: when - // activity of pinned workflow was determined independent and sent to the default queue - // but now at dispatch time, the determination is different because the activity pollers - // on the pinned deployment have reached server. - // TODO: before spooling, try to sync-match the task on the new queue - err = newBacklogQueue.SpoolTask(taskInfo) - if err != nil { - // return the error so task_reader retries the outer call - return err - } - // Finish the task because now it is copied to the other backlog. It should be considered - // invalid because a poller did not receive the task. - task.finish(nil, false) - return nil - } - err = syncMatchQueue.DispatchSpooledTask(ctx, task, userDataChanged) - if err != errInterrupted { - return err + // Finish the task because now it is copied to the other backlog. It should be considered + // invalid because a poller did not receive the task. + task.finish(nil, false) + return nil + } + // TODO(pri): Is this always necessary or can we determine that some can't be re-redirected? + // TODO(pri): Can we avoid allocating a closure when re-redirection is necessary? + task.checkRedirect = func() (handled bool) { + select { + case <-userDataChanged: + // user data has changed, call again to re-evaluate redirections + return pm.AddSpooledTask(ctx, task, backlogQueue) == nil + default: + return false // unchanged, leave task in matcher } } + syncMatchQueue.AddSpooledTaskToMatcher(task) + return nil } func (pm *taskQueuePartitionManagerImpl) DispatchQueryTask( @@ -1015,3 +1021,15 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T perType := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())] return perType, userDataChanged, nil } + +func (pm *taskQueuePartitionManagerImpl) userDataChanged() { + // Notify all queues so they can re-evaluate their backlog. + pm.versionedQueuesLock.RLock() + for _, vq := range pm.versionedQueues { + go vq.UserDataChanged() + } + pm.versionedQueuesLock.RUnlock() + + // Do this one in this goroutine. + pm.defaultQueue.UserDataChanged() +} diff --git a/service/matching/task_queue_partition_manager_interface.go b/service/matching/task_queue_partition_manager_interface.go index 28ac5a894bf..4f5a9c94d9e 100644 --- a/service/matching/task_queue_partition_manager_interface.go +++ b/service/matching/task_queue_partition_manager_interface.go @@ -55,13 +55,7 @@ type ( // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched // from this task queue to pollers PollTask(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, bool, error) - // ProcessSpooledTask dispatches a task to a poller. When there are no pollers to pick - // up the task, this method will return error. Task will not be persisted to db - ProcessSpooledTask( - ctx context.Context, - task *internalTask, - backlogQueue *PhysicalTaskQueueKey, - ) error + AddSpooledTask(ctx context.Context, task *internalTask, backlogQueue *PhysicalTaskQueueKey) error // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned, // if dispatched to local poller then nil and nil is returned. DispatchQueryTask(ctx context.Context, taskId string, request *matchingservice.QueryWorkflowRequest) (*matchingservice.QueryWorkflowResponse, error) diff --git a/service/matching/task_queue_partition_manager_mock.go b/service/matching/task_queue_partition_manager_mock.go index 405e3c0ab61..1c09893dd5f 100644 --- a/service/matching/task_queue_partition_manager_mock.go +++ b/service/matching/task_queue_partition_manager_mock.go @@ -70,6 +70,20 @@ func (m *MocktaskQueuePartitionManager) EXPECT() *MocktaskQueuePartitionManagerM return m.recorder } +// AddSpooledTask mocks base method. +func (m *MocktaskQueuePartitionManager) AddSpooledTask(ctx context.Context, task *internalTask, backlogQueue *PhysicalTaskQueueKey) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddSpooledTask", ctx, task, backlogQueue) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddSpooledTask indicates an expected call of AddSpooledTask. +func (mr *MocktaskQueuePartitionManagerMockRecorder) AddSpooledTask(ctx, task, backlogQueue any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSpooledTask", reflect.TypeOf((*MocktaskQueuePartitionManager)(nil).AddSpooledTask), ctx, task, backlogQueue) +} + // AddTask mocks base method. func (m *MocktaskQueuePartitionManager) AddTask(ctx context.Context, params addTaskParams) (string, bool, error) { m.ctrl.T.Helper() @@ -285,20 +299,6 @@ func (mr *MocktaskQueuePartitionManagerMockRecorder) PollTask(ctx, pollMetadata return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollTask", reflect.TypeOf((*MocktaskQueuePartitionManager)(nil).PollTask), ctx, pollMetadata) } -// ProcessSpooledTask mocks base method. -func (m *MocktaskQueuePartitionManager) ProcessSpooledTask(ctx context.Context, task *internalTask, backlogQueue *PhysicalTaskQueueKey) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessSpooledTask", ctx, task, backlogQueue) - ret0, _ := ret[0].(error) - return ret0 -} - -// ProcessSpooledTask indicates an expected call of ProcessSpooledTask. -func (mr *MocktaskQueuePartitionManagerMockRecorder) ProcessSpooledTask(ctx, task, backlogQueue any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSpooledTask", reflect.TypeOf((*MocktaskQueuePartitionManager)(nil).ProcessSpooledTask), ctx, task, backlogQueue) -} - // Start mocks base method. func (m *MocktaskQueuePartitionManager) Start() { m.ctrl.T.Helper() diff --git a/service/matching/task_reader.go b/service/matching/task_reader.go index 7ae5c805f09..dbb1768471c 100644 --- a/service/matching/task_reader.go +++ b/service/matching/task_reader.go @@ -31,6 +31,7 @@ import ( "sync/atomic" "time" + "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" @@ -38,46 +39,57 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/primitives/timestamp" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/util" "go.temporal.io/server/internal/goro" + "golang.org/x/sync/semaphore" ) const ( - taskReaderOfferThrottleWait = time.Second taskReaderThrottleRetryDelay = 3 * time.Second + + // Load more tasks when loaded count is <= MaxBatchSize/reloadFraction. + // E.g. if MaxBatchSize is 1000, then we'll load 1000, dispatch down to 200, + // load another batch to make 1200, down to 200, etc. + reloadFraction = 5 // TODO(pri): make dynamic config + + concurrentAddRetries = 10 ) type ( taskReader struct { status int32 - taskBuffer chan *persistencespb.AllocatedTaskInfo // tasks loaded from persistence - notifyC chan struct{} // Used as signal to notify pump of new tasks + notifyC chan struct{} // Used as signal to notify pump of new tasks backlogMgr *backlogManagerImpl gorogrp goro.Group - backoffTimerLock sync.Mutex - backoffTimer *time.Timer - retrier backoff.Retrier - backlogHeadCreateTime atomic.Int64 + backoffTimerLock sync.Mutex + backoffTimer *time.Timer + retrier backoff.Retrier + loadedTasks atomic.Int64 + + backlogAgeLock sync.Mutex + backlogAge backlogAgeTracker + + addRetries *semaphore.Weighted } ) +var addErrorRetryPolicy = backoff.NewExponentialRetryPolicy(2 * time.Second). + WithExpirationInterval(backoff.NoInterval) + func newTaskReader(backlogMgr *backlogManagerImpl) *taskReader { tr := &taskReader{ status: common.DaemonStatusInitialized, backlogMgr: backlogMgr, notifyC: make(chan struct{}, 1), - // we always dequeue the head of the buffer and try to dispatch it to a poller - // so allocate one less than desired target buffer size - taskBuffer: make(chan *persistencespb.AllocatedTaskInfo, backlogMgr.config.GetTasksBatchSize()-1), retrier: backoff.NewRetrier( common.CreateReadTaskRetryPolicy(), clock.NewRealTimeSource(), ), + backlogAge: newBacklogAgeTracker(), + addRetries: semaphore.NewWeighted(concurrentAddRetries), } - tr.backlogHeadCreateTime.Store(-1) return tr } @@ -91,12 +103,11 @@ func (tr *taskReader) Start() { return } - tr.gorogrp.Go(tr.dispatchBufferedTasks) tr.gorogrp.Go(tr.getTasksPump) } // Stop taskReader goroutines. -// Note that this does not wait until +// Note that this does not wait until they stop before returning. func (tr *taskReader) Stop() { if !atomic.CompareAndSwapInt32( &tr.status, @@ -110,68 +121,30 @@ func (tr *taskReader) Stop() { } func (tr *taskReader) Signal() { - var event struct{} select { - case tr.notifyC <- event: + case tr.notifyC <- struct{}{}: default: // channel already has an event, don't block } } -func (tr *taskReader) updateBacklogAge(task *internalTask) { - if task.event.Data.CreateTime == nil { - return // should not happen but for safety - } - ts := timestamp.TimeValue(task.event.Data.CreateTime).UnixNano() - tr.backlogHeadCreateTime.Store(ts) -} - func (tr *taskReader) getBacklogHeadAge() time.Duration { - if tr.backlogHeadCreateTime.Load() == -1 { - return time.Duration(0) - } - return time.Since(time.Unix(0, tr.backlogHeadCreateTime.Load())) + tr.backlogAgeLock.Lock() + defer tr.backlogAgeLock.Unlock() + return max(0, tr.backlogAge.getAge()) // return 0 instead of -1 } -func (tr *taskReader) dispatchBufferedTasks(ctx context.Context) error { - ctx = tr.backlogMgr.contextInfoProvider(ctx) +func (tr *taskReader) completeTask(task *persistencespb.AllocatedTaskInfo, err error) { + loaded := tr.loadedTasks.Add(-1) -dispatchLoop: - for ctx.Err() == nil { - if len(tr.taskBuffer) == 0 { - // reset the atomic since we have no tasks from the backlog - tr.backlogHeadCreateTime.Store(-1) - } - select { - case taskInfo, ok := <-tr.taskBuffer: - if !ok { // Task queue getTasks pump is shutdown - break dispatchLoop - } - task := newInternalTaskFromBacklog(taskInfo, tr.backlogMgr.completeTask) - for ctx.Err() == nil { - tr.updateBacklogAge(task) - taskCtx, cancel := context.WithTimeout(ctx, taskReaderOfferTimeout) - err := tr.backlogMgr.processSpooledTask(taskCtx, task) - cancel() - if err == nil { - continue dispatchLoop - } + tr.backlogAgeLock.Lock() + tr.backlogAge.record(task.Data.CreateTime, -1) + tr.backlogAgeLock.Unlock() - var stickyUnavailable *serviceerrors.StickyWorkerUnavailable - // if task is still valid (truly valid or unable to verify if task is valid) - metrics.BufferThrottlePerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1) - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) && - // StickyWorkerUnavailable is expected for versioned sticky queues - !errors.As(err, &stickyUnavailable) { - tr.throttledLogger().Error("taskReader: unexpected error dispatching task", tag.Error(err)) - } - util.InterruptibleSleep(ctx, taskReaderOfferThrottleWait) - } - return ctx.Err() - case <-ctx.Done(): - return ctx.Err() - } + // use == so we just signal once when we cross this threshold + if int(loaded) == tr.backlogMgr.config.GetTasksBatchSize()/reloadFraction { + tr.Signal() } - return ctx.Err() + tr.backlogMgr.completeTask(task, err) } func (tr *taskReader) getTasksPump(ctx context.Context) error { @@ -181,8 +154,7 @@ func (tr *taskReader) getTasksPump(ctx context.Context) error { return err } - updateAckTimer := time.NewTimer(tr.backlogMgr.config.UpdateAckInterval()) - defer updateAckTimer.Stop() + updateAckTicker := time.NewTicker(tr.backlogMgr.config.UpdateAckInterval()) tr.Signal() // prime pump Loop: @@ -199,14 +171,20 @@ Loop: return nil case <-tr.notifyC: + if int(tr.loadedTasks.Load()) > tr.backlogMgr.config.GetTasksBatchSize()/reloadFraction { + // Too many loaded already, ignore this signal. We'll get another signal when + // loadedTasks drops low enough. + continue Loop + } + batch, err := tr.getTaskBatch(ctx) tr.backlogMgr.signalIfFatal(err) if err != nil { // TODO: Should we ever stop retrying on db errors? if common.IsResourceExhausted(err) { - tr.reEnqueueAfterDelay(taskReaderThrottleRetryDelay) + tr.backoffSignal(taskReaderThrottleRetryDelay) } else { - tr.reEnqueueAfterDelay(tr.retrier.NextBackOff(err)) + tr.backoffSignal(tr.retrier.NextBackOff(err)) } continue Loop } @@ -220,13 +198,11 @@ Loop: continue Loop } - // only error here is due to context cancellation which we also - // handle above - _ = tr.addTasksToBuffer(ctx, batch.tasks) - // There maybe more tasks. We yield now, but signal pump to check again later. + tr.addTasksToMatcher(ctx, batch.tasks) + // There may be more tasks. tr.Signal() - case <-updateAckTimer.C: + case <-updateAckTicker.C: err := tr.persistAckBacklogCountLevel(ctx) isConditionFailed := tr.backlogMgr.signalIfFatal(err) if err != nil && !isConditionFailed { @@ -235,8 +211,8 @@ Loop: tag.Error(err)) // keep going as saving ack is not critical } + // TODO(pri): don't do this, or else prove that it's needed tr.Signal() // periodically signal pump to check persistence for tasks - updateAckTimer = time.NewTimer(tr.backlogMgr.config.UpdateAckInterval()) } } } @@ -294,7 +270,7 @@ func (tr *taskReader) getTaskBatch(ctx context.Context) (*getTasksBatchResponse, }, nil // caller will update readLevel when no task grabbed } -func (tr *taskReader) addTasksToBuffer( +func (tr *taskReader) addTasksToMatcher( ctx context.Context, tasks []*persistencespb.AllocatedTaskInfo, ) error { @@ -306,24 +282,86 @@ func (tr *taskReader) addTasksToBuffer( tr.backlogMgr.taskAckManager.setReadLevel(t.GetTaskId()) continue } - if err := tr.addSingleTaskToBuffer(ctx, t); err != nil { - return err + + tr.backlogMgr.taskAckManager.addTask(t.GetTaskId()) + tr.loadedTasks.Add(1) + tr.backlogAgeLock.Lock() + tr.backlogAge.record(t.Data.CreateTime, 1) + tr.backlogAgeLock.Unlock() + task := newInternalTaskFromBacklog(t, tr.completeTask) + + // After we get to this point, we must eventually call task.finish or + // task.finishForwarded, which will call tr.completeTask. + + err := tr.backlogMgr.addSpooledTask(ctx, task) + + if err != nil { + if drop, retry := tr.addErrorBehavior(err); drop { + task.finish(nil, false) + } else if retry { + // This should only be due to persistence problems. Retry in a new goroutine + // to not block other tasks, up to some concurrency limit. + if tr.addRetries.Acquire(ctx, 1) != nil { + return nil + } + go tr.retryAddAfterError(ctx, task) + } } } return nil } -func (tr *taskReader) addSingleTaskToBuffer( - ctx context.Context, - task *persistencespb.AllocatedTaskInfo, -) error { - tr.backlogMgr.taskAckManager.addTask(task.GetTaskId()) - select { - case tr.taskBuffer <- task: - return nil - case <-ctx.Done(): - return ctx.Err() +func (tr *taskReader) addErrorBehavior(err error) (drop, retry bool) { + // addSpooledTask can only fail due to: + // - the task queue is closed (errTaskQueueClosed or context.Canceled) + // - ValidateDeployment failed (InvalidArgument) + // - versioning wants to get a versioned queue and it can't be initialized + // - versioning wants to re-spool the task on a different queue and that failed + // - versioning says StickyWorkerUnavailable + if errors.Is(err, errTaskQueueClosed) || common.IsContextCanceledErr(err) { + return false, false + } + var stickyUnavailable *serviceerrors.StickyWorkerUnavailable + if errors.As(err, &stickyUnavailable) { + return true, false // drop the task } + var invalid *serviceerror.InvalidArgument + var internal *serviceerror.Internal + if errors.As(err, &invalid) || errors.As(err, &internal) { + tr.throttledLogger().Error("nonretryable error processing spooled task", tag.Error(err)) + return true, false // drop the task + } + // For any other error (this should be very rare), we can retry. + tr.throttledLogger().Error("retryable error processing spooled task", tag.Error(err)) + return false, true +} + +func (tr *taskReader) retryAddAfterError(ctx context.Context, task *internalTask) { + defer tr.addRetries.Release(1) + metrics.BufferThrottlePerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1) + + // initial sleep since we just tried once + util.InterruptibleSleep(ctx, time.Second) + + backoff.ThrottleRetryContext( + ctx, + func(ctx context.Context) error { + if IsTaskExpired(task.event.AllocatedTaskInfo) { + task.finish(nil, false) + return nil + } + err := tr.backlogMgr.addSpooledTask(ctx, task) + if drop, retry := tr.addErrorBehavior(err); drop { + task.finish(nil, false) + } else if retry { + metrics.BufferThrottlePerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1) + return err + } + return nil + }, + addErrorRetryPolicy, + nil, + ) } func (tr *taskReader) persistAckBacklogCountLevel(ctx context.Context) error { @@ -343,7 +381,7 @@ func (tr *taskReader) taggedMetricsHandler() metrics.Handler { return tr.backlogMgr.metricsHandler } -func (tr *taskReader) reEnqueueAfterDelay(duration time.Duration) { +func (tr *taskReader) backoffSignal(duration time.Duration) { tr.backoffTimerLock.Lock() defer tr.backoffTimerLock.Unlock() @@ -357,3 +395,7 @@ func (tr *taskReader) reEnqueueAfterDelay(duration time.Duration) { }) } } + +func (tr *taskReader) getLoadedTasks() int64 { + return tr.loadedTasks.Load() +} diff --git a/service/matching/task_validation.go b/service/matching/task_validation.go index 48ddbc95fe7..cd62b921d8c 100644 --- a/service/matching/task_validation.go +++ b/service/matching/task_validation.go @@ -38,10 +38,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" ) -const ( - taskReaderOfferTimeout = 60 * time.Second - taskReaderValidationThreshold = 600 * time.Second -) +const taskReaderValidationThreshold = 600 * time.Second type ( taskValidator interface { diff --git a/service/matching/task_writer.go b/service/matching/task_writer.go index f61b6c9fb30..3a5de308b64 100644 --- a/service/matching/task_writer.go +++ b/service/matching/task_writer.go @@ -214,6 +214,7 @@ func (w *taskWriter) appendTasks( tag.WorkflowTaskQueueType(w.backlogMgr.queueKey().TaskType())) return nil, err } + // TODO(pri): we should signal taskreader here, not in the callers of appendTask return resp, nil } diff --git a/service/matching/user_data_manager.go b/service/matching/user_data_manager.go index 661c474dad1..1012ede4d3b 100644 --- a/service/matching/user_data_manager.go +++ b/service/matching/user_data_manager.go @@ -94,12 +94,13 @@ type ( // to/from the persistence layer passes through userDataManager of the owning partition. // All other partitions long-poll the latest user data from the owning partition. userDataManagerImpl struct { - lock sync.Mutex - onFatalErr func(unloadCause) - partition tqid.Partition - userData *persistencespb.VersionedTaskQueueUserData - userDataChanged chan struct{} - userDataState userDataState + lock sync.Mutex + onFatalErr func(unloadCause) + onUserDataChanged func() // if set, call this in new goroutine when user data changes + partition tqid.Partition + userData *persistencespb.VersionedTaskQueueUserData + userDataChanged chan struct{} + userDataState userDataState // only set if this partition owns user data of its task queue store persistence.TaskManager config *taskQueueConfig @@ -128,6 +129,7 @@ func newUserDataManager( store persistence.TaskManager, matchingClient matchingservice.MatchingServiceClient, onFatalErr func(unloadCause), + onUserDataChanged func(), partition tqid.Partition, config *taskQueueConfig, logger log.Logger, @@ -135,6 +137,7 @@ func newUserDataManager( ) *userDataManagerImpl { m := &userDataManagerImpl{ onFatalErr: onFatalErr, + onUserDataChanged: onUserDataChanged, partition: partition, userDataChanged: make(chan struct{}), config: config, @@ -195,6 +198,9 @@ func (m *userDataManagerImpl) setUserDataLocked(userData *persistencespb.Version m.userData = userData close(m.userDataChanged) m.userDataChanged = make(chan struct{}) + if m.onUserDataChanged != nil { + go m.onUserDataChanged() + } } // Sets user data enabled/disabled and marks the future ready (if it's not ready yet). diff --git a/service/matching/user_data_manager_test.go b/service/matching/user_data_manager_test.go index 83e07bc4e43..1c3b234ac59 100644 --- a/service/matching/user_data_manager_test.go +++ b/service/matching/user_data_manager_test.go @@ -82,7 +82,7 @@ func createUserDataManager( onFatalErr = func(unloadCause) { t.Fatal("user data manager called onFatalErr") } } - return newUserDataManager(tm, testOpts.matchingClientMock, onFatalErr, testOpts.dbq.Partition(), newTaskQueueConfig(testOpts.dbq.Partition().TaskQueue(), testOpts.config, ns), logger, mockNamespaceCache) + return newUserDataManager(tm, testOpts.matchingClientMock, onFatalErr, nil, testOpts.dbq.Partition(), newTaskQueueConfig(testOpts.dbq.Partition().TaskQueue(), testOpts.config, ns), logger, mockNamespaceCache) } func TestUserData_LoadOnInit(t *testing.T) {