Skip to content

Commit

Permalink
fix: fix load variable from env (#2787)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored and ngjaying committed Apr 22, 2024
1 parent f126a29 commit 030f1f4
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 35 deletions.
26 changes: 13 additions & 13 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func InitConf() {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down Expand Up @@ -422,29 +422,29 @@ func ValidateRuleOption(option *api.RuleOption) error {
Log.Warnf("lateTol is negative, set to 1000")
errs = errors.Join(errs, errors.New("invalidLateTol:lateTol must be greater than 0"))
}
if option.Restart != nil {
if option.Restart.Multiplier <= 0 {
option.Restart.Multiplier = 2
if option.RestartStrategy != nil {
if option.RestartStrategy.Multiplier <= 0 {
option.RestartStrategy.Multiplier = 2
Log.Warnf("restart multiplier is negative, set to 2")
errs = errors.Join(errs, errors.New("invalidRestartMultiplier:restart multiplier must be greater than 0"))
}
if option.Restart.Attempts < 0 {
option.Restart.Attempts = 0
if option.RestartStrategy.Attempts < 0 {
option.RestartStrategy.Attempts = 0
Log.Warnf("restart attempts is negative, set to 0")
errs = errors.Join(errs, errors.New("invalidRestartAttempts:restart attempts must be greater than 0"))
}
if option.Restart.Delay <= 0 {
option.Restart.Delay = 1000
if option.RestartStrategy.Delay <= 0 {
option.RestartStrategy.Delay = 1000
Log.Warnf("restart delay is negative, set to 1000")
errs = errors.Join(errs, errors.New("invalidRestartDelay:restart delay must be greater than 0"))
}
if option.Restart.MaxDelay <= 0 {
option.Restart.MaxDelay = option.Restart.Delay
Log.Warnf("restart maxDelay is negative, set to %d", option.Restart.Delay)
if option.RestartStrategy.MaxDelay <= 0 {
option.RestartStrategy.MaxDelay = option.RestartStrategy.Delay
Log.Warnf("restart maxDelay is negative, set to %d", option.RestartStrategy.Delay)
errs = errors.Join(errs, errors.New("invalidRestartMaxDelay:restart maxDelay must be greater than 0"))
}
if option.Restart.JitterFactor <= 0 || option.Restart.JitterFactor >= 1 {
option.Restart.JitterFactor = 0.1
if option.RestartStrategy.JitterFactor <= 0 || option.RestartStrategy.JitterFactor >= 1 {
option.RestartStrategy.JitterFactor = 0.1
Log.Warnf("restart jitterFactor must between 0 and 1, set to 0.1")
errs = errors.Join(errs, errors.New("invalidRestartJitterFactor:restart jitterFactor must between [0, 1)"))
}
Expand Down
28 changes: 20 additions & 8 deletions internal/conf/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package conf
import (
"errors"
"fmt"
"os"
"path"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/api"
)
Expand Down Expand Up @@ -102,7 +105,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 1,
Expand All @@ -116,7 +119,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 1,
Expand All @@ -132,7 +135,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 3,
Delay: 1000,
Multiplier: 1,
Expand All @@ -146,7 +149,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 3,
Delay: 1000,
Multiplier: 1,
Expand All @@ -162,7 +165,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 3,
Delay: 1000,
Multiplier: 1.5,
Expand All @@ -176,7 +179,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 3,
Delay: 1000,
Multiplier: 1.5,
Expand All @@ -192,7 +195,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: -2,
Delay: 0,
Multiplier: 0,
Expand All @@ -206,7 +209,7 @@ func TestRuleOptionValidate(t *testing.T) {
BufferLength: 1024,
CheckpointInterval: 300000, // 5 minutes
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down Expand Up @@ -422,3 +425,12 @@ func TestSyslogConf_Validate(t *testing.T) {
})
}
}

func TestLoad(t *testing.T) {
require.NoError(t, os.Setenv("KUIPER__RULE__RESTARTSTRATEGY__ATTEMPTS", "10"))
InitConf()
cpath, err := GetConfLoc()
require.NoError(t, err)
LoadConfigFromPath(path.Join(cpath, ConfFileName), &Config)
require.Equal(t, 10, Config.Rule.RestartStrategy.Attempts)
}
12 changes: 6 additions & 6 deletions internal/processor/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ func clone(opt api.RuleOption) *api.RuleOption {
SendError: opt.SendError,
Qos: opt.Qos,
CheckpointInterval: opt.CheckpointInterval,
Restart: &api.RestartStrategy{
Attempts: opt.Restart.Attempts,
Delay: opt.Restart.Delay,
Multiplier: opt.Restart.Multiplier,
MaxDelay: opt.Restart.MaxDelay,
JitterFactor: opt.Restart.JitterFactor,
RestartStrategy: &api.RestartStrategy{
Attempts: opt.RestartStrategy.Attempts,
Delay: opt.RestartStrategy.Delay,
Multiplier: opt.RestartStrategy.Multiplier,
MaxDelay: opt.RestartStrategy.MaxDelay,
JitterFactor: opt.RestartStrategy.JitterFactor,
},
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/processor/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
Qos: api.AtMostOnce,
CheckpointInterval: 300000,
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 20,
Delay: 1000,
Multiplier: 2,
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
Qos: api.ExactlyOnce,
CheckpointInterval: 60000,
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestRuleActionParse_Apply(t *testing.T) {
Qos: api.AtMostOnce,
CheckpointInterval: 300000,
SendError: true,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var defaultOption = &api.RuleOption{
SendError: true,
Qos: api.AtMostOnce,
CheckpointInterval: 300000,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/rule/ruleState.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (rs *RuleState) runTopo(ctx context.Context) {
// Load the changeable states once
rs.Lock()
tp := rs.Topology
option := rs.Rule.Options.Restart
option := rs.Rule.Options.RestartStrategy
rs.Unlock()
if tp == nil {
conf.Log.Warnf("rule %s is not initialized or just stopped", rs.RuleId)
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/rule/ruleState_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var defaultOption = &api.RuleOption{
SendError: true,
Qos: api.AtMostOnce,
CheckpointInterval: 300000,
Restart: &api.RestartStrategy{
RestartStrategy: &api.RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ type RuleOption struct {
SendError bool `json:"sendError" yaml:"sendError"`
Qos Qos `json:"qos" yaml:"qos"`
CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
Restart *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
RestartStrategy *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
Cron string `json:"cron" yaml:"cron"`
Duration string `json:"duration" yaml:"duration"`
CronDatetimeRange []DatetimeRange `json:"cronDatetimeRange" yaml:"cronDatetimeRange"`
Expand Down Expand Up @@ -265,7 +265,7 @@ func GetDefaultRule(name, sql string) *Rule {
SendError: true,
Qos: AtMostOnce,
CheckpointInterval: 300000,
Restart: &RestartStrategy{
RestartStrategy: &RestartStrategy{
Attempts: 0,
Delay: 1000,
Multiplier: 2,
Expand Down

0 comments on commit 030f1f4

Please sign in to comment.