Skip to content

Commit

Permalink
[querier] support TopK and Any
Browse files Browse the repository at this point in the history
- run automation test (querier) pass
  • Loading branch information
WJxuan authored and xiaochaoren1 committed Sep 28, 2023
1 parent e885005 commit a2c0002
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 6 deletions.
23 changes: 22 additions & 1 deletion server/querier/engine/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
output: "SELECT SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` ORDER BY `sum_log_count` desc LIMIT 1",
}, {
input: "select Uniq(ip_0) as uniq_ip_0 from l4_flow_log limit 1",
output: "SELECT uniqIf([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
output: "SELECT uniqIf([toString(ip4_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
input: "select Max(byte) as max_byte, Sum(log_count) as sum_log_count from l4_flow_log having Sum(byte)>=0 limit 1",
output: "SELECT MAX(byte_tx+byte_rx) AS `max_byte`, SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` HAVING SUM(byte_tx+byte_rx) >= 0 LIMIT 1",
Expand Down Expand Up @@ -294,6 +294,27 @@ var (
}, {
input: "SELECT is_internet_0, is_internet_1 FROM l4_flow_log GROUP BY is_internet_0, is_internet_1 limit 1",
output: "SELECT if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` FROM flow_log.`l4_flow_log` GROUP BY if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` LIMIT 1",
}, {
index: "TopK_1",
input: "select TopK(ip_0, 10) as top_10_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(10)(if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "TopK_2",
input: "select TopK(ip_0, pod_0, 10) as top_10_ip_pod_0 from l4_flow_log limit 1",
output: "SELECT topKIf(10)((if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id_0)))), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `top_10_ip_pod_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "Any_1",
input: "select Any(ip_0) as any_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(1)(if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `any_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "Any_2",
input: "select Any(ip_0, pod_0) as any_ip_pod_0 from l4_flow_log limit 1",
output: "SELECT topKIf(1)((if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id_0)))), (NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0')))) AND NOT (pod_id_0 = 0))) AS `any_ip_pod_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "layered_0",
input: "select Avg(`byte_tx`) AS `Avg(byte_tx)`, region_0 from vtap_flow_edge_port group by region_0 limit 1",
output: "SELECT region_0, AVG(`_sum_byte_tx`) AS `Avg(byte_tx)` FROM (SELECT dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`, SUM(byte_tx) AS `_sum_byte_tx` FROM flow_metrics.`vtap_flow_edge_port` GROUP BY dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`) GROUP BY `region_0` LIMIT 1",
db: "flow_metrics",
}}
)

Expand Down
75 changes: 75 additions & 0 deletions server/querier/engine/clickhouse/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func GetTagFunction(name string, args []string, alias, db, table string) (Statem
}

func GetAggFunc(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) {
if name == view.FUNCTION_TOPK || name == view.FUNCTION_ANY {
return GetTopKTrans(name, args, alias, db, table, ctx)
}

var levelFlag int
field := args[0]
field = strings.Trim(field, "`")
Expand Down Expand Up @@ -119,6 +123,77 @@ func GetAggFunc(name string, args []string, alias string, db string, table strin
}, levelFlag, unit, nil
}

func GetTopKTrans(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) {
function, ok := metrics.METRICS_FUNCTIONS_MAP[name]
if !ok {
return nil, 0, "", nil
}

var fields []string
if name == view.FUNCTION_TOPK {
fields = args[:len(args)-1]
} else if name == view.FUNCTION_ANY {
fields = args
}

levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY

fieldsLen := len(fields)
dbFields := make([]string, fieldsLen)
conditions := make([]string, 0, fieldsLen)

var metricStruct *metrics.Metrics
for i, field := range fields {
var condition string

field = strings.Trim(field, "`")
metricStruct, ok = metrics.GetAggMetrics(field, db, table, ctx)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}

condition = metricStruct.Condition
tag, ok := tag.GetTag(field, db, table, "default")
if ok {
dbFields[i] = tag.TagTranslator
if condition == "" {
condition = tag.NotNullFilter
}
} else {
dbFields[i] = metricStruct.DBField
}
if condition != "" {
conditions = append(conditions, condition)
}

// 判断算子是否支持单层
if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db == "flow_metrics" {
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type]
if !common.IsValueInSliceString(name, unlayFuns) {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED
}
}
}

metricStructCopy := *metricStruct
if fieldsLen > 1 {
metricStructCopy.DBField = "(" + strings.Join(dbFields, ", ") + ")"
metricStructCopy.Condition = "(" + strings.Join(conditions, " AND ") + ")"
} else {
metricStructCopy.DBField = strings.Join(dbFields, ", ")
metricStructCopy.Condition = strings.Join(conditions, " AND ")
}

unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)

return &AggFunction{
Metrics: &metricStructCopy,
Name: name,
Args: args,
Alias: alias,
}, levelFlag, unit, nil
}

func GetBinaryFunc(name string, args []Function) (*BinaryFunction, error) {
return &BinaryFunction{
Name: name,
Expand Down
2 changes: 2 additions & 0 deletions server/querier/engine/clickhouse/metrics/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ var METRICS_FUNCTIONS_MAP = map[string]*Function{
view.FUNCTION_PERSECOND: NewFunction(view.FUNCTION_PERSECOND, FUNCTION_TYPE_MATH, nil, "$unit/s", 0),
view.FUNCTION_HISTOGRAM: NewFunction(view.FUNCTION_HISTOGRAM, FUNCTION_TYPE_MATH, nil, "", 1),
view.FUNCTION_LAST: NewFunction(view.FUNCTION_LAST, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_COUNTER, METRICS_TYPE_GAUGE, METRICS_TYPE_DELAY, METRICS_TYPE_PERCENTAGE, METRICS_TYPE_QUOTIENT}, "", 0),
view.FUNCTION_TOPK: NewFunction(view.FUNCTION_TOPK, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
view.FUNCTION_ANY: NewFunction(view.FUNCTION_ANY, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
}

func GetFunctionDescriptions() (*common.Result, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/querier/engine/clickhouse/metrics/l4_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var L4_FLOW_LOG_METRICS_REPLACE = map[string]*Metrics{

"vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"),
"subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"),
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
"pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"),
"pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"),
"pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var VTAP_FLOW_EDGE_PORT_METRICS_REPLACE = map[string]*Metrics{

"vpc_0": NewReplaceMetrics("l3_epc_id_0", "NOT (l3_epc_id_0 = -2)"),
"subnet_0": NewReplaceMetrics("subnet_id_0", "NOT (subnet_id_0 = 0)"),
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
"ip_0": NewReplaceMetrics("[toString(ip4_0), toString(is_ipv4), toString(ip6_0)]", "NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))"),
"pod_cluster_0": NewReplaceMetrics("pod_cluster_id_0", "NOT (pod_cluster_id_0 = 0)"),
"pod_node_0": NewReplaceMetrics("pod_node_id_0", "NOT (pod_node_id_0 = 0)"),
"pod_ns_0": NewReplaceMetrics("pod_ns_id_0", "NOT (pod_ns_id_0 = 0)"),
Expand Down
16 changes: 13 additions & 3 deletions server/querier/engine/clickhouse/view/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
FUNCTION_PERCENTAG = "Percentage"
FUNCTION_HISTOGRAM = "Histogram"
FUNCTION_LAST = "Last"
FUNCTION_TOPK = "TopK"
FUNCTION_ANY = "Any"
)

// 对外提供的算子与数据库实际算子转换
Expand All @@ -68,6 +70,8 @@ var FUNC_NAME_MAP map[string]string = map[string]string{
FUNCTION_UNIQ: "uniq",
FUNCTION_UNIQ_EXACT: "uniqExact",
FUNCTION_LAST: "last_value",
FUNCTION_TOPK: "topK",
FUNCTION_ANY: "topK", // because need to set any to topK(1), and '(1)' may be appended after 'If' in func (f *DefaultFunction) WriteTo(buf *bytes.Buffer)
}

var MATH_FUNCTIONS = []string{
Expand Down Expand Up @@ -207,11 +211,17 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) {
buf.WriteString("If")
}

if len(f.Args) > 0 {
args := f.Args
if f.Name == FUNCTION_TOPK {
args = f.Args[len(f.Args)-1:]
} else if f.Name == FUNCTION_ANY {
args = []string{"1"}
}
if len(args) > 0 {
buf.WriteString("(")
for i, arg := range f.Args {
for i, arg := range args {
buf.WriteString(arg)
if i < len(f.Args)-1 {
if i < len(args)-1 {
buf.WriteString(", ")
}
}
Expand Down

0 comments on commit a2c0002

Please sign in to comment.