Skip to content

Commit

Permalink
Move mutable state cache to host level (temporalio#4833)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Move mutable state cache to host level. 

<!-- Tell your future self why have you made these changes -->
**Why?**
For better utilization. This will be better to handle hot shard problem
with full pinned items in cache.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Local server + unit tests

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
N/A

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
yux0 authored Sep 6, 2023
1 parent db25e46 commit c9d9a93
Show file tree
Hide file tree
Showing 70 changed files with 994 additions and 765 deletions.
8 changes: 5 additions & 3 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(

wfContext, release, err := c.workflowCache.GetOrCreateWorkflowExecution(
ctx,
c.shardContext,
namespace.ID(workflowKey.NamespaceID),
commonpb.WorkflowExecution{
WorkflowId: workflowKey.WorkflowID,
Expand All @@ -182,7 +183,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
return nil, err
}

mutableState, err := wfContext.LoadMutableState(ctx)
mutableState, err := wfContext.LoadMutableState(ctx, c.shardContext)
if err != nil {
release(err)
return nil, err
Expand All @@ -205,6 +206,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(

wfContext, release, err := c.workflowCache.GetOrCreateWorkflowExecution(
ctx,
c.shardContext,
namespace.ID(workflowKey.NamespaceID),
commonpb.WorkflowExecution{
WorkflowId: workflowKey.WorkflowID,
Expand All @@ -216,15 +218,15 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
return nil, err
}

mutableState, err := wfContext.LoadMutableState(ctx)
mutableState, err := wfContext.LoadMutableState(ctx, c.shardContext)
switch err.(type) {
case nil:
if consistencyPredicate(mutableState) {
return NewWorkflowContext(wfContext, release, mutableState), nil
}
wfContext.Clear()

mutableState, err := wfContext.LoadMutableState(ctx)
mutableState, err := wfContext.LoadMutableState(ctx, c.shardContext)
if err != nil {
release(err)
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(
ctx,
s.shardContext,
namespace.ID(s.namespaceID),
commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil)
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState, nil)

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
ctx,
Expand All @@ -143,6 +144,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(
ctx,
s.shardContext,
namespace.ID(s.namespaceID),
commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
Expand All @@ -151,9 +153,9 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
gomock.InOrder(
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil),
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState1, nil),
wfContext.EXPECT().Clear(),
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState2, nil),
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(mutableState2, nil),
)

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
Expand All @@ -178,14 +180,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(
ctx,
s.shardContext,
namespace.ID(s.namespaceID),
commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(nil, serviceerror.NewNotFound(""))

s.shardContext.EXPECT().AssertOwnership(ctx).Return(nil)

Expand All @@ -211,14 +214,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(
ctx,
s.shardContext,
namespace.ID(s.namespaceID),
commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(nil, serviceerror.NewNotFound(""))

s.shardContext.EXPECT().AssertOwnership(ctx).Return(&persistence.ShardOwnershipLostError{})

Expand All @@ -244,14 +248,15 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck

s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(
ctx,
s.shardContext,
namespace.ID(s.namespaceID),
commonpb.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable(""))
wfContext.EXPECT().LoadMutableState(ctx, s.shardContext).Return(nil, serviceerror.NewUnavailable(""))

workflowContext, err := s.checker.getWorkflowContextValidatedByCheck(
ctx,
Expand Down
4 changes: 3 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ func NewWorkflowWithSignal(
}

newWorkflowContext := workflow.NewContext(
shard,
shard.GetConfig(),
definition.NewWorkflowKey(
namespaceEntry.ID().String(),
workflowID,
runID,
),
shard.GetLogger(),
shard.GetThrottledLogger(),
shard.GetMetricsHandler(),
)
return NewWorkflowContext(newWorkflowContext, wcache.NoopReleaseFn, newMutableState), nil
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/describemutablestate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func Invoke(
ctx context.Context,
req *historyservice.DescribeMutableStateRequest,
shard shard.Context,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (_ *historyservice.DescribeMutableStateResponse, retError error) {
namespaceID := namespace.ID(req.GetNamespaceId())
Expand Down Expand Up @@ -71,7 +71,7 @@ func Invoke(

// clear mutable state to force reload from persistence. This API returns both cached and persisted version.
weCtx.GetContext().Clear()
mutableState, err := weCtx.GetContext().LoadMutableState(ctx)
mutableState, err := weCtx.GetContext().LoadMutableState(ctx, shardContext)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import (

func GetOrPollMutableState(
ctx context.Context,
shardContext shard.Context,
request *historyservice.GetMutableStateRequest,
shard shard.Context,
workflowConsistencyChecker WorkflowConsistencyChecker,
eventNotifier events.Notifier,
) (*historyservice.GetMutableStateResponse, error) {
Expand All @@ -75,7 +75,7 @@ func GetOrPollMutableState(
request.Execution.WorkflowId,
request.Execution.RunId,
)
response, err := GetMutableState(ctx, workflowKey, workflowConsistencyChecker)
response, err := GetMutableState(ctx, shardContext, workflowKey, workflowConsistencyChecker)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +101,7 @@ func GetOrPollMutableState(
}
defer func() { _ = eventNotifier.UnwatchHistoryEvent(workflowKey, subscriberID) }()
// check again in case the next event ID is updated
response, err = GetMutableState(ctx, workflowKey, workflowConsistencyChecker)
response, err = GetMutableState(ctx, shardContext, workflowKey, workflowConsistencyChecker)
if err != nil {
return nil, err
}
Expand All @@ -113,11 +113,11 @@ func GetOrPollMutableState(
return response, nil
}

namespaceRegistry, err := shard.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
namespaceRegistry, err := shardContext.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
}
timer := time.NewTimer(shard.GetConfig().LongPollExpirationInterval(namespaceRegistry.Name().String()))
timer := time.NewTimer(shardContext.GetConfig().LongPollExpirationInterval(namespaceRegistry.Name().String()))
defer timer.Stop()
for {
select {
Expand Down Expand Up @@ -150,6 +150,7 @@ func GetOrPollMutableState(

func GetMutableState(
ctx context.Context,
shardContext shard.Context,
workflowKey definition.WorkflowKey,
workflowConsistencyChecker WorkflowConsistencyChecker,
) (_ *historyservice.GetMutableStateResponse, retError error) {
Expand All @@ -172,7 +173,7 @@ func GetMutableState(
}
defer func() { weCtx.GetReleaseFn()(retError) }()

mutableState, err := weCtx.GetContext().LoadMutableState(ctx)
mutableState, err := weCtx.GetContext().LoadMutableState(ctx, shardContext)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions service/history/api/getworkflowexecutionhistory/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (

func Invoke(
ctx context.Context,
shard shard.Context,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
versionChecker headers.VersionChecker,
eventNotifier events.Notifier,
Expand Down Expand Up @@ -78,13 +78,13 @@ func Invoke(
) ([]byte, string, int64, int64, bool, error) {
response, err := api.GetOrPollMutableState(
ctx,
shardContext,
&historyservice.GetMutableStateRequest{
NamespaceId: namespaceUUID.String(),
Execution: execution,
ExpectedNextEventId: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
},
shard,
workflowConsistencyChecker,
eventNotifier,
)
Expand Down Expand Up @@ -166,7 +166,7 @@ func Invoke(
if _, ok := retError.(*serviceerror.DataLoss); ok {
api.TrimHistoryNode(
ctx,
shard,
shardContext,
workflowConsistencyChecker,
eventNotifier,
namespaceID.String(),
Expand All @@ -181,10 +181,10 @@ func Invoke(
var historyBlob []*commonpb.DataBlob
if isCloseEventOnly {
if !isWorkflowRunning {
if shard.GetConfig().SendRawWorkflowHistory(request.Request.GetNamespace()) {
if shardContext.GetConfig().SendRawWorkflowHistory(request.Request.GetNamespace()) {
historyBlob, _, err = api.GetRawHistory(
ctx,
shard,
shardContext,
namespaceID,
*execution,
lastFirstEventID,
Expand All @@ -203,7 +203,7 @@ func Invoke(
} else {
history, _, err = api.GetHistory(
ctx,
shard,
shardContext,
namespaceID,
*execution,
lastFirstEventID,
Expand Down Expand Up @@ -236,10 +236,10 @@ func Invoke(
continuationToken = nil
}
} else {
if shard.GetConfig().SendRawWorkflowHistory(request.Request.GetNamespace()) {
if shardContext.GetConfig().SendRawWorkflowHistory(request.Request.GetNamespace()) {
historyBlob, continuationToken.PersistenceToken, err = api.GetRawHistory(
ctx,
shard,
shardContext,
namespaceID,
*execution,
continuationToken.FirstEventId,
Expand All @@ -252,7 +252,7 @@ func Invoke(
} else {
history, continuationToken.PersistenceToken, err = api.GetHistory(
ctx,
shard,
shardContext,
namespaceID,
*execution,
continuationToken.FirstEventId,
Expand Down
9 changes: 5 additions & 4 deletions service/history/api/getworkflowexecutionhistoryreverse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/historyservice/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
Expand All @@ -44,7 +45,7 @@ import (

func Invoke(
ctx context.Context,
shard shard.Context,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
eventNotifier events.Notifier,
request *historyservice.GetWorkflowExecutionHistoryReverseRequest,
Expand All @@ -64,13 +65,13 @@ func Invoke(
) ([]byte, string, int64, error) {
response, err := api.GetOrPollMutableState(
ctx,
shardContext,
&historyservice.GetMutableStateRequest{
NamespaceId: namespaceUUID.String(),
Execution: execution,
ExpectedNextEventId: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
},
shard,
workflowConsistencyChecker,
eventNotifier,
)
Expand Down Expand Up @@ -124,7 +125,7 @@ func Invoke(
if _, ok := retError.(*serviceerror.DataLoss); ok {
api.TrimHistoryNode(
ctx,
shard,
shardContext,
workflowConsistencyChecker,
eventNotifier,
namespaceID.String(),
Expand All @@ -139,7 +140,7 @@ func Invoke(
// return all events
history, continuationToken.PersistenceToken, continuationToken.NextEventId, err = api.GetHistoryReverse(
ctx,
shard,
shardContext,
namespaceID,
*execution,
continuationToken.NextEventId,
Expand Down
Loading

0 comments on commit c9d9a93

Please sign in to comment.