diff --git a/server/querier/engine/clickhouse/clickhouse_test.go b/server/querier/engine/clickhouse/clickhouse_test.go index 6ac06450d22..ce75636cebe 100644 --- a/server/querier/engine/clickhouse/clickhouse_test.go +++ b/server/querier/engine/clickhouse/clickhouse_test.go @@ -291,6 +291,27 @@ var ( db: "flow_metrics", input: "select pod_ns, any(pod, pod_cluster_id, service_id) from `vtap_app_port.1h` WHERE time>=1694069050 AND time<=1694990640 group by pod_ns limit 10", output: "SELECT dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, topK(1)((dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id))),pod_cluster_id,service_id)) FROM flow_metrics.`vtap_app_port.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10", + }, { + index: "TopK_1", + input: "select TopK(ip_0, 10) as top_10_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(10)([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "TopK_2", + input: "select TopK(ip_0, pod_0, 10) as top_10_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(10)(([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "Any_1", + input: "select Any(ip_0) as top_10_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(1)([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "Any_2", + input: "select Any(ip_0, pod_0) as top_10_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(1)(([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "layered_0", + input: "select Avg(`byte_tx`) AS `Avg(byte_tx)`, region_0 from vtap_flow_edge_port group by region_0 limit 1", + output: "SELECT region_0, AVG(`_sum_byte_tx`) AS `Avg(byte_tx)` FROM (SELECT dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`, SUM(byte_tx) AS `_sum_byte_tx` FROM flow_metrics.`vtap_flow_edge_port` GROUP BY dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`) GROUP BY `region_0` LIMIT 1", + db: "flow_metrics", }} ) diff --git a/server/querier/engine/clickhouse/function.go b/server/querier/engine/clickhouse/function.go index 6c908fcc5f4..f6d92a5903b 100644 --- a/server/querier/engine/clickhouse/function.go +++ b/server/querier/engine/clickhouse/function.go @@ -79,38 +79,61 @@ func GetTagFunction(name string, args []string, alias, db, table string) (Statem } func GetAggFunc(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) { - var levelFlag int - field := args[0] - field = strings.Trim(field, "`") - - if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME { - return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]", - view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME) - } - function, ok := metrics.METRICS_FUNCTIONS_MAP[name] if !ok { return nil, 0, "", nil } - metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx) - if !ok { - return nil, 0, "", nil + + fields := args[:1] + if name == view.FUNCTION_TOPK { + fields = args[:len(args)-1] + } else if name == view.FUNCTION_ANY { + fields = args + } + + var metricStruct *metrics.Metrics + levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY + + var metricStructs []*metrics.Metrics + for _, field := range fields { + field = strings.Trim(field, "`") + if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME { + return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]", + view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME) + } + + metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx) + if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY { + return nil, 0, "", nil + } + metricStructs = append(metricStructs, metricStruct) } - if metricStruct.Type == metrics.METRICS_TYPE_ARRAY { + + metricslength := len(metricStructs) + if metricslength == 0 { return nil, 0, "", nil } - unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit) - // 判断算子是否支持单层 - if db == "flow_metrics" { - unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type] - if common.IsValueInSliceString(name, unlayFuns) { - levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY - } else { - levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED + + metricStruct = metricStructs[0] + dbFields := make([]string, metricslength) + for i, metric := range metricStructs { + dbFields[i] = metric.DBField + + // 判断算子是否支持单层 + if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db == "flow_metrics" { + unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metric.Type] + if !common.IsValueInSliceString(name, unlayFuns) { + levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED + } } - } else { - levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY } + if metricslength > 1 { + metricStructCopy := *metricStruct + metricStructCopy.DBField = "(" + strings.Join(dbFields, ", ") + ")" + metricStruct = &metricStructCopy + } + unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit) + return &AggFunction{ Metrics: metricStruct, Name: name, diff --git a/server/querier/engine/clickhouse/metrics/function.go b/server/querier/engine/clickhouse/metrics/function.go index 1653f4cec55..074cdb6c323 100644 --- a/server/querier/engine/clickhouse/metrics/function.go +++ b/server/querier/engine/clickhouse/metrics/function.go @@ -45,6 +45,7 @@ var METRICS_FUNCTIONS = []string{ view.FUNCTION_RSPREAD, view.FUNCTION_STDDEV, view.FUNCTION_APDEX, view.FUNCTION_UNIQ, view.FUNCTION_UNIQ_EXACT, view.FUNCTION_PERCENTAG, view.FUNCTION_PERSECOND, view.FUNCTION_HISTOGRAM, view.FUNCTION_LAST, view.FUNCTION_COUNT, + view.FUNCTION_TOPK, view.FUNCTION_ANY, } var METRICS_FUNCTIONS_MAP = map[string]*Function{ @@ -65,6 +66,8 @@ var METRICS_FUNCTIONS_MAP = map[string]*Function{ view.FUNCTION_PERSECOND: NewFunction(view.FUNCTION_PERSECOND, FUNCTION_TYPE_MATH, nil, "$unit/s", 0), view.FUNCTION_HISTOGRAM: NewFunction(view.FUNCTION_HISTOGRAM, FUNCTION_TYPE_MATH, nil, "", 1), view.FUNCTION_LAST: NewFunction(view.FUNCTION_LAST, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_COUNTER, METRICS_TYPE_GAUGE, METRICS_TYPE_DELAY, METRICS_TYPE_PERCENTAGE, METRICS_TYPE_QUOTIENT}, "", 0), + view.FUNCTION_TOPK: NewFunction(view.FUNCTION_TOPK, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0), + view.FUNCTION_ANY: NewFunction(view.FUNCTION_ANY, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0), } func GetFunctionDescriptions() (*common.Result, error) { diff --git a/server/querier/engine/clickhouse/view/function.go b/server/querier/engine/clickhouse/view/function.go index bd9bbf71f61..968b59e440c 100644 --- a/server/querier/engine/clickhouse/view/function.go +++ b/server/querier/engine/clickhouse/view/function.go @@ -48,6 +48,8 @@ const ( FUNCTION_PERCENTAG = "Percentage" FUNCTION_HISTOGRAM = "Histogram" FUNCTION_LAST = "Last" + FUNCTION_TOPK = "TopK" + FUNCTION_ANY = "Any" ) // 对外提供的算子与数据库实际算子转换 @@ -68,6 +70,8 @@ var FUNC_NAME_MAP map[string]string = map[string]string{ FUNCTION_UNIQ: "uniq", FUNCTION_UNIQ_EXACT: "uniqExact", FUNCTION_LAST: "last_value", + FUNCTION_TOPK: "topK", + FUNCTION_ANY: "topK", } var MATH_FUNCTIONS = []string{ @@ -207,11 +211,17 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) { buf.WriteString("If") } - if len(f.Args) > 0 { + args := f.Args + if f.Name == FUNCTION_TOPK { + args = f.Args[len(f.Args)-1:] + } else if f.Name == FUNCTION_ANY { + args = []string{"1"} + } + if len(args) > 0 { buf.WriteString("(") - for i, arg := range f.Args { + for i, arg := range args { buf.WriteString(arg) - if i < len(f.Args)-1 { + if i < len(args)-1 { buf.WriteString(", ") } }