Skip to content

Commit b86ed6d

Browse files
author
王家璇
committed
[querier] support TopK and Any
- run automation test (querier) pass
1 parent 089ad53 commit b86ed6d

File tree

6 files changed

+90
-29
lines changed

6 files changed

+90
-29
lines changed

server/querier/engine/clickhouse/clickhouse_test.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var (
5555
output: "SELECT SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` ORDER BY `sum_log_count` desc LIMIT 1",
5656
}, {
5757
input: "select Uniq(ip_0) as uniq_ip_0 from l4_flow_log limit 1",
58-
output: "SELECT uniqIf([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
58+
output: "SELECT uniqIf([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
5959
}, {
6060
input: "select Max(byte) as max_byte, Sum(log_count) as sum_log_count from l4_flow_log having Sum(byte)>=0 limit 1",
6161
output: "SELECT MAX(byte_tx+byte_rx) AS `max_byte`, SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` HAVING SUM(byte_tx+byte_rx) >= 0 LIMIT 1",
@@ -291,6 +291,27 @@ var (
291291
db: "flow_metrics",
292292
input: "select pod_ns, any(pod, pod_cluster_id, service_id) from `vtap_app_port.1h` WHERE time>=1694069050 AND time<=1694990640 group by pod_ns limit 10",
293293
output: "SELECT dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, topK(1)((dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id))),pod_cluster_id,service_id)) FROM flow_metrics.`vtap_app_port.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10",
294+
}, {
295+
index: "TopK_1",
296+
input: "select TopK(ip_0, 10) as top_10_ip_0 from l4_flow_log limit 1",
297+
output: "SELECT topKIf(10)([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
298+
}, {
299+
index: "TopK_2",
300+
input: "select TopK(ip_0, pod_0, 10) as top_10_ip_0 from l4_flow_log limit 1",
301+
output: "SELECT topKIf(10)(([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
302+
}, {
303+
index: "Any_1",
304+
input: "select Any(ip_0) as top_10_ip_0 from l4_flow_log limit 1",
305+
output: "SELECT topKIf(1)([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
306+
}, {
307+
index: "Any_2",
308+
input: "select Any(ip_0, pod_0) as top_10_ip_0 from l4_flow_log limit 1",
309+
output: "SELECT topKIf(1)(([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
310+
}, {
311+
index: "layered_0",
312+
input: "select Avg(`byte_tx`) AS `Avg(byte_tx)`, region_0 from vtap_flow_edge_port group by region_0 limit 1",
313+
output: "SELECT region_0, AVG(`_sum_byte_tx`) AS `Avg(byte_tx)` FROM (SELECT dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`, SUM(byte_tx) AS `_sum_byte_tx` FROM flow_metrics.`vtap_flow_edge_port` GROUP BY dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`) GROUP BY `region_0` LIMIT 1",
314+
db: "flow_metrics",
294315
}}
295316
)
296317

server/querier/engine/clickhouse/function.go

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,38 +79,65 @@ func GetTagFunction(name string, args []string, alias, db, table string) (Statem
7979
}
8080

8181
func GetAggFunc(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) {
82-
var levelFlag int
83-
field := args[0]
84-
field = strings.Trim(field, "`")
85-
86-
if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME {
87-
return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]",
88-
view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME)
89-
}
90-
9182
function, ok := metrics.METRICS_FUNCTIONS_MAP[name]
9283
if !ok {
9384
return nil, 0, "", nil
9485
}
95-
metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx)
96-
if !ok {
97-
return nil, 0, "", nil
86+
87+
fields := args[:1]
88+
if name == view.FUNCTION_TOPK {
89+
fields = args[:len(args)-1]
90+
} else if name == view.FUNCTION_ANY {
91+
fields = args
92+
}
93+
94+
var metricStruct *metrics.Metrics
95+
levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY
96+
97+
var metricStructs []*metrics.Metrics
98+
for _, field := range fields {
99+
field = strings.Trim(field, "`")
100+
if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME {
101+
return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]",
102+
view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME)
103+
}
104+
105+
metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx)
106+
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
107+
return nil, 0, "", nil
108+
}
109+
metricStructs = append(metricStructs, metricStruct)
98110
}
99-
if metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
111+
112+
metricslength := len(metricStructs)
113+
if metricslength == 0 {
100114
return nil, 0, "", nil
101115
}
102-
unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)
103-
// 判断算子是否支持单层
104-
if db == "flow_metrics" {
105-
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type]
106-
if common.IsValueInSliceString(name, unlayFuns) {
107-
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY
108-
} else {
109-
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED
116+
117+
metricStruct = metricStructs[0]
118+
dbFields := make([]string, metricslength)
119+
conditions := make([]string, 0, metricslength)
120+
for i, metric := range metricStructs {
121+
dbFields[i] = metric.DBField
122+
if metric.Condition != "" {
123+
conditions = append(conditions, metric.Condition)
110124
}
111-
} else {
112-
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY
125+
// 判断算子是否支持单层
126+
if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db == "flow_metrics" {
127+
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metric.Type]
128+
if !common.IsValueInSliceString(name, unlayFuns) {
129+
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED
130+
}
131+
}
132+
}
133+
if metricslength > 1 {
134+
metricStructCopy := *metricStruct
135+
metricStructCopy.DBField = "(" + strings.Join(dbFields, ", ") + ")"
136+
metricStructCopy.Condition = "(" + strings.Join(conditions, " AND ") + ")"
137+
metricStruct = &metricStructCopy
113138
}
139+
unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)
140+
114141
return &AggFunction{
115142
Metrics: metricStruct,
116143
Name: name,

server/querier/engine/clickhouse/metrics/function.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ var METRICS_FUNCTIONS = []string{
4545
view.FUNCTION_RSPREAD, view.FUNCTION_STDDEV, view.FUNCTION_APDEX,
4646
view.FUNCTION_UNIQ, view.FUNCTION_UNIQ_EXACT, view.FUNCTION_PERCENTAG,
4747
view.FUNCTION_PERSECOND, view.FUNCTION_HISTOGRAM, view.FUNCTION_LAST, view.FUNCTION_COUNT,
48+
view.FUNCTION_TOPK, view.FUNCTION_ANY,
4849
}
4950

5051
var METRICS_FUNCTIONS_MAP = map[string]*Function{
@@ -65,6 +66,8 @@ var METRICS_FUNCTIONS_MAP = map[string]*Function{
6566
view.FUNCTION_PERSECOND: NewFunction(view.FUNCTION_PERSECOND, FUNCTION_TYPE_MATH, nil, "$unit/s", 0),
6667
view.FUNCTION_HISTOGRAM: NewFunction(view.FUNCTION_HISTOGRAM, FUNCTION_TYPE_MATH, nil, "", 1),
6768
view.FUNCTION_LAST: NewFunction(view.FUNCTION_LAST, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_COUNTER, METRICS_TYPE_GAUGE, METRICS_TYPE_DELAY, METRICS_TYPE_PERCENTAGE, METRICS_TYPE_QUOTIENT}, "", 0),
69+
view.FUNCTION_TOPK: NewFunction(view.FUNCTION_TOPK, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
70+
view.FUNCTION_ANY: NewFunction(view.FUNCTION_ANY, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
6871
}
6972

7073
func GetFunctionDescriptions() (*common.Result, error) {

server/querier/engine/clickhouse/metrics/l4_flow_log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ var L4_FLOW_LOG_METRICS_REPLACE = map[string]*Metrics{
142142

143143
"vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"),
144144
"subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"),
145-
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
145+
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
146146
"pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"),
147147
"pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"),
148148
"pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"),

server/querier/engine/clickhouse/metrics/vtap_flow_edge_port.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ var VTAP_FLOW_EDGE_PORT_METRICS_REPLACE = map[string]*Metrics{
5454

5555
"vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"),
5656
"subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"),
57-
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
57+
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
5858
"pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"),
5959
"pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"),
6060
"pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"),

server/querier/engine/clickhouse/view/function.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const (
4848
FUNCTION_PERCENTAG = "Percentage"
4949
FUNCTION_HISTOGRAM = "Histogram"
5050
FUNCTION_LAST = "Last"
51+
FUNCTION_TOPK = "TopK"
52+
FUNCTION_ANY = "Any"
5153
)
5254

5355
// 对外提供的算子与数据库实际算子转换
@@ -68,6 +70,8 @@ var FUNC_NAME_MAP map[string]string = map[string]string{
6870
FUNCTION_UNIQ: "uniq",
6971
FUNCTION_UNIQ_EXACT: "uniqExact",
7072
FUNCTION_LAST: "last_value",
73+
FUNCTION_TOPK: "topK",
74+
FUNCTION_ANY: "topK", // because need to set any to topK(1), and '(1)' may be appended after 'If' in func (f *DefaultFunction) WriteTo(buf *bytes.Buffer)
7175
}
7276

7377
var MATH_FUNCTIONS = []string{
@@ -207,11 +211,17 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) {
207211
buf.WriteString("If")
208212
}
209213

210-
if len(f.Args) > 0 {
214+
args := f.Args
215+
if f.Name == FUNCTION_TOPK {
216+
args = f.Args[len(f.Args)-1:]
217+
} else if f.Name == FUNCTION_ANY {
218+
args = []string{"1"}
219+
}
220+
if len(args) > 0 {
211221
buf.WriteString("(")
212-
for i, arg := range f.Args {
222+
for i, arg := range args {
213223
buf.WriteString(arg)
214-
if i < len(f.Args)-1 {
224+
if i < len(args)-1 {
215225
buf.WriteString(", ")
216226
}
217227
}

0 commit comments

Comments
 (0)