diff --git a/CHANGES.md b/CHANGES.md index 22c7ee358..d08ca4fa6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -26,6 +26,7 @@ Release Notes. - Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics. - Add HTTP health check endpoint for the data node. - Add slow query log for the distributed query and local query. +- Support applying the index rule to the tag belonging to the entity. ### Bugs diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 074e30c21..11ee28b3e 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -40,7 +40,7 @@ func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery } qs := make([]bluge.Query, len(seriesMatchers)) - primaryNode := newShouldNode() + nodes := make([]node, len(seriesMatchers)) for i := range seriesMatchers { switch seriesMatchers[i].Type { case index.SeriesMatcherTypeExact: @@ -48,36 +48,42 @@ func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery q := bluge.NewTermQuery(match) q.SetField(entityField) qs[i] = q - primaryNode.Append(newTermNode(match, nil)) + nodes = append(nodes, newTermNode(match, nil)) case index.SeriesMatcherTypePrefix: match := convert.BytesToString(seriesMatchers[i].Match) q := bluge.NewPrefixQuery(match) q.SetField(entityField) qs[i] = q - primaryNode.Append(newPrefixNode(match)) + nodes = append(nodes, newPrefixNode(match)) case index.SeriesMatcherTypeWildcard: match := convert.BytesToString(seriesMatchers[i].Match) q := bluge.NewWildcardQuery(match) q.SetField(entityField) qs[i] = q - primaryNode.Append(newWildcardNode(match)) + nodes = append(nodes, newWildcardNode(match)) default: return nil, errors.Errorf("unsupported series matcher type: %v", seriesMatchers[i].Type) } } var primaryQuery bluge.Query + var n node if len(qs) > 1 { bq := bluge.NewBooleanQuery() bq.AddShould(qs...) bq.SetMinShould(1) primaryQuery = bq + n = newShouldNode() + for i := range nodes { + n.(*shouldNode).Append(nodes[i]) + } } else { primaryQuery = qs[0] + n = nodes[0] } query := bluge.NewBooleanQuery().AddMust(primaryQuery) node := newMustNode() - node.Append(primaryNode) + node.Append(n) if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil { query.AddMust(secondaryQuery.(*queryNode).query) node.Append(secondaryQuery.(*queryNode).node) diff --git a/pkg/query/logical/parser.go b/pkg/query/logical/parser.go index 68f8ad47c..b4769e8cb 100644 --- a/pkg/query/logical/parser.go +++ b/pkg/query/logical/parser.go @@ -28,7 +28,7 @@ import ( func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) { entityIdx, ok := entityDict[cond.Name] if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN { - return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond) + ok = false } switch v := cond.Value.Value.(type) { case *modelv1.TagValue_Str: diff --git a/pkg/test/measure/testdata/index_rule_bindings/endpoint_traffic.json b/pkg/test/measure/testdata/index_rule_bindings/endpoint_traffic.json new file mode 100644 index 000000000..2633d4a54 --- /dev/null +++ b/pkg/test/measure/testdata/index_rule_bindings/endpoint_traffic.json @@ -0,0 +1,16 @@ +{ + "metadata": { + "name": "endpoint_traffic", + "group": "sw_metric" + }, + "rules": [ + "endpoint_name" + ], + "subject": { + "catalog": "CATALOG_MEASURE", + "name": "endpoint_traffic" + }, + "begin_at": "2021-04-15T01:30:15.01Z", + "expire_at": "2121-04-15T01:30:15.01Z", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/index_rules/endpoint_name.json b/pkg/test/measure/testdata/index_rules/endpoint_name.json new file mode 100644 index 000000000..10738558b --- /dev/null +++ b/pkg/test/measure/testdata/index_rules/endpoint_name.json @@ -0,0 +1,13 @@ +{ + "metadata": { + "id": 5, + "name": "endpoint_name", + "group": "sw_metric" + }, + "tags": [ + "endpoint_name" + ], + "type": "TYPE_INVERTED", + "analyzer": "ANALYZER_SIMPLE", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/measures/endpoint_traffic.json b/pkg/test/measure/testdata/measures/endpoint_traffic.json new file mode 100644 index 000000000..2c240557b --- /dev/null +++ b/pkg/test/measure/testdata/measures/endpoint_traffic.json @@ -0,0 +1,28 @@ +{ + "metadata": { + "name": "endpoint_traffic", + "group": "sw_metric" + }, + "tag_families": [ + { + "name": "default", + "tags": [ + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_name", + "type": "TAG_TYPE_STRING" + } + ] + } + ], + "entity": { + "tag_names": [ + "service_id", + "endpoint_name" + ] + }, + "updated_at": "2024-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/cases/init.go b/test/cases/init.go index 7341a96d3..7f32b753a 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -54,5 +54,6 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", "service_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data1.json", now.Add(1*time.Minute), interval) + casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric", "endpoint_traffic.json", now, interval) casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0) } diff --git a/test/cases/measure/data/input/entity_match.yaml b/test/cases/measure/data/input/entity_match.yaml new file mode 100644 index 000000000..ce37d3cc3 --- /dev/null +++ b/test/cases/measure/data/input/entity_match.yaml @@ -0,0 +1,41 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "endpoint_traffic" +groups: ["sw_metric"] +tagProjection: + tagFamilies: + - name: "default" + tags: ["service_id", "endpoint_name"] +criteria: + le: + op: "LOGICAL_OP_AND" + right: + condition: + name: "endpoint_name" + op: "BINARY_OP_MATCH" + value: + str: + value: "foo" + left: + condition: + name: "service_id" + op: "BINARY_OP_EQ" + value: + str: + value: "service_1" + diff --git a/test/cases/measure/data/testdata/endpoint_traffic.json b/test/cases/measure/data/testdata/endpoint_traffic.json new file mode 100644 index 000000000..bf43fb9d3 --- /dev/null +++ b/test/cases/measure/data/testdata/endpoint_traffic.json @@ -0,0 +1,74 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "service_1" + } + }, + { + "str": { + "value": "/api/v1/foo" + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "service_1" + } + }, + { + "str": { + "value": "/api/v1/bar" + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "service_2" + } + }, + { + "str": { + "value": "/api/v1/foo" + } + } + ] + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "service_2" + } + }, + { + "str": { + "value": "/api/v1/bar" + } + } + ] + } + ] + } +] \ No newline at end of file diff --git a/test/cases/measure/data/want/entity_match.yaml b/test/cases/measure/data/want/entity_match.yaml new file mode 100644 index 000000000..ccd793069 --- /dev/null +++ b/test/cases/measure/data/want/entity_match.yaml @@ -0,0 +1,29 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +dataPoints: + - tagFamilies: + - name: default + tags: + - key: service_id + value: + str: + value: service_1 + - key: endpoint_name + value: + str: + value: /api/v1/foo diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 06b24498a..8f843105b 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -71,4 +71,5 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("match a tag belongs to the entity", helpers.Args{Input: "entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), )