Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:Resolve service disconnection and configuration invalidations #2717

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand Down Expand Up @@ -102,6 +103,20 @@ func (cli *Client) DialWithInfo(interfaceName string, info *ClientInfo, opts ...
return cli.dial(interfaceName, info, opts...)
}

func (cli *Client) DialWithDefinition(interfaceName string, definition *ClientDefinition, opts ...ReferenceOption) (*Connection, error) {
// TODO(finalt) Temporarily solve the config_center configuration does not work
refName := common.GetReference(definition.Svc)
if refConfig, ok := cli.cliOpts.Consumer.References[refName]; ok {
ref := cli.cliOpts.overallReference.Clone()
for _, opt := range refConfig.GetOptions() {
opt(ref)
}
opts = append(opts, setReference(ref))
}

return cli.dial(interfaceName, definition.Info, opts...)
}

func (cli *Client) dial(interfaceName string, info *ClientInfo, opts ...ReferenceOption) (*Connection, error) {
newRefOpts := defaultReferenceOptions()
finalOpts := []ReferenceOption{
Expand Down
13 changes: 10 additions & 3 deletions cluster/cluster/failover/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package failover

import (
"context"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
"fmt"
"strconv"
)

import (
Expand All @@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)

type failoverClusterInvoker struct {
Expand All @@ -61,7 +62,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
}

methodName := invocation.ActualMethodName()
retries := getRetries(invokers, methodName)
retries := getRetries(invokers, methodName, invocation)
loadBalance := base.GetLoadBalance(invokers[0], methodName)

for i := 0; i <= retries; i++ {
Expand Down Expand Up @@ -113,7 +114,13 @@ func isBizError(err error) bool {
return triple_protocol.IsWireError(err) && triple_protocol.CodeOf(err) == triple_protocol.CodeBizError
}

func getRetries(invokers []protocol.Invoker, methodName string) int {
func getRetries(invokers []protocol.Invoker, methodName string, invocation protocol.Invocation) int {
// Todo(finalt) Temporarily solve the problem that the retries is not valid
if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
if rInt, err := strconv.Atoi(retries); err == nil {
return rInt
}
}
if len(invokers) <= 0 {
return constant.DefaultRetriesInt
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/router/script/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package script

import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
)

func init() {
Expand All @@ -29,7 +27,8 @@ func init() {
and cause warning if config center is empty.
User can import this package and config config center to use Script router.
*/
extension.SetRouterFactory(constant.ScriptRouterFactoryKey, NewScriptRouterFactory)
// TODO(finalt) Temporarily removed until fixed (https://github.com/apache/dubbo-go/pull/2716)
//extension.SetRouterFactory(constant.ScriptRouterFactoryKey, NewScriptRouterFactory)
}

// ScriptRouteFactory router factory
Expand Down
54 changes: 54 additions & 0 deletions compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,67 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig) *global.ConsumerConfig
ProxyFactory: c.ProxyFactory,
Check: c.Check,
AdaptiveService: c.AdaptiveService,
References: compatGlobalReferences(c.References),
TracingKey: c.TracingKey,
FilterConf: c.FilterConf,
MaxWaitTimeForServiceDiscovery: c.MaxWaitTimeForServiceDiscovery,
MeshEnabled: c.MeshEnabled,
}
}

func compatGlobalReferences(c map[string]*config.ReferenceConfig) map[string]*global.ReferenceConfig {
refs := make(map[string]*global.ReferenceConfig, len(c))
for name, ref := range c {
refs[name] = &global.ReferenceConfig{
InterfaceName: ref.InterfaceName,
Check: ref.Check,
URL: ref.URL,
Filter: ref.Filter,
Protocol: ref.Protocol,
RegistryIDs: ref.RegistryIDs,
Cluster: ref.Cluster,
Loadbalance: ref.Loadbalance,
Retries: ref.Retries,
Group: ref.Group,
Version: ref.Version,
Serialization: ref.Serialization,
ProvidedBy: ref.ProvidedBy,
Methods: compatGlobalMethod(ref.Methods),
Async: ref.Async,
Params: ref.Params,
Generic: ref.Generic,
Sticky: ref.Sticky,
RequestTimeout: ref.RequestTimeout,
ForceTag: ref.ForceTag,
TracingKey: ref.TracingKey,
MeshProviderPort: ref.MeshProviderPort,
}
}
return refs
}

func compatGlobalMethod(m []*config.MethodConfig) []*global.MethodConfig {
methods := make([]*global.MethodConfig, 0, len(m))
for _, method := range m {
methods = append(methods, &global.MethodConfig{
InterfaceId: method.InterfaceId,
InterfaceName: method.InterfaceName,
Name: method.Name,
Retries: method.Retries,
LoadBalance: method.LoadBalance,
Weight: method.Weight,
TpsLimitInterval: method.TpsLimitInterval,
TpsLimitRate: method.TpsLimitRate,
TpsLimitStrategy: method.TpsLimitStrategy,
ExecuteLimit: method.ExecuteLimit,
ExecuteLimitRejectedHandler: method.ExecuteLimitRejectedHandler,
Sticky: method.Sticky,
RequestTimeout: method.RequestTimeout,
})
}
return methods
}

func compatGlobalMetricConfig(c *config.MetricsConfig) *global.MetricsConfig {
if c == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (ins *Instance) loadConsumer() error {
conLock.RLock()
defer conLock.RUnlock()
for intfName, definition := range consumerServices {
conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
conn, dialErr := cli.DialWithDefinition(intfName, definition)
if dialErr != nil {
return dialErr
}
Expand Down
68 changes: 68 additions & 0 deletions global/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,74 @@ func DefaultReferenceConfig() *ReferenceConfig {
}
}

func (c *ReferenceConfig) GetOptions() []ReferenceOption {
var refOpts []ReferenceOption
if c.InterfaceName != "" {
refOpts = append(refOpts, WithReference_InterfaceName(c.InterfaceName))
}
if c.Check != nil {
refOpts = append(refOpts, WithReference_Check(*c.Check))
}
if c.URL != "" {
refOpts = append(refOpts, WithReference_URL(c.URL))
}
if c.Filter != "" {
refOpts = append(refOpts, WithReference_Filter(c.Filter))
}
if c.Protocol != "" {
refOpts = append(refOpts, WithReference_Protocol(c.Protocol))
}
if c.RegistryIDs != nil && len(c.RegistryIDs) > 0 {
refOpts = append(refOpts, WithReference_RegistryIDs(c.RegistryIDs))
}
if c.Cluster != "" {
refOpts = append(refOpts, WithReference_Cluster(c.Cluster))
}
if c.Loadbalance != "" {
refOpts = append(refOpts, WithReference_LoadBalance(c.Loadbalance))
}
if c.Retries != "" {
if rInt, err := strconv.Atoi(c.Retries); err == nil {
refOpts = append(refOpts, WithReference_Retries(rInt))
}
}
if c.Group != "" {
refOpts = append(refOpts, WithReference_Group(c.Group))
}
if c.Version != "" {
refOpts = append(refOpts, WithReference_Version(c.Version))
}
if c.Serialization != "" {
refOpts = append(refOpts, WithReference_Serialization(c.Serialization))
}
if c.ProvidedBy != "" {
refOpts = append(refOpts, WithReference_ProviderBy(c.ProvidedBy))
}
if c.Params != nil && len(c.Params) > 0 {
newParams := make(map[string]string, len(c.Params))
for k, v := range c.Params {
newParams[k] = v
}
refOpts = append(refOpts, WithReference_Params(newParams))
}
if c.Generic != "" {
refOpts = append(refOpts, WithReference_Generic(c.Generic))
}
if c.Sticky {
refOpts = append(refOpts, WithReference_Sticky(c.Sticky))
}
if c.RequestTimeout != "" {
refOpts = append(refOpts, WithReference_RequestTimeout(c.RequestTimeout))
}
if c.TracingKey != "" {
refOpts = append(refOpts, WithReference_TracingKey(c.TracingKey))
}
if c.MeshProviderPort != 0 {
refOpts = append(refOpts, WithReference_MeshProviderPort(c.MeshProviderPort))
}
return refOpts
}

// Clone a new ReferenceConfig
func (c *ReferenceConfig) Clone() *ReferenceConfig {
if c == nil {
Expand Down
1 change: 0 additions & 1 deletion protocol/triple/internal/server/cmd_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func main() {
protocol.WithTriple(),
protocol.WithPort(20000),
),
server.WithServerVersion("1.0.0"),
)

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions protocol/triple/triple_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@
}

func mergeAttachmentToOutgoing(ctx context.Context, inv protocol.Invocation) (context.Context, error) {
// Todo(finalt) Temporarily solve the problem that the timeout time is not valid
if timeout, ok := inv.GetAttachment(constant.TimeoutKey); ok {
ctx = context.WithValue(ctx, "dubbo.timeout.key", timeout)

Check failure on line 148 in protocol/triple/triple_invoker.go

View workflow job for this annotation

GitHub Actions / lint (1.20)

SA1029: should not use built-in type string as key for value; define your own type to avoid collisions (staticcheck)
}
for key, valRaw := range inv.Attachments() {
if str, ok := valRaw.(string); ok {
ctx = tri.AppendToOutgoingContext(ctx, key, str)
Expand Down
19 changes: 17 additions & 2 deletions protocol/triple/triple_protocol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,27 @@ func parseRequestURL(rawURL string) (*url.URL, *Error) {
func applyDefaultTimeout(ctx context.Context, timeout time.Duration) (context.Context, bool, context.CancelFunc) {
var cancel context.CancelFunc
var applyFlag bool

_, ok := ctx.Deadline()

// Todo(finalt) Temporarily solve the problem that the timeout time is not valid
if !ok {
timeoutVal := ctx.Value("dubbo.timeout.key")
if timeoutVal != nil {
if s, exist := timeoutVal.(string); exist && s != "" {
if newTimeout, err := time.ParseDuration(s); err == nil {
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(newTimeout))
applyFlag = true
return ctx, applyFlag, cancel
}
}
}
}

if !ok && timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
applyFlag = true
}

return ctx, applyFlag, cancel
}

Expand Down
7 changes: 6 additions & 1 deletion protocol/triple/triple_protocol/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package triple_protocol

import (
"context"
"errors"
"fmt"
"net/http"
)
Expand Down Expand Up @@ -363,7 +364,11 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
svcGroup := request.Header.Get(tripleServiceGroup)
svcVersion := request.Header.Get(tripleServiceVersion)
// todo(DMwangnima): inspect ok
implementation := h.implementations[getIdentifier(svcGroup, svcVersion)]
implementation, ok := h.implementations[getIdentifier(svcGroup, svcVersion)]
if !ok {
_ = connCloser.Close(errors.New("no implementation for " + svcVersion))
return
}
_ = connCloser.Close(implementation(ctx, connCloser))
}

Expand Down
Loading