Skip to content

Commit

Permalink
Merge pull request #112 from ZhengHe-MD/dev/zhenghe3119/trace-traffic…
Browse files Browse the repository at this point in the history
…-patch-2

trace traffic patch 2
  • Loading branch information
ZhengHe-MD authored Nov 4, 2019
2 parents 245e379 + b4eb310 commit 161a342
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 35 deletions.
17 changes: 11 additions & 6 deletions cache/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,23 @@ func (m *ApolloConfig) Init(ctx context.Context) error {
return err
}

type simpleContextController struct {
type simpleContextControlRouter struct {
group string
}

func (s simpleContextController) GetGroup() string {
return s.group
func (s simpleContextControlRouter) GetControlRouteGroup() (string, bool) {
return s.group, true
}

func (s simpleContextControlRouter) SetControlRouteGroup(group string) error {
s.group = group
return nil
}

func (m *ApolloConfig) getConfigStringItemWithFallback(ctx context.Context, namespace, name string) (string, bool) {
val, ok := m.center.GetStringWithNamespace(ctx, center.DefaultApolloCacheNamespace, m.buildKey(ctx, namespace, name))
if !ok {
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextController{defaultGroup})
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{defaultGroup})
val, ok = m.center.GetStringWithNamespace(defaultCtx, center.DefaultApolloCacheNamespace, m.buildKey(defaultCtx, namespace, name))
}
return val, ok
Expand All @@ -192,7 +197,7 @@ func (m *ApolloConfig) getConfigStringItemWithFallback(ctx context.Context, name
func (m *ApolloConfig) getConfigIntItemWithFallback(ctx context.Context, namespace, name string) (int, bool) {
val, ok := m.center.GetIntWithNamespace(ctx, center.DefaultApolloCacheNamespace, m.buildKey(ctx, namespace, name))
if !ok {
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextController{defaultGroup})
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{defaultGroup})
val, ok = m.center.GetIntWithNamespace(defaultCtx, center.DefaultApolloCacheNamespace, m.buildKey(defaultCtx, namespace, name))
}
return val, ok
Expand Down Expand Up @@ -281,7 +286,7 @@ func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent {
func (m *ApolloConfig) buildKey(ctx context.Context, namespace, item string) string {
return strings.Join([]string{
namespace,
scontext.GetGroupWithDefault(ctx, defaultGroup),
scontext.GetControlRouteGroupWithDefault(ctx, defaultGroup),
fmt.Sprint(cache.CacheTypeRedis),
item,
}, apolloConfigSep)
Expand Down
2 changes: 1 addition & 1 deletion cache/redisext/redisext.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (m *RedisExt) getRedisInstance(ctx context.Context) (client *redis.Client,

func (m *RedisExt) getInstanceConf(ctx context.Context) *redis.InstanceConf {
return &redis.InstanceConf{
Group: scontext.GetGroupWithDefault(ctx, cache.DefaultRouteGroup),
Group: scontext.GetControlRouteGroupWithDefault(ctx, cache.DefaultRouteGroup),
Namespace: m.namespace,
Wrapper: cache.WrapperTypeRedisExt,
}
Expand Down
2 changes: 1 addition & 1 deletion cache/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewCache(namespace, prefix string, expire time.Duration, load LoadFunc) *Ca

func (m *Cache) getInstanceConf(ctx context.Context) *redis.InstanceConf {
return &redis.InstanceConf{
Group: scontext.GetGroupWithDefault(ctx, cache.DefaultRouteGroup),
Group: scontext.GetControlRouteGroupWithDefault(ctx, cache.DefaultRouteGroup),
Namespace: m.namespace,
Wrapper: cache.WrapperTypeCache,
}
Expand Down
4 changes: 2 additions & 2 deletions dbrouter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSimpleConfiger(data []byte) (Configer, error) {
}

func (m *SimpleConfig) GetConfig(ctx context.Context, instance string) *Config {
group := scontext.GetGroup(ctx)
group := scontext.GetControlRouteGroupWithDefault(ctx, DefaultGroup)
info := m.parser.GetConfig(instance, group)
return &Config{
DBType: info.DBType,
Expand Down Expand Up @@ -179,7 +179,7 @@ func (m *EtcdConfig) setParser(ctx context.Context, parser *Parser) {
}

func (m *EtcdConfig) GetConfig(ctx context.Context, instance string) *Config {
group := scontext.GetGroup(ctx)
group := scontext.GetControlRouteGroupWithDefault(ctx, DefaultGroup)
parser := m.getParser(ctx)
info := parser.GetConfig(instance, group)
return &Config{
Expand Down
2 changes: 1 addition & 1 deletion dbrouter/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *InstanceManager) Get(ctx context.Context, instance string) Instancer {

var err error
var in Instancer
group := scontext.GetGroup(ctx)
group := scontext.GetControlRouteGroupWithDefault(ctx, DefaultGroup)

if group != DefaultGroup {
if !m.isInGroup(group) {
Expand Down
15 changes: 10 additions & 5 deletions mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,23 @@ func (m *ApolloConfig) Init(ctx context.Context) (err error) {
return
}

type simpleContextController struct {
type simpleContextControlRouter struct {
group string
}

func (s simpleContextController) GetGroup() string {
return s.group
func (s simpleContextControlRouter) GetControlRouteGroup() (string, bool) {
return s.group, true
}

func (s simpleContextControlRouter) SetControlRouteGroup(group string) error {
s.group = group
return nil
}

func (m *ApolloConfig) getConfigItemWithFallback(ctx context.Context, topic string, name string) (string, bool) {
val, ok := m.center.GetStringWithNamespace(ctx, center.DefaultApolloMQNamespace, m.buildKey(ctx, topic, name))
if !ok {
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextController{defaultRouteGroup})
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{defaultRouteGroup})
val, ok = m.center.GetStringWithNamespace(defaultCtx, center.DefaultApolloMQNamespace, m.buildKey(defaultCtx, topic, name))
}
return val, ok
Expand Down Expand Up @@ -302,7 +307,7 @@ func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent {
func (m *ApolloConfig) buildKey(ctx context.Context, topic, item string) string {
return strings.Join([]string{
topic,
scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
fmt.Sprint(MQTypeKafka),
item,
}, apolloConfigSep)
Expand Down
4 changes: 2 additions & 2 deletions mq/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestApolloConfig_getConfigItemWithFallback(t *testing.T) {

t.Run("ctx with unknown group should get default value", func(t *testing.T) {
ctx := context.TODO()
ctx = context.WithValue(ctx, scontext.ContextKeyControl, simpleContextController{"unknown"})
ctx = context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{"unknown"})

conf := NewApolloConfiger()

Expand All @@ -136,7 +136,7 @@ func TestApolloConfig_getConfigItemWithFallback(t *testing.T) {

t.Run("ctx with known group should get its value", func(t *testing.T) {
ctx := context.TODO()
ctx = context.WithValue(ctx, scontext.ContextKeyControl, simpleContextController{"testgroup"})
ctx = context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{"testgroup"})

conf := NewApolloConfiger()

Expand Down
10 changes: 5 additions & 5 deletions mq/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func WriteMsg(ctx context.Context, topic string, key string, value interface{})
log.String(spanLogKeyKey, key))

conf := &instanceConf{
group: scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
group: scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
role: RoleTypeWriter,
topic: topic,
groupId: "",
Expand Down Expand Up @@ -81,7 +81,7 @@ func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error {
span.LogFields(log.String(spanLogKeyTopic, topic))

conf := &instanceConf{
group: scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
group: scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
role: RoleTypeWriter,
topic: topic,
groupId: "",
Expand Down Expand Up @@ -120,7 +120,7 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{
log.String(spanLogKeyTopic, topic))

conf := &instanceConf{
group: scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
group: scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
role: RoleTypeReader,
topic: topic,
groupId: groupId,
Expand Down Expand Up @@ -171,7 +171,7 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value
log.String(spanLogKeyTopic, topic))

conf := &instanceConf{
group: scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
group: scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
role: RoleTypeReader,
topic: topic,
groupId: "",
Expand Down Expand Up @@ -222,7 +222,7 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface
log.String(spanLogKeyTopic, topic))

conf := &instanceConf{
group: scontext.GetGroupWithDefault(ctx, defaultRouteGroup),
group: scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
role: RoleTypeReader,
topic: topic,
groupId: groupId,
Expand Down
126 changes: 114 additions & 12 deletions scontext/scontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package scontext

import (
"context"
"errors"
)

// 由于请求的上下文信息的 thrift 定义在 util 项目中,本模块主要为了避免循环依赖
const (
ContextKeyTraceID = "traceID"
ContextKeyControl = "Control"

ContextKeyHead = "Head"
ContextKeyHeadUid = "uid"
Expand All @@ -16,32 +16,64 @@ const (
ContextKeyHeadRegion = "region"
ContextKeyHeadDt = "dt"
ContextKeyHeadUnionId = "unionid"

ContextKeyControl = "Control"
)

const DefaultGroup = ""

var ErrInvalidContext = errors.New("invalid context")

type ContextHeader interface {
ToKV() map[string]interface{}
}

type ContextController interface {
GetGroup() string
type ContextControlRouter interface {
GetControlRouteGroup() (string, bool)
SetControlRouteGroup(string) error
}

func GetGroup(ctx context.Context) string {
type ContextControlCaller interface {
GetControlCallerServerName() (string, bool)
SetControlCallerServerName(string) error
GetControlCallerServerId() (string, bool)
SetControlCallerServerId(string) error
GetControlCallerMethod() (string, bool)
SetControlCallerMethod(string) error
}

func GetControlRouteGroup(ctx context.Context) (group string, ok bool) {
value := ctx.Value(ContextKeyControl)
if value == nil {
return ""
ok = false
return
}

control, ok := value.(ContextController)
control, ok := value.(ContextControlRouter)
if ok == false {
return ""
return
}
return control.GetControlRouteGroup()
}

func SetControlRouteGroup(ctx context.Context, group string) (context.Context, error) {
value := ctx.Value(ContextKeyControl)
if value == nil {
return ctx, ErrInvalidContext
}
control, ok := value.(ContextControlRouter)
if !ok {
return ctx, ErrInvalidContext
}

return control.GetGroup()
err := control.SetControlRouteGroup(group)
if err != nil {
return ctx, err
}
return context.WithValue(ctx, ContextKeyControl, control), nil
}

func GetGroupWithDefault(ctx context.Context, dv string) string {
if group := GetGroup(ctx); group != "" {
func GetControlRouteGroupWithDefault(ctx context.Context, dv string) string {
if group, ok := GetControlRouteGroup(ctx); ok {
return group
}
return dv
Expand Down Expand Up @@ -107,4 +139,74 @@ func GetUnionId(ctx context.Context) (unionId string, ok bool) {
unionId, ok = val.(string)
}
return
}
}

func getControlCaller(ctx context.Context) (ContextControlCaller, error) {
value := ctx.Value(ContextKeyControl)
if value == nil {
return nil, ErrInvalidContext
}
caller, ok := value.(ContextControlCaller)
if !ok {
return nil, ErrInvalidContext
}
return caller, nil
}

func GetControlCallerServerName(ctx context.Context) (serverName string, ok bool) {
caller, ok := ctx.Value(ContextKeyControl).(ContextControlCaller)
if !ok {
return
}
return caller.GetControlCallerServerName()
}

func SetControlCallerServerName(ctx context.Context, serverName string) (context.Context, error) {
caller, err := getControlCaller(ctx)
if err != nil {
return ctx, err
}
err = caller.SetControlCallerServerName(serverName)
if err != nil {
return ctx, err
}
return context.WithValue(ctx, ContextKeyControl, caller), nil
}

func GetControlCallerServerId(ctx context.Context) (serverId string, ok bool) {
caller, ok := ctx.Value(ContextKeyControl).(ContextControlCaller)
if !ok {
return
}
return caller.GetControlCallerServerId()
}

func SetControlCallerServerId(ctx context.Context, serverId string) (context.Context, error) {
caller, err := getControlCaller(ctx)
if err != nil {
return ctx, err
}
err = caller.SetControlCallerServerId(serverId)
return context.WithValue(ctx, ContextKeyControl, caller), nil
}

func GetControlCallerMethod(ctx context.Context) (method string, ok bool) {
caller, ok := ctx.Value(ContextKeyControl).(ContextControlCaller)
if !ok {
return
}
return caller.GetControlCallerMethod()
}

func SetControlCallerMethod(ctx context.Context, method string) (context.Context, error) {
caller, err := getControlCaller(ctx)
if err != nil {
return ctx, err
}
err = caller.SetControlCallerMethod(method)
if err != nil {
return ctx, err
}
return context.WithValue(ctx, ContextKeyControl, caller), nil
}

Loading

0 comments on commit 161a342

Please sign in to comment.