Skip to content

Commit

Permalink
feat: support ref alias in select (#2101)
Browse files Browse the repository at this point in the history
* support alias

Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored Jul 20, 2023
1 parent cbc5996 commit 9087cae
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 32 deletions.
132 changes: 125 additions & 7 deletions internal/topo/planner/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package planner

import (
"fmt"
"sort"
"strings"

"github.com/lf-edge/ekuiper/internal/binder/function"
Expand Down Expand Up @@ -50,7 +51,9 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
isSchemaless = true
}
}

if !isSchemaless {
aliasFieldTopoSort(s, streamStmts)
}
dsn := ast.DefaultStream
if len(streamsFromStmt) == 1 {
dsn = streamStmts[0].stmt.Name
Expand Down Expand Up @@ -83,6 +86,7 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
}
if f.AName != "" {
aliasFields = append(aliasFields, &s.Fields[i])
fieldsMap.bindAlias(f.AName)
}
}
// bind alias field expressions
Expand All @@ -97,6 +101,19 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
AliasRef: ar,
}
walkErr = fieldsMap.save(f.AName, ast.AliasStream, ar)
for _, subF := range aliasFields {
ast.WalkFunc(subF, func(node ast.Node) bool {
switch fr := node.(type) {
case *ast.FieldRef:
if fr.Name == f.AName {
fr.StreamName = ast.AliasStream
fr.AliasRef = ar
}
return false
}
return true
})
}
}
}
// Bind field ref for alias AND set StreamName for all field ref
Expand Down Expand Up @@ -179,6 +196,99 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
return streamStmts, analyticFuncs, walkErr
}

type aliasTopoDegree struct {
alias string
degree int
field ast.Field
}

type aliasTopoDegrees []*aliasTopoDegree

func (a aliasTopoDegrees) Len() int {
return len(a)
}

func (a aliasTopoDegrees) Less(i, j int) bool {
return a[i].degree < a[j].degree
}

func (a aliasTopoDegrees) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}

func aliasFieldTopoSort(s *ast.SelectStatement, streamStmts []*streamInfo) {
nonAliasFields := make([]ast.Field, 0)
aliasDegreeMap := make(map[string]*aliasTopoDegree)
for _, field := range s.Fields {
if field.AName != "" {
aliasDegreeMap[field.AName] = &aliasTopoDegree{
alias: field.AName,
degree: -1,
field: field,
}
} else {
nonAliasFields = append(nonAliasFields, field)
}
}
for !isAliasFieldTopoSortFinish(aliasDegreeMap) {
for _, field := range s.Fields {
if field.AName != "" && aliasDegreeMap[field.AName].degree < 0 {
skip := false
degree := 0
ast.WalkFunc(field.Expr, func(node ast.Node) bool {
switch f := node.(type) {
case *ast.FieldRef:
if fDegree, ok := aliasDegreeMap[f.Name]; ok && fDegree.degree >= 0 {
if degree < fDegree.degree+1 {
degree = fDegree.degree + 1
}
return true
}
if !isFieldRefNameExists(f.Name, streamStmts) {
skip = true
return false
}
}
return true
})
if !skip {
aliasDegreeMap[field.AName].degree = degree
}
}
}
}
as := make(aliasTopoDegrees, 0)
for _, degree := range aliasDegreeMap {
as = append(as, degree)
}
sort.Sort(as)
s.Fields = make([]ast.Field, 0)
for _, d := range as {
s.Fields = append(s.Fields, d.field)
}
s.Fields = append(s.Fields, nonAliasFields...)
}

func isFieldRefNameExists(name string, streamStmts []*streamInfo) bool {
for _, streamStmt := range streamStmts {
for _, col := range streamStmt.schema {
if col.Name == name {
return true
}
}
}
return false
}

func isAliasFieldTopoSortFinish(aliasDegrees map[string]*aliasTopoDegree) bool {
for _, aliasDegree := range aliasDegrees {
if aliasDegree.degree < 0 {
return false
}
}
return true
}

func validate(s *ast.SelectStatement) (err error) {
isAggStmt := false
if xsql.IsAggregate(s.Condition) {
Expand Down Expand Up @@ -265,12 +375,13 @@ func convertStreamInfo(streamStmt *ast.StreamStmt) (*streamInfo, error) {

type fieldsMap struct {
content map[string]streamFieldStore
aliasNames map[string]struct{}
isSchemaless bool
defaultStream ast.StreamName
}

func newFieldsMap(isSchemaless bool, defaultStream ast.StreamName) *fieldsMap {
return &fieldsMap{content: make(map[string]streamFieldStore), isSchemaless: isSchemaless, defaultStream: defaultStream}
return &fieldsMap{content: make(map[string]streamFieldStore), aliasNames: map[string]struct{}{}, isSchemaless: isSchemaless, defaultStream: defaultStream}
}

func (f *fieldsMap) reserve(fieldName string, streamName ast.StreamName) {
Expand Down Expand Up @@ -302,20 +413,27 @@ func (f *fieldsMap) save(fieldName string, streamName ast.StreamName, field *ast
return nil
}

func (f *fieldsMap) bindAlias(aliasName string) {
f.aliasNames[aliasName] = struct{}{}
}

func (f *fieldsMap) bind(fr *ast.FieldRef) error {
lname := strings.ToLower(fr.Name)
fm, ok := f.content[lname]
if !ok {
fm, ok1 := f.content[lname]
_, ok2 := f.aliasNames[lname]
if !ok1 && !ok2 {
if f.isSchemaless && fr.Name != "" {
fm = newStreamFieldStore(f.isSchemaless, f.defaultStream)
f.content[lname] = fm
} else {
return fmt.Errorf("unknown field %s", fr.Name)
}
}
err := fm.bindRef(fr)
if err != nil {
return fmt.Errorf("%s%s", err, fr.Name)
if fm != nil {
err := fm.bindRef(fr)
if err != nil {
return fmt.Errorf("%s%s", err, fr.Name)
}
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/topo/planner/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ var tests = []struct {
r: newErrorStruct(""),
},
{ // 10
sql: `SELECT sum(temp) as temp, count(temp) as temp FROM src1`,
r: newErrorStruct("duplicate alias temp"),
sql: `SELECT sum(temp) as temp1, count(temp) as temp FROM src1`,
r: newErrorStruct("invalid argument for func count: aggregate argument is not allowed"),
},
{ // 11
sql: `SELECT sum(temp) as temp, count(temp) as ct FROM src1`,
sql: `SELECT sum(temp) as temp1, count(temp) as ct FROM src1`,
r: newErrorStruct(""),
},
{ // 12
Expand All @@ -116,7 +116,7 @@ var tests = []struct {
},
{ // 13
sql: `SELECT sin(temp) as temp1, cos(temp1) FROM src1`,
r: newErrorStructWithS("unknown field temp1", ""),
r: newErrorStruct(""),
},
{ // 14
sql: `SELECT collect(*)[-1] as current FROM src1 GROUP BY COUNTWINDOW(2, 1) HAVING isNull(current->name) = false`,
Expand Down
172 changes: 172 additions & 0 deletions internal/topo/planner/planner_alias_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import (
"encoding/json"
"reflect"
"strings"
"testing"

"github.com/gdexlab/go-render/render"

"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
)

func TestPlannerAlias(t *testing.T) {
kv, err := store.GetKV("stream")
if err != nil {
t.Error(err)
return
}
streamSqls := map[string]string{
"src1": `CREATE STREAM src1 (
) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
"src2": `CREATE STREAM src2 (
) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts");`,
"tableInPlanner": `CREATE TABLE tableInPlanner (
id BIGINT,
name STRING,
value STRING,
hum BIGINT
) WITH (TYPE="file");`,
}
types := map[string]ast.StreamType{
"src1": ast.TypeStream,
"src2": ast.TypeStream,
"tableInPlanner": ast.TypeTable,
}
for name, sql := range streamSqls {
s, err := json.Marshal(&xsql.StreamInfo{
StreamType: types[name],
Statement: sql,
})
if err != nil {
t.Error(err)
t.Fail()
}
err = kv.Set(name, string(s))
if err != nil {
t.Error(err)
t.Fail()
}
}
streams := make(map[string]*ast.StreamStmt)
for n := range streamSqls {
streamStmt, err := xsql.GetDataSource(kv, n)
if err != nil {
t.Errorf("fail to get stream %s, please check if stream is created", n)
return
}
streams[n] = streamStmt
}
aliasRef1 := &ast.AliasRef{
Expression: &ast.BinaryExpr{
OP: ast.ADD,
LHS: &ast.FieldRef{
StreamName: "src1",
Name: "a",
},
RHS: &ast.FieldRef{
StreamName: "src1",
Name: "b",
},
},
}
aliasRef1.SetRefSource([]string{"src1"})
aliasRef2 := &ast.AliasRef{
Expression: &ast.BinaryExpr{
OP: ast.ADD,
LHS: &ast.FieldRef{
StreamName: ast.AliasStream,
Name: "sum",
AliasRef: aliasRef1,
},
RHS: &ast.IntegerLiteral{
Val: 1,
},
},
}
aliasRef2.SetRefSource([]string{"src1"})

testcases := []struct {
sql string
p LogicalPlan
err string
}{
{
sql: "select a + b as sum, sum + 1 as sum2 from src1",
p: ProjectPlan{
baseLogicalPlan: baseLogicalPlan{
children: []LogicalPlan{
DataSourcePlan{
baseLogicalPlan: baseLogicalPlan{},
name: "src1",
streamFields: map[string]*ast.JsonStreamField{
"a": nil,
"b": nil,
},
streamStmt: streams["src1"],
pruneFields: []string{},
isSchemaless: true,
metaFields: []string{},
}.Init(),
},
},
fields: []ast.Field{
{
AName: "sum",
Expr: &ast.FieldRef{
StreamName: ast.AliasStream,
Name: "sum",
AliasRef: aliasRef1,
},
},
{
AName: "sum2",
Expr: &ast.FieldRef{
StreamName: ast.AliasStream,
Name: "sum2",
AliasRef: aliasRef2,
},
},
},
}.Init(),
},
}
for i, tt := range testcases {
stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
if err != nil {
t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err)
continue
}
p, _ := createLogicalPlan(stmt, &api.RuleOption{
IsEventTime: false,
LateTol: 0,
Concurrency: 0,
BufferLength: 0,
SendMetaToSink: false,
Qos: 0,
CheckpointInterval: 0,
SendError: true,
}, kv)
if !reflect.DeepEqual(tt.p, p) {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
}
}
}
Loading

0 comments on commit 9087cae

Please sign in to comment.