Skip to content

Commit

Permalink
feat: 扩展 collector processor servicediscovery 功能 (#403)
Browse files Browse the repository at this point in the history
1. 新增RegexMatcher,支持使用正则匹配任意span字段信息
2. 新增固定值替换,支持配置const_val的值对span信息进行替换
3. 兼容window下提交中文commit message
  • Loading branch information
WinterWaterWarm authored Jun 28, 2024
1 parent dd690de commit 4f79d2a
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/collector/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.58.x
0.59.x
40 changes: 39 additions & 1 deletion pkg/collector/processor/servicediscover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
const (
MatchTypeAuto = "auto"
MatchTypeManual = "manual"
MatchTypeRegex = "regex"
)

type Config struct {
Expand All @@ -47,7 +48,7 @@ func (c *Config) Setup() {
}
}

// manual 优先于 auto
// 优先级
sort.Slice(c.Rules, func(i, j int) bool {
return c.Rules[i].Type > c.Rules[j].Type
})
Expand Down Expand Up @@ -77,6 +78,7 @@ type MatchConfig struct {
type MatchGroup struct {
Source string `config:"source" mapstructure:"source"`
Destination string `config:"destination" mapstructure:"destination"`
ConstVal string `config:"const_val" mapstructure:"const_val"`
}

type RuleParam struct {
Expand Down Expand Up @@ -111,11 +113,22 @@ func (r *Rule) ResourceValue() string {
return ""
}

func (r *Rule) MethodValue() string {
df, key := processor.DecodeDimensionFrom(r.MatchKey)
if df == processor.DimensionFromMethod {
return key
}
return ""
}

func (r *Rule) Match(val string) (map[string]string, bool, string) {
switch r.MatchType {
case MatchTypeManual:
mappings, matched := r.ManualMatched(val)
return mappings, matched, MatchTypeManual
case MatchTypeRegex:
mappings, matched := r.RegexMatched(val)
return mappings, matched, MatchTypeRegex
default:
mappings, matched := r.AutoMatched(val)
return mappings, matched, MatchTypeAuto
Expand Down Expand Up @@ -194,6 +207,31 @@ func (r *Rule) AutoMatched(val string) (map[string]string, bool) {
return m, true
}

func (r *Rule) RegexMatched(val string) (map[string]string, bool) {
if r.re == nil {
return nil, false
}

match := r.re.FindStringSubmatch(val)
regexGroups := make(map[string]string)
for i, name := range r.re.SubexpNames() {
if i != 0 && name != "" && len(match) > i {
regexGroups[name] = match[i]
}
}
m := make(map[string]string)
for _, group := range r.MatchGroups {
if group.ConstVal != "" {
m[group.Destination] = group.ConstVal
continue
}
if val, ok := regexGroups[group.Source]; ok {
m[group.Destination] = val
}
}
return m, true
}

type ConfigHandler struct {
rules map[string][]*Rule
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/collector/processor/servicediscover/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,30 @@ func (p *serviceDiscover) processTraces(record *define.Record) {
continue
}

p.matcher.Match(span, mappings)
break loop
case processor.DimensionFromMethod:
// 1) 先判断是否有 predicateKey
if s := p.fetcher.FetchMethod(span, pk); s == "" {
continue
}
// 2)判断是否有 matchKey
mkey := rule.MethodValue()
if mkey == "" {
continue
}
// 3) 判断 matchValue 是否为空
val := p.fetcher.FetchMethod(span, mkey)
if val == "" {
continue
}

mappings, matched, matchType := rule.Match(val)
logger.Debugf("matcher: mappings=%v, matched=%v, matchType=%v", mappings, matched, matchType)
if !matched {
continue
}

p.matcher.Match(span, mappings)
break loop
}
Expand Down
125 changes: 123 additions & 2 deletions pkg/collector/processor/servicediscover/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ processor:
value: "mando"
host:
value: "https://doc.weixin.qq.com"
operator: eq
operator: eq
path:
value: "/api/v1/users"
operator: nq
- service: "None"
type: "http"
match_type: "auto"
Expand All @@ -67,6 +67,18 @@ processor:
destination: "span_name"
rule:
regex: "https://(?P<peer_service>[^/]+)/(?P<span_name>\\w+)/.+"
- service: "None"
type: "http"
match_type: "regex"
predicate_key: "span_name"
kind: "SPAN_KIND_SERVER"
match_key: "span_name"
match_groups:
- const_val: "GET:/benchmark/{uuid}"
destination: "span_name"
rule:
regex: "GET:/benchmark/(?P<uuid>[^/]+)"
`
mainConf := processor.MustLoadConfigs(content)[0].Config

Expand Down Expand Up @@ -171,11 +183,30 @@ processor:
},
re: regexp.MustCompile(`https://(?P<peer_service>[^/]+)/(?P<span_name>\w+)/.+`),
},
{
Type: "http",
Kind: "SPAN_KIND_SERVER",
Service: "None",
MatchType: "regex",
MatchKey: "span_name",
PredicateKey: "span_name",
MatchConfig: MatchConfig{
Regex: `GET:/benchmark/(?P<uuid>[^/]+)`,
},
MatchGroups: []MatchGroup{
{
ConstVal: "GET:/benchmark/{uuid}",
Destination: "span_name",
},
},
re: regexp.MustCompile(`GET:/benchmark/(?P<uuid>[^/]+)`),
},
}

ch := NewConfigHandler(c)
assert.Equal(t, rules[0], ch.Get("SPAN_KIND_CLIENT")[0])
assert.Equal(t, rules[1], ch.Get("SPAN_KIND_CLIENT")[1])
assert.Equal(t, rules[2], ch.Get("SPAN_KIND_SERVER")[0])

assert.Equal(t, define.ProcessorServiceDiscover, factory.Name())
assert.False(t, factory.IsDerived())
Expand Down Expand Up @@ -337,3 +368,93 @@ processor:
testkits.AssertAttrsFoundStringVal(t, span.Attributes(), "peer.service", "doc.weixin.qq.com")
})
}

func TestTracesRegexMatched(t *testing.T) {
content := `
processor:
- name: "service_discover/common"
config:
rules:
- service: "None"
type: "http"
match_type: "regex"
predicate_key: "attributes.http.method"
kind: "SPAN_KIND_SERVER"
match_key: "attributes.http.url"
match_groups:
- const_val: "GET:/benchmark/{uuid}"
destination: "span_name"
- source: "peer_service"
destination: "peer.service"
rule:
regex: http://(?P<peer_service>[^/]+)/benchmark/(?P<uuid>[^/]+)
`
factory := processor.MustCreateFactory(content, NewFactory)

traces := generator.NewTracesGenerator(define.TracesOptions{
GeneratorOptions: define.GeneratorOptions{
Attributes: map[string]string{
"http.method": "GET",
"http.url": "http://example:19100/benchmark/015017af-85fb-4c45-9460-9e10f45b88a5",
},
},
SpanCount: 1,
SpanKind: int(ptrace.SpanKindServer),
})
data := traces.Generate()

record := &define.Record{
RecordType: define.RecordTraces,
Data: data,
}
_, err := factory.Process(record)
assert.NoError(t, err)

data = record.Data.(ptrace.Traces)
foreach.Spans(data.ResourceSpans(), func(span ptrace.Span) {
testkits.AssertAttrsFoundStringVal(t, span.Attributes(), "peer.service", "example:19100")
assert.Equal(t, "GET:/benchmark/{uuid}", span.Name())
})
}

func TestTracesRegexMatchedWithSpanName(t *testing.T) {
content := `
processor:
- name: "service_discover/common"
config:
rules:
- service: "None"
type: "http"
match_type: "regex"
predicate_key: "span_name"
kind: "SPAN_KIND_CLIENT"
match_key: "span_name"
match_groups:
- source: "peer_service"
destination: "peer.service"
rule:
regex: (?P<peer_service>.+)
`
factory := processor.MustCreateFactory(content, NewFactory)

traces := generator.NewTracesGenerator(define.TracesOptions{
GeneratorOptions: define.GeneratorOptions{
Attributes: map[string]string{},
},
SpanCount: 1,
SpanKind: int(ptrace.SpanKindClient),
})
data := traces.Generate()

record := &define.Record{
RecordType: define.RecordTraces,
Data: data,
}
_, err := factory.Process(record)
assert.NoError(t, err)

data = record.Data.(ptrace.Traces)
foreach.Spans(data.ResourceSpans(), func(span ptrace.Span) {
testkits.AssertAttrsFoundStringVal(t, span.Attributes(), "peer.service", span.Name())
})
}
2 changes: 1 addition & 1 deletion scripts/pre_commit/check_commit_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_commit_message():
print("Warning: The path of file `COMMIT_EDITMSG` not given, skipped!")
return 0
commit_message_filepath = args[1]
with open(commit_message_filepath, "r") as fd:
with open(commit_message_filepath, "r", encoding="utf-8") as fd:
content = fd.read()
return content.strip().lower()

Expand Down

0 comments on commit 4f79d2a

Please sign in to comment.