Skip to content

Commit

Permalink
taskmatcher rewrite wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jan 31, 2025
1 parent f195d5d commit fe8d0d7
Show file tree
Hide file tree
Showing 17 changed files with 947 additions and 721 deletions.
7 changes: 1 addition & 6 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,11 +1070,6 @@ See DynamicRateLimitingParams comments for more details.`,
time.Minute,
`MatchingLongPollExpirationInterval is the long poll expiration interval in the matching service`,
)
MatchingSyncMatchWaitDuration = NewTaskQueueDurationSetting(
"matching.syncMatchWaitDuration",
200*time.Millisecond,
`MatchingSyncMatchWaitDuration is to wait time for sync match`,
)
MatchingHistoryMaxPageSize = NewNamespaceIntSetting(
"matching.historyMaxPageSize",
primitives.GetHistoryMaxPageSize,
Expand Down Expand Up @@ -1175,7 +1170,7 @@ for VERSIONED queues.`,
1,
`MatchingForwarderMaxOutstandingTasks is the max number of inflight addTask/queryTask from the forwarder`,
)
MatchingForwarderMaxRatePerSecond = NewTaskQueueIntSetting(
MatchingForwarderMaxRatePerSecond = NewTaskQueueFloatSetting(
"matching.forwarderMaxRatePerSecond",
10,
`MatchingForwarderMaxRatePerSecond is the max rate at which add/query can be forwarded`,
Expand Down
8 changes: 0 additions & 8 deletions service/matching/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package matching

import (
"sync"
"sync/atomic"

"github.com/emirpasic/gods/maps/treemap"
godsutils "github.com/emirpasic/gods/utils"
Expand All @@ -41,7 +40,6 @@ type ackManager struct {
outstandingTasks *treemap.Map // TaskID->acked
readLevel int64 // Maximum TaskID inserted into outstandingTasks
ackLevel int64 // Maximum TaskID below which all tasks are acked
backlogCountHint atomic.Int64
logger log.Logger
}

Expand All @@ -68,7 +66,6 @@ func (m *ackManager) addTask(taskID int64) {
m.logger.Fatal("Already present in outstanding tasks", tag.TaskID(taskID))
}
m.outstandingTasks.Put(taskID, false)
m.backlogCountHint.Add(1)
}

func (m *ackManager) getReadLevel() int64 {
Expand Down Expand Up @@ -135,7 +132,6 @@ func (m *ackManager) completeTask(taskID int64) int64 {
// TODO the ack level management should be done by a dedicated coroutine
// this is only a temporarily solution
m.outstandingTasks.Put(taskID, true)
m.backlogCountHint.Add(-1)

// Adjust the ack level as far as we can
var numberOfAckedTasks int64
Expand All @@ -153,7 +149,3 @@ func (m *ackManager) completeTask(taskID int64) int64 {
}
return m.ackLevel
}

func (m *ackManager) getBacklogCountHint() int64 {
return m.backlogCountHint.Load()
}
49 changes: 49 additions & 0 deletions service/matching/backlog_age_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package matching

import (
"time"

"github.com/emirpasic/gods/maps/treemap"
godsutils "github.com/emirpasic/gods/utils"
"google.golang.org/protobuf/types/known/timestamppb"
)

const emptyBacklogAge time.Duration = -1

// backlogAgeTracker is not safe for concurrent use
type backlogAgeTracker struct {
tree treemap.Map // unix nano as int64 -> int (count)
}

func newBacklogAgeTracker() backlogAgeTracker {
return backlogAgeTracker{tree: *treemap.NewWith(godsutils.Int64Comparator)}
}

// record adds or removes a task from the tracker.
func (b backlogAgeTracker) record(ts *timestamppb.Timestamp, delta int) {
if ts == nil {
return
}

createTime := ts.AsTime().UnixNano()
count := delta
if prev, ok := b.tree.Get(createTime); ok {
count += prev.(int)
}
if count = max(0, count); count == 0 {
b.tree.Remove(createTime)
} else {
b.tree.Put(createTime, count)
}
}

// getBacklogAge returns the largest age in this backlog (age of oldest task),
// or emptyBacklogAge if empty.
func (b backlogAgeTracker) getAge() time.Duration {
if b.tree.Empty() {
return emptyBacklogAge
}
k, _ := b.tree.Min()
oldest := k.(int64)
return time.Since(time.Unix(0, oldest))
}
9 changes: 3 additions & 6 deletions service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,12 @@ func (c *backlogManagerImpl) SpoolTask(taskInfo *persistencespb.TaskInfo) error
return err
}

func (c *backlogManagerImpl) processSpooledTask(
ctx context.Context,
task *internalTask,
) error {
return c.pqMgr.ProcessSpooledTask(ctx, task)
func (c *backlogManagerImpl) addSpooledTask(ctx context.Context, task *internalTask) error {
return c.pqMgr.AddSpooledTask(ctx, task)
}

func (c *backlogManagerImpl) BacklogCountHint() int64 {
return c.taskAckManager.getBacklogCountHint()
return c.taskReader.getLoadedTasks()
}

func (c *backlogManagerImpl) BacklogStatus() *taskqueuepb.TaskQueueStatus {
Expand Down
17 changes: 7 additions & 10 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/tqid"
)
Expand All @@ -46,7 +47,6 @@ type (
PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter
PersistenceDynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams]
PersistenceQPSBurstRatio dynamicconfig.FloatPropertyFn
SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueFilter
RPS dynamicconfig.IntPropertyFn
OperatorRPSRatio dynamicconfig.FloatPropertyFn
AlignMembershipChange dynamicconfig.DurationPropertyFn
Expand All @@ -70,7 +70,7 @@ type (
BreakdownMetricsByBuildID dynamicconfig.BoolPropertyFnWithTaskQueueFilter
ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskQueueFilter
ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskQueueFilter
ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueFilter
ForwarderMaxRatePerSecond dynamicconfig.FloatPropertyFnWithTaskQueueFilter
ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueFilter
VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter
VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -124,13 +124,13 @@ type (
forwarderConfig struct {
ForwarderMaxOutstandingPolls func() int
ForwarderMaxOutstandingTasks func() int
ForwarderMaxRatePerSecond func() int
ForwarderMaxRatePerSecond func() float64
ForwarderMaxChildrenPerNode func() int
}

taskQueueConfig struct {
forwarderConfig
SyncMatchWaitDuration func() time.Duration
CallerInfo headers.CallerInfo
BacklogNegligibleAge func() time.Duration
MaxWaitForPollerBeforeFwd func() time.Duration
QueryPollerUnavailableWindow func() time.Duration
Expand Down Expand Up @@ -213,7 +213,6 @@ func NewConfig(
PersistencePerShardNamespaceMaxQPS: dynamicconfig.DefaultPerShardNamespaceRPSMax,
PersistenceDynamicRateLimitingParams: dynamicconfig.MatchingPersistenceDynamicRateLimitingParams.Get(dc),
PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc),
SyncMatchWaitDuration: dynamicconfig.MatchingSyncMatchWaitDuration.Get(dc),
LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc),
HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
Expand Down Expand Up @@ -285,7 +284,8 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
taskType := tq.TaskType()

return &taskQueueConfig{
RangeSize: config.RangeSize,
CallerInfo: headers.NewBackgroundCallerInfo(ns.String()),
RangeSize: config.RangeSize,
GetTasksBatchSize: func() int {
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)
},
Expand All @@ -298,9 +298,6 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
MinTaskThrottlingBurstSize: func() int {
return config.MinTaskThrottlingBurstSize(ns.String(), taskQueueName, taskType)
},
SyncMatchWaitDuration: func() time.Duration {
return config.SyncMatchWaitDuration(ns.String(), taskQueueName, taskType)
},
BacklogNegligibleAge: func() time.Duration {
return config.BacklogNegligibleAge(ns.String(), taskQueueName, taskType)
},
Expand Down Expand Up @@ -353,7 +350,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
ForwarderMaxOutstandingTasks: func() int {
return config.ForwarderMaxOutstandingTasks(ns.String(), taskQueueName, taskType)
},
ForwarderMaxRatePerSecond: func() int {
ForwarderMaxRatePerSecond: func() float64 {
return config.ForwarderMaxRatePerSecond(ns.String(), taskQueueName, taskType)
},
ForwarderMaxChildrenPerNode: func() int {
Expand Down
Loading

0 comments on commit fe8d0d7

Please sign in to comment.