diff --git a/server/querier/engine/clickhouse/clickhouse_test.go b/server/querier/engine/clickhouse/clickhouse_test.go index be6d6d53681..6fe5126b68a 100644 --- a/server/querier/engine/clickhouse/clickhouse_test.go +++ b/server/querier/engine/clickhouse/clickhouse_test.go @@ -55,7 +55,7 @@ var ( output: "SELECT SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` ORDER BY `sum_log_count` desc LIMIT 1", }, { input: "select Uniq(ip_0) as uniq_ip_0 from l4_flow_log limit 1", - output: "SELECT uniqIf([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + output: "SELECT uniqIf([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", }, { input: "select Max(byte) as max_byte, Sum(log_count) as sum_log_count from l4_flow_log having Sum(byte)>=0 limit 1", output: "SELECT MAX(byte_tx+byte_rx) AS `max_byte`, SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` HAVING SUM(byte_tx+byte_rx) >= 0 LIMIT 1", @@ -294,6 +294,27 @@ var ( }, { input: "SELECT is_internet_0, is_internet_1 FROM l4_flow_log GROUP BY is_internet_0, is_internet_1 limit 1", output: "SELECT if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` FROM flow_log.`l4_flow_log` GROUP BY if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` LIMIT 1", + }, { + index: "TopK_1", + input: "select TopK(ip_0, 10) as top_10_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(10)(if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "TopK_2", + input: "select TopK(ip_0, pod_0, 10) as top_10_ip_pod_0 from l4_flow_log limit 1", + output: "SELECT topKIf(10)((if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id_0)))), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `top_10_ip_pod_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "Any_1", + input: "select Any(ip_0) as any_ip_0 from l4_flow_log limit 1", + output: "SELECT topKIf(1)(if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `any_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "Any_2", + input: "select Any(ip_0, pod_0) as any_ip_pod_0 from l4_flow_log limit 1", + output: "SELECT topKIf(1)((if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id_0)))), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `any_ip_pod_0` FROM flow_log.`l4_flow_log` LIMIT 1", + }, { + index: "layered_0", + input: "select Avg(`byte_tx`) AS `Avg(byte_tx)`, region_0 from vtap_flow_edge_port group by region_0 limit 1", + output: "SELECT region_0, AVG(`_sum_byte_tx`) AS `Avg(byte_tx)` FROM (SELECT dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`, SUM(byte_tx) AS `_sum_byte_tx` FROM flow_metrics.`vtap_flow_edge_port` GROUP BY dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`) GROUP BY `region_0` LIMIT 1", + db: "flow_metrics", }} ) diff --git a/server/querier/engine/clickhouse/function.go b/server/querier/engine/clickhouse/function.go index 6c908fcc5f4..79dbb11cf5d 100644 --- a/server/querier/engine/clickhouse/function.go +++ b/server/querier/engine/clickhouse/function.go @@ -79,6 +79,10 @@ func GetTagFunction(name string, args []string, alias, db, table string) (Statem } func GetAggFunc(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) { + if name == view.FUNCTION_TOPK || name == view.FUNCTION_ANY { + return GetTopKTrans(name, args, alias, db, table, ctx) + } + var levelFlag int field := args[0] field = strings.Trim(field, "`") @@ -119,6 +123,77 @@ func GetAggFunc(name string, args []string, alias string, db string, table strin }, levelFlag, unit, nil } +func GetTopKTrans(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) { + function, ok := metrics.METRICS_FUNCTIONS_MAP[name] + if !ok { + return nil, 0, "", nil + } + + var fields []string + if name == view.FUNCTION_TOPK { + fields = args[:len(args)-1] + } else if name == view.FUNCTION_ANY { + fields = args + } + + levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY + + fieldsLen := len(fields) + dbFields := make([]string, fieldsLen) + conditions := make([]string, 0, fieldsLen) + + var metricStruct *metrics.Metrics + for i, field := range fields { + var condition string + + field = strings.Trim(field, "`") + metricStruct, ok = metrics.GetAggMetrics(field, db, table, ctx) + if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY { + return nil, 0, "", nil + } + + condition = metricStruct.Condition + tag, ok := tag.GetTag(field, db, table, "default") + if ok { + dbFields[i] = tag.TagTranslator + if condition == "" { + condition = tag.NotNullFilter + } + } else { + dbFields[i] = metricStruct.DBField + } + if condition != "" { + conditions = append(conditions, condition) + } + + // 判断算子是否支持单层 + if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db == "flow_metrics" { + unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type] + if !common.IsValueInSliceString(name, unlayFuns) { + levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED + } + } + } + + metricStructCopy := *metricStruct + if fieldsLen > 1 { + metricStructCopy.DBField = "(" + strings.Join(dbFields, ", ") + ")" + metricStructCopy.Condition = "(" + strings.Join(conditions, " AND ") + ")" + } else { + metricStructCopy.DBField = strings.Join(dbFields, ", ") + metricStructCopy.Condition = strings.Join(conditions, " AND ") + } + + unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit) + + return &AggFunction{ + Metrics: &metricStructCopy, + Name: name, + Args: args, + Alias: alias, + }, levelFlag, unit, nil +} + func GetBinaryFunc(name string, args []Function) (*BinaryFunction, error) { return &BinaryFunction{ Name: name, diff --git a/server/querier/engine/clickhouse/metrics/function.go b/server/querier/engine/clickhouse/metrics/function.go index 1653f4cec55..20cb681cd75 100644 --- a/server/querier/engine/clickhouse/metrics/function.go +++ b/server/querier/engine/clickhouse/metrics/function.go @@ -65,6 +65,8 @@ var METRICS_FUNCTIONS_MAP = map[string]*Function{ view.FUNCTION_PERSECOND: NewFunction(view.FUNCTION_PERSECOND, FUNCTION_TYPE_MATH, nil, "$unit/s", 0), view.FUNCTION_HISTOGRAM: NewFunction(view.FUNCTION_HISTOGRAM, FUNCTION_TYPE_MATH, nil, "", 1), view.FUNCTION_LAST: NewFunction(view.FUNCTION_LAST, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_COUNTER, METRICS_TYPE_GAUGE, METRICS_TYPE_DELAY, METRICS_TYPE_PERCENTAGE, METRICS_TYPE_QUOTIENT}, "", 0), + view.FUNCTION_TOPK: NewFunction(view.FUNCTION_TOPK, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0), + view.FUNCTION_ANY: NewFunction(view.FUNCTION_ANY, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0), } func GetFunctionDescriptions() (*common.Result, error) { diff --git a/server/querier/engine/clickhouse/metrics/l4_flow_log.go b/server/querier/engine/clickhouse/metrics/l4_flow_log.go index 796940fb4e2..07fa5774e6e 100644 --- a/server/querier/engine/clickhouse/metrics/l4_flow_log.go +++ b/server/querier/engine/clickhouse/metrics/l4_flow_log.go @@ -142,7 +142,7 @@ var L4_FLOW_LOG_METRICS_REPLACE = map[string]*Metrics{ "vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"), "subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"), - "ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"), + "ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"), "pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"), "pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"), "pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"), diff --git a/server/querier/engine/clickhouse/metrics/vtap_flow_edge_port.go b/server/querier/engine/clickhouse/metrics/vtap_flow_edge_port.go index de14e4740e4..b9bd869881b 100644 --- a/server/querier/engine/clickhouse/metrics/vtap_flow_edge_port.go +++ b/server/querier/engine/clickhouse/metrics/vtap_flow_edge_port.go @@ -54,7 +54,7 @@ var VTAP_FLOW_EDGE_PORT_METRICS_REPLACE = map[string]*Metrics{ "vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"), "subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"), - "ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"), + "ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"), "pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"), "pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"), "pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"), diff --git a/server/querier/engine/clickhouse/view/function.go b/server/querier/engine/clickhouse/view/function.go index bd9bbf71f61..c4c54409e99 100644 --- a/server/querier/engine/clickhouse/view/function.go +++ b/server/querier/engine/clickhouse/view/function.go @@ -48,6 +48,8 @@ const ( FUNCTION_PERCENTAG = "Percentage" FUNCTION_HISTOGRAM = "Histogram" FUNCTION_LAST = "Last" + FUNCTION_TOPK = "TopK" + FUNCTION_ANY = "Any" ) // 对外提供的算子与数据库实际算子转换 @@ -68,6 +70,8 @@ var FUNC_NAME_MAP map[string]string = map[string]string{ FUNCTION_UNIQ: "uniq", FUNCTION_UNIQ_EXACT: "uniqExact", FUNCTION_LAST: "last_value", + FUNCTION_TOPK: "topK", + FUNCTION_ANY: "topK", // because need to set any to topK(1), and '(1)' may be appended after 'If' in func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) } var MATH_FUNCTIONS = []string{ @@ -207,11 +211,17 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) { buf.WriteString("If") } - if len(f.Args) > 0 { + args := f.Args + if f.Name == FUNCTION_TOPK { + args = f.Args[len(f.Args)-1:] + } else if f.Name == FUNCTION_ANY { + args = []string{"1"} + } + if len(args) > 0 { buf.WriteString("(") - for i, arg := range f.Args { + for i, arg := range args { buf.WriteString(arg) - if i < len(f.Args)-1 { + if i < len(args)-1 { buf.WriteString(", ") } }