Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: querier uniq function supports multi params #8218

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions server/querier/engine/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ var (
}, {
input: "select Uniq(ip_0) as uniq_ip_0 from l4_flow_log limit 1",
output: []string{"SELECT uniq(if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0))) AS `uniq_ip_0` FROM flow_log.`l4_flow_log` LIMIT 1"},
}, {
input: "select Uniq(ip_0, region_0, region_id_0) as uniq_0 from l4_flow_log limit 1",
output: []string{"SELECT uniq((if(is_ipv4=1, IPv4NumToString(ip4_0), IPv6NumToString(ip6_0)), dictGet('flow_tag.region_map', 'name', (toUInt64(region_id_0))), region_id_0)) AS `uniq_0` FROM flow_log.`l4_flow_log` LIMIT 1"},
}, {
input: "select Max(byte) as max_byte, Sum(log_count) as sum_log_count from l4_flow_log having Sum(byte)>=0 limit 1",
output: []string{"SELECT MAX(byte_tx+byte_rx) AS `max_byte`, SUM(1) AS `sum_log_count` FROM flow_log.`l4_flow_log` HAVING SUM(byte_tx+byte_rx) >= 0 LIMIT 1"},
Expand Down Expand Up @@ -339,7 +342,7 @@ var (
name: "topk_2",
db: "flow_metrics",
input: "select pod_ns, topK(pod, pod_cluster_id, service_id, 10) from `vtap_app_port.1h` WHERE time>=1694069050 AND time<=1694990640 group by pod_ns limit 10",
output: []string{"SELECT dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, topK(10)((dictGet('flow_tag.pod_map', 'name', (toUInt64(pod_id))),pod_cluster_id,service_id)) FROM flow_metrics.`application.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10"},
output: []string{"SELECT dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, topK(10)(dictGet('flow_tag.pod_map', 'name', (toUInt64(pod_id))),pod_cluster_id,service_id) FROM flow_metrics.`application.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10"},
}, {
name: "topk_enum",
db: "flow_log",
Expand All @@ -359,7 +362,7 @@ var (
name: "any_2",
db: "flow_metrics",
input: "select pod_ns, any(pod, pod_cluster_id, service_id) from `vtap_app_port.1h` WHERE time>=1694069050 AND time<=1694990640 group by pod_ns limit 10",
output: []string{"SELECT dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, any((dictGet('flow_tag.pod_map', 'name', (toUInt64(pod_id))),pod_cluster_id,service_id)) FROM flow_metrics.`application.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10"},
output: []string{"SELECT dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns`, any(dictGet('flow_tag.pod_map', 'name', (toUInt64(pod_id))),pod_cluster_id,service_id) FROM flow_metrics.`application.1h` WHERE `time` >= 1694069050 AND `time` <= 1694990640 AND (pod_ns_id!=0) GROUP BY dictGet('flow_tag.pod_ns_map', 'name', (toUInt64(pod_ns_id))) AS `pod_ns` LIMIT 10"},
}, {
input: "SELECT is_internet_0, is_internet_1 FROM l4_flow_log GROUP BY is_internet_0, is_internet_1 limit 1",
output: []string{"SELECT if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` FROM flow_log.`l4_flow_log` GROUP BY if(l3_epc_id_0=-2,1,0) AS `is_internet_0`, if(l3_epc_id_1=-2,1,0) AS `is_internet_1` LIMIT 1"},
Expand Down
53 changes: 51 additions & 2 deletions server/querier/engine/clickhouse/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin
derivativeGroupBy := e.DerivativeGroupBy
if name == view.FUNCTION_TOPK || name == view.FUNCTION_ANY {
return GetTopKTrans(name, args, alias, e)
} else if name == view.FUNCTION_UNIQ || name == view.FUNCTION_UNIQ_EXACT {
return GetUniqTrans(name, args, alias, e)
}

var levelFlag int
Expand Down Expand Up @@ -242,6 +244,53 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem
}, levelFlag, unit, nil
}

func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statement, int, string, error) {
db := e.DB
fields := args

function, ok := metrics.METRICS_FUNCTIONS_MAP[name]
if !ok {
return nil, 0, "", nil
}

levelFlag := view.MODEL_METRICS_LEVEL_FLAG_UNLAY
fieldsLen := len(fields)
dbFields := make([]string, fieldsLen)

var metricStruct *metrics.Metrics
for i, field := range fields {
field = strings.Trim(field, "`")
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
dbFields[i] = metricStruct.DBField

// judge whether the operator supports single layer
if levelFlag == view.MODEL_METRICS_LEVEL_FLAG_UNLAY && db != chCommon.DB_NAME_FLOW_LOG {
unlayFuns := metrics.METRICS_TYPE_UNLAY_FUNCTIONS[metricStruct.Type]
if !common.IsValueInSliceString(name, unlayFuns) {
levelFlag = view.MODEL_METRICS_LEVEL_FLAG_LAYERED
}
}
}

metricStructCopy := *metricStruct
metricStructCopy.DBField = strings.Join(dbFields, ", ")
if fieldsLen > 1 {
metricStructCopy.DBField = "(" + metricStructCopy.DBField + ")"
}

unit := strings.ReplaceAll(function.UnitOverwrite, "$unit", metricStruct.Unit)

return &AggFunction{
Metrics: &metricStructCopy,
Name: name,
Args: args,
Alias: alias,
}, levelFlag, unit, nil
}

func GetBinaryFunc(name string, args []Function) (*BinaryFunction, error) {
return &BinaryFunction{
Name: name,
Expand Down Expand Up @@ -915,8 +964,8 @@ func (f *TagFunction) Trans(m *view.Model) view.Node {
if len(fields) > 1 {
if f.Name == "if" {
withValue = fmt.Sprintf("%s(%s)", f.Name, strings.Join(values, ","))
} else if strings.HasPrefix(f.Name, "topK") || strings.HasPrefix(f.Name, "any") {
withValue = fmt.Sprintf("%s((%s))", f.Name, strings.Join(values, ","))
} else if strings.HasPrefix(f.Name, "topK") || strings.HasPrefix(f.Name, "any") || strings.HasPrefix(f.Name, "uniq") {
withValue = fmt.Sprintf("%s(%s)", f.Name, strings.Join(values, ","))
} else {
withValue = fmt.Sprintf("%s([%s])", f.Name, strings.Join(values, ","))
}
Expand Down
2 changes: 1 addition & 1 deletion server/querier/engine/clickhouse/view/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (f *DefaultFunction) WriteTo(buf *bytes.Buffer) {
args := f.Args
if f.Name == FUNCTION_TOPK {
args = f.Args[len(f.Args)-1:]
} else if f.Name == FUNCTION_ANY {
} else if f.Name == FUNCTION_ANY || f.Name == FUNCTION_UNIQ || f.Name == FUNCTION_UNIQ_EXACT {
args = nil
}
if len(args) > 0 {
Expand Down
Loading