From 455fe8638b14897e78aad2b5d6b8b122a6b8a0cb Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 14 Jan 2025 09:18:00 -0600 Subject: [PATCH 1/7] Cherry-picked the engine primitive for values join Signed-off-by: Florent Poinsard --- go/vt/proto/query/query.pb.go | 11 ++- go/vt/vtgate/engine/cached_size.go | 43 +++++++++++ go/vt/vtgate/engine/join_values.go | 119 +++++++++++++++++++++++++++++ proto/query.proto | 4 + web/vtadmin/src/proto/vtadmin.d.ts | 3 +- web/vtadmin/src/proto/vtadmin.js | 32 ++++++++ 6 files changed, 209 insertions(+), 3 deletions(-) create mode 100644 go/vt/vtgate/engine/join_values.go diff --git a/go/vt/proto/query/query.pb.go b/go/vt/proto/query/query.pb.go index a03e5fa9025..a168cc2d363 100644 --- a/go/vt/proto/query/query.pb.go +++ b/go/vt/proto/query/query.pb.go @@ -319,7 +319,11 @@ const ( // Properties: 35, IsQuoted. Type_VECTOR Type = 2083 // RAW specifies a type which won't be quoted but the value used as-is while encoding. + // Properties: 36, None. Type_RAW Type = 2084 + // ROW_TUPLE represents multiple rows. + // Properties: 37, None. + Type_ROW_TUPLE Type = 2085 ) // Enum value maps for Type. @@ -362,6 +366,7 @@ var ( 4130: "BITNUM", 2083: "VECTOR", 2084: "RAW", + 2085: "ROW_TUPLE", } Type_value = map[string]int32{ "NULL_TYPE": 0, @@ -401,6 +406,7 @@ var ( "BITNUM": 4130, "VECTOR": 2083, "RAW": 2084, + "ROW_TUPLE": 2085, } ) @@ -6751,7 +6757,7 @@ var file_query_proto_rawDesc = []byte{ 0x41, 0x54, 0x10, 0x80, 0x08, 0x12, 0x0d, 0x0a, 0x08, 0x49, 0x53, 0x51, 0x55, 0x4f, 0x54, 0x45, 0x44, 0x10, 0x80, 0x10, 0x12, 0x0b, 0x0a, 0x06, 0x49, 0x53, 0x54, 0x45, 0x58, 0x54, 0x10, 0x80, 0x20, 0x12, 0x0d, 0x0a, 0x08, 0x49, 0x53, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x80, 0x40, - 0x2a, 0xd7, 0x03, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0d, 0x0a, 0x09, 0x4e, 0x55, 0x4c, + 0x2a, 0xe7, 0x03, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0d, 0x0a, 0x09, 0x4e, 0x55, 0x4c, 0x4c, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x04, 0x49, 0x4e, 0x54, 0x38, 0x10, 0x81, 0x02, 0x12, 0x0a, 0x0a, 0x05, 0x55, 0x49, 0x4e, 0x54, 0x38, 0x10, 0x82, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x31, 0x36, 0x10, 0x83, 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, @@ -6780,7 +6786,8 @@ var file_query_proto_rawDesc = []byte{ 0x48, 0x45, 0x58, 0x4e, 0x55, 0x4d, 0x10, 0xa0, 0x20, 0x12, 0x0b, 0x0a, 0x06, 0x48, 0x45, 0x58, 0x56, 0x41, 0x4c, 0x10, 0xa1, 0x20, 0x12, 0x0b, 0x0a, 0x06, 0x42, 0x49, 0x54, 0x4e, 0x55, 0x4d, 0x10, 0xa2, 0x20, 0x12, 0x0b, 0x0a, 0x06, 0x56, 0x45, 0x43, 0x54, 0x4f, 0x52, 0x10, 0xa3, 0x10, - 0x12, 0x08, 0x0a, 0x03, 0x52, 0x41, 0x57, 0x10, 0xa4, 0x10, 0x2a, 0x36, 0x0a, 0x10, 0x53, 0x74, + 0x12, 0x08, 0x0a, 0x03, 0x52, 0x41, 0x57, 0x10, 0xa4, 0x10, 0x12, 0x0e, 0x0a, 0x09, 0x52, 0x4f, + 0x57, 0x5f, 0x54, 0x55, 0x50, 0x4c, 0x45, 0x10, 0xa5, 0x10, 0x2a, 0x36, 0x0a, 0x10, 0x53, 0x74, 0x61, 0x72, 0x74, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x46, 0x61, 0x69, 0x6c, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 50d3a4b6bbf..d176b99a839 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1475,6 +1475,49 @@ func (cached *VStream) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Position))) return size } + +//go:nocheckptr +func (cached *ValuesJoin) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(80) + } + // field Left vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Left.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Right vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Right.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Vars map[string]int + if cached.Vars != nil { + size += int64(48) + hmap := reflect.ValueOf(cached.Vars) + numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9))))))) + numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10)))) + size += hack.RuntimeAllocSize(int64(numOldBuckets * 208)) + if len(cached.Vars) > 0 || numBuckets > 1 { + size += hack.RuntimeAllocSize(int64(numBuckets * 208)) + } + for k := range cached.Vars { + size += hack.RuntimeAllocSize(int64(len(k))) + } + } + // field Columns []string + { + size += hack.RuntimeAllocSize(int64(cap(cached.Columns)) * int64(16)) + for _, elem := range cached.Columns { + size += hack.RuntimeAllocSize(int64(len(elem))) + } + } + // field RowConstructorArg string + size += hack.RuntimeAllocSize(int64(len(cached.RowConstructorArg))) + return size +} func (cached *Verify) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) diff --git a/go/vt/vtgate/engine/join_values.go b/go/vt/vtgate/engine/join_values.go new file mode 100644 index 00000000000..e35addd8c09 --- /dev/null +++ b/go/vt/vtgate/engine/join_values.go @@ -0,0 +1,119 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +var _ Primitive = (*ValuesJoin)(nil) + +// ValuesJoin is a primitive that joins two primitives by constructing a table from the rows of the LHS primitive. +// The table is passed in as a bind variable to the RHS primitive. +// It's called ValuesJoin because the LHS of the join is sent to the RHS as a VALUES clause. +type ValuesJoin struct { + // Left and Right are the LHS and RHS primitives + // of the Join. They can be any primitive. + Left, Right Primitive + + Vars map[string]int + Columns []string + RowConstructorArg string +} + +// TryExecute performs a non-streaming exec. +func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + lresult, err := vcursor.ExecutePrimitive(ctx, jv.Left, bindVars, wantfields) + if err != nil { + return nil, err + } + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + if len(lresult.Rows) == 0 && wantfields { + // If there are no rows, we still need to construct a single row + // to send down to RHS for Values Table to execute correctly. + // It will be used to execute the field query to provide the output fields. + var vals []sqltypes.Value + for _, field := range lresult.Fields { + val, _ := sqltypes.NewValue(field.Type, nil) + vals = append(vals, val) + } + bv.Values = append(bv.Values, sqltypes.TupleToProto(vals)) + + bindVars[jv.RowConstructorArg] = bv + return jv.Right.GetFields(ctx, vcursor, bindVars) + } + + for _, row := range lresult.Rows { + bv.Values = append(bv.Values, sqltypes.TupleToProto(row)) + } + bindVars[jv.RowConstructorArg] = bv + return vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields) +} + +// TryStreamExecute performs a streaming exec. +func (jv *ValuesJoin) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + panic("implement me") +} + +// GetFields fetches the field info. +func (jv *ValuesJoin) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return jv.Right.GetFields(ctx, vcursor, bindVars) +} + +// Inputs returns the input primitives for this join +func (jv *ValuesJoin) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{jv.Left, jv.Right}, nil +} + +// RouteType returns a description of the query routing type used by the primitive +func (jv *ValuesJoin) RouteType() string { + return "ValuesJoin" +} + +// GetKeyspaceName specifies the Keyspace that this primitive routes to. +func (jv *ValuesJoin) GetKeyspaceName() string { + if jv.Left.GetKeyspaceName() == jv.Right.GetKeyspaceName() { + return jv.Left.GetKeyspaceName() + } + return jv.Left.GetKeyspaceName() + "_" + jv.Right.GetKeyspaceName() +} + +// GetTableName specifies the table that this primitive routes to. +func (jv *ValuesJoin) GetTableName() string { + return jv.Left.GetTableName() + "_" + jv.Right.GetTableName() +} + +// NeedsTransaction implements the Primitive interface +func (jv *ValuesJoin) NeedsTransaction() bool { + return jv.Right.NeedsTransaction() || jv.Left.NeedsTransaction() +} + +func (jv *ValuesJoin) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Join", + Variant: "Values", + Other: map[string]any{ + "ValuesArg": jv.RowConstructorArg, + "Vars": jv.Vars, + }, + } +} diff --git a/proto/query.proto b/proto/query.proto index 27a04d77f0a..6c088566fc1 100644 --- a/proto/query.proto +++ b/proto/query.proto @@ -219,7 +219,11 @@ enum Type { // Properties: 35, IsQuoted. VECTOR = 2083; // RAW specifies a type which won't be quoted but the value used as-is while encoding. + // Properties: 36, None. RAW = 2084; + // ROW_TUPLE represents multiple rows. + // Properties: 37, None. + ROW_TUPLE = 2085; } // Value represents a typed value. diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index adb02034c92..234c7e0f323 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -40380,7 +40380,8 @@ export namespace query { HEXVAL = 4129, BITNUM = 4130, VECTOR = 2083, - RAW = 2084 + RAW = 2084, + ROW_TUPLE = 2085 } /** Properties of a Value. */ diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index d71d3e08fe5..f1935b2a9b3 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -95915,6 +95915,7 @@ export const query = $root.query = (() => { * @property {number} BITNUM=4130 BITNUM value * @property {number} VECTOR=2083 VECTOR value * @property {number} RAW=2084 RAW value + * @property {number} ROW_TUPLE=2085 ROW_TUPLE value */ query.Type = (function() { const valuesById = {}, values = Object.create(valuesById); @@ -95955,6 +95956,7 @@ export const query = $root.query = (() => { values[valuesById[4130] = "BITNUM"] = 4130; values[valuesById[2083] = "VECTOR"] = 2083; values[valuesById[2084] = "RAW"] = 2084; + values[valuesById[2085] = "ROW_TUPLE"] = 2085; return values; })(); @@ -96145,6 +96147,7 @@ export const query = $root.query = (() => { case 4130: case 2083: case 2084: + case 2085: break; } if (message.value != null && message.hasOwnProperty("value")) @@ -96320,6 +96323,10 @@ export const query = $root.query = (() => { case 2084: message.type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.type = 2085; + break; } if (object.value != null) if (typeof object.value === "string") @@ -96594,6 +96601,7 @@ export const query = $root.query = (() => { case 4130: case 2083: case 2084: + case 2085: break; } if (message.value != null && message.hasOwnProperty("value")) @@ -96778,6 +96786,10 @@ export const query = $root.query = (() => { case 2084: message.type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.type = 2085; + break; } if (object.value != null) if (typeof object.value === "string") @@ -98345,6 +98357,7 @@ export const query = $root.query = (() => { case 4130: case 2083: case 2084: + case 2085: break; } if (message.table != null && message.hasOwnProperty("table")) @@ -98546,6 +98559,10 @@ export const query = $root.query = (() => { case 2084: message.type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.type = 2085; + break; } if (object.table != null) message.table = String(object.table); @@ -115806,6 +115823,7 @@ export const query = $root.query = (() => { case 4130: case 2083: case 2084: + case 2085: break; } return null; @@ -115982,6 +116000,10 @@ export const query = $root.query = (() => { case 2084: message.return_type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.return_type = 2085; + break; } return message; }; @@ -119616,6 +119638,7 @@ export const vschema = $root.vschema = (() => { case 4130: case 2083: case 2084: + case 2085: break; } return null; @@ -119790,6 +119813,10 @@ export const vschema = $root.vschema = (() => { case 2084: message.tenant_id_column_type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.tenant_id_column_type = 2085; + break; } return message; }; @@ -121332,6 +121359,7 @@ export const vschema = $root.vschema = (() => { case 4130: case 2083: case 2084: + case 2085: break; } if (message.invisible != null && message.hasOwnProperty("invisible")) @@ -121533,6 +121561,10 @@ export const vschema = $root.vschema = (() => { case 2084: message.type = 2084; break; + case "ROW_TUPLE": + case 2085: + message.type = 2085; + break; } if (object.invisible != null) message.invisible = Boolean(object.invisible); From bd6d67f870a5b74c8e80c0df03485a663c61df8a Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 15 Jan 2025 09:35:59 -0600 Subject: [PATCH 2/7] Enhance ValuesJoin engine and improve unit test Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/cached_size.go | 34 +++---- go/vt/vtgate/engine/fake_primitive_test.go | 13 ++- go/vt/vtgate/engine/fake_vcursor_test.go | 47 +++++++++- go/vt/vtgate/engine/join.go | 4 +- go/vt/vtgate/engine/join_values.go | 41 ++++++++- go/vt/vtgate/engine/join_values_test.go | 101 +++++++++++++++++++++ 6 files changed, 205 insertions(+), 35 deletions(-) create mode 100644 go/vt/vtgate/engine/join_values_test.go diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index d176b99a839..4aec4b70ecc 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1475,15 +1475,13 @@ func (cached *VStream) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Position))) return size } - -//go:nocheckptr func (cached *ValuesJoin) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) } size := int64(0) if alloc { - size += int64(80) + size += int64(128) } // field Left vitess.io/vitess/go/vt/vtgate/engine.Primitive if cc, ok := cached.Left.(cachedObject); ok { @@ -1493,29 +1491,23 @@ func (cached *ValuesJoin) CachedSize(alloc bool) int64 { if cc, ok := cached.Right.(cachedObject); ok { size += cc.CachedSize(true) } - // field Vars map[string]int - if cached.Vars != nil { - size += int64(48) - hmap := reflect.ValueOf(cached.Vars) - numBuckets := int(math.Pow(2, float64((*(*uint8)(unsafe.Pointer(hmap.Pointer() + uintptr(9))))))) - numOldBuckets := (*(*uint16)(unsafe.Pointer(hmap.Pointer() + uintptr(10)))) - size += hack.RuntimeAllocSize(int64(numOldBuckets * 208)) - if len(cached.Vars) > 0 || numBuckets > 1 { - size += hack.RuntimeAllocSize(int64(numBuckets * 208)) - } - for k := range cached.Vars { - size += hack.RuntimeAllocSize(int64(len(k))) - } + // field Vars []int + { + size += hack.RuntimeAllocSize(int64(cap(cached.Vars)) * int64(8)) + } + // field RowConstructorArg string + size += hack.RuntimeAllocSize(int64(len(cached.RowConstructorArg))) + // field Cols []int + { + size += hack.RuntimeAllocSize(int64(cap(cached.Cols)) * int64(8)) } - // field Columns []string + // field ColNames []string { - size += hack.RuntimeAllocSize(int64(cap(cached.Columns)) * int64(16)) - for _, elem := range cached.Columns { + size += hack.RuntimeAllocSize(int64(cap(cached.ColNames)) * int64(16)) + for _, elem := range cached.ColNames { size += hack.RuntimeAllocSize(int64(len(elem))) } } - // field RowConstructorArg string - size += hack.RuntimeAllocSize(int64(len(cached.RowConstructorArg))) return size } func (cached *Verify) CachedSize(alloc bool) int64 { diff --git a/go/vt/vtgate/engine/fake_primitive_test.go b/go/vt/vtgate/engine/fake_primitive_test.go index f3ab5ad5336..bddbca87664 100644 --- a/go/vt/vtgate/engine/fake_primitive_test.go +++ b/go/vt/vtgate/engine/fake_primitive_test.go @@ -46,6 +46,8 @@ type fakePrimitive struct { allResultsInOneCall bool async bool + + useNewPrintBindVars bool } func (f *fakePrimitive) Inputs() ([]Primitive, []map[string]any) { @@ -72,7 +74,12 @@ func (f *fakePrimitive) GetTableName() string { } func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields)) + if f.useNewPrintBindVars { + f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields)) + } else { + f.log = append(f.log, fmt.Sprintf("Execute %v %v", deprecatedPrintBindVars(bindVars), wantfields)) + } + if f.results == nil { return nil, f.sendErr } @@ -87,7 +94,7 @@ func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVar func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { if !f.noLog { - f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields)) + f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", deprecatedPrintBindVars(bindVars), wantfields)) } if f.results == nil { return f.sendErr @@ -171,7 +178,7 @@ func (f *fakePrimitive) asyncCall(callback func(*sqltypes.Result) error) error { } func (f *fakePrimitive) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars))) + f.log = append(f.log, fmt.Sprintf("GetFields %v", deprecatedPrintBindVars(bindVars))) return f.TryExecute(ctx, vcursor, bindVars, true /* wantfields */) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index aac3e9b584c..3ac62ddffd9 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -597,7 +597,7 @@ func (f *loggingVCursor) Execute(ctx context.Context, method string, query strin case vtgatepb.CommitOrder_AUTOCOMMIT: name = "ExecuteAutocommit" } - f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError)) + f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, deprecatedPrintBindVars(bindvars), rollbackOnError)) return f.nextResult() } @@ -621,7 +621,7 @@ func (f *loggingVCursor) AutocommitApproval() bool { } func (f *loggingVCursor) ExecuteStandalone(ctx context.Context, _ Primitive, query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, fetchLastInsertID bool) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, printBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard)) + f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, deprecatedPrintBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard)) return f.nextResult() } @@ -943,6 +943,24 @@ func expectResultAnyOrder(t *testing.T, result, want *sqltypes.Result) { } } +// deprecatedPrintBindVars does not print bind variables, specifically tuples, correctly. +// We should use printBindVars instead. +func deprecatedPrintBindVars(bindvars map[string]*querypb.BindVariable) string { + var keys []string + for k := range bindvars { + keys = append(keys, k) + } + sort.Strings(keys) + buf := &bytes.Buffer{} + for i, k := range keys { + if i > 0 { + fmt.Fprintf(buf, " ") + } + fmt.Fprintf(buf, "%s: %v", k, bindvars[k]) + } + return buf.String() +} + func printBindVars(bindvars map[string]*querypb.BindVariable) string { var keys []string for k := range bindvars { @@ -954,6 +972,27 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string { if i > 0 { fmt.Fprintf(buf, " ") } + + if bindvars[k].Type == querypb.Type_TUPLE { + fmt.Fprintf(buf, "%s: [", k) + for _, val := range bindvars[k].Values { + if val.Type != querypb.Type_TUPLE { + fmt.Fprintf(buf, "[%s]", val.String()) + continue + } + var s []string + v := sqltypes.ProtoToValue(val) + err := v.ForEachValue(func(bv sqltypes.Value) { + s = append(s, bv.String()) + }) + if err != nil { + panic(err) + } + fmt.Fprintf(buf, "[%s]", strings.Join(s, " ")) + } + fmt.Fprintf(buf, "]") + continue + } fmt.Fprintf(buf, "%s: %v", k, bindvars[k]) } return buf.String() @@ -962,7 +1001,7 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string { func printResolvedShardQueries(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) string { buf := &bytes.Buffer{} for i, rs := range rss { - fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, printBindVars(queries[i].BindVariables)) + fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, deprecatedPrintBindVars(queries[i].BindVariables)) } return buf.String() } @@ -970,7 +1009,7 @@ func printResolvedShardQueries(rss []*srvtopo.ResolvedShard, queries []*querypb. func printResolvedShardsBindVars(rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable) string { buf := &bytes.Buffer{} for i, rs := range rss { - fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, printBindVars(bvs[i])) + fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, deprecatedPrintBindVars(bvs[i])) } return buf.String() } diff --git a/go/vt/vtgate/engine/join.go b/go/vt/vtgate/engine/join.go index 51976396cba..8134d78ff4a 100644 --- a/go/vt/vtgate/engine/join.go +++ b/go/vt/vtgate/engine/join.go @@ -220,10 +220,10 @@ func joinFields(lfields, rfields []*querypb.Field, cols []int) []*querypb.Field fields := make([]*querypb.Field, len(cols)) for i, index := range cols { if index < 0 { - fields[i] = lfields[-index-1] + fields[i] = lfields[-index-1].CloneVT() continue } - fields[i] = rfields[index-1] + fields[i] = rfields[index-1].CloneVT() } return fields } diff --git a/go/vt/vtgate/engine/join_values.go b/go/vt/vtgate/engine/join_values.go index e35addd8c09..0d341c362df 100644 --- a/go/vt/vtgate/engine/join_values.go +++ b/go/vt/vtgate/engine/join_values.go @@ -21,6 +21,7 @@ import ( "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vterrors" ) var _ Primitive = (*ValuesJoin)(nil) @@ -33,9 +34,10 @@ type ValuesJoin struct { // of the Join. They can be any primitive. Left, Right Primitive - Vars map[string]int - Columns []string + Vars []int RowConstructorArg string + Cols []int + ColNames []string } // TryExecute performs a non-streaming exec. @@ -62,11 +64,40 @@ func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars return jv.Right.GetFields(ctx, vcursor, bindVars) } - for _, row := range lresult.Rows { - bv.Values = append(bv.Values, sqltypes.TupleToProto(row)) + for i, row := range lresult.Rows { + newRow := make(sqltypes.Row, 0, len(jv.Vars)+1) // +1 since we always add the row ID + newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID + + for _, loffset := range jv.Vars { + newRow = append(newRow, row[loffset]) + } + + bv.Values = append(bv.Values, sqltypes.TupleToProto(newRow)) } + bindVars[jv.RowConstructorArg] = bv - return vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields) + rresult, err := vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields) + if err != nil { + return nil, err + } + + result := &sqltypes.Result{} + + result.Fields = joinFields(lresult.Fields, rresult.Fields, jv.Cols) + for i := range result.Fields { + result.Fields[i].Name = jv.ColNames[i] + } + + for _, rrow := range rresult.Rows { + lhsRowID, err := rrow[len(rrow)-1].ToCastInt64() + if err != nil { + return nil, vterrors.VT13001("values joins cannot fetch lhs row ID: " + err.Error()) + } + + result.Rows = append(result.Rows, joinRows(lresult.Rows[lhsRowID], rrow, jv.Cols)) + } + + return result, nil } // TryStreamExecute performs a streaming exec. diff --git a/go/vt/vtgate/engine/join_values_test.go b/go/vt/vtgate/engine/join_values_test.go new file mode 100644 index 00000000000..068259a4e3e --- /dev/null +++ b/go/vt/vtgate/engine/join_values_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +func TestJoinValuesExecute(t *testing.T) { + + /* + select col1, col2, col3, col4, col5, col6 from left join right on left.col1 = right.col4 + LHS: select col1, col2, col3 from left + RHS: select col5, col6, id from (values row(1,2), ...) left(id,col1) join right on left.col1 = right.col4 + */ + + leftPrim := &fakePrimitive{ + useNewPrintBindVars: true, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col1|col2|col3", + "int64|varchar|varchar", + ), + "1|a|aa", + "2|b|bb", + "3|c|cc", + "4|d|dd", + ), + }, + } + rightPrim := &fakePrimitive{ + useNewPrintBindVars: true, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col5|col6|id", + "varchar|varchar|int64", + ), + "d|dd|0", + "e|ee|1", + "f|ff|2", + "g|gg|3", + ), + }, + } + + bv := map[string]*querypb.BindVariable{ + "a": sqltypes.Int64BindVariable(10), + } + + vjn := &ValuesJoin{ + Left: leftPrim, + Right: rightPrim, + Vars: []int{0}, + RowConstructorArg: "v", + Cols: []int{-1, -2, -3, -1, 1, 2}, + ColNames: []string{"col1", "col2", "col3", "col4", "col5", "col6"}, + } + + r, err := vjn.TryExecute(context.Background(), &noopVCursor{}, bv, true) + require.NoError(t, err) + leftPrim.ExpectLog(t, []string{ + `Execute a: type:INT64 value:"10" true`, + }) + rightPrim.ExpectLog(t, []string{ + `Execute a: type:INT64 value:"10" v: [[INT64(0) INT64(1)][INT64(1) INT64(2)][INT64(2) INT64(3)][INT64(3) INT64(4)]] true`, + }) + + result := sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col1|col2|col3|col4|col5|col6", + "int64|varchar|varchar|int64|varchar|varchar", + ), + "1|a|aa|1|d|dd", + "2|b|bb|2|e|ee", + "3|c|cc|3|f|ff", + "4|d|dd|4|g|gg", + ) + expectResult(t, r, result) +} From b4f62e34b5feb3c3c4727919e5133c705a15613e Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 22 Jan 2025 11:46:51 -0600 Subject: [PATCH 3/7] wip - dirty implementation of findRoute for Values Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/delete_test.go | 36 ++++---- go/vt/vtgate/engine/dml_with_input_test.go | 8 +- go/vt/vtgate/engine/fk_cascade_test.go | 6 +- go/vt/vtgate/engine/fk_verify_test.go | 6 +- go/vt/vtgate/engine/insert_test.go | 54 ++++++------ go/vt/vtgate/engine/routing.go | 84 ++++++++++++++++++- go/vt/vtgate/engine/routing_parameter_test.go | 65 ++++++++++++++ go/vt/vtgate/engine/update_test.go | 50 +++++------ go/vt/vtgate/evalengine/eval.go | 10 +++ go/vt/vtgate/evalengine/eval_tuple.go | 10 ++- 10 files changed, 247 insertions(+), 82 deletions(-) create mode 100644 go/vt/vtgate/engine/routing_parameter_test.go diff --git a/go/vt/vtgate/engine/delete_test.go b/go/vt/vtgate/engine/delete_test.go index 18dcef5cbe4..56d3467aac3 100644 --- a/go/vt/vtgate/engine/delete_test.go +++ b/go/vt/vtgate/engine/delete_test.go @@ -45,7 +45,7 @@ func TestDeleteUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -80,7 +80,7 @@ func TestDeleteEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -112,7 +112,7 @@ func TestDeleteEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -148,7 +148,7 @@ func TestDeleteEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -181,7 +181,7 @@ func TestDeleteEqualNoScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)") } @@ -213,7 +213,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -231,7 +231,7 @@ func TestDeleteOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -252,7 +252,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -300,7 +300,7 @@ func TestDeleteOwnedVindexMultiCol(t *testing.T) { "1|2|4", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -371,7 +371,7 @@ func TestDeleteSharded(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -399,7 +399,7 @@ func TestDeleteShardedStreaming(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -435,7 +435,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -453,7 +453,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -475,7 +475,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -528,7 +528,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) { "1|3|6", "2|3|7", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -565,7 +565,7 @@ func TestDeleteEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -602,7 +602,7 @@ func TestDeleteMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -635,7 +635,7 @@ func TestDeleteInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/dml_with_input_test.go b/go/vt/vtgate/engine/dml_with_input_test.go index 6fcf2040dfc..38d9068b433 100644 --- a/go/vt/vtgate/engine/dml_with_input_test.go +++ b/go/vt/vtgate/engine/dml_with_input_test.go @@ -51,7 +51,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) { OutputCols: [][]int{{0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -95,7 +95,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) { OutputCols: [][]int{{1, 0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -160,7 +160,7 @@ func TestDeleteWithMultiTarget(t *testing.T) { OutputCols: [][]int{{0}, {1, 2}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -210,7 +210,7 @@ func TestUpdateWithInputNonLiteral(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = []*sqltypes.Result{ {RowsAffected: 1}, {RowsAffected: 1}, {RowsAffected: 1}, } diff --git a/go/vt/vtgate/engine/fk_cascade_test.go b/go/vt/vtgate/engine/fk_cascade_test.go index 942fe44a709..c93e487067b 100644 --- a/go/vt/vtgate/engine/fk_cascade_test.go +++ b/go/vt/vtgate/engine/fk_cascade_test.go @@ -62,7 +62,7 @@ func TestDeleteCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -123,7 +123,7 @@ func TestUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -195,7 +195,7 @@ func TestNonLiteralUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/fk_verify_test.go b/go/vt/vtgate/engine/fk_verify_test.go index 5c9ff83c2ec..465dd81d3b2 100644 --- a/go/vt/vtgate/engine/fk_verify_test.go +++ b/go/vt/vtgate/engine/fk_verify_test.go @@ -58,7 +58,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("foreign key verification success", func(t *testing.T) { fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64")) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("parent foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot add or update a child row: a foreign key constraint fails") @@ -105,7 +105,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("child foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot delete or update a parent row: a foreign key constraint fails") diff --git a/go/vt/vtgate/engine/insert_test.go b/go/vt/vtgate/engine/insert_test.go index 2de95e5d186..5e66649f82e 100644 --- a/go/vt/vtgate/engine/insert_test.go +++ b/go/vt/vtgate/engine/insert_test.go @@ -42,7 +42,7 @@ func TestInsertUnsharded(t *testing.T) { "dummy_insert", ) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{{ InsertID: 4, }} @@ -91,7 +91,7 @@ func TestInsertUnshardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -144,7 +144,7 @@ func TestInsertUnshardedGenerate_Zeros(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -215,7 +215,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -254,7 +254,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -297,7 +297,7 @@ func TestInsertShardedSimple(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -366,7 +366,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { }, }}}, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{ @@ -412,7 +412,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, }, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -457,7 +457,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -590,7 +590,7 @@ func TestInsertShardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -715,7 +715,7 @@ func TestInsertShardedOwned(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -807,7 +807,7 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -893,7 +893,7 @@ func TestInsertShardedGeo(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -1029,7 +1029,7 @@ func TestInsertShardedIgnoreOwned(t *testing.T) { "\x00", ) noresult := &sqltypes.Result{} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ // primary vindex lookups: fail row 2. @@ -1147,7 +1147,7 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { ), "\x00", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} vc.results = []*sqltypes.Result{ ksid0, @@ -1267,7 +1267,7 @@ func TestInsertShardedUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1381,7 +1381,7 @@ func TestInsertShardedIgnoreUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1472,7 +1472,7 @@ func TestInsertShardedIgnoreUnownedVerifyFail(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, `values [[INT64(2)]] for column [c3] does not map to keyspace ids`) @@ -1578,7 +1578,7 @@ func TestInsertShardedUnownedReverseMap(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1663,7 +1663,7 @@ func TestInsertShardedUnownedReverseMapSuccess(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -1694,7 +1694,7 @@ func TestInsertSelectSimple(t *testing.T) { Keyspace: ks.Keyspace}} ins := newInsertSelect(false, ks.Keyspace, ks.Tables["t1"], "prefix ", nil, [][]int{{1}}, rb) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1787,7 +1787,7 @@ func TestInsertSelectOwned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1894,7 +1894,7 @@ func TestInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -1987,7 +1987,7 @@ func TestStreamingInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2082,7 +2082,7 @@ func TestInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2169,7 +2169,7 @@ func TestStreamingInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2258,7 +2258,7 @@ func TestInsertSelectUnowned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1", "3", "2"), diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index 067278c1a93..47d4cea04a6 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -21,6 +21,9 @@ import ( "encoding/json" "strconv" + "golang.org/x/exp/maps" + + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -74,6 +77,8 @@ const ( // Is used when the query explicitly sets a target destination: // in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1 ByDestination + // Values // TODO + Values ) var opName = map[Opcode]string{ @@ -90,6 +95,7 @@ var opName = map[Opcode]string{ None: "None", ByDestination: "ByDestination", SubShard: "SubShard", + Values: "Values", } // MarshalJSON serializes the Opcode as a JSON string. @@ -176,6 +182,76 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin default: return rp.multiEqual(ctx, vcursor, bindVars) } + case Values: + switch rp.Vindex.(type) { + case vindexes.MultiColumn: + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported multi column vindex for values") + default: + if len(rp.Values) < 2 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values") + } + env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) + value, err := env.Evaluate(rp.Values[0]) + if err != nil { + return nil, nil, err + } + + rval, ok := rp.Values[0].(*evalengine.BindVariable) + if !ok { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values") + } + + tuple := value.TupleValues() + + type rssValue struct { + rss *srvtopo.ResolvedShard + vals []sqltypes.Value + } + r := map[string]rssValue{} + for _, row := range tuple { + env.Row = nil + err = row.ForEachValue(func(bv sqltypes.Value) { + env.Row = append(env.Row, bv) + }) + if err != nil { + return nil, nil, err + } + val, err := env.Evaluate(rp.Values[1]) + if err != nil { + return nil, nil, err + } + + rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())}) + if err != nil { + return nil, nil, err + } + if len(rss) > 1 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused") + } + r[rss[0].Target.String()] = rssValue{ + rss: rss[0], + vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)), + } + } + var resultRss []*srvtopo.ResolvedShard + var resultBvs []map[string]*querypb.BindVariable + for _, rssVals := range r { + resultRss = append(resultRss, rssVals.rss) + + clonedBindVars := maps.Clone(bindVars) + + newBv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + for _, s := range rssVals.vals { + newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s)) + } + + clonedBindVars[rval.Key] = newBv + resultBvs = append(resultBvs, clonedBindVars) + } + return resultRss, resultBvs, nil + } default: // Unreachable. return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode) @@ -480,7 +556,13 @@ func setReplaceSchemaName(bindVars map[string]*querypb.BindVariable) { bindVars[sqltypes.BvReplaceSchemaName] = sqltypes.Int64BindVariable(1) } -func resolveShards(ctx context.Context, vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { +func resolveShards( + ctx context.Context, + vcursor VCursor, + vindex vindexes.SingleColumn, + keyspace *vindexes.Keyspace, + vindexKeys []sqltypes.Value, +) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { // Convert vindexKeys to []*querypb.Value ids := make([]*querypb.Value, len(vindexKeys)) for i, vik := range vindexKeys { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go new file mode 100644 index 00000000000..17f6b9d4eca --- /dev/null +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -0,0 +1,65 @@ +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestFindRouteValuesJoin(t *testing.T) { + vindex, err := vindexes.CreateVindex("hash", "", nil) + require.NoError(t, err) + + const valueBvName = "v" + rp := &RoutingParameters{ + Opcode: Values, + + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + + Vindex: vindex, + + Values: []evalengine.Expr{ + evalengine.NewBindVar(valueBvName, evalengine.NewType(sqltypes.Tuple, collations.Unknown)), + evalengine.NewColumn(0, evalengine.NewType(sqltypes.Int64, collations.Unknown), nil), + }, + } + + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + bv.Values = append( + bv.Values, + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("hello")}), + ) + bv.Values = append( + bv.Values, + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewVarBinary("good morning")}), + ) + + vc := newTestVCursor("0") + rss, bvs, err := rp.findRoute(context.Background(), vc, map[string]*querypb.BindVariable{ + valueBvName: bv, + }) + require.NoError(t, err) + require.Len(t, rss, 1) + require.Len(t, bvs, 1) + var s []int64 + for _, value := range bvs[0][valueBvName].Values { + v := sqltypes.ProtoToValue(value) + require.Equal(t, sqltypes.Int64, v.Type()) + i, err := v.ToInt64() + require.NoError(t, err) + s = append(s, i) + } + require.Equal(t, []int64{1, 2}, s) +} diff --git a/go/vt/vtgate/engine/update_test.go b/go/vt/vtgate/engine/update_test.go index eb6af5a5299..e29ffeccd6f 100644 --- a/go/vt/vtgate/engine/update_test.go +++ b/go/vt/vtgate/engine/update_test.go @@ -50,7 +50,7 @@ func TestUpdateUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -85,7 +85,7 @@ func TestUpdateEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -116,7 +116,7 @@ func TestUpdateEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -142,7 +142,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -199,7 +199,7 @@ func TestUpdateEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -250,7 +250,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -272,7 +272,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -294,7 +294,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -330,7 +330,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|1", // twocol changes "1|7|8|9|1|0", // onecol changes )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -387,7 +387,7 @@ func TestUpdateEqualMultiColChangedVindex(t *testing.T) { ), "1|2|4|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -514,7 +514,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -534,7 +534,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) if err != nil { @@ -558,7 +558,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -604,7 +604,7 @@ func TestUpdateIn(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -628,7 +628,7 @@ func TestUpdateInStreamExecute(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := upd.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -655,7 +655,7 @@ func TestUpdateInMultiCol(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -710,7 +710,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|4|5|6|0|0", "2|21|22|23|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -738,7 +738,7 @@ func TestUpdateInChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -761,7 +761,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|7|8|9|0|0", "2|21|22|23|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -835,7 +835,7 @@ func TestUpdateInChangedVindexMultiCol(t *testing.T) { "1|3|6|0", "2|3|7|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -874,7 +874,7 @@ func TestUpdateEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -911,7 +911,7 @@ func TestUpdateMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -944,7 +944,7 @@ func TestUpdateInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) @@ -1033,6 +1033,6 @@ func buildTestVSchema() *vindexes.VSchema { return vs } -func newDMLTestVCursor(shards ...string) *loggingVCursor { +func newTestVCursor(shards ...string) *loggingVCursor { return &loggingVCursor{shards: shards, resolvedTargetTabletType: topodatapb.TabletType_PRIMARY} } diff --git a/go/vt/vtgate/evalengine/eval.go b/go/vt/vtgate/evalengine/eval.go index 916c5e200f4..f75ac0f8202 100644 --- a/go/vt/vtgate/evalengine/eval.go +++ b/go/vt/vtgate/evalengine/eval.go @@ -378,6 +378,16 @@ func valueToEval(value sqltypes.Value, collation collations.TypedCollation, valu } switch tt := value.Type(); { + case tt == sqltypes.Tuple: + t := &evalTuple{} + err := value.ForEachValue(func(bv sqltypes.Value) { + e, err := valueToEval(bv, collation, values) + if err != nil { + return + } + t.t = append(t.t, e) + }) + return t, wrap(err) case sqltypes.IsSigned(tt): ival, err := value.ToInt64() return newEvalInt64(ival), wrap(err) diff --git a/go/vt/vtgate/evalengine/eval_tuple.go b/go/vt/vtgate/evalengine/eval_tuple.go index 81fa3317977..1faff68e155 100644 --- a/go/vt/vtgate/evalengine/eval_tuple.go +++ b/go/vt/vtgate/evalengine/eval_tuple.go @@ -27,7 +27,15 @@ type evalTuple struct { var _ eval = (*evalTuple)(nil) func (e *evalTuple) ToRawBytes() []byte { - return nil + var vals []sqltypes.Value + for _, e2 := range e.t { + v, err := sqltypes.NewValue(e2.SQLType(), e2.ToRawBytes()) + if err != nil { + panic(err) + } + vals = append(vals, v) + } + return sqltypes.TupleToProto(vals).Value } func (e *evalTuple) SQLType() sqltypes.Type { From 9dda778d7ea261ce4962485906659e091897ca85 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Thu, 23 Jan 2025 08:47:46 -0600 Subject: [PATCH 4/7] Finalize the Values OpCode and TestFindRouteValuesJoin Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/routing.go | 128 ++++++++++-------- go/vt/vtgate/engine/routing_parameter_test.go | 45 +++--- 2 files changed, 94 insertions(+), 79 deletions(-) diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index 47d4cea04a6..5c77d8b03a5 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -187,75 +187,83 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin case vindexes.MultiColumn: return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported multi column vindex for values") default: - if len(rp.Values) < 2 { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values") - } - env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) - value, err := env.Evaluate(rp.Values[0]) - if err != nil { - return nil, nil, err - } + return rp.values(ctx, vcursor, bindVars) + } + default: + // Unreachable. + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode) + } +} - rval, ok := rp.Values[0].(*evalengine.BindVariable) - if !ok { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values") - } +// values is used by the "Values" OpCode. It takes a tuple of tuple in the bindVars (from a VALUES JOIN), and +// will split all the rows from the tuple to their own shards. Minimizing the amount of bindVars we send to each shard. +// rp.Values has to be formatted a certain way by the planner: The first index has to be the expression that returns a +// tuple of tuples. The second index has to be the offset where the vindex values can be found in every row of the outer tuple. +func (rp *RoutingParameters) values(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { + if len(rp.Values) < 2 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values") + } + env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) + value, err := env.Evaluate(rp.Values[0]) + if err != nil { + return nil, nil, err + } - tuple := value.TupleValues() + rval, ok := rp.Values[0].(*evalengine.BindVariable) + if !ok { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values") + } - type rssValue struct { - rss *srvtopo.ResolvedShard - vals []sqltypes.Value - } - r := map[string]rssValue{} - for _, row := range tuple { - env.Row = nil - err = row.ForEachValue(func(bv sqltypes.Value) { - env.Row = append(env.Row, bv) - }) - if err != nil { - return nil, nil, err - } - val, err := env.Evaluate(rp.Values[1]) - if err != nil { - return nil, nil, err - } + tuple := value.TupleValues() - rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())}) - if err != nil { - return nil, nil, err - } - if len(rss) > 1 { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused") - } - r[rss[0].Target.String()] = rssValue{ - rss: rss[0], - vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)), - } - } - var resultRss []*srvtopo.ResolvedShard - var resultBvs []map[string]*querypb.BindVariable - for _, rssVals := range r { - resultRss = append(resultRss, rssVals.rss) + type rssValue struct { + rss *srvtopo.ResolvedShard + vals []sqltypes.Value + } + r := map[string]rssValue{} + for _, row := range tuple { + env.Row = nil + err = row.ForEachValue(func(bv sqltypes.Value) { + env.Row = append(env.Row, bv) + }) + if err != nil { + return nil, nil, err + } + val, err := env.Evaluate(rp.Values[1]) + if err != nil { + return nil, nil, err + } - clonedBindVars := maps.Clone(bindVars) + rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())}) + if err != nil { + return nil, nil, err + } + if len(rss) > 1 { + return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused") + } + r[rss[0].Target.String()] = rssValue{ + rss: rss[0], + vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)), + } + } + var resultRss []*srvtopo.ResolvedShard + var resultBvs []map[string]*querypb.BindVariable + for _, rssVals := range r { + resultRss = append(resultRss, rssVals.rss) - newBv := &querypb.BindVariable{ - Type: querypb.Type_TUPLE, - } - for _, s := range rssVals.vals { - newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s)) - } + clonedBindVars := maps.Clone(bindVars) - clonedBindVars[rval.Key] = newBv - resultBvs = append(resultBvs, clonedBindVars) - } - return resultRss, resultBvs, nil + newBv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, } - default: - // Unreachable. - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode) + for _, s := range rssVals.vals { + newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s)) + } + + clonedBindVars[rval.Key] = newBv + resultBvs = append(resultBvs, clonedBindVars) } + return resultRss, resultBvs, nil } func (rp *RoutingParameters) systemQuery(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go index 17f6b9d4eca..863e27d874f 100644 --- a/go/vt/vtgate/engine/routing_parameter_test.go +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -36,30 +36,37 @@ func TestFindRouteValuesJoin(t *testing.T) { bv := &querypb.BindVariable{ Type: querypb.Type_TUPLE, + Values: []*querypb.Value{ + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("hello")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewVarBinary("good morning")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(3), sqltypes.NewVarBinary("bonjour")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(4), sqltypes.NewVarBinary("bonjour")}), + }, } - bv.Values = append( - bv.Values, - sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("hello")}), - ) - bv.Values = append( - bv.Values, - sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewVarBinary("good morning")}), - ) - vc := newTestVCursor("0") + vc := newTestVCursor("-20", "20-") + vc.shardForKsid = []string{"-20", "-20", "20-", "20-"} rss, bvs, err := rp.findRoute(context.Background(), vc, map[string]*querypb.BindVariable{ valueBvName: bv, }) + require.NoError(t, err) - require.Len(t, rss, 1) - require.Len(t, bvs, 1) - var s []int64 - for _, value := range bvs[0][valueBvName].Values { - v := sqltypes.ProtoToValue(value) - require.Equal(t, sqltypes.Int64, v.Type()) - i, err := v.ToInt64() - require.NoError(t, err) - s = append(s, i) + require.Len(t, rss, 2) + require.Len(t, bvs, 2) + + expectedIdsPerShard := [][]int64{ + {1, 2}, + {3, 4}, + } + for i, ids := range expectedIdsPerShard { + var s []int64 + for _, value := range bvs[i][valueBvName].Values { + v := sqltypes.ProtoToValue(value) + require.Equal(t, sqltypes.Int64, v.Type()) + i, err := v.ToInt64() + require.NoError(t, err) + s = append(s, i) + } + require.Equal(t, ids, s) } - require.Equal(t, []int64{1, 2}, s) } From b265101ad36305ce93bb280897e8386d9faab13f Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Thu, 23 Jan 2025 09:51:43 -0600 Subject: [PATCH 5/7] Use MultiEqual instead of Values OpCode Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/routing.go | 114 +++++------------- go/vt/vtgate/engine/routing_parameter_test.go | 5 +- go/vt/vtgate/evalengine/expr_tuple_bvar.go | 1 - .../planbuilder/operators/sharded_routing.go | 1 - 4 files changed, 29 insertions(+), 92 deletions(-) diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index 5c77d8b03a5..aba0bccc999 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -23,7 +23,6 @@ import ( "golang.org/x/exp/maps" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -77,8 +76,6 @@ const ( // Is used when the query explicitly sets a target destination: // in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1 ByDestination - // Values // TODO - Values ) var opName = map[Opcode]string{ @@ -95,7 +92,6 @@ var opName = map[Opcode]string{ None: "None", ByDestination: "ByDestination", SubShard: "SubShard", - Values: "Values", } // MarshalJSON serializes the Opcode as a JSON string. @@ -182,90 +178,12 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin default: return rp.multiEqual(ctx, vcursor, bindVars) } - case Values: - switch rp.Vindex.(type) { - case vindexes.MultiColumn: - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported multi column vindex for values") - default: - return rp.values(ctx, vcursor, bindVars) - } default: // Unreachable. return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode) } } -// values is used by the "Values" OpCode. It takes a tuple of tuple in the bindVars (from a VALUES JOIN), and -// will split all the rows from the tuple to their own shards. Minimizing the amount of bindVars we send to each shard. -// rp.Values has to be formatted a certain way by the planner: The first index has to be the expression that returns a -// tuple of tuples. The second index has to be the offset where the vindex values can be found in every row of the outer tuple. -func (rp *RoutingParameters) values(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { - if len(rp.Values) < 2 { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values") - } - env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) - value, err := env.Evaluate(rp.Values[0]) - if err != nil { - return nil, nil, err - } - - rval, ok := rp.Values[0].(*evalengine.BindVariable) - if !ok { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values") - } - - tuple := value.TupleValues() - - type rssValue struct { - rss *srvtopo.ResolvedShard - vals []sqltypes.Value - } - r := map[string]rssValue{} - for _, row := range tuple { - env.Row = nil - err = row.ForEachValue(func(bv sqltypes.Value) { - env.Row = append(env.Row, bv) - }) - if err != nil { - return nil, nil, err - } - val, err := env.Evaluate(rp.Values[1]) - if err != nil { - return nil, nil, err - } - - rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())}) - if err != nil { - return nil, nil, err - } - if len(rss) > 1 { - return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused") - } - r[rss[0].Target.String()] = rssValue{ - rss: rss[0], - vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)), - } - } - var resultRss []*srvtopo.ResolvedShard - var resultBvs []map[string]*querypb.BindVariable - for _, rssVals := range r { - resultRss = append(resultRss, rssVals.rss) - - clonedBindVars := maps.Clone(bindVars) - - newBv := &querypb.BindVariable{ - Type: querypb.Type_TUPLE, - } - for _, s := range rssVals.vals { - newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s)) - } - - clonedBindVars[rval.Key] = newBv - resultBvs = append(resultBvs, clonedBindVars) - } - return resultRss, resultBvs, nil -} - func (rp *RoutingParameters) systemQuery(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { destinations, err := rp.routeInfoSchemaQuery(ctx, vcursor, bindVars) if err != nil { @@ -511,15 +429,37 @@ func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bi if err != nil { return nil, nil, err } - rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues()) + rss, bvs, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues()) if err != nil { return nil, nil, err } - multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) - for i := range multiBindVars { - multiBindVars[i] = bindVars + + tbv, ok := rp.Values[0].(*evalengine.TupleBindVariable) + if !ok { + multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) + for i := range multiBindVars { + multiBindVars[i] = bindVars + } + return rss, multiBindVars, nil } - return rss, multiBindVars, nil + + var resultRss []*srvtopo.ResolvedShard + var resultBvs []map[string]*querypb.BindVariable + for i, rssVals := range rss { + resultRss = append(resultRss, rssVals) + + clonedBindVars := maps.Clone(bindVars) + + newBv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + Values: bvs[i], + } + + clonedBindVars[tbv.Key] = newBv + + resultBvs = append(resultBvs, clonedBindVars) + } + return resultRss, resultBvs, nil } func (rp *RoutingParameters) multiEqualMultiCol(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go index 863e27d874f..9c5ad167a70 100644 --- a/go/vt/vtgate/engine/routing_parameter_test.go +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -19,7 +19,7 @@ func TestFindRouteValuesJoin(t *testing.T) { const valueBvName = "v" rp := &RoutingParameters{ - Opcode: Values, + Opcode: MultiEqual, Keyspace: &vindexes.Keyspace{ Name: "ks", @@ -29,8 +29,7 @@ func TestFindRouteValuesJoin(t *testing.T) { Vindex: vindex, Values: []evalengine.Expr{ - evalengine.NewBindVar(valueBvName, evalengine.NewType(sqltypes.Tuple, collations.Unknown)), - evalengine.NewColumn(0, evalengine.NewType(sqltypes.Int64, collations.Unknown), nil), + &evalengine.TupleBindVariable{Key: valueBvName, Index: 0, Collation: collations.Unknown}, }, } diff --git a/go/vt/vtgate/evalengine/expr_tuple_bvar.go b/go/vt/vtgate/evalengine/expr_tuple_bvar.go index 14cfbd95a8b..754ed8cf4f8 100644 --- a/go/vt/vtgate/evalengine/expr_tuple_bvar.go +++ b/go/vt/vtgate/evalengine/expr_tuple_bvar.go @@ -30,7 +30,6 @@ type ( Key string Index int - Type sqltypes.Type Collation collations.ID } ) diff --git a/go/vt/vtgate/planbuilder/operators/sharded_routing.go b/go/vt/vtgate/planbuilder/operators/sharded_routing.go index 2c8873dee07..2737f74fcd7 100644 --- a/go/vt/vtgate/planbuilder/operators/sharded_routing.go +++ b/go/vt/vtgate/planbuilder/operators/sharded_routing.go @@ -613,7 +613,6 @@ func (tr *ShardedRouting) planCompositeInOpArg( Index: idx, } if typ, found := ctx.TypeForExpr(col); found { - value.Type = typ.Type() value.Collation = typ.Collation() } From 16a30f3f91127e530331f2a88fc4fb8c9e303ad0 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 28 Jan 2025 07:24:41 -0600 Subject: [PATCH 6/7] Add missing headers Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/join_values.go | 2 +- go/vt/vtgate/engine/routing_parameter_test.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/engine/join_values.go b/go/vt/vtgate/engine/join_values.go index 0d341c362df..7b4fc19e908 100644 --- a/go/vt/vtgate/engine/join_values.go +++ b/go/vt/vtgate/engine/join_values.go @@ -1,5 +1,5 @@ /* -Copyright 2024 The Vitess Authors. +Copyright 2025 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go index 9c5ad167a70..b292f57e643 100644 --- a/go/vt/vtgate/engine/routing_parameter_test.go +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package engine import ( From e9fb2c7a04c5f30fd38ae11d047b9d440014543b Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 28 Jan 2025 08:20:06 -0600 Subject: [PATCH 7/7] Remove routing multi equal optimization Signed-off-by: Florent Poinsard --- go/vt/vtgate/engine/routing.go | 33 +++---------------- go/vt/vtgate/engine/routing_parameter_test.go | 16 --------- 2 files changed, 5 insertions(+), 44 deletions(-) diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index aba0bccc999..dd6143f6aa4 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -21,8 +21,6 @@ import ( "encoding/json" "strconv" - "golang.org/x/exp/maps" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -429,37 +427,16 @@ func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bi if err != nil { return nil, nil, err } - rss, bvs, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues()) + rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues()) if err != nil { return nil, nil, err } - tbv, ok := rp.Values[0].(*evalengine.TupleBindVariable) - if !ok { - multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) - for i := range multiBindVars { - multiBindVars[i] = bindVars - } - return rss, multiBindVars, nil - } - - var resultRss []*srvtopo.ResolvedShard - var resultBvs []map[string]*querypb.BindVariable - for i, rssVals := range rss { - resultRss = append(resultRss, rssVals) - - clonedBindVars := maps.Clone(bindVars) - - newBv := &querypb.BindVariable{ - Type: querypb.Type_TUPLE, - Values: bvs[i], - } - - clonedBindVars[tbv.Key] = newBv - - resultBvs = append(resultBvs, clonedBindVars) + multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) + for i := range multiBindVars { + multiBindVars[i] = bindVars } - return resultRss, resultBvs, nil + return rss, multiBindVars, nil } func (rp *RoutingParameters) multiEqualMultiCol(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go index b292f57e643..596a2f7f424 100644 --- a/go/vt/vtgate/engine/routing_parameter_test.go +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -68,20 +68,4 @@ func TestFindRouteValuesJoin(t *testing.T) { require.NoError(t, err) require.Len(t, rss, 2) require.Len(t, bvs, 2) - - expectedIdsPerShard := [][]int64{ - {1, 2}, - {3, 4}, - } - for i, ids := range expectedIdsPerShard { - var s []int64 - for _, value := range bvs[i][valueBvName].Values { - v := sqltypes.ProtoToValue(value) - require.Equal(t, sqltypes.Int64, v.Type()) - i, err := v.ToInt64() - require.NoError(t, err) - s = append(s, i) - } - require.Equal(t, ids, s) - } }