Skip to content

Commit

Permalink
feat: support degradation config for Kitex client (#25)
Browse files Browse the repository at this point in the history
* add new

* feat: support degradation nacos-config for Kitex client

* feat: support nacos degradation

* feat: support nacos degradation

* feat: support nacos degradation

* feat: support nacos degradation

* feat: support nacos degradation

* feat: support nacos degradation

* rollback go.mod/sum files

* feat: retrun DeepCooy of defaultConfig

* feat: update README file
  • Loading branch information
Madxf authored Jun 7, 2024
1 parent 698e368 commit 7c63f1d
Show file tree
Hide file tree
Showing 12 changed files with 463 additions and 2 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ The echo method uses the following configuration (0.3, 100) and other methods us
}
}
```

##### Degradation: Category=degradation

[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30)

| Variable |Introduction|
|------------|----|
| enable | Whether to enable degradation|
| percentage | The percentage of dropped requests|

Example:

The all requests uses the following configuration (true, 50)
> configDataId: `ClientName.ServiecName.degradation`
```json
{
"enable": true,
"percentage": 50
}
```
### More Info

Refer to [example](https://github.com/kitex-contrib/config-nacos/tree/main/example) for more usage.
Expand Down
20 changes: 20 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,26 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认
}
```

##### 降级: Category=degradation

[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/#L30)

| 参数 | 说明 |
|------------|--------|
| enable | 是否开启降级 |
| percentage | 请求丢弃比例 |
例子:

客户端所有请求使用以下限流配置 (true, 50)
> configDataId: `ClientName.ServiecName.degradation`
```json
{
"enable": true,
"percentage": 50
}
```

### 更多信息

更多示例请参考 [example](https://github.com/kitex-contrib/config-nacos/tree/main/example)
Expand Down
78 changes: 78 additions & 0 deletions client/degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/kitex-contrib/config-nacos/pkg/degradation"
"github.com/nacos-group/nacos-sdk-go/vo"

"github.com/kitex-contrib/config-nacos/nacos"
"github.com/kitex-contrib/config-nacos/utils"
)

// WithDegradation sets the degradation policy from nacos configuration center.
func WithDegradation(dest, src string, nacosClient nacos.Client, opts utils.Options) []client.Option {
param, err := nacosClient.ClientConfigParam(&nacos.ConfigParamConfig{
Category: degradationName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.NacosCustomFunctions {
f(&param)
}

uniqueID := nacos.GetUniqueID()

degradationContainer := initDegradation(param, dest, src, nacosClient, uniqueID)

return []client.Option{
client.WithACLRules(degradationContainer.GetACLRule()),
client.WithCloseCallbacks(func() error {
err := nacosClient.DeregisterConfig(param, uniqueID)
if err != nil {
return err
}
// cancel the configuration listener when client is closed.
return nil
}),
}
}

func initDegradation(param vo.ConfigParam, dest, src string,
nacosClient nacos.Client, uniqueID int64,
) *degradation.Container {
degradationContainer := degradation.NewDegradationContainer()

onChangeCallback := func(data string, parser nacos.ConfigParser) {
config := &degradation.Config{}
err := parser.Decode(param.Type, data, config)
if err != nil {
klog.Warnf("[nacos] %s client nacos rpc degradation: unmarshal data %s failed: %s, skip...", dest, data, err)
return
}
// update degradation config
degradationContainer.NotifyPolicyChange(config)
}

nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID)

return degradationContainer
}
2 changes: 2 additions & 0 deletions client/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
retryConfigName = "retry"
rpcTimeoutConfigName = "rpc_timeout"
circuitBreakerConfigName = "circuit_break"
degradationName = "degradation"
)

// NacosClientSuite nacos client config suite, configure retry timeout limit and circuitbreak dynamically from nacos.
Expand Down Expand Up @@ -53,5 +54,6 @@ func (s *NacosClientSuite) Options() []client.Option {
opts = append(opts, WithRetryPolicy(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithRPCTimeout(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithCircuitBreaker(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithDegradation(s.service, s.client, s.nacosClient, s.opts)...)
return opts
}
90 changes: 90 additions & 0 deletions pkg/degradation/degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"sync"
"sync/atomic"

"github.com/bytedance/gopkg/lang/fastrand"
"github.com/cloudwego/kitex/pkg/acl"
"github.com/pkg/errors"
)

var errorDegradation = errors.New("rejected by client degradation config")

// DegradationConfig is policy config of degradator.
// DON'T FORGET to update DeepCopy() and Equals() if you add new fields.
type Config struct {
Enable bool `json:"enable"`
Percentage int `json:"percentage"`
}

type Container struct {
sync.RWMutex
config atomic.Value
}

var defaultConfig = &Config{Enable: false, Percentage: 0}

// GetDefaultDegradationConfig return defaultConfig of degradation.
func GetDefaultDegradationConfig() *Config {
return defaultConfig.DeepCopy()
}

func NewDegradationContainer() *Container {
degradationContainer := &Container{}
degradationContainer.config.Store(GetDefaultDegradationConfig())
return degradationContainer
}

func (s *Container) NotifyPolicyChange(cfg *Config) {
s.config.Store(cfg)
}

func (s *Container) GetACLRule() acl.RejectFunc {
return func(ctx context.Context, request interface{}) (reason error) {
config := s.config.Load().(*Config)
if !config.Enable {
return nil
}
if fastrand.Intn(100) < config.Percentage {
return errorDegradation
}
return nil
}
}

// DeepCopy returns a full copy of DegradationConfig.
func (c *Config) DeepCopy() *Config {
if c == nil {
return nil
}
return &Config{
Enable: c.Enable,
Percentage: c.Percentage,
}
}

func (c *Config) Equals(other *Config) bool {
if c == nil && other == nil {
return true
}
if c == nil || other == nil {
return false
}
return c.Enable == other.Enable && c.Percentage == other.Percentage
}
40 changes: 40 additions & 0 deletions pkg/degradation/degradation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"errors"
"testing"

"github.com/cloudwego/kitex/pkg/acl"
"github.com/cloudwego/thriftgo/pkg/test"
)

var errFake = errors.New("fake error")

func invoke(ctx context.Context, request, response interface{}) error {
return errFake
}

func TestNewContainer(t *testing.T) {
container := NewDegradationContainer()
aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetACLRule()})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: false, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: true, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errorDegradation))
}
78 changes: 78 additions & 0 deletions v2/client/degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/kitex-contrib/config-nacos/v2/pkg/degradation"
"github.com/nacos-group/nacos-sdk-go/v2/vo"

"github.com/kitex-contrib/config-nacos/v2/nacos"
"github.com/kitex-contrib/config-nacos/v2/utils"
)

// WithDegradation sets the degradation policy from nacos configuration center.
func WithDegradation(dest, src string, nacosClient nacos.Client, opts utils.Options) []client.Option {
param, err := nacosClient.ClientConfigParam(&nacos.ConfigParamConfig{
Category: degradationName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.NacosCustomFunctions {
f(&param)
}

uniqueID := nacos.GetUniqueID()

degradationContainer := initDegradation(param, dest, src, nacosClient, uniqueID)

return []client.Option{
client.WithACLRules(degradationContainer.GetACLRule()),
client.WithCloseCallbacks(func() error {
err := nacosClient.DeregisterConfig(param, uniqueID)
if err != nil {
return err
}
// cancel the configuration listener when client is closed.
return nil
}),
}
}

func initDegradation(param vo.ConfigParam, dest, src string,
nacosClient nacos.Client, uniqueID int64,
) *degradation.Container {
degradationContainer := degradation.NewDegradationContainer()

onChangeCallback := func(data string, parser nacos.ConfigParser) {
config := &degradation.Config{}
err := parser.Decode(param.Type, data, config)
if err != nil {
klog.Warnf("[nacos] %s client nacos rpc degradation: unmarshal data %s failed: %s, skip...", dest, data, err)
return
}
// update degradation config
degradationContainer.NotifyPolicyChange(config)
}

nacosClient.RegisterConfigCallback(param, onChangeCallback, uniqueID)

return degradationContainer
}
2 changes: 2 additions & 0 deletions v2/client/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
retryConfigName = "retry"
rpcTimeoutConfigName = "rpc_timeout"
circuitBreakerConfigName = "circuit_break"
degradationName = "degradation"
)

// NacosClientSuite nacos client config suite, configure retry timeout limit and circuitbreak dynamically from nacos.
Expand Down Expand Up @@ -53,5 +54,6 @@ func (s *NacosClientSuite) Options() []client.Option {
opts = append(opts, WithRetryPolicy(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithRPCTimeout(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithCircuitBreaker(s.service, s.client, s.nacosClient, s.opts)...)
opts = append(opts, WithDegradation(s.service, s.client, s.nacosClient, s.opts)...)
return opts
}
2 changes: 1 addition & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/apache/thrift => github.com/apache/thrift v0.13.0
replace github.com/apache/thrift => github.com/apache/thrift v0.13.0
2 changes: 1 addition & 1 deletion v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,4 @@ rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
Loading

0 comments on commit 7c63f1d

Please sign in to comment.