Skip to content

Commit

Permalink
Merge pull request #10 from vinted/origin/vinted/11-0-4-backport-11-b…
Browse files Browse the repository at this point in the history
…oost-stream

add initial boost stream service
  • Loading branch information
DeathBorn authored Mar 27, 2024
2 parents bb85bb2 + 25ba5a4 commit d8007a9
Show file tree
Hide file tree
Showing 7 changed files with 2,071 additions and 0 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ endif
go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/...
(cd go/cmd/vttablet && go run github.com/GeertJohan/go.rice/rice append --exec=../../../bin/vttablet)

# build boost only
boost:
ifndef NOBANNER
echo $$(date): Building source tree
endif
bash ./build.env
# build all the binaries by default with CGO disabled
CGO_ENABLED=0 go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/cmd/boost/...

# build the vitess binaries statically
build:
ifndef NOBANNER
Expand All @@ -76,6 +85,7 @@ endif
# build vtorc with CGO, because it depends on sqlite
CGO_ENABLED=1 go install $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) -ldflags "$(shell tools/build_version_flags.sh)" ./go/cmd/vtorc/...


# cross-build can be used to cross-compile Vitess client binaries
# Outside of select client binaries (namely vtctlclient & vtexplain), cross-compiled Vitess Binaries are not recommended for production deployments
# Usage: GOOS=darwin GOARCH=amd64 make cross-build
Expand Down
124 changes: 124 additions & 0 deletions go/cmd/boost/boost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"flag"

"vitess.io/vitess/go/cmd/boost/boost"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
)

var (
targetPort,
targetGrpcPort int
targetHost,
targetGtid,
targetFilter,
targetTabletType string

boostFlags = []string{
"host",
"port",
"grpcPort",
"gtid",
"filter",
"tabletType",
}
)

func usage() {
fmt.Printf("usage of boost:\n")
for _, name := range boostFlags {
f := flag.Lookup(name)
if f == nil {
panic("unknown flag " + name)
}
flagUsage(f)
}
}

// Cloned from the source to print out the usage for a given flag
func flagUsage(f *flag.Flag) {
s := fmt.Sprintf(" -%s", f.Name) // Two spaces before -; see next two comments.
name, usage := flag.UnquoteUsage(f)
if len(name) > 0 {
s += " " + name
}
// Boolean flags of one ASCII letter are so common we
// treat them specially, putting their usage on the same line.
if len(s) <= 4 { // space, space, '-', 'x'.
s += "\t"
} else {
// Four spaces before the tab triggers good alignment
// for both 4- and 8-space tab stops.
s += "\n \t"
}
s += usage
if name == "string" {
// put quotes on the value
s += fmt.Sprintf(" (default %q)", f.DefValue)
} else {
s += fmt.Sprintf(" (default %v)", f.DefValue)
}
fmt.Printf(s + "\n")
}

func init() {
flag.StringVar(&targetHost, "host", "127.0.0.1", "(defaults to 127.0.0.1)")
flag.IntVar(&targetPort, "port", 15306, "(defaults to 15306)")
flag.IntVar(&targetGrpcPort, "grpcPort", 15991, "(defaults to 15991)")
flag.StringVar(&targetGtid, "gtid", "{}", "(defaults to {})")
flag.StringVar(&targetFilter, "filter", "{}", "(defaults to{})")
flag.StringVar(&targetTabletType, "tabletType", "master", "(defaults to{})")
logger := logutil.NewConsoleLogger()
flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger))
flag.Usage = usage
}

func main() {
flag.Lookup("logtostderr").Value.Set("true")
flag.Parse()
defer logutil.Flush()

boost, err := boost.NewBoost(
targetPort,
targetGrpcPort,
targetHost,
targetGtid,
targetFilter,
targetTabletType,
)
if err != nil {
os.Exit(1)
}

err = boost.Init()
if err != nil {
boost.Close()
os.Exit(1)
}

// Catch SIGTERM and SIGINT so we get a chance to clean up.
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Infof("Cancelling due to signal: %v", sig)
cancel()
}()
defer boost.Close()

err = boost.Play(ctx)
if err != nil {
os.Exit(1)
}

log.Info("Stopped")
}
185 changes: 185 additions & 0 deletions go/cmd/boost/boost/boost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package boost

import (
"context"
"fmt"
"io"

"strings"
"time"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)

type BoostConfig struct {
Host string
Port int
GrpcPort int
User string
Password string
Gtid *binlogdatapb.VGtid
Filter *binlogdatapb.Filter
TabletType topodatapb.TabletType
Source string
Flags vtgatepb.VStreamFlags
}

type Boost struct {
config *BoostConfig
player *vplayer
}

func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost, error) {
targetVgtid := &binlogdatapb.VGtid{}
targetFilter := &binlogdatapb.Filter{}
if err := json2.Unmarshal([]byte(gtid), &targetVgtid); err != nil {
log.Error(err)
return nil, err
}
if err := json2.Unmarshal([]byte(filter), &targetFilter); err != nil {
log.Error(err)
return nil, err
}

cfg := &BoostConfig{
Host: host,
Port: port,
GrpcPort: grpcPort,
Gtid: targetVgtid,
Filter: targetFilter,
Flags: vtgatepb.VStreamFlags{
//MinimizeSkew: false,
HeartbeatInterval: 60, //seconds
},
}
switch tabletType {
case "replica":
cfg.TabletType = topodatapb.TabletType_REPLICA
case "rdonly":
cfg.TabletType = topodatapb.TabletType_RDONLY
default:
cfg.TabletType = topodatapb.TabletType_MASTER
}

boost := &Boost{
config: cfg,
}
return boost, nil
}

func (b *Boost) Init() error {
// Create and open the connection pool for dba access.
dbPool := dbconnpool.NewConnectionPool("dbPool", 10, time.Minute, 0)
dbPool.Open(
dbconfigs.New(
&mysql.ConnParams{
Host: b.config.Host,
Port: b.config.Port,
// Uname : "main",
// Pass: ,
},
),
)

// for event aplicator
colInfo, err := buildColInfoMap(context.Background(), dbPool)
if err != nil {
log.Error("---- buildColInfoMap ", err)
return err
}

replicatorPlan, err := buildReplicatorPlan(
b.config.Filter,
colInfo,
nil,
binlogplayer.NewStats(),
)
if err != nil {
log.Error("---- buildReplicatorPlan", err)
return err
}

b.player = &vplayer{
currentVgtid: b.config.Gtid,
dbPool: dbPool,
replicatorPlan: replicatorPlan,
tablePlans: make(map[string]*TablePlan),
}
return nil
}

func (b *Boost) Close() {
// close db pool
b.player.dbPool.Close()
}

func (b *Boost) Play(ctx context.Context) error {
log.Info("Playing...")
conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%v", b.config.Host, b.config.GrpcPort))
if err != nil {
log.Errorf("vtgateconn.Dial error: %s", err)
return err
}
defer conn.Close()

reader, err := conn.VStream(ctx, b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags)
if err != nil {
log.Errorf("conn.VStream error: %s", err)
return err
}
connect := false
numErrors := 0
for {
if connect { // if vtgate returns a transient error try reconnecting from the last seen vgtid
log.Info("Reconnecting... %s %s %s %s", b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags)
reader, err = conn.VStream(ctx, b.config.TabletType, b.player.currentVgtid, b.config.Filter, &b.config.Flags)
if err != nil {
log.Errorf("Reconnecting error: %s", err)
time.Sleep(100 * time.Millisecond)
continue
}
connect = false
}
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if err := b.player.applyVEvent(ctx, ev); err != nil {
if err != io.EOF {
log.Errorf("Error applying event: %s", err.Error())
}
}
}
case io.EOF:
log.Info("stream ended")
return nil
default:
log.Errorf("%s:: remote error: %v\n", time.Now(), err)
numErrors++
if numErrors > 10 { // if vtgate is continuously unavailable error the test
return fmt.Errorf("too many remote errors")
}
if strings.Contains(strings.ToLower(err.Error()), "unavailable") {
// this should not happen, but maybe the buffering logic might return a transient
// error during resharding. So adding this logic to reduce future flakiness
time.Sleep(100 * time.Millisecond)
connect = true
} else {
// failure, stop test
return fmt.Errorf("failure, stop test")
}
}
}
}
Loading

0 comments on commit d8007a9

Please sign in to comment.