diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 832f5a22..0d572a81 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -88,7 +88,7 @@ func init() { applyCmd.Flags().StringVar( &applyConfig.retentionDropStepDurationStr, "retention-drop-step-duration", - os.Getenv("TOPICCTL_APPLY_RETENTION_DROP_STEP_DURATION"), + "", "Amount of time to use for retention drop steps", ) applyCmd.Flags().BoolVar( diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 76ddf3a5..88906a94 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -356,9 +356,20 @@ func (t *TopicApplier) updateSettings( return err } + var retentionDropStepDuration time.Duration + if t.config.RetentionDropStepDuration != 0 { + retentionDropStepDuration = t.config.RetentionDropStepDuration + } else { + var err error + retentionDropStepDuration, err = t.config.ClusterConfig.GetDefaultRetentionDropStepDuration() + if err != nil { + return err + } + } + reduced, err := topicSettings.ReduceRetentionDrop( topicInfo.Config, - t.config.RetentionDropStepDuration, + retentionDropStepDuration, ) if err != nil { return err diff --git a/pkg/config/cluster.go b/pkg/config/cluster.go index 64348d81..a80be41d 100644 --- a/pkg/config/cluster.go +++ b/pkg/config/cluster.go @@ -3,6 +3,8 @@ package config import ( "context" "errors" + "fmt" + "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/hashicorp/go-multierror" @@ -68,6 +70,10 @@ type ClusterSpec struct { // DefaultThrottleMB is the default broker throttle used for migrations in this // cluster. If unset, then a reasonable default is used instead. DefaultThrottleMB int64 `json:"defaultThrottleMB"` + + // DefaultRetentionDropStepDuration is the default amount of time that retention drops will be + // limited by. If unset, no retention drop limiting will be applied. + DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"` } // Validate evaluates whether the cluster config is valid. @@ -98,9 +104,25 @@ func (c ClusterConfig) Validate() error { multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2")) } + _, parseErr := c.GetDefaultRetentionDropStepDuration() + if parseErr != nil { + err = multierror.Append( + err, + fmt.Errorf("Error parsing retention drop step retention: %+v", parseErr), + ) + } + return err } +func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error) { + if c.Spec.DefaultRetentionDropStepDurationStr == "" { + return 0, nil + } + + return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr) +} + // NewAdminClient returns a new admin client using the parameters in the current cluster config. func (c ClusterConfig) NewAdminClient( ctx context.Context, diff --git a/pkg/config/cluster_test.go b/pkg/config/cluster_test.go index 34dbb374..9e0b3a40 100644 --- a/pkg/config/cluster_test.go +++ b/pkg/config/cluster_test.go @@ -24,9 +24,10 @@ func TestClusterValidate(t *testing.T) { Description: "test-description", }, Spec: ClusterSpec{ - BootstrapAddrs: []string{"broker-addr"}, - ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + VersionMajor: "v2", + DefaultRetentionDropStepDurationStr: "5m", }, }, expError: false, @@ -78,6 +79,24 @@ func TestClusterValidate(t *testing.T) { }, expError: true, }, + { + description: "bad retention drop format", + clusterConfig: ClusterConfig{ + Meta: ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + Description: "test-description", + }, + Spec: ClusterSpec{ + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + VersionMajor: "v2", + DefaultRetentionDropStepDurationStr: "10xxx", + }, + }, + expError: true, + }, } for _, testCase := range testCases {