Skip to content

Commit

Permalink
Delete Namespace: read cache refresh interval from DC (#7215)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Delete Namespace: read cache refresh interval from dynamic config
instead of hardcoded const.

## Why?
<!-- Tell your future self why have you made these changes -->
`cacheRefreshInterval` used to be a const, and it was kinda ok to
duplicate it in delete namespace WF code. But now it is coming from
dynamic config, and WF should read it from there too.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Modified existing unit tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No risks.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Feb 3, 2025
1 parent 020195d commit 01a9c41
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 23 deletions.
4 changes: 3 additions & 1 deletion service/worker/deletenamespace/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
allowDeleteNamespaceIfNexusEndpointTarget dynamicconfig.BoolPropertyFn
nexusEndpointListDefaultPageSize dynamicconfig.IntPropertyFn
deleteActivityRPS dynamicconfig.TypedSubscribable[int]
namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
}
componentParams struct {
fx.In
Expand Down Expand Up @@ -92,6 +93,7 @@ func newComponent(
allowDeleteNamespaceIfNexusEndpointTarget: dynamicconfig.AllowDeleteNamespaceIfNexusEndpointTarget.Get(params.DynamicCollection),
nexusEndpointListDefaultPageSize: dynamicconfig.NexusEndpointListDefaultPageSize.Get(params.DynamicCollection),
deleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Subscribe(params.DynamicCollection),
namespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(params.DynamicCollection),
}
}

Expand Down Expand Up @@ -145,7 +147,7 @@ func (wc *deleteNamespaceComponent) reclaimResourcesActivities() *reclaimresourc
}

func (wc *deleteNamespaceComponent) reclaimResourcesLocalActivities() *reclaimresources.LocalActivities {
return reclaimresources.NewLocalActivities(wc.visibilityManager, wc.metadataManager, wc.logger)
return reclaimresources.NewLocalActivities(wc.visibilityManager, wc.metadataManager, wc.namespaceCacheRefreshInterval, wc.logger)
}

func (wc *deleteNamespaceComponent) deleteExecutionsActivities() *deleteexecutions.Activities {
Expand Down
17 changes: 13 additions & 4 deletions service/worker/deletenamespace/reclaimresources/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ package reclaimresources

import (
"context"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand All @@ -42,15 +43,16 @@ import (
type (
Activities struct {
visibilityManager manager.VisibilityManager
metricsHandler metrics.Handler
logger log.Logger
}

LocalActivities struct {
visibilityManager manager.VisibilityManager
metadataManager persistence.MetadataManager
metricsHandler metrics.Handler
logger log.Logger

namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn

logger log.Logger
}
)

Expand All @@ -67,12 +69,15 @@ func NewActivities(
func NewLocalActivities(
visibilityManager manager.VisibilityManager,
metadataManager persistence.MetadataManager,
namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn,
logger log.Logger,
) *LocalActivities {
return &LocalActivities{
visibilityManager: visibilityManager,
metadataManager: metadataManager,
logger: logger,

namespaceCacheRefreshInterval: namespaceCacheRefreshInterval,
}
}

Expand Down Expand Up @@ -176,3 +181,7 @@ func (a *LocalActivities) DeleteNamespaceActivity(ctx context.Context, nsID name
logger.Info("Namespace is deleted.")
return nil
}

func (a *LocalActivities) GetNamespaceCacheRefreshInterval(_ context.Context) (time.Duration, error) {
return a.namespaceCacheRefreshInterval(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/searchattribute"
Expand All @@ -53,7 +52,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_NoExecutions(t *testing.T) {

a := &Activities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}

Expand All @@ -78,7 +76,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_ExecutionsExist(t *testing.T)

a := &Activities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)
Expand Down Expand Up @@ -107,7 +104,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_NotDeletedExecutionsExist(t *t

a := &Activities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)
Expand Down
13 changes: 9 additions & 4 deletions service/worker/deletenamespace/reclaimresources/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ import (

const (
WorkflowName = "temporal-sys-reclaim-namespace-resources-workflow"

namespaceCacheRefreshDelay = 11 * time.Second
)

type (
Expand Down Expand Up @@ -180,8 +178,15 @@ func ReclaimResourcesWorkflow(ctx workflow.Context, params ReclaimResourcesParam
var la *LocalActivities

// Step 0. This workflow is started right after the namespace is marked as DELETED and renamed.
// Wait for namespace cache refresh to make sure no new executions are created.
err = workflow.Sleep(ctx, namespaceCacheRefreshDelay)
// Wait for namespace cache refresh to make sure no new executions are created. 2 secodnds is a random buffer.
ctx0 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
var namespaceCacheRefreshDelay time.Duration
err = workflow.ExecuteLocalActivity(ctx0, la.GetNamespaceCacheRefreshInterval).Get(ctx, &namespaceCacheRefreshDelay)
if err != nil {
return result, err
}

err = workflow.Sleep(ctx, namespaceCacheRefreshDelay+2*time.Second)
if err != nil {
return result, err
}
Expand Down
25 changes: 15 additions & 10 deletions service/worker/deletenamespace/reclaimresources/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -70,6 +70,7 @@ func Test_ReclaimResourcesWorkflow_Success(t *testing.T) {
ErrorCount: 0,
}, nil).Once()

env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once()
env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).Return(nil).Once()

Expand Down Expand Up @@ -120,6 +121,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_Error(t *testing.T
ErrorCount: 0,
}, nil).Once()

env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once()
env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(stderrors.New("specific_error_from_activity")).
Expand Down Expand Up @@ -168,6 +170,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi
ErrorCount: 0,
}, nil).Once()

env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once()
env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(errors.NewExecutionsStillExist(1)).
Expand Down Expand Up @@ -233,16 +236,17 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) {

a := &Activities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}
la := &LocalActivities{
visibilityManager: visibilityManager,
metadataManager: metadataManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
visibilityManager: visibilityManager,
metadataManager: metadataManager,
namespaceCacheRefreshInterval: dynamicconfig.GetDurationPropertyFn(10 * time.Second),

logger: log.NewTestLogger(),
}

env.RegisterActivity(la.GetNamespaceCacheRefreshInterval)
env.RegisterActivity(la.CountExecutionsAdvVisibilityActivity)
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)
env.RegisterActivity(la.DeleteNamespaceActivity)
Expand Down Expand Up @@ -312,15 +316,15 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_NoProgressMade(t *testing.T)

a := &Activities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
}
la := &LocalActivities{
visibilityManager: visibilityManager,
metricsHandler: metrics.NoopMetricsHandler,
logger: log.NewTestLogger(),
visibilityManager: visibilityManager,
namespaceCacheRefreshInterval: dynamicconfig.GetDurationPropertyFn(10 * time.Second),
logger: log.NewTestLogger(),
}

env.RegisterActivity(la.GetNamespaceCacheRefreshInterval)
env.RegisterActivity(la.CountExecutionsAdvVisibilityActivity)
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)

Expand Down Expand Up @@ -373,6 +377,7 @@ func Test_ReclaimResourcesWorkflow_UpdateDeleteDelay(t *testing.T) {
ErrorCount: 0,
}, nil).Once()

env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once()
env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).Return(nil).Once()

Expand Down

0 comments on commit 01a9c41

Please sign in to comment.