Skip to content

Commit

Permalink
Support applying the index rule to the tag belonging to the entity (#539
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hanahmily committed Sep 20, 2024
1 parent b7d397b commit a1d882a
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Release Notes.
- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics.
- Add HTTP health check endpoint for the data node.
- Add slow query log for the distributed query and local query.
- Support applying the index rule to the tag belonging to the entity.

### Bugs

Expand Down
16 changes: 11 additions & 5 deletions pkg/index/inverted/inverted_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,50 @@ func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery
}

qs := make([]bluge.Query, len(seriesMatchers))
primaryNode := newShouldNode()
nodes := make([]node, len(seriesMatchers))
for i := range seriesMatchers {
switch seriesMatchers[i].Type {
case index.SeriesMatcherTypeExact:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewTermQuery(match)
q.SetField(entityField)
qs[i] = q
primaryNode.Append(newTermNode(match, nil))
nodes = append(nodes, newTermNode(match, nil))
case index.SeriesMatcherTypePrefix:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewPrefixQuery(match)
q.SetField(entityField)
qs[i] = q
primaryNode.Append(newPrefixNode(match))
nodes = append(nodes, newPrefixNode(match))
case index.SeriesMatcherTypeWildcard:
match := convert.BytesToString(seriesMatchers[i].Match)
q := bluge.NewWildcardQuery(match)
q.SetField(entityField)
qs[i] = q
primaryNode.Append(newWildcardNode(match))
nodes = append(nodes, newWildcardNode(match))
default:
return nil, errors.Errorf("unsupported series matcher type: %v", seriesMatchers[i].Type)
}
}
var primaryQuery bluge.Query
var n node
if len(qs) > 1 {
bq := bluge.NewBooleanQuery()
bq.AddShould(qs...)
bq.SetMinShould(1)
primaryQuery = bq
n = newShouldNode()
for i := range nodes {
n.(*shouldNode).Append(nodes[i])
}
} else {
primaryQuery = qs[0]
n = nodes[0]
}

query := bluge.NewBooleanQuery().AddMust(primaryQuery)
node := newMustNode()
node.Append(primaryNode)
node.Append(n)
if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil {
query.AddMust(secondaryQuery.(*queryNode).query)
node.Append(secondaryQuery.(*queryNode).node)
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/logical/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
entityIdx, ok := entityDict[cond.Name]
if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN {
return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond)
ok = false
}
switch v := cond.Value.Value.(type) {
case *modelv1.TagValue_Str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"metadata": {
"name": "endpoint_traffic",
"group": "sw_metric"
},
"rules": [
"endpoint_name"
],
"subject": {
"catalog": "CATALOG_MEASURE",
"name": "endpoint_traffic"
},
"begin_at": "2021-04-15T01:30:15.01Z",
"expire_at": "2121-04-15T01:30:15.01Z",
"updated_at": "2021-04-15T01:30:15.01Z"
}
13 changes: 13 additions & 0 deletions pkg/test/measure/testdata/index_rules/endpoint_name.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"metadata": {
"id": 5,
"name": "endpoint_name",
"group": "sw_metric"
},
"tags": [
"endpoint_name"
],
"type": "TYPE_INVERTED",
"analyzer": "ANALYZER_SIMPLE",
"updated_at": "2021-04-15T01:30:15.01Z"
}
28 changes: 28 additions & 0 deletions pkg/test/measure/testdata/measures/endpoint_traffic.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"metadata": {
"name": "endpoint_traffic",
"group": "sw_metric"
},
"tag_families": [
{
"name": "default",
"tags": [
{
"name": "service_id",
"type": "TAG_TYPE_STRING"
},
{
"name": "endpoint_name",
"type": "TAG_TYPE_STRING"
}
]
}
],
"entity": {
"tag_names": [
"service_id",
"endpoint_name"
]
},
"updated_at": "2024-04-15T01:30:15.01Z"
}
1 change: 1 addition & 0 deletions test/cases/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ func Initialize(addr string, now time.Time) {
casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", "service_latency_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data1.json", now.Add(1*time.Minute), interval)
casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric", "endpoint_traffic.json", now, interval)
casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0)
}
41 changes: 41 additions & 0 deletions test/cases/measure/data/input/entity_match.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: "endpoint_traffic"
groups: ["sw_metric"]
tagProjection:
tagFamilies:
- name: "default"
tags: ["service_id", "endpoint_name"]
criteria:
le:
op: "LOGICAL_OP_AND"
right:
condition:
name: "endpoint_name"
op: "BINARY_OP_MATCH"
value:
str:
value: "foo"
left:
condition:
name: "service_id"
op: "BINARY_OP_EQ"
value:
str:
value: "service_1"

74 changes: 74 additions & 0 deletions test/cases/measure/data/testdata/endpoint_traffic.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
[
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "service_1"
}
},
{
"str": {
"value": "/api/v1/foo"
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "service_1"
}
},
{
"str": {
"value": "/api/v1/bar"
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "service_2"
}
},
{
"str": {
"value": "/api/v1/foo"
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "service_2"
}
},
{
"str": {
"value": "/api/v1/bar"
}
}
]
}
]
}
]
29 changes: 29 additions & 0 deletions test/cases/measure/data/want/entity_match.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

dataPoints:
- tagFamilies:
- name: default
tags:
- key: service_id
value:
str:
value: service_1
- key: endpoint_name
value:
str:
value: /api/v1/foo
1 change: 1 addition & 0 deletions test/cases/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("match a tag belongs to the entity", helpers.Args{Input: "entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)

0 comments on commit a1d882a

Please sign in to comment.