From 2e48ab900a64d42211f941bb5a04262f6b77acf4 Mon Sep 17 00:00:00 2001 From: Vilius Okockis Date: Wed, 27 Mar 2024 15:59:49 +0200 Subject: [PATCH] add initial boost stream service Signed-off-by: Vilius Okockis --- Makefile | 10 + go/cmd/boost/boost.go | 124 ++++ go/cmd/boost/boost/boost.go | 278 ++++++++ go/cmd/boost/boost/replicator_plan.go | 404 +++++++++++ go/cmd/boost/boost/schema.go | 436 ++++++++++++ go/cmd/boost/boost/table_plan_builder.go | 828 +++++++++++++++++++++++ go/cmd/boost/boost/vplayer.go | 84 +++ 7 files changed, 2164 insertions(+) create mode 100644 go/cmd/boost/boost.go create mode 100644 go/cmd/boost/boost/boost.go create mode 100644 go/cmd/boost/boost/replicator_plan.go create mode 100644 go/cmd/boost/boost/schema.go create mode 100644 go/cmd/boost/boost/table_plan_builder.go create mode 100644 go/cmd/boost/boost/vplayer.go diff --git a/Makefile b/Makefile index 91120cfa11b..499c9af7064 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,15 @@ endif go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/... (cd go/cmd/vttablet && go run github.com/GeertJohan/go.rice/rice append --exec=../../../bin/vttablet) +# build boost only +boost: +ifndef NOBANNER + echo $$(date): Building source tree +endif + bash ./build.env + # build all the binaries by default with CGO disabled + CGO_ENABLED=0 go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/cmd/boost/... + # build the vitess binaries statically build: ifndef NOBANNER @@ -76,6 +85,7 @@ endif # build vtorc with CGO, because it depends on sqlite CGO_ENABLED=1 go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/cmd/vtorc/... + # cross-build can be used to cross-compile Vitess client binaries # Outside of select client binaries (namely vtctlclient & vtexplain), cross-compiled Vitess Binaries are not recommended for production deployments # Usage: GOOS=darwin GOARCH=amd64 make cross-build diff --git a/go/cmd/boost/boost.go b/go/cmd/boost/boost.go new file mode 100644 index 00000000000..20127ddcc5e --- /dev/null +++ b/go/cmd/boost/boost.go @@ -0,0 +1,124 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + "flag" + + "vitess.io/vitess/go/cmd/boost/boost" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" +) + +var ( + targetPort, + targetGrpcPort int + targetHost, + targetGtid, + targetFilter, + targetTabletType string + + boostFlags = []string{ + "host", + "port", + "grpcPort", + "gtid", + "filter", + "tabletType", + } +) + +func usage() { + fmt.Printf("usage of boost:\n") + for _, name := range boostFlags { + f := flag.Lookup(name) + if f == nil { + panic("unknown flag " + name) + } + flagUsage(f) + } +} + +// Cloned from the source to print out the usage for a given flag +func flagUsage(f *flag.Flag) { + s := fmt.Sprintf(" -%s", f.Name) // Two spaces before -; see next two comments. + name, usage := flag.UnquoteUsage(f) + if len(name) > 0 { + s += " " + name + } + // Boolean flags of one ASCII letter are so common we + // treat them specially, putting their usage on the same line. + if len(s) <= 4 { // space, space, '-', 'x'. + s += "\t" + } else { + // Four spaces before the tab triggers good alignment + // for both 4- and 8-space tab stops. + s += "\n \t" + } + s += usage + if name == "string" { + // put quotes on the value + s += fmt.Sprintf(" (default %q)", f.DefValue) + } else { + s += fmt.Sprintf(" (default %v)", f.DefValue) + } + fmt.Printf(s + "\n") +} + +func init() { + flag.StringVar(&targetHost, "host", "127.0.0.1", "(defaults to 127.0.0.1)") + flag.IntVar(&targetPort, "port", 15306, "(defaults to 15306)") + flag.IntVar(&targetGrpcPort, "grpcPort", 15991, "(defaults to 15991)") + flag.StringVar(&targetGtid, "gtid", "{}", "(defaults to {})") + flag.StringVar(&targetFilter, "filter", "{}", "(defaults to{})") + flag.StringVar(&targetTabletType, "tabletType", "master", "(defaults to{})") + logger := logutil.NewConsoleLogger() + flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger)) + flag.Usage = usage +} + +func main() { + flag.Lookup("logtostderr").Value.Set("true") + flag.Parse() + defer logutil.Flush() + + boost, err := boost.NewBoost( + targetPort, + targetGrpcPort, + targetHost, + targetGtid, + targetFilter, + targetTabletType, + ) + if err != nil { + os.Exit(1) + } + + err = boost.Init() + if err != nil { + boost.Close() + os.Exit(1) + } + + // Catch SIGTERM and SIGINT so we get a chance to clean up. + ctx, cancel := context.WithCancel(context.Background()) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigChan + log.Infof("Cancelling due to signal: %v", sig) + cancel() + }() + defer boost.Close() + + err = boost.Play(ctx) + if err != nil { + os.Exit(1) + } + + log.Info("Stopped") +} diff --git a/go/cmd/boost/boost/boost.go b/go/cmd/boost/boost/boost.go new file mode 100644 index 00000000000..814d28e927b --- /dev/null +++ b/go/cmd/boost/boost/boost.go @@ -0,0 +1,278 @@ +package boost + +import ( + "context" + "fmt" + "io" + + "strings" + "time" + + "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" + _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" +) + +type BoostConfig struct { + Host string + Port int + GrpcPort int + User string + Password string + Gtid *binlogdatapb.VGtid + Filter *binlogdatapb.Filter + TabletType topodatapb.TabletType + Source string + Flags vtgatepb.VStreamFlags +} + +type Boost struct { + config *BoostConfig + player *vplayer +} + +func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost, error) { + targetVgtid := &binlogdatapb.VGtid{} + targetFilter := &binlogdatapb.Filter{} + if err := json2.Unmarshal([]byte(gtid), &targetVgtid); err != nil { + log.Error(err) + return nil, err + } + if err := json2.Unmarshal([]byte(filter), &targetFilter); err != nil { + log.Error(err) + return nil, err + } + + cfg := &BoostConfig{ + Host: host, + Port: port, + GrpcPort: grpcPort, + Gtid: targetVgtid, + Filter: targetFilter, + Flags: vtgatepb.VStreamFlags{ + //MinimizeSkew: false, + HeartbeatInterval: 60, //seconds + }, + } + switch tabletType { + case "replica": + cfg.TabletType = topodatapb.TabletType_REPLICA + case "rdonly": + cfg.TabletType = topodatapb.TabletType_RDONLY + default: + cfg.TabletType = topodatapb.TabletType_MASTER + } + + boost := &Boost{ + config: cfg, + } + return boost, nil +} + +func (b *Boost) Init() error { + // Create and open the connection pool for dba access. + dbPool := dbconnpool.NewConnectionPool("dbPool", 10, time.Minute, 0) + dbPool.Open( + dbconfigs.New( + &mysql.ConnParams{ + Host: b.config.Host, + Port: b.config.Port, + // Uname : "main", + // Pass: , + }, + ), + ) + + // for event aplicator + colInfo, err := buildColInfoMap(context.Background(), dbPool) + if err != nil { + log.Error("---- buildColInfoMap ", err) + return err + } + + replicatorPlan, err := buildReplicatorPlan( + b.config.Filter, + colInfo, + nil, + binlogplayer.NewStats(), + ) + if err != nil { + log.Error("---- buildReplicatorPlan", err) + return err + } + + b.player = &vplayer{ + currentVgtid: b.config.Gtid, + dbPool: dbPool, + replicatorPlan: replicatorPlan, + tablePlans: make(map[string]*TablePlan), + } + return nil +} + +func (b *Boost) Close() { + // close db pool + b.player.dbPool.Close() +} + +func (b *Boost) Play(ctx context.Context) error { + log.Info("Playing...") + conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%v", b.config.Host, b.config.GrpcPort)) + if err != nil { + log.Errorf("vtgateconn.Dial error: %s", err) + return err + } + defer conn.Close() + + reader, err := conn.VStream(ctx, b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags) + if err != nil { + log.Errorf("conn.VStream error: %s", err) + return err + } + connect := false + numErrors := 0 + for { + if connect { // if vtgate returns a transient error try reconnecting from the last seen vgtid + log.Info("Reconnecting... %s %s %s %s", b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags) + reader, err = conn.VStream(ctx, b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags) + if err != nil { + log.Errorf("Reconnecting error: %s", err) + time.Sleep(100 * time.Millisecond) + continue + } + connect = false + } + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if err := b.player.applyVEvent(ctx, ev); err != nil { + if err != io.EOF { + log.Errorf("Error applying event: %s", err.Error()) + } + } + } + case io.EOF: + log.Info("stream ended") + return nil + default: + log.Errorf("%s:: remote error: %v\n", time.Now(), err) + numErrors++ + if numErrors > 10 { // if vtgate is continuously unavailable error the test + return fmt.Errorf("too many remote errors") + } + if strings.Contains(strings.ToLower(err.Error()), "unavailable") { + // this should not happen, but maybe the buffering logic might return a transient + // error during resharding. So adding this logic to reduce future flakiness + time.Sleep(100 * time.Millisecond) + connect = true + } else { + // failure, stop test + return fmt.Errorf("failure, stop test") + } + } + } +} + +// func RunBoost(cfg *BoostConfig) { +// vgtid := &binlogdatapb.VGtid{} +// filter := &binlogdatapb.Filter{} +// if err := json2.Unmarshal([]byte(*&cfg.Gtid), &vgtid); err != nil { +// log.Fatal(err) +// } +// if err := json2.Unmarshal([]byte(*&cfg.Filter), &filter); err != nil { +// log.Fatal(err) +// } +// fmt.Println(vgtid) +// fmt.Println(filter) +// fmt.Println("Will start streaming...") +// ctx := context.Background() +// conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%s", cfg.Host, cfg.Port)) +// if err != nil { +// log.Fatal(err) +// } + +// defer conn.Close() +// flags := &vtgatepb.VStreamFlags{ +// //MinimizeSkew: false, +// //HeartbeatInterval: 60, //seconds +// } +// var fields []*querypb.Field +// reader, err := conn.VStream(ctx, cfg.TabletType, vgtid, filter, flags) +// if err != nil { +// log.Fatal(err) +// } +// connect := false +// numErrors := 0 +// for { +// if connect { // if vtgate returns a transient error try reconnecting from the last seen vgtid +// fmt.Println("Reconnecting...") +// reader, err = conn.VStream(ctx, cfg.TabletType, vgtid, filter, flags) +// log.Fatal(err) +// connect = false +// } +// e, err := reader.Recv() +// fmt.Println("_----") +// switch err { +// case nil: +// _ = e +// for _, ev := range e { + +// switch ev.Type { +// case binlogdatapb.VEventType_HEARTBEAT: +// fmt.Println("Got Heartbeat") +// // track vgtid +// case binlogdatapb.VEventType_VGTID: +// vgtid = ev.Vgtid +// // receive new schema +// case binlogdatapb.VEventType_FIELD: +// fields = ev.FieldEvent.Fields +// b := []string{} +// for _, f := range fields { +// b = append(b, fmt.Sprintf("%q:%q", f.Name, f.Type)) +// } +// fmt.Println(fmt.Sprintf("{ \"table\" : %q, \"fields\": { %s }}", ev.FieldEvent.TableName, strings.Join(b[:], ","))) +// // print last vgtid with rows +// case binlogdatapb.VEventType_ROW: +// jsonValue, _ := json.Marshal(vgtid) +// fmt.Println(string(jsonValue)) +// for _, rowChange := range ev.RowEvent.RowChanges { +// row := sqltypes.MakeRowTrusted(fields, rowChange.After) +// jsonValue, _ = json.Marshal(row) +// fmt.Println(fmt.Sprintf("{ \"table\" : %q, \"row\": %s}", ev.RowEvent.TableName, string(jsonValue))) + +// } + +// } +// } +// case io.EOF: +// fmt.Printf("stream ended\n") +// return +// default: +// fmt.Printf("%s:: remote error: %v\n", time.Now(), err) +// numErrors++ +// if numErrors > 10 { // if vtgate is continuously unavailable error the test +// return +// } +// if strings.Contains(strings.ToLower(err.Error()), "unavailable") { +// // this should not happen, but maybe the buffering logic might return a transient +// // error during resharding. So adding this logic to reduce future flakiness +// time.Sleep(100 * time.Millisecond) +// connect = true +// } else { +// // failure, stop test +// return +// } +// } +// } +// } diff --git a/go/cmd/boost/boost/replicator_plan.go b/go/cmd/boost/boost/replicator_plan.go new file mode 100644 index 00000000000..a1b3df19bf2 --- /dev/null +++ b/go/cmd/boost/boost/replicator_plan.go @@ -0,0 +1,404 @@ +package boost + +import ( + "encoding/json" + "fmt" + "sort" + "strings" + + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/bytes2" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// ReplicatorPlan is the execution plan for the replicator. It contains +// plans for all the tables it's replicating. Every phase of vreplication +// builds its own instance of the ReplicatorPlan. This is because the plan +// depends on copyState, which changes on every iteration. +// The table plans within ReplicatorPlan will not be fully populated because +// all the information is not available initially. +// For simplicity, the ReplicatorPlan is immutable. +// Once we get the field info for a table from the stream response, +// we'll have all the necessary info to build the final plan. +// At that time, buildExecutionPlan is invoked, which will make a copy +// of the TablePlan from ReplicatorPlan, and fill the rest +// of the members, leaving the original plan unchanged. +// The constructor is buildReplicatorPlan in table_plan_builder.go +type ReplicatorPlan struct { + VStreamFilter *binlogdatapb.Filter + TargetTables map[string]*TablePlan + TablePlans map[string]*TablePlan + ColInfoMap map[string][]*vreplication.ColumnInfo + stats *binlogplayer.Stats +} + +// buildExecution plan uses the field info as input and the partially built +// TablePlan for that table to build a full plan. +func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent) (*TablePlan, error) { + // we have db name in front. + parts := strings.Split(fieldEvent.TableName, ".") + tableName := parts[len(parts)-1] + + prelim := rp.TablePlans[tableName] + if prelim == nil { + // Unreachable code. + return nil, fmt.Errorf("plan not found for %s", tableName) + } + // If Insert is initialized, then it means that we knew the column + // names and have already built most of the plan. + if prelim.Insert != nil { + tplanv := *prelim + // We know that we sent only column names, but they may be backticked. + // If so, we have to strip them out to allow them to match the expected + // bind var names. + tplanv.Fields = make([]*querypb.Field, 0, len(fieldEvent.Fields)) + for _, fld := range fieldEvent.Fields { + trimmed := proto.Clone(fld).(*querypb.Field) + trimmed.Name = strings.Trim(trimmed.Name, "`") + tplanv.Fields = append(tplanv.Fields, trimmed) + } + return &tplanv, nil + } + // select * construct was used. We need to use the field names. + tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields) + if err != nil { + return nil, err + } + tplan.Fields = fieldEvent.Fields + return tplan, nil +} + +// buildFromFields builds a full TablePlan, but uses the field info as the +// full column list. This happens when the query used was a 'select *', which +// requires us to wait for the field info sent by the source. +func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) { + tpb := &tablePlanBuilder{ + name: sqlparser.NewTableIdent(tableName), + lastpk: lastpk, + colInfos: rp.ColInfoMap[tableName], + stats: rp.stats, + } + for _, field := range fields { + colName := sqlparser.NewColIdent(field.Name) + isGenerated := false + for _, colInfo := range tpb.colInfos { + if !strings.EqualFold(colInfo.Name, field.Name) { + continue + } + if colInfo.IsGenerated { + isGenerated = true + } + break + } + if isGenerated { + continue + } + cexpr := &colExpr{ + colName: colName, + colType: field.Type, + expr: &sqlparser.ColName{ + Name: colName, + }, + references: map[string]bool{ + field.Name: true, + }, + } + tpb.colExprs = append(tpb.colExprs, cexpr) + } + // The following actions are a subset of buildTablePlan. + if err := tpb.analyzePK(rp.ColInfoMap); err != nil { + return nil, err + } + return tpb.generate(), nil +} + +// MarshalJSON performs a custom JSON Marshalling. +func (rp *ReplicatorPlan) MarshalJSON() ([]byte, error) { + var targets []string + for k := range rp.TargetTables { + targets = append(targets, k) + } + sort.Strings(targets) + v := struct { + VStreamFilter *binlogdatapb.Filter + TargetTables []string + TablePlans map[string]*TablePlan + }{ + VStreamFilter: rp.VStreamFilter, + TargetTables: targets, + TablePlans: rp.TablePlans, + } + return json.Marshal(&v) +} + +// TablePlan is the execution plan for a table within a replicator. +// If the column names are not known at the time of plan building (like +// select *), then only TargetName, SendRule and Lastpk are initialized. +// When the stream returns the field info, those are used as column +// names to build the final plan. +// Lastpk comes from copyState. If it's set, then the generated plans +// are significantly different because any events that fall beyond +// Lastpk must be excluded. +// If column names were known upfront, then all fields of TablePlan +// are built except for Fields. This member is populated only after +// the field info is received from the stream. +// The ParsedQuery objects assume that a map of before and after values +// will be built based on the streaming rows. Before image values will +// be prefixed with a "b_", and after image values will be prefixed +// with a "a_". The TablePlan structure is used during all the phases +// of vreplication: catchup, copy, fastforward, or regular replication. +type TablePlan struct { + // TargetName, SendRule will always be initialized. + TargetName string + SendRule *binlogdatapb.Rule + // Lastpk will be initialized if it was specified, and + // will be used for building the final plan after field info + // is received. + Lastpk *sqltypes.Result + // BulkInsertFront, BulkInsertValues and BulkInsertOnDup are used + // by vcopier. These three parts are combined to build bulk insert + // statements. This is functionally equivalent to generating + // multiple statements using the "Insert" construct, but much more + // efficient for the copy phase. + BulkInsertFront *sqlparser.ParsedQuery + BulkInsertValues *sqlparser.ParsedQuery + BulkInsertOnDup *sqlparser.ParsedQuery + // Insert, Update and Delete are used by vplayer. + // If the plan is an insertIgnore type, then Insert + // and Update contain 'insert ignore' statements and + // Delete is nil. + Insert *sqlparser.ParsedQuery + Update *sqlparser.ParsedQuery + Delete *sqlparser.ParsedQuery + Fields []*querypb.Field + EnumValuesMap map[string](map[string]string) + // PKReferences is used to check if an event changed + // a primary key column (row move). + PKReferences []string + Stats *binlogplayer.Stats + FieldsToSkip map[string]bool + ConvertCharset map[string](*binlogdatapb.CharsetConversion) +} + +// MarshalJSON performs a custom JSON Marshalling. +func (tp *TablePlan) MarshalJSON() ([]byte, error) { + v := struct { + TargetName string + SendRule string + InsertFront *sqlparser.ParsedQuery `json:",omitempty"` + InsertValues *sqlparser.ParsedQuery `json:",omitempty"` + InsertOnDup *sqlparser.ParsedQuery `json:",omitempty"` + Insert *sqlparser.ParsedQuery `json:",omitempty"` + Update *sqlparser.ParsedQuery `json:",omitempty"` + Delete *sqlparser.ParsedQuery `json:",omitempty"` + PKReferences []string `json:",omitempty"` + }{ + TargetName: tp.TargetName, + SendRule: tp.SendRule.Match, + InsertFront: tp.BulkInsertFront, + InsertValues: tp.BulkInsertValues, + InsertOnDup: tp.BulkInsertOnDup, + Insert: tp.Insert, + Update: tp.Update, + Delete: tp.Delete, + PKReferences: tp.PKReferences, + } + return json.Marshal(&v) +} + +func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows *binlogdatapb.VStreamRowsResponse, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + sqlbuffer.Reset() + sqlbuffer.WriteString(tp.BulkInsertFront.Query) + sqlbuffer.WriteString(" values ") + + for i, row := range rows.Rows { + if i > 0 { + sqlbuffer.WriteString(", ") + } + if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil { + return nil, err + } + } + if tp.BulkInsertOnDup != nil { + sqlbuffer.WriteString(tp.BulkInsertOnDup.Query) + } + return executor(sqlbuffer.StringUnsafe()) +} + +// During the copy phase we run catchup and fastforward, which stream binlogs. While streaming we should only process +// rows whose PK has already been copied. Ideally we should compare the PKs before applying the change and never send +// such rows to the target mysql server. However reliably comparing primary keys in a manner compatible to MySQL will require a lot of +// coding: consider composite PKs, character sets, collations ... So we send these rows to the mysql server which then does the comparison +// in sql, through where clauses like "pk_val <= last_seen_pk". +// +// But this does generate a lot of unnecessary load of, effectively, no-ops since the where +// clauses are always false. This can create a significant cpu load on the target for high qps servers resulting in a +// much lower copy bandwidth (or provisioning more powerful servers). +// isOutsidePKRange currently checks for rows with single primary keys which are currently comparable in Vitess: +// (see NullsafeCompare() for types supported). It returns true if pk is not to be applied +// +// At this time we have decided to only perform this for Insert statements. Insert statements form a significant majority of +// the generated noop load during catchup and are easier to test for. Update and Delete statements are very difficult to +// unit test reliably and without flakiness with our current test framework. So as a pragmatic decision we support Insert +// now and punt on the others. +func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, before, after bool, stmtType string) bool { + // added empty comments below, otherwise gofmt removes the spaces between the bitwise & and obfuscates this check! + // if *vreplicationExperimentalFlags /**/ & /**/ vreplicationExperimentalFlagOptimizeInserts == 0 { + // return false + // } + // Ensure there is one and only one value in lastpk and pkrefs. + if tp.Lastpk != nil && len(tp.Lastpk.Fields) == 1 && len(tp.Lastpk.Rows) == 1 && len(tp.Lastpk.Rows[0]) == 1 && len(tp.PKReferences) == 1 { + // check again that this is an insert + var bindvar *querypb.BindVariable + switch { + case !before && after: + bindvar = bindvars["a_"+tp.PKReferences[0]] + } + if bindvar == nil { //should never happen + return false + } + + rowVal, _ := sqltypes.BindVariableToValue(bindvar) + result, err := evalengine.NullsafeCompare(rowVal, tp.Lastpk.Rows[0][0]) + // If rowVal is > last pk, transaction will be a noop, so don't apply this statement + if err == nil && result > 0 { + tp.Stats.NoopQueryCount.Add(stmtType, 1) + return true + } + } + return false +} + +// bindFieldVal returns a bind variable based on given field and value. +// Most values will just bind directly. But some values may need manipulation: +// - text values with charset conversion +// - enum values converted to text via Online DDL +// - ...any other future possible values +func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) { + if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() { + // Non-null string value, for which we have a charset conversion instruction + valString := val.ToString() + fromEncoding, encodingOK := mysql.CharacterSetEncoding[conversion.FromCharset] + if !encodingOK { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name) + } + if fromEncoding != nil { + // As reminder, encoding can be nil for trivial charsets, like utf8 or ascii. + // encoding will be non-nil for charsets like latin1, gbk, etc. + var err error + valString, err = fromEncoding.NewDecoder().String(valString) + if err != nil { + return nil, err + } + } + return sqltypes.StringBindVariable(valString), nil + } + if enumValues, ok := tp.EnumValuesMap[field.Name]; ok && !val.IsNull() { + // The fact that this fielkd has a EnumValuesMap entry, means we must + // use the enum's text value as opposed to the enum's numerical value. + // Once known use case is with Online DDL, when a column is converted from + // ENUM to a VARCHAR/TEXT. + enumValue, enumValueOK := enumValues[val.ToString()] + if !enumValueOK { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Invalid enum value: %v for field %s", val, field.Name) + } + // get the enum text fir this val + return sqltypes.StringBindVariable(enumValue), nil + } + return sqltypes.ValueBindVariable(*val), nil +} + +// applyChange generate mysql statement +func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + // MakeRowTrusted is needed here because Proto3ToResult is not convenient. + var before, after bool + bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields)) + if rowChange.Before != nil { + before = true + vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before) + for i, field := range tp.Fields { + bindVar, err := tp.bindFieldVal(field, &vals[i]) + if err != nil { + return nil, err + } + bindvars["b_"+field.Name] = bindVar + } + } + if rowChange.After != nil { + after = true + vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After) + for i, field := range tp.Fields { + bindVar, err := tp.bindFieldVal(field, &vals[i]) + if err != nil { + return nil, err + } + bindvars["a_"+field.Name] = bindVar + } + } + switch { + case !before && after: + // only apply inserts for rows whose primary keys are within the range of rows already copied + if tp.isOutsidePKRange(bindvars, before, after, "insert") { + return nil, nil + } + return execParsedQuery(tp.Insert, bindvars, executor) + case before && !after: + if tp.Delete == nil { + return nil, nil + } + return execParsedQuery(tp.Delete, bindvars, executor) + case before && after: + if !tp.pkChanged(bindvars) { + return execParsedQuery(tp.Update, bindvars, executor) + } + if tp.Delete != nil { + if _, err := execParsedQuery(tp.Delete, bindvars, executor); err != nil { + return nil, err + } + } + return execParsedQuery(tp.Insert, bindvars, executor) + } + // Unreachable. + return nil, nil +} + +func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + sql, err := pq.GenerateQuery(bindvars, nil) + if err != nil { + return nil, err + } + return executor(sql) +} + +func (tp *TablePlan) pkChanged(bindvars map[string]*querypb.BindVariable) bool { + for _, pkref := range tp.PKReferences { + v1, _ := sqltypes.BindVariableToValue(bindvars["b_"+pkref]) + v2, _ := sqltypes.BindVariableToValue(bindvars["a_"+pkref]) + if !valsEqual(v1, v2) { + return true + } + } + return false +} + +func valsEqual(v1, v2 sqltypes.Value) bool { + if v1.IsNull() && v2.IsNull() { + return true + } + // If any one of them is null, something has changed. + if v1.IsNull() || v2.IsNull() { + return false + } + // Compare content only if none are null. + return v1.ToString() == v2.ToString() +} diff --git a/go/cmd/boost/boost/schema.go b/go/cmd/boost/boost/schema.go new file mode 100644 index 00000000000..b00b50f1f81 --- /dev/null +++ b/go/cmd/boost/boost/schema.go @@ -0,0 +1,436 @@ +package boost + +import ( + "context" + "fmt" + "regexp" + "sort" + "strings" + "sync" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// getPoolReconnect gets a connection from a pool, tests it, and reconnects if +// the connection is lost. +func getPoolReconnect(ctx context.Context, pool *dbconnpool.ConnectionPool) (*dbconnpool.PooledDBConnection, error) { + conn, err := pool.Get(ctx) + if err != nil { + return conn, err + } + // Run a test query to see if this connection is still good. + if _, err := conn.ExecuteFetch("SELECT 1", 1, false); err != nil { + // If we get a connection error, try to reconnect. + if sqlErr, ok := err.(*mysql.SQLError); ok && (sqlErr.Number() == mysql.CRServerGone || sqlErr.Number() == mysql.CRServerLost) { + if err := conn.Reconnect(ctx); err != nil { + conn.Recycle() + return nil, err + } + return conn, nil + } + conn.Recycle() + return nil, err + } + return conn, nil +} + +func buildColInfoMap(ctx context.Context, dbPool *dbconnpool.ConnectionPool) (map[string][]*vreplication.ColumnInfo, error) { + conn, connErr := getPoolReconnect(ctx, dbPool) + if connErr != nil { + return nil, connErr + } + defer conn.Recycle() + + // we don't know dbname, because connecting through vtgate + dbName := "" + schema, err := GetSchema(ctx, dbName, []string{"/.*/"}, nil, false, dbPool) + if err != nil { + log.Errorf("GetSchema error: %s", err) + return nil, err + } + + // queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where table_schema=%s and table_name=%s;" + queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where table_schema like 'vt_%%' and table_name=%s;" + colInfoMap := make(map[string][]*vreplication.ColumnInfo) + for _, td := range schema.TableDefinitions { + // query := fmt.Sprintf(queryTemplate, encodeString(dbclient.DBName()), encodeString(td.Name)) + query := fmt.Sprintf(queryTemplate, encodeString(td.Name)) + qr, err := conn.ExecuteFetch(query, 10000, true) + if err != nil { + return nil, err + } + if len(qr.Rows) == 0 { + return nil, fmt.Errorf("no data returned from information_schema.columns") + } + + var pks []string + if len(td.PrimaryKeyColumns) != 0 { + pks = td.PrimaryKeyColumns + } else { + pks = td.Columns + } + var colInfo []*vreplication.ColumnInfo + for _, row := range qr.Rows { + charSet := "" + collation := "" + columnName := "" + isPK := false + isGenerated := false + var dataType, columnType string + columnName = row[2].ToString() + var currentField *querypb.Field + for _, field := range td.Fields { + if field.Name == columnName { + currentField = field + break + } + } + if currentField == nil { + continue + } + dataType = row[3].ToString() + columnType = row[4].ToString() + if sqltypes.IsText(currentField.Type) { + charSet = row[0].ToString() + collation = row[1].ToString() + } + if dataType == "" || columnType == "" { + return nil, fmt.Errorf("no dataType/columnType found in information_schema.columns for table %s, column %s", td.Name, columnName) + } + for _, pk := range pks { + if columnName == pk { + isPK = true + } + } + extra := strings.ToLower(row[5].ToString()) + if strings.Contains(extra, "stored generated") || strings.Contains(extra, "virtual generated") { + isGenerated = true + } + colInfo = append(colInfo, &vreplication.ColumnInfo{ + Name: columnName, + CharSet: charSet, + Collation: collation, + DataType: dataType, + ColumnType: columnType, + IsPK: isPK, + IsGenerated: isGenerated, + }) + } + colInfoMap[td.Name] = colInfo + } + return colInfoMap, nil +} + +func encodeString(in string) string { + var buf strings.Builder + sqltypes.NewVarChar(in).EncodeSQL(&buf) + return buf.String() +} + +// GetSchema returns the schema for database for tables listed in +// tables. If tables is empty, return the schema for all tables. +func GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool, dbPool *dbconnpool.ConnectionPool) (*tabletmanagerdatapb.SchemaDefinition, error) { + sd := &tabletmanagerdatapb.SchemaDefinition{} + // we don't know dbname, because connecting through vtgate + // sd.DatabaseSchema = sqlescape.EscapeID(dbName) + + tds, err := collectBasicTableData(ctx, dbName, tables, excludeTables, includeViews, dbPool) + if err != nil { + log.Error("collectBasicTableData") + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + + // Get per-table schema concurrently. + tableNames := make([]string, 0, len(tds)) + for _, td := range tds { + tableNames = append(tableNames, td.Name) + wg.Add(1) + go func(td *tabletmanagerdatapb.TableDefinition) { + defer wg.Done() + + fields, columns, schema, err := collectSchema(ctx, dbName, td.Name, td.Type, dbPool) + if err != nil { + allErrors.RecordError(err) + cancel() + return + } + + td.Fields = fields + td.Columns = columns + td.Schema = schema + }(td) + } + + // Get primary columns concurrently. + colMap := map[string][]string{} + if len(tableNames) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + + var err error + colMap, err = getPrimaryKeyColumns(ctx, dbName, dbPool, tableNames...) + if err != nil { + allErrors.RecordError(err) + cancel() + return + } + }() + } + + wg.Wait() + if err := allErrors.AggrError(vterrors.Aggregate); err != nil { + return nil, err + } + + for _, td := range tds { + td.PrimaryKeyColumns = colMap[td.Name] + } + + sd.TableDefinitions = tds + + tmutils.GenerateSchemaVersion(sd) + return sd, nil +} + +func getPrimaryKeyColumns(ctx context.Context, dbName string, dbPool *dbconnpool.ConnectionPool, tables ...string) (map[string][]string, error) { + conn, err := getPoolReconnect(ctx, dbPool) + if err != nil { + return nil, err + } + defer conn.Recycle() + + tableList, err := tableListSQL(tables) + if err != nil { + return nil, err + } + // sql uses column name aliases to guarantee lower case sensitivity. + // we don't know dbname, because connecting through vtgate + // sql := fmt.Sprintf(` + // SELECT + // table_name AS table_name, + // ordinal_position AS ordinal_position, + // column_name AS column_name + // FROM information_schema.key_column_usage + // WHERE table_schema = '%s' + // AND table_name IN %s + // AND constraint_name='PRIMARY' + // ORDER BY table_name, ordinal_position`, dbName, tableList) + sql := fmt.Sprintf(` + SELECT + table_name AS table_name, + ordinal_position AS ordinal_position, + column_name AS column_name + FROM information_schema.key_column_usage + WHERE table_schema like 'vt_%%' + AND table_name IN %s + AND constraint_name='PRIMARY' + ORDER BY table_name, ordinal_position`, tableList) + qr, err := conn.ExecuteFetch(sql, len(tables)*100, true) + if err != nil { + return nil, err + } + + named := qr.Named() + colMap := map[string][]string{} + for _, row := range named.Rows { + tableName := row.AsString("table_name", "") + colMap[tableName] = append(colMap[tableName], row.AsString("column_name", "")) + } + return colMap, err +} + +func encodeTableName(tableName string) string { + var buf strings.Builder + sqltypes.NewVarChar(tableName).EncodeSQL(&buf) + return buf.String() +} + +// tableListSQL returns an IN clause "('t1', 't2'...) for a list of tables." +func tableListSQL(tables []string) (string, error) { + if len(tables) == 0 { + return "", vterrors.New(vtrpc.Code_INTERNAL, "no tables for tableListSQL") + } + + encodedTables := make([]string, len(tables)) + for i, tableName := range tables { + encodedTables[i] = encodeTableName(tableName) + } + + return "(" + strings.Join(encodedTables, ", ") + ")", nil +} + +// tableDefinitions is a sortable collection of table definitions +type tableDefinitions []*tabletmanagerdatapb.TableDefinition + +func (t tableDefinitions) Len() int { + return len(t) +} + +func (t tableDefinitions) Less(i, j int) bool { + return t[i].Name < t[j].Name +} + +func (t tableDefinitions) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +var _ sort.Interface = (tableDefinitions)(nil) + +func collectBasicTableData(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool, dbPool *dbconnpool.ConnectionPool) ([]*tabletmanagerdatapb.TableDefinition, error) { + conn, err := getPoolReconnect(ctx, dbPool) + if err != nil { + return nil, err + } + defer conn.Recycle() + // get the list of tables we're interested in + // sql := "SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema = '" + dbName + "'" + // we don't know dbname when connecting to vtgate + sql := "SELECT table_name, table_type, data_length, table_rows FROM information_schema.tables WHERE table_schema like 'vt_%%'" + if !includeViews { + sql += " AND table_type = '" + tmutils.TableBaseTable + "'" + } + qr, err := conn.ExecuteFetch(sql, 10000, true) + if err != nil { + return nil, err + } + if len(qr.Rows) == 0 { + return nil, nil + } + + filter, err := tmutils.NewTableFilter(tables, excludeTables, includeViews) + if err != nil { + return nil, err + } + + tds := make(tableDefinitions, 0, len(qr.Rows)) + for _, row := range qr.Rows { + tableName := row[0].ToString() + tableType := row[1].ToString() + + if !filter.Includes(tableName, tableType) { + continue + } + + // compute dataLength + var dataLength uint64 + if !row[2].IsNull() { + // dataLength is NULL for views, then we use 0 + dataLength, err = evalengine.ToUint64(row[2]) + if err != nil { + return nil, err + } + } + + // get row count + var rowCount uint64 + if !row[3].IsNull() { + rowCount, err = evalengine.ToUint64(row[3]) + if err != nil { + return nil, err + } + } + + tds = append(tds, &tabletmanagerdatapb.TableDefinition{ + Name: tableName, + Type: tableType, + DataLength: dataLength, + RowCount: rowCount, + }) + } + + sort.Sort(tds) + + return tds, nil +} + +func collectSchema(ctx context.Context, dbName, tableName, tableType string, dbPool *dbconnpool.ConnectionPool) ([]*querypb.Field, []string, string, error) { + fields, columns, err := GetColumns(ctx, dbName, tableName, dbPool) + if err != nil { + return nil, nil, "", err + } + + schema, err := normalizedSchema(ctx, dbName, tableName, tableType, dbPool) + if err != nil { + return nil, nil, "", err + } + + return fields, columns, schema, nil +} + +// GetColumns returns the columns of table. +func GetColumns(ctx context.Context, dbName, table string, dbPool *dbconnpool.ConnectionPool) ([]*querypb.Field, []string, error) { + conn, err := getPoolReconnect(ctx, dbPool) + if err != nil { + return nil, nil, err + } + defer conn.Recycle() + + // we don't know dbname, because connecting through vtgate + // qr, err := conn.ExecuteFetch(fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", sqlescape.EscapeID(dbName), sqlescape.EscapeID(table)), 0, true) + qr, err := conn.ExecuteFetch(fmt.Sprintf("SELECT * FROM %s WHERE 1=0", sqlescape.EscapeID(table)), 0, true) + if err != nil { + return nil, nil, err + } + + columns := make([]string, len(qr.Fields)) + for i, field := range qr.Fields { + columns[i] = field.Name + } + return qr.Fields, columns, nil + +} + +var autoIncr = regexp.MustCompile(` AUTO_INCREMENT=\d+`) + +// normalizedSchema returns a table schema with database names replaced, and auto_increment annotations removed. +func normalizedSchema(ctx context.Context, dbName, tableName, tableType string, dbPool *dbconnpool.ConnectionPool) (string, error) { + conn, err := getPoolReconnect(ctx, dbPool) + if err != nil { + return "", err + } + defer conn.Recycle() + + backtickDBName := sqlescape.EscapeID(dbName) + // we don't know dbname, because connecting through vtgate + // qr, fetchErr := conn.ExecuteFetch(fmt.Sprintf("SHOW CREATE TABLE %s.%s", backtickDBName, sqlescape.EscapeID(tableName)), 10000, true) + qr, fetchErr := conn.ExecuteFetch(fmt.Sprintf("SHOW CREATE TABLE %s", sqlescape.EscapeID(tableName)), 10000, true) + if fetchErr != nil { + return "", fetchErr + } + if len(qr.Rows) == 0 { + return "", fmt.Errorf("empty create table statement for %v", tableName) + } + + // FIXME(DeathBorn) Does not support views actually + // Normalize & remove auto_increment because it changes on every insert + // FIXME(alainjobart) find a way to share this with + // vt/tabletserver/table_info.go:162 + norm := qr.Rows[0][1].ToString() + norm = autoIncr.ReplaceAllLiteralString(norm, "") + if tableType == tmutils.TableView { + // Views will have the dbname in there, replace it + // with {{.DatabaseName}} + norm = strings.Replace(norm, backtickDBName, "{{.DatabaseName}}", -1) + } + + return norm, nil +} diff --git a/go/cmd/boost/boost/table_plan_builder.go b/go/cmd/boost/boost/table_plan_builder.go new file mode 100644 index 00000000000..e130ff0d3f9 --- /dev/null +++ b/go/cmd/boost/boost/table_plan_builder.go @@ -0,0 +1,828 @@ +package boost + +import ( + "fmt" + "regexp" + "sort" + "strings" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/key" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// This file contains just the builders for ReplicatorPlan and TablePlan. +// ReplicatorPlan and TablePlan are in replicator_plan.go. +// TODO(sougou): reorganize this in a better fashion. + +// ExcludeStr is the filter value for excluding tables that match a rule. +// TODO(sougou): support this on vstreamer side also. +const ExcludeStr = "exclude" + +// tablePlanBuilder contains the metadata needed for building a TablePlan. +type tablePlanBuilder struct { + name sqlparser.TableIdent + sendSelect *sqlparser.Select + // selColumns keeps track of the columns we want to pull from source. + // If Lastpk is set, we compare this list against the table's pk and + // add missing references. + selColumns map[string]bool + colExprs []*colExpr + onInsert insertType + pkCols []*colExpr + lastpk *sqltypes.Result + colInfos []*vreplication.ColumnInfo + stats *binlogplayer.Stats +} + +// colExpr describes the processing to be performed to +// compute the value of one column of the target table. +type colExpr struct { + colName sqlparser.ColIdent + colType querypb.Type + // operation==opExpr: full expression is set + // operation==opCount: nothing is set. + // operation==opSum: for 'sum(a)', expr is set to 'a'. + operation operation + // expr stores the expected field name from vstreamer and dictates + // the generated bindvar names, like a_col or b_col. + expr sqlparser.Expr + // references contains all the column names referenced in the expression. + references map[string]bool + + isGrouped bool + isPK bool + dataType string + columnType string +} + +// operation is the opcode for the colExpr. +type operation int + +// The following values are the various colExpr opcodes. +const ( + opExpr = operation(iota) + opCount + opSum +) + +// insertType describes the type of insert statement to generate. +// Please refer to TestBuildPlayerPlan for examples. +type insertType int + +// The following values are the various insert types. +const ( + // insertNormal is for normal selects without a group by, like + // "select a+b as c from t". + insertNormal = insertType(iota) + // insertOnDup is for the more traditional grouped expressions, like + // "select a, b, count(*) as c from t group by a". For statements + // like these, "insert.. on duplicate key" statements will be generated + // causing "b" to be updated to the latest value (last value wins). + insertOnDup + // insertIgnore is for special grouped expressions where all columns are + // in the group by, like "select a, b, c from t group by a, b, c". + // This generates "insert ignore" statements (first value wins). + insertIgnore +) + +// buildReplicatorPlan builds a ReplicatorPlan for the tables that match the filter. +// The filter is matched against the target schema. For every table matched, +// a table-specific rule is built to be sent to the source. We don't send the +// original rule to the source because it may not match the same tables as the +// target. +// colInfoMap specifies the list of primary key columns for each table. +// copyState is a map of tables that have not been fully copied yet. +// If a table is not present in copyState, then it has been fully copied. If so, +// all replication events are applied. The table still has to match a Filter.Rule. +// If it has a non-nil entry, then the value is the last primary key (lastpk) +// that was copied. If so, only replication events < lastpk are applied. +// If the entry is nil, then copying of the table has not started yet. If so, +// no events are applied. +// The TablePlan built is a partial plan. The full plan for a table is built +// when we receive field information from events or rows sent by the source. +// buildExecutionPlan is the function that builds the full plan. +func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) { + plan := &ReplicatorPlan{ + VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode}, + TargetTables: make(map[string]*TablePlan), + TablePlans: make(map[string]*TablePlan), + ColInfoMap: colInfoMap, + stats: stats, + } + for tableName := range colInfoMap { + lastpk, ok := copyState[tableName] + if ok && lastpk == nil { + // Don't replicate uncopied tables. + continue + } + rule, err := MatchTable(tableName, filter) + if err != nil { + return nil, err + } + if rule == nil { + continue + } + tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats) + if err != nil { + return nil, err + } + if tablePlan == nil { + // Table was excluded. + continue + } + if dup, ok := plan.TablePlans[tablePlan.SendRule.Match]; ok { + return nil, fmt.Errorf("more than one target for source table %s: %s and %s", tablePlan.SendRule.Match, dup.TargetName, tableName) + } + plan.VStreamFilter.Rules = append(plan.VStreamFilter.Rules, tablePlan.SendRule) + plan.TargetTables[tableName] = tablePlan + plan.TablePlans[tablePlan.SendRule.Match] = tablePlan + } + return plan, nil +} + +// MatchTable is similar to tableMatches and buildPlan defined in vstreamer/planbuilder.go. +func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Rule, error) { + for _, rule := range filter.Rules { + switch { + case strings.HasPrefix(rule.Match, "/"): + expr := strings.Trim(rule.Match, "/") + result, err := regexp.MatchString(expr, tableName) + if err != nil { + return nil, err + } + if !result { + continue + } + return rule, nil + case tableName == rule.Match: + return rule, nil + } + } + return nil, nil +} + +func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) { + filter := rule.Filter + query := filter + // generate equivalent select statement if filter is empty or a keyrange. + switch { + case filter == "": + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v", sqlparser.NewTableIdent(tableName)) + query = buf.String() + case key.IsKeyRange(filter): + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter)) + query = buf.String() + case filter == ExcludeStr: + return nil, nil + } + sel, fromTable, err := analyzeSelectFrom(query) + if err != nil { + return nil, err + } + sendRule := &binlogdatapb.Rule{ + Match: fromTable, + } + + enumValuesMap := map[string](map[string]string){} + for k, v := range rule.ConvertEnumToText { + tokensMap := schema.ParseEnumTokensMap(v) + enumValuesMap[k] = tokensMap + } + + if expr, ok := sel.SelectExprs[0].(*sqlparser.StarExpr); ok { + // If it's a "select *", we return a partial plan, and complete + // it when we get back field info from the stream. + if len(sel.SelectExprs) != 1 { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + if !expr.TableName.IsEmpty() { + return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr)) + } + sendRule.Filter = query + tablePlan := &TablePlan{ + TargetName: tableName, + SendRule: sendRule, + Lastpk: lastpk, + Stats: stats, + EnumValuesMap: enumValuesMap, + ConvertCharset: rule.ConvertCharset, + } + + return tablePlan, nil + } + + tpb := &tablePlanBuilder{ + name: sqlparser.NewTableIdent(tableName), + sendSelect: &sqlparser.Select{ + From: sel.From, + Where: sel.Where, + }, + selColumns: make(map[string]bool), + lastpk: lastpk, + colInfos: colInfoMap[tableName], + stats: stats, + } + + if err := tpb.analyzeExprs(sel.SelectExprs); err != nil { + return nil, err + } + // It's possible that the target table does not materialize all + // the primary keys of the source table. In such situations, + // we still have to be able to validate the incoming event + // against the current lastpk. For this, we have to request + // the missing columns so we can compare against those values. + // If there is no lastpk to validate against, then we don't + // care. + if tpb.lastpk != nil { + for _, f := range tpb.lastpk.Fields { + tpb.addCol(sqlparser.NewColIdent(f.Name)) + } + } + if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil { + return nil, err + } + if err := tpb.analyzePK(colInfoMap); err != nil { + return nil, err + } + + // if there are no columns being selected the select expression can be empty, so we "select 1" so we have a valid + // select to get a row back + if len(tpb.sendSelect.SelectExprs) == 0 { + tpb.sendSelect.SelectExprs = sqlparser.SelectExprs([]sqlparser.SelectExpr{ + &sqlparser.AliasedExpr{ + Expr: sqlparser.NewIntLiteral("1"), + }, + }) + } + sendRule.Filter = sqlparser.String(tpb.sendSelect) + + tablePlan := tpb.generate() + tablePlan.SendRule = sendRule + tablePlan.EnumValuesMap = enumValuesMap + tablePlan.ConvertCharset = rule.ConvertCharset + return tablePlan, nil +} + +func (tpb *tablePlanBuilder) generate() *TablePlan { + refmap := make(map[string]bool) + for _, cexpr := range tpb.pkCols { + for k := range cexpr.references { + refmap[k] = true + } + } + if tpb.lastpk != nil { + for _, f := range tpb.lastpk.Fields { + refmap[f.Name] = true + } + } + pkrefs := make([]string, 0, len(refmap)) + for k := range refmap { + pkrefs = append(pkrefs, k) + } + sort.Strings(pkrefs) + + bvf := &bindvarFormatter{} + + fieldsToSkip := make(map[string]bool) + for _, colInfo := range tpb.colInfos { + if colInfo.IsGenerated { + fieldsToSkip[colInfo.Name] = true + } + } + + return &TablePlan{ + TargetName: tpb.name.String(), + Lastpk: tpb.lastpk, + BulkInsertFront: tpb.generateInsertPart(sqlparser.NewTrackedBuffer(bvf.formatter)), + BulkInsertValues: tpb.generateValuesPart(sqlparser.NewTrackedBuffer(bvf.formatter), bvf), + BulkInsertOnDup: tpb.generateOnDupPart(sqlparser.NewTrackedBuffer(bvf.formatter)), + Insert: tpb.generateInsertStatement(), + Update: tpb.generateUpdateStatement(), + Delete: tpb.generateDeleteStatement(), + PKReferences: pkrefs, + Stats: tpb.stats, + FieldsToSkip: fieldsToSkip, + } +} + +func analyzeSelectFrom(query string) (sel *sqlparser.Select, from string, err error) { + statement, err := sqlparser.Parse(query) + if err != nil { + return nil, "", err + } + sel, ok := statement.(*sqlparser.Select) + if !ok { + return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + } + if sel.Distinct { + return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + if len(sel.From) > 1 { + return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + node, ok := sel.From[0].(*sqlparser.AliasedTableExpr) + if !ok { + return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + fromTable := sqlparser.GetTableName(node.Expr) + if fromTable.IsEmpty() { + return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + return sel, fromTable.String(), nil +} + +func (tpb *tablePlanBuilder) analyzeExprs(selExprs sqlparser.SelectExprs) error { + for _, selExpr := range selExprs { + cexpr, err := tpb.analyzeExpr(selExpr) + if err != nil { + return err + } + tpb.colExprs = append(tpb.colExprs, cexpr) + } + return nil +} + +func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr, error) { + aliased, ok := selExpr.(*sqlparser.AliasedExpr) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(selExpr)) + } + as := aliased.As + if as.IsEmpty() { + // Require all non-trivial expressions to have an alias. + if colAs, ok := aliased.Expr.(*sqlparser.ColName); ok && colAs.Qualifier.IsEmpty() { + as = colAs.Name + } else { + return nil, fmt.Errorf("expression needs an alias: %v", sqlparser.String(aliased)) + } + } + cexpr := &colExpr{ + colName: as, + references: make(map[string]bool), + } + if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok { + selExpr := &sqlparser.ConvertUsingExpr{ + Type: "utf8mb4", + Expr: &sqlparser.ColName{Name: as}, + } + cexpr.expr = expr + cexpr.operation = opExpr + tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: selExpr, As: as}) + cexpr.references[as.Lowered()] = true + return cexpr, nil + } + if expr, ok := aliased.Expr.(*sqlparser.FuncExpr); ok { + if expr.Distinct { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + switch fname := expr.Name.Lowered(); fname { + case "count": + if _, ok := expr.Exprs[0].(*sqlparser.StarExpr); !ok { + return nil, fmt.Errorf("only count(*) is supported: %v", sqlparser.String(expr)) + } + cexpr.operation = opCount + return cexpr, nil + case "sum": + if len(expr.Exprs) != 1 { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + aInner, ok := expr.Exprs[0].(*sqlparser.AliasedExpr) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + innerCol, ok := aInner.Expr.(*sqlparser.ColName) + if !ok { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + if !innerCol.Qualifier.IsEmpty() { + return nil, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(innerCol)) + } + cexpr.operation = opSum + cexpr.expr = innerCol + tpb.addCol(innerCol.Name) + cexpr.references[innerCol.Name.Lowered()] = true + return cexpr, nil + case "keyspace_id": + if len(expr.Exprs) != 0 { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: aliased.Expr}) + // The vstreamer responds with "keyspace_id" as the field name for this request. + cexpr.expr = &sqlparser.ColName{Name: sqlparser.NewColIdent("keyspace_id")} + return cexpr, nil + } + } + err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + switch node := node.(type) { + case *sqlparser.ColName: + if !node.Qualifier.IsEmpty() { + return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) + } + tpb.addCol(node.Name) + cexpr.references[node.Name.Lowered()] = true + case *sqlparser.Subquery: + return false, fmt.Errorf("unsupported subquery: %v", sqlparser.String(node)) + case *sqlparser.FuncExpr: + // Other aggregates are not supported. + if node.IsAggregate() { + return false, fmt.Errorf("unexpected: %v", sqlparser.String(node)) + } + } + return true, nil + }, aliased.Expr) + if err != nil { + return nil, err + } + cexpr.expr = aliased.Expr + return cexpr, nil +} + +// addCol adds the specified column to the send query +// if it's not already present. +func (tpb *tablePlanBuilder) addCol(ident sqlparser.ColIdent) { + if tpb.selColumns[ident.Lowered()] { + return + } + tpb.selColumns[ident.Lowered()] = true + tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{ + Expr: &sqlparser.ColName{Name: ident}, + }) +} + +func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error { + if groupBy == nil { + // If there's no grouping, the it's an insertNormal. + return nil + } + for _, expr := range groupBy { + colname, ok := expr.(*sqlparser.ColName) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + cexpr := tpb.findCol(colname.Name) + if cexpr == nil { + return fmt.Errorf("group by expression does not reference an alias in the select list: %v", sqlparser.String(expr)) + } + if cexpr.operation != opExpr { + return fmt.Errorf("group by expression is not allowed to reference an aggregate expression: %v", sqlparser.String(expr)) + } + cexpr.isGrouped = true + } + // If all colExprs are grouped, then it's an insertIgnore. + tpb.onInsert = insertIgnore + for _, cExpr := range tpb.colExprs { + if !cExpr.isGrouped { + // If some colExprs are not grouped, then it's an insertOnDup. + tpb.onInsert = insertOnDup + break + } + } + return nil +} + +// analyzePK builds tpb.pkCols. +func (tpb *tablePlanBuilder) analyzePK(colInfoMap map[string][]*vreplication.ColumnInfo) error { + cols, ok := colInfoMap[tpb.name.String()] + if !ok { + return fmt.Errorf("table %s not found in schema", tpb.name) + } + for _, col := range cols { + if !col.IsPK { + continue + } + if col.IsGenerated { + // It's possible that a GENERATED column is part of the PRIMARY KEY. That's valid. + // But then, we also know that we don't actually SELECT a GENERATED column, we just skip + // it silently and let it re-materialize by MySQL itself on the target. + continue + } + cexpr := tpb.findCol(sqlparser.NewColIdent(col.Name)) + if cexpr == nil { + // TODO(shlomi): at some point in the futue we want to make this check stricter. + // We could be reading a generated column c1 which in turn selects some other column c2. + // We will want t oensure that `c2` is found in select list... + return fmt.Errorf("primary key column %v not found in select list", col) + } + if cexpr.operation != opExpr { + return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", col) + } + cexpr.isPK = true + cexpr.dataType = col.DataType + cexpr.columnType = col.ColumnType + tpb.pkCols = append(tpb.pkCols, cexpr) + } + return nil +} + +func (tpb *tablePlanBuilder) findCol(name sqlparser.ColIdent) *colExpr { + for _, cexpr := range tpb.colExprs { + if cexpr.colName.Equal(name) { + return cexpr + } + } + return nil +} + +func (tpb *tablePlanBuilder) generateInsertStatement() *sqlparser.ParsedQuery { + bvf := &bindvarFormatter{} + buf := sqlparser.NewTrackedBuffer(bvf.formatter) + + tpb.generateInsertPart(buf) + if tpb.lastpk == nil { + // If there's no lastpk, generate straight values. + buf.Myprintf(" values ", tpb.name) + tpb.generateValuesPart(buf, bvf) + } else { + // If there is a lastpk, generate values as a select from dual + // where the pks < lastpk + tpb.generateSelectPart(buf, bvf) + } + tpb.generateOnDupPart(buf) + + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *sqlparser.ParsedQuery { + if tpb.onInsert == insertIgnore { + buf.Myprintf("insert ignore into %v(", tpb.name) + } else { + buf.Myprintf("insert into %v(", tpb.name) + } + separator := "" + for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } + buf.Myprintf("%s%v", separator, cexpr.colName) + separator = "," + } + buf.Myprintf(")", tpb.name) + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateValuesPart(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) *sqlparser.ParsedQuery { + bvf.mode = bvAfter + separator := "(" + for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } + buf.Myprintf("%s", separator) + separator = "," + switch cexpr.operation { + case opExpr: + if cexpr.colType == querypb.Type_JSON { + buf.Myprintf("convert(%v using utf8mb4)", cexpr.expr) + } else { + buf.Myprintf("%v", cexpr.expr) + } + case opCount: + buf.WriteString("1") + case opSum: + // NULL values must be treated as 0 for SUM. + buf.Myprintf("ifnull(%v, 0)", cexpr.expr) + } + } + buf.Myprintf(")") + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) *sqlparser.ParsedQuery { + bvf.mode = bvAfter + buf.WriteString(" select ") + separator := "" + for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } + buf.Myprintf("%s", separator) + separator = ", " + switch cexpr.operation { + case opExpr: + buf.Myprintf("%v", cexpr.expr) + case opCount: + buf.WriteString("1") + case opSum: + buf.Myprintf("ifnull(%v, 0)", cexpr.expr) + } + } + buf.WriteString(" from dual where ") + tpb.generatePKConstraint(buf, bvf) + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateOnDupPart(buf *sqlparser.TrackedBuffer) *sqlparser.ParsedQuery { + if tpb.onInsert != insertOnDup { + return nil + } + buf.Myprintf(" on duplicate key update ") + separator := "" + for _, cexpr := range tpb.colExprs { + // We don't know of a use case where the group by columns + // don't match the pk of a table. But we'll allow this, + // and won't update the pk column with the new value if + // this does happen. This can be revisited if there's + // a legitimate use case in the future that demands + // a different behavior. This rule is applied uniformly + // for updates and deletes also. + if cexpr.isGrouped || cexpr.isPK { + continue + } + if tpb.isColumnGenerated(cexpr.colName) { + continue + } + buf.Myprintf("%s%v=", separator, cexpr.colName) + separator = ", " + switch cexpr.operation { + case opExpr: + buf.Myprintf("values(%v)", cexpr.colName) + case opCount: + buf.Myprintf("%v+1", cexpr.colName) + case opSum: + buf.Myprintf("%v", cexpr.colName) + buf.Myprintf("+ifnull(values(%v), 0)", cexpr.colName) + } + } + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateUpdateStatement() *sqlparser.ParsedQuery { + if tpb.onInsert == insertIgnore { + return tpb.generateInsertStatement() + } + bvf := &bindvarFormatter{} + buf := sqlparser.NewTrackedBuffer(bvf.formatter) + buf.Myprintf("update %v set ", tpb.name) + separator := "" + for _, cexpr := range tpb.colExprs { + if cexpr.isGrouped || cexpr.isPK { + continue + } + if tpb.isColumnGenerated(cexpr.colName) { + continue + } + buf.Myprintf("%s%v=", separator, cexpr.colName) + separator = ", " + switch cexpr.operation { + case opExpr: + bvf.mode = bvAfter + if cexpr.colType == querypb.Type_JSON { + buf.Myprintf("convert(%v using utf8mb4)", cexpr.expr) + } else { + buf.Myprintf("%v", cexpr.expr) + } + case opCount: + buf.Myprintf("%v", cexpr.colName) + case opSum: + buf.Myprintf("%v", cexpr.colName) + bvf.mode = bvBefore + buf.Myprintf("-ifnull(%v, 0)", cexpr.expr) + bvf.mode = bvAfter + buf.Myprintf("+ifnull(%v, 0)", cexpr.expr) + } + } + tpb.generateWhere(buf, bvf) + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { + bvf := &bindvarFormatter{} + buf := sqlparser.NewTrackedBuffer(bvf.formatter) + switch tpb.onInsert { + case insertNormal: + buf.Myprintf("delete from %v", tpb.name) + tpb.generateWhere(buf, bvf) + case insertOnDup: + bvf.mode = bvBefore + buf.Myprintf("update %v set ", tpb.name) + separator := "" + for _, cexpr := range tpb.colExprs { + if cexpr.isGrouped || cexpr.isPK { + continue + } + buf.Myprintf("%s%v=", separator, cexpr.colName) + separator = ", " + switch cexpr.operation { + case opExpr: + buf.WriteString("null") + case opCount: + buf.Myprintf("%v-1", cexpr.colName) + case opSum: + buf.Myprintf("%v-ifnull(%v, 0)", cexpr.colName, cexpr.expr) + } + } + tpb.generateWhere(buf, bvf) + case insertIgnore: + return nil + } + return buf.ParsedQuery() +} + +func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { + buf.WriteString(" where ") + bvf.mode = bvBefore + separator := "" + for _, cexpr := range tpb.pkCols { + if _, ok := cexpr.expr.(*sqlparser.ColName); ok { + buf.Myprintf("%s%v=", separator, cexpr.colName) + buf.Myprintf("%v", cexpr.expr) + } else { + // Parenthesize non-trivial expressions. + buf.Myprintf("%s%v=(", separator, cexpr.colName) + buf.Myprintf("%v", cexpr.expr) + buf.Myprintf(")") + } + separator = " and " + } + if tpb.lastpk != nil { + buf.WriteString(" and ") + tpb.generatePKConstraint(buf, bvf) + } +} + +func (tpb *tablePlanBuilder) getCharsetAndCollation(pkname string) (charSet string, collation string) { + for _, colInfo := range tpb.colInfos { + if colInfo.IsPK && strings.EqualFold(colInfo.Name, pkname) { + if colInfo.CharSet != "" { + charSet = fmt.Sprintf(" _%s ", colInfo.CharSet) + } + if colInfo.Collation != "" { + collation = fmt.Sprintf(" COLLATE %s ", colInfo.Collation) + } + } + } + return charSet, collation +} + +func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { + type charSetCollation struct { + charSet string + collation string + } + var charSetCollations []*charSetCollation + separator := "(" + for _, pkname := range tpb.lastpk.Fields { + charSet, collation := tpb.getCharsetAndCollation(pkname.Name) + charSetCollations = append(charSetCollations, &charSetCollation{charSet: charSet, collation: collation}) + buf.Myprintf("%s%s%v%s", separator, charSet, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}, collation) + separator = "," + } + separator = ") <= (" + for i, val := range tpb.lastpk.Rows[0] { + buf.WriteString(separator) + buf.WriteString(charSetCollations[i].charSet) + separator = "," + val.EncodeSQL(buf) + buf.WriteString(charSetCollations[i].collation) + } + buf.WriteString(")") +} + +func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.ColIdent) bool { + for _, colInfo := range tpb.colInfos { + if col.EqualString(colInfo.Name) && colInfo.IsGenerated { + return true + } + } + return false +} + +// bindvarFormatter is a dual mode formatter. Its behavior +// can be changed dynamically changed to generate bind vars +// for the 'before' row or 'after' row by setting its mode +// to 'bvBefore' or 'bvAfter'. For example, inserts will always +// use bvAfter, whereas deletes will always use bvBefore. +// For updates, values being set will use bvAfter, whereas +// the where clause will use bvBefore. +type bindvarFormatter struct { + mode bindvarMode +} + +type bindvarMode int + +const ( + bvBefore = bindvarMode(iota) + bvAfter +) + +func (bvf *bindvarFormatter) formatter(buf *sqlparser.TrackedBuffer, node sqlparser.SQLNode) { + if node, ok := node.(*sqlparser.ColName); ok { + switch bvf.mode { + case bvBefore: + buf.WriteArg(":", "b_"+node.Name.String()) + return + case bvAfter: + buf.WriteArg(":", "a_"+node.Name.String()) + return + } + } + node.Format(buf) +} diff --git a/go/cmd/boost/boost/vplayer.go b/go/cmd/boost/boost/vplayer.go new file mode 100644 index 00000000000..d44f9973b2a --- /dev/null +++ b/go/cmd/boost/boost/vplayer.go @@ -0,0 +1,84 @@ +package boost + +import ( + "context" + "fmt" + "strings" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +// vplayer replays binlog events by pulling them from a vstreamer. +type vplayer struct { + replicatorPlan *ReplicatorPlan + tablePlans map[string]*TablePlan + + currentVgtid *binlogdatapb.VGtid + // fields []*querypb.Field + dbPool *dbconnpool.ConnectionPool +} + +func (vp *vplayer) applyVEvent(ctx context.Context, event *binlogdatapb.VEvent) error { + switch event.Type { + case binlogdatapb.VEventType_GTID: + vp.currentVgtid = event.Vgtid + case binlogdatapb.VEventType_FIELD: + b := []string{} + for _, f := range event.FieldEvent.Fields { + b = append(b, fmt.Sprintf("%q:%q", f.Name, f.Type)) + } + log.Infof("field event: { \"table\" : %q, \"fields\": { %s } }", event.FieldEvent.TableName, strings.Join(b[:], ",")) + // -- + tplan, err := vp.replicatorPlan.buildExecutionPlan(event.FieldEvent) + if err != nil { + return err + } + + vp.tablePlans[event.FieldEvent.TableName] = tplan + log.Info("new table plan") + case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, + binlogdatapb.VEventType_REPLACE, binlogdatapb.VEventType_SAVEPOINT: + return fmt.Errorf("not implemented event types: INSERT,DELETE,REPLACE,UPDATE,SAVEPOINT") + case binlogdatapb.VEventType_ROW: + if err := vp.applyRowEvent(ctx, event.RowEvent); err != nil { + return err + } + + // //Row event is logged AFTER RowChanges are applied so as to calculate the total elapsed time for the Row event + // stats.Send(fmt.Sprintf("%v", event.RowEvent)) + } + return nil +} + +// applyRowEvent applies mysql events +// example sql generated: +// - insert into items(id,user_id,`desc`,`status`) values (3924,5,null,0) +// - update items set user_id=999, `desc`=null, `status`=0 where id=3924 +// - delete from items where id=3924 +func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.RowEvent) error { + tplan := vp.tablePlans[rowEvent.TableName] + if tplan == nil { + return fmt.Errorf("unexpected event on table %s", rowEvent.TableName) + } + for _, change := range rowEvent.RowChanges { + _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { + // stats := NewVrLogStats("ROWCHANGE") + // start := time.Now() + fmt.Println(sql) + return nil, nil + // qr, err := vp.dbClient.ExecuteWithRetry(ctx, sql) + // vp.vr.stats.QueryCount.Add(vp.phase, 1) + // vp.vr.stats.QueryTimings.Record(vp.phase, start) + // stats.Send(sql) + // return qr, err + }) + if err != nil { + return err + } + } + return nil +}