Skip to content

Commit

Permalink
Allow setting default step drop duration in cluster config
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Sep 8, 2020
1 parent f615dd3 commit b31ee95
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func init() {
applyCmd.Flags().StringVar(
&applyConfig.retentionDropStepDurationStr,
"retention-drop-step-duration",
os.Getenv("TOPICCTL_APPLY_RETENTION_DROP_STEP_DURATION"),
"",
"Amount of time to use for retention drop steps",
)
applyCmd.Flags().BoolVar(
Expand Down
13 changes: 12 additions & 1 deletion pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,20 @@ func (t *TopicApplier) updateSettings(
return err
}

var retentionDropStepDuration time.Duration
if t.config.RetentionDropStepDuration != 0 {
retentionDropStepDuration = t.config.RetentionDropStepDuration
} else {
var err error
retentionDropStepDuration, err = t.config.ClusterConfig.GetDefaultRetentionDropStepDuration()
if err != nil {
return err
}
}

reduced, err := topicSettings.ReduceRetentionDrop(
topicInfo.Config,
t.config.RetentionDropStepDuration,
retentionDropStepDuration,
)
if err != nil {
return err
Expand Down
22 changes: 22 additions & 0 deletions pkg/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"context"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -68,6 +70,10 @@ type ClusterSpec struct {
// DefaultThrottleMB is the default broker throttle used for migrations in this
// cluster. If unset, then a reasonable default is used instead.
DefaultThrottleMB int64 `json:"defaultThrottleMB"`

// DefaultRetentionDropStepDuration is the default amount of time that retention drops will be
// limited by. If unset, no retention drop limiting will be applied.
DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"`
}

// Validate evaluates whether the cluster config is valid.
Expand Down Expand Up @@ -98,9 +104,25 @@ func (c ClusterConfig) Validate() error {
multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2"))
}

_, parseErr := c.GetDefaultRetentionDropStepDuration()
if parseErr != nil {
err = multierror.Append(
err,
fmt.Errorf("Error parsing retention drop step retention: %+v", parseErr),
)
}

return err
}

func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error) {
if c.Spec.DefaultRetentionDropStepDurationStr == "" {
return 0, nil
}

return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr)
}

// NewAdminClient returns a new admin client using the parameters in the current cluster config.
func (c ClusterConfig) NewAdminClient(
ctx context.Context,
Expand Down
25 changes: 22 additions & 3 deletions pkg/config/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func TestClusterValidate(t *testing.T) {
Description: "test-description",
},
Spec: ClusterSpec{
BootstrapAddrs: []string{"broker-addr"},
ZKAddrs: []string{"zk-addr"},
VersionMajor: "v2",
BootstrapAddrs: []string{"broker-addr"},
ZKAddrs: []string{"zk-addr"},
VersionMajor: "v2",
DefaultRetentionDropStepDurationStr: "5m",
},
},
expError: false,
Expand Down Expand Up @@ -78,6 +79,24 @@ func TestClusterValidate(t *testing.T) {
},
expError: true,
},
{
description: "bad retention drop format",
clusterConfig: ClusterConfig{
Meta: ClusterMeta{
Name: "test-cluster",
Region: "test-region",
Environment: "test-environment",
Description: "test-description",
},
Spec: ClusterSpec{
BootstrapAddrs: []string{"broker-addr"},
ZKAddrs: []string{"zk-addr"},
VersionMajor: "v2",
DefaultRetentionDropStepDurationStr: "10xxx",
},
},
expError: true,
},
}

for _, testCase := range testCases {
Expand Down

0 comments on commit b31ee95

Please sign in to comment.