Skip to content

Commit

Permalink
[querier] support TopK and Any
Browse files Browse the repository at this point in the history
  • Loading branch information
WJxuan committed Sep 22, 2023
1 parent 3ceeabb commit dfc74f0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 26 deletions.
21 changes: 21 additions & 0 deletions server/querier/engine/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,27 @@ var (
db: "flow_metrics",
input: "select pod_ns, any(pod, pod_cluster_id, service_id) from `vtap_app_port.1h` WHERE time>=1694069050 AND time<=1694990640 group by pod_ns limit 10",
output: "SELECT dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, topK(1)((dictGet(flow_tag.pod_map, 'name', (toUInt64(pod_id))),pod_cluster_id,service_id)) FROM flow_metrics.`vtap_app_port.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet(flow_tag.pod_ns_map, 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10",
}, {
index: "TopK_1",
input: "select TopK(ip_0, 10) as top_10_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(10)([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "TopK_2",
input: "select TopK(ip_0, pod_0, 10) as top_10_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(10)(([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "Any_1",
input: "select Any(ip_0) as top_10_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(1)([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "Any_2",
input: "select Any(ip_0, pod_0) as top_10_ip_0 from l4_flow_log limit 1",
output: "SELECT topKIf(1)(([toString(ip4_0), toString(subnet_id_0), toString(is_ipv4), toString(ip6_0)], pod_id_0), NOT (((is_ipv4 = 1) OR (ip6_0 = toIPv6('::'))) AND ((is_ipv4 = 0) OR (ip4_0 = toIPv4('0.0.0.0'))))) AS `top_10_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1",
}, {
index: "layered_0",
input: "select Avg(`byte_tx`) AS `Avg(byte_tx)`, region_0 from vtap_flow_edge_port group by region_0 limit 1",
output: "SELECT region_0, AVG(`_sum_byte_tx`) AS `Avg(byte_tx)` FROM (SELECT dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`, SUM(byte_tx) AS `_sum_byte_tx` FROM flow_metrics.`vtap_flow_edge_port` GROUP BY dictGet(flow_tag.region_map, 'name', (toUInt64(region_id_0))) AS `region_0`) GROUP BY `region_0` LIMIT 1",
db: "flow_metrics",
}}
)

Expand Down
69 changes: 46 additions & 23 deletions server/querier/engine/clickhouse/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,61 @@ func GetTagFunction(name string, args []string, alias, db, table string) (Statem
}

func GetAggFunc(name string, args []string, alias string, db string, table string, ctx context.Context) (Statement, int, string, error) {
var levelFlag int
field := args[0]
field = strings.Trim(field, "`")

if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME {
return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]",
view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME)
}

function, ok := metrics.METRICS_FUNCTIONS_MAP[name]
if !ok {
return nil, 0, "", nil
}
metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx)
if !ok {
return nil, 0, "", nil

fields := args[:1]
if name == view.FUNCTION_TOPK {
fields = args[:len(args)-1]
} else if name == view.FUNCTION_ANY {
fields = args
}

var metricStruct *metrics.Metrics
levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY

var metricStructs []*metrics.Metrics
for _, field := range fields {
field = strings.Trim(field, "`")
if name == view.FUNCTION_COUNT && field != metrics.COUNT_METRICS_NAME {
return nil, 0, "", fmt.Errorf("function [%s] not support metric [%s]",
view.FUNCTION_COUNT, metrics.COUNT_METRICS_NAME)
}

metricStruct, ok := metrics.GetAggMetrics(field, db, table, ctx)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
metricStructs = append(metricStructs, metricStruct)
}
if metricStruct.Type == metrics.METRICS_TYPE_ARRAY {

metricslength := len(metricStructs)
if metricslength == 0 {
return nil, 0, "", nil
}
unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)
// 判断算子是否支持单层
if db == "flow_metrics" {
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type]
if common.IsValueInSliceString(name, unlayFuns) {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY
} else {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED

metricStruct = metricStructs[0]
dbFields := make([]string, metricslength)
for i, metric := range metricStructs {
dbFields[i] = metric.DBField

// 判断算子是否支持单层
if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db == "flow_metrics" {
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metric.Type]
if !common.IsValueInSliceString(name, unlayFuns) {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED
}
}
} else {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_UNLAY
}
if metricslength > 1 {
metricStructCopy := *metricStruct
metricStructCopy.DBField = "(" + strings.Join(dbFields, ", ") + ")"
metricStruct = &metricStructCopy
}
unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)

return &AggFunction{
Metrics: metricStruct,
Name: name,
Expand Down
3 changes: 3 additions & 0 deletions server/querier/engine/clickhouse/metrics/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var METRICS_FUNCTIONS = []string{
view.FUNCTION_RSPREAD, view.FUNCTION_STDDEV, view.FUNCTION_APDEX,
view.FUNCTION_UNIQ, view.FUNCTION_UNIQ_EXACT, view.FUNCTION_PERCENTAG,
view.FUNCTION_PERSECOND, view.FUNCTION_HISTOGRAM, view.FUNCTION_LAST, view.FUNCTION_COUNT,
view.FUNCTION_TOPK, view.FUNCTION_ANY,
}

var METRICS_FUNCTIONS_MAP = map[string]*Function{
Expand All @@ -65,6 +66,8 @@ var METRICS_FUNCTIONS_MAP = map[string]*Function{
view.FUNCTION_PERSECOND: NewFunction(view.FUNCTION_PERSECOND, FUNCTION_TYPE_MATH, nil, "$unit/s", 0),
view.FUNCTION_HISTOGRAM: NewFunction(view.FUNCTION_HISTOGRAM, FUNCTION_TYPE_MATH, nil, "", 1),
view.FUNCTION_LAST: NewFunction(view.FUNCTION_LAST, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_COUNTER, METRICS_TYPE_GAUGE, METRICS_TYPE_DELAY, METRICS_TYPE_PERCENTAGE, METRICS_TYPE_QUOTIENT}, "", 0),
view.FUNCTION_TOPK: NewFunction(view.FUNCTION_TOPK, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
view.FUNCTION_ANY: NewFunction(view.FUNCTION_ANY, FUNCTION_TYPE_AGG, []int{METRICS_TYPE_TAG}, "$unit", 0),
}

func GetFunctionDescriptions() (*common.Result, error) {
Expand Down
16 changes: 13 additions & 3 deletions server/querier/engine/clickhouse/view/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
FUNCTION_PERCENTAG = "Percentage"
FUNCTION_HISTOGRAM = "Histogram"
FUNCTION_LAST = "Last"
FUNCTION_TOPK = "TopK"
FUNCTION_ANY = "Any"
)

// 对外提供的算子与数据库实际算子转换
Expand All @@ -68,6 +70,8 @@ var FUNC_NAME_MAP map[string]string = map[string]string{
FUNCTION_UNIQ: "uniq",
FUNCTION_UNIQ_EXACT: "uniqExact",
FUNCTION_LAST: "last_value",
FUNCTION_TOPK: "topK",
FUNCTION_ANY: "topK",
}

var MATH_FUNCTIONS = []string{
Expand Down Expand Up @@ -207,11 +211,17 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) {
buf.WriteString("If")
}

if len(f.Args) > 0 {
args := f.Args
if f.Name == FUNCTION_TOPK {
args = f.Args[len(f.Args)-1:]
} else if f.Name == FUNCTION_ANY {
args = []string{"1"}
}
if len(args) > 0 {
buf.WriteString("(")
for i, arg := range f.Args {
for i, arg := range args {
buf.WriteString(arg)
if i < len(f.Args)-1 {
if i < len(args)-1 {
buf.WriteString(", ")
}
}
Expand Down

0 comments on commit dfc74f0

Please sign in to comment.