diff --git a/README.md b/README.md index cc4a62d..214f31e 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,27 @@ The echo method uses the following configuration (0.3, 100) and other methods us } } ``` + +##### Degradation: Category=degradation + +[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30) + +| Variable |Introduction| +|------------|----| +| enable | Whether to enable degradation| +| percentage | The percentage of dropped requests| + +Example: + +The all requests uses the following configuration (true, 50) +> configDataId: `ClientName.ServiecName.degradation` + +```json +{ + "enable": true, + "percentage": 50 +} +``` ### More Info Refer to [example](https://github.com/kitex-contrib/config-nacos/tree/main/example) for more usage. diff --git a/README_CN.md b/README_CN.md index 0f34f38..ccc82e7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -262,6 +262,26 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认 } ``` +##### 降级: Category=degradation + +[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30) + +| 参数 | 说明 | +|------------|--------| +| enable | 是否开启降级 | +| percentage | 请求丢弃比例 | +例子: + +客户端所有请求使用以下限流配置 (true, 50) +> configDataId: `ClientName.ServiecName.degradation` + +```json +{ + "enable": true, + "percentage": 50 +} +``` + ### 更多信息 更多示例请参考 [example](https://github.com/kitex-contrib/config-nacos/tree/main/example) diff --git a/client/degradation.go b/client/degradation.go new file mode 100644 index 0000000..8cb8dc6 --- /dev/null +++ b/client/degradation.go @@ -0,0 +1,78 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/klog" + "github.com/kitex-contrib/config-nacos/pkg/degradation" + "github.com/nacos-group/nacos-sdk-go/vo" + + "github.com/kitex-contrib/config-nacos/nacos" + "github.com/kitex-contrib/config-nacos/utils" +) + +// WithDegradation sets the degradation policy from nacos configuration center. +func WithDegradation(dest, src string, nacosClient nacos.Client, opts utils.Options) []client.Option { + param, err := nacosClient.ClientConfigParam(&nacos.ConfigParamConfig{ + Category: degradationName, + ServerServiceName: dest, + ClientServiceName: src, + }) + if err != nil { + panic(err) + } + + for _, f := range opts.NacosCustomFunctions { + f(¶m) + } + + uniqueID := nacos.GetUniqueID() + + degradationContainer := initDegradation(param, dest, src, nacosClient, uniqueID) + + return []client.Option{ + client.WithACLRules(degradationContainer.GetACLRule()), + client.WithCloseCallbacks(func() error { + err := nacosClient.DeregisterConfig(param, uniqueID) + if err != nil { + return err + } + // cancel the configuration listener when client is closed. + return nil + }), + } +} + +func initDegradation(param vo.ConfigParam, dest, src string, + nacosClient nacos.Client, uniqueID int64, +) *degradation.Container { + degradationContainer := degradation.NewDegradationContainer() + + onChangeCallback := func(data string, parser nacos.ConfigParser) { + config := °radation.Config{} + err := parser.Decode(param.Type, data, config) + if err != nil { + klog.Warnf("[nacos] %s client nacos rpc degradation: unmarshal data %s failed: %s, skip...", dest, data, err) + return + } + // update degradation config + degradationContainer.NotifyPolicyChange(config) + } + + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) + + return degradationContainer +} diff --git a/client/suite.go b/client/suite.go index 0779137..9de116c 100644 --- a/client/suite.go +++ b/client/suite.go @@ -24,6 +24,7 @@ const ( retryConfigName = "retry" rpcTimeoutConfigName = "rpc_timeout" circuitBreakerConfigName = "circuit_break" + degradationName = "degradation" ) // NacosClientSuite nacos client config suite, configure retry timeout limit and circuitbreak dynamically from nacos. @@ -53,5 +54,6 @@ func (s *NacosClientSuite) Options() []client.Option { opts = append(opts, WithRetryPolicy(s.service, s.client, s.nacosClient, s.opts)...) opts = append(opts, WithRPCTimeout(s.service, s.client, s.nacosClient, s.opts)...) opts = append(opts, WithCircuitBreaker(s.service, s.client, s.nacosClient, s.opts)...) + opts = append(opts, WithDegradation(s.service, s.client, s.nacosClient, s.opts)...) return opts } diff --git a/pkg/degradation/degradation.go b/pkg/degradation/degradation.go new file mode 100644 index 0000000..68cf10a --- /dev/null +++ b/pkg/degradation/degradation.go @@ -0,0 +1,90 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/bytedance/gopkg/lang/fastrand" + "github.com/cloudwego/kitex/pkg/acl" + "github.com/pkg/errors" +) + +var errorDegradation = errors.New("rejected by client degradation config") + +// DegradationConfig is policy config of degradator. +// DON'T FORGET to update DeepCopy() and Equals() if you add new fields. +type Config struct { + Enable bool `json:"enable"` + Percentage int `json:"percentage"` +} + +type Container struct { + sync.RWMutex + config atomic.Value +} + +var defaultConfig = &Config{Enable: false, Percentage: 0} + +// GetDefaultDegradationConfig return defaultConfig of degradation. +func GetDefaultDegradationConfig() *Config { + return defaultConfig.DeepCopy() +} + +func NewDegradationContainer() *Container { + degradationContainer := &Container{} + degradationContainer.config.Store(GetDefaultDegradationConfig()) + return degradationContainer +} + +func (s *Container) NotifyPolicyChange(cfg *Config) { + s.config.Store(cfg) +} + +func (s *Container) GetACLRule() acl.RejectFunc { + return func(ctx context.Context, request interface{}) (reason error) { + config := s.config.Load().(*Config) + if !config.Enable { + return nil + } + if fastrand.Intn(100) < config.Percentage { + return errorDegradation + } + return nil + } +} + +// DeepCopy returns a full copy of DegradationConfig. +func (c *Config) DeepCopy() *Config { + if c == nil { + return nil + } + return &Config{ + Enable: c.Enable, + Percentage: c.Percentage, + } +} + +func (c *Config) Equals(other *Config) bool { + if c == nil && other == nil { + return true + } + if c == nil || other == nil { + return false + } + return c.Enable == other.Enable && c.Percentage == other.Percentage +} diff --git a/pkg/degradation/degradation_test.go b/pkg/degradation/degradation_test.go new file mode 100644 index 0000000..de16c7f --- /dev/null +++ b/pkg/degradation/degradation_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "testing" + + "github.com/cloudwego/kitex/pkg/acl" + "github.com/cloudwego/thriftgo/pkg/test" +) + +var errFake = errors.New("fake error") + +func invoke(ctx context.Context, request, response interface{}) error { + return errFake +} + +func TestNewContainer(t *testing.T) { + container := NewDegradationContainer() + aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetACLRule()}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&Config{Enable: false, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&Config{Enable: true, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errorDegradation)) +} diff --git a/v2/client/degradation.go b/v2/client/degradation.go new file mode 100644 index 0000000..cd750a8 --- /dev/null +++ b/v2/client/degradation.go @@ -0,0 +1,78 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/klog" + "github.com/kitex-contrib/config-nacos/v2/pkg/degradation" + "github.com/nacos-group/nacos-sdk-go/v2/vo" + + "github.com/kitex-contrib/config-nacos/v2/nacos" + "github.com/kitex-contrib/config-nacos/v2/utils" +) + +// WithDegradation sets the degradation policy from nacos configuration center. +func WithDegradation(dest, src string, nacosClient nacos.Client, opts utils.Options) []client.Option { + param, err := nacosClient.ClientConfigParam(&nacos.ConfigParamConfig{ + Category: degradationName, + ServerServiceName: dest, + ClientServiceName: src, + }) + if err != nil { + panic(err) + } + + for _, f := range opts.NacosCustomFunctions { + f(¶m) + } + + uniqueID := nacos.GetUniqueID() + + degradationContainer := initDegradation(param, dest, src, nacosClient, uniqueID) + + return []client.Option{ + client.WithACLRules(degradationContainer.GetACLRule()), + client.WithCloseCallbacks(func() error { + err := nacosClient.DeregisterConfig(param, uniqueID) + if err != nil { + return err + } + // cancel the configuration listener when client is closed. + return nil + }), + } +} + +func initDegradation(param vo.ConfigParam, dest, src string, + nacosClient nacos.Client, uniqueID int64, +) *degradation.Container { + degradationContainer := degradation.NewDegradationContainer() + + onChangeCallback := func(data string, parser nacos.ConfigParser) { + config := °radation.Config{} + err := parser.Decode(param.Type, data, config) + if err != nil { + klog.Warnf("[nacos] %s client nacos rpc degradation: unmarshal data %s failed: %s, skip...", dest, data, err) + return + } + // update degradation config + degradationContainer.NotifyPolicyChange(config) + } + + nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID) + + return degradationContainer +} diff --git a/v2/client/suite.go b/v2/client/suite.go index 2a2f8af..9f60504 100644 --- a/v2/client/suite.go +++ b/v2/client/suite.go @@ -24,6 +24,7 @@ const ( retryConfigName = "retry" rpcTimeoutConfigName = "rpc_timeout" circuitBreakerConfigName = "circuit_break" + degradationName = "degradation" ) // NacosClientSuite nacos client config suite, configure retry timeout limit and circuitbreak dynamically from nacos. @@ -53,5 +54,6 @@ func (s *NacosClientSuite) Options() []client.Option { opts = append(opts, WithRetryPolicy(s.service, s.client, s.nacosClient, s.opts)...) opts = append(opts, WithRPCTimeout(s.service, s.client, s.nacosClient, s.opts)...) opts = append(opts, WithCircuitBreaker(s.service, s.client, s.nacosClient, s.opts)...) + opts = append(opts, WithDegradation(s.service, s.client, s.nacosClient, s.opts)...) return opts } diff --git a/v2/go.mod b/v2/go.mod index 78eeb47..d5d660c 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -77,4 +77,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/apache/thrift => github.com/apache/thrift v0.13.0 +replace github.com/apache/thrift => github.com/apache/thrift v0.13.0 \ No newline at end of file diff --git a/v2/go.sum b/v2/go.sum index 70e6635..7892d37 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -788,4 +788,4 @@ rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= -sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= \ No newline at end of file diff --git a/v2/pkg/degradation/degradation.go b/v2/pkg/degradation/degradation.go new file mode 100644 index 0000000..68cf10a --- /dev/null +++ b/v2/pkg/degradation/degradation.go @@ -0,0 +1,90 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/bytedance/gopkg/lang/fastrand" + "github.com/cloudwego/kitex/pkg/acl" + "github.com/pkg/errors" +) + +var errorDegradation = errors.New("rejected by client degradation config") + +// DegradationConfig is policy config of degradator. +// DON'T FORGET to update DeepCopy() and Equals() if you add new fields. +type Config struct { + Enable bool `json:"enable"` + Percentage int `json:"percentage"` +} + +type Container struct { + sync.RWMutex + config atomic.Value +} + +var defaultConfig = &Config{Enable: false, Percentage: 0} + +// GetDefaultDegradationConfig return defaultConfig of degradation. +func GetDefaultDegradationConfig() *Config { + return defaultConfig.DeepCopy() +} + +func NewDegradationContainer() *Container { + degradationContainer := &Container{} + degradationContainer.config.Store(GetDefaultDegradationConfig()) + return degradationContainer +} + +func (s *Container) NotifyPolicyChange(cfg *Config) { + s.config.Store(cfg) +} + +func (s *Container) GetACLRule() acl.RejectFunc { + return func(ctx context.Context, request interface{}) (reason error) { + config := s.config.Load().(*Config) + if !config.Enable { + return nil + } + if fastrand.Intn(100) < config.Percentage { + return errorDegradation + } + return nil + } +} + +// DeepCopy returns a full copy of DegradationConfig. +func (c *Config) DeepCopy() *Config { + if c == nil { + return nil + } + return &Config{ + Enable: c.Enable, + Percentage: c.Percentage, + } +} + +func (c *Config) Equals(other *Config) bool { + if c == nil && other == nil { + return true + } + if c == nil || other == nil { + return false + } + return c.Enable == other.Enable && c.Percentage == other.Percentage +} diff --git a/v2/pkg/degradation/degradation_test.go b/v2/pkg/degradation/degradation_test.go new file mode 100644 index 0000000..de16c7f --- /dev/null +++ b/v2/pkg/degradation/degradation_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "testing" + + "github.com/cloudwego/kitex/pkg/acl" + "github.com/cloudwego/thriftgo/pkg/test" +) + +var errFake = errors.New("fake error") + +func invoke(ctx context.Context, request, response interface{}) error { + return errFake +} + +func TestNewContainer(t *testing.T) { + container := NewDegradationContainer() + aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetACLRule()}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&Config{Enable: false, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&Config{Enable: true, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errorDegradation)) +}