Skip to content

Commit

Permalink
*: add a flag to retry commit when data loading fails on prepare stage (
Browse files Browse the repository at this point in the history
#72)

Signed-off-by: mahjonp <[email protected]>
  • Loading branch information
mahjonp authored Feb 26, 2021
1 parent 54d8a24 commit 08b8347
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 56 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
GOOS := $(if $(GOOS),$(GOOS),linux)
GOARCH := $(if $(GOARCH),$(GOARCH),amd64)
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOARCH=$(GOARCH) GO111MODULE=on go

PACKAGE_LIST := go list ./...| grep -vE "cmd"
PACKAGES := $$($(PACKAGE_LIST))
FILES_TO_FMT := $(shell find . -path -prune -o -name '*.go' -print)

LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.ReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildHash=$(shell git rev-parse HEAD)"

GOBUILD=$(GO) build -ldflags '$(LDFLAGS)'

# Image URL to use all building/pushing image targets
IMG ?= go-tpc:latest

Expand All @@ -22,7 +27,7 @@ test:
go test ./... -cover $(PACKAGES)

build: mod
go build -o ./bin/go-tpc cmd/go-tpc/*
$(GOBUILD) -o ./bin/go-tpc cmd/go-tpc/*

vet:
go vet ./...
Expand Down
6 changes: 3 additions & 3 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error {
return err
}
sqlLoader := map[dbgen.Table]dbgen.Loader{
dbgen.TSupp: tpch.NewSuppLoader(ctx, s.Conn),
dbgen.TNation: tpch.NewNationLoader(ctx, s.Conn),
dbgen.TRegion: tpch.NewRegionLoader(ctx, s.Conn),
dbgen.TSupp: tpch.NewSuppLoader(ctx, w.db),
dbgen.TNation: tpch.NewNationLoader(ctx, w.db),
dbgen.TRegion: tpch.NewRegionLoader(ctx, w.db),
}
dbgen.InitDbGen(1)
if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TSupp}); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func main() {

cobra.EnablePrefixMatching = true

registerVersionInfo(rootCmd)
registerTpcc(rootCmd)
registerTpch(rootCmd)
registerCHBenchmark(rootCmd)
Expand Down
3 changes: 3 additions & 0 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac
go func(index int) {
defer wg.Done()
if err := execute(ctx, w, action, threads, index); err != nil {
if action == "prepare" {
panic(fmt.Sprintf("a fatal occurred when preparing data: %v", err))
}
fmt.Printf("execute %s failed, err %v\n", action, err)
return
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "net/http/pprof"
"os"
"runtime"
"time"

"github.com/pingcap/go-tpc/pkg/measurement"
"github.com/pingcap/go-tpc/pkg/workload"
Expand Down Expand Up @@ -90,6 +91,8 @@ func registerTpcc(root *cobra.Command) {
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareReCommitCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareReCommitDuration, "retry-duration", 10*time.Second, "The duration for each retry")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
25 changes: 25 additions & 0 deletions cmd/go-tpc/versioninfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"fmt"

"github.com/spf13/cobra"

"github.com/pingcap/go-tpc/pkg/util"
)

func printVersion() {
fmt.Println("Git Commit Hash:", util.BuildHash)
fmt.Println("UTC Build Time:", util.BuildTS)
fmt.Println("Release version:", util.ReleaseVersion)
}

func registerVersionInfo(root *cobra.Command) {
cmd := &cobra.Command{
Use: "version",
Run: func(cmd *cobra.Command, args []string) {
printVersion()
},
}
root.AddCommand(cmd)
}
35 changes: 29 additions & 6 deletions pkg/load/batch_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"context"
"database/sql"
"encoding/csv"
"fmt"
"os"
"strings"
"time"
)

const (
Expand All @@ -20,17 +23,23 @@ type BatchLoader interface {
// SQLBatchLoader helps us insert in batch
type SQLBatchLoader struct {
insertHint string
conn *sql.Conn
db *sql.DB
buf bytes.Buffer
count int

// loader retry
retryCount int
retryDuration time.Duration
}

// NewSQLBatchLoader creates a batch loader for database connection
func NewSQLBatchLoader(conn *sql.Conn, hint string) *SQLBatchLoader {
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryDuration time.Duration) *SQLBatchLoader {
return &SQLBatchLoader{
count: 0,
insertHint: hint,
conn: conn,
count: 0,
insertHint: hint,
db: db,
retryCount: retryCount,
retryDuration: retryDuration,
}
}

Expand Down Expand Up @@ -59,7 +68,21 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error {
return nil
}

_, err := b.conn.ExecContext(ctx, b.buf.String())
var err error
for i := 0; i < 1+b.retryCount; i++ {
_, err = b.db.ExecContext(ctx, b.buf.String())
if err == nil || (strings.Contains(err.Error(), "Error 1062: Duplicate entry") && i == 0) {
break
}
if i < b.retryCount {
fmt.Printf("exec statement error: %v, may try again later...\n", err)
time.Sleep(b.retryDuration)
}
}
if err != nil {
return fmt.Errorf("exec statement error: %v", err)
}

b.count = 0
b.buf.Reset()

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/versioninfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package util

// Version information
var (
ReleaseVersion string
BuildTS string
BuildHash string
)
18 changes: 9 additions & 9 deletions tpcc/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (w *Workloader) loadItem(ctx context.Context) error {
s := getTPCCState(ctx)
hint := "INSERT INTO item (i_id, i_im_id, i_name, i_price, i_data) VALUES "

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < maxItems; i++ {
s.Buf.Reset()
Expand All @@ -50,7 +50,7 @@ func (w *Workloader) loadWarehouse(ctx context.Context, warehouse int) error {
s := getTPCCState(ctx)
hint := "INSERT INTO warehouse (w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd) VALUES "

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

wName := randChars(s.R, s.Buf, 6, 10)
wStree1 := randChars(s.R, s.Buf, 10, 20)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (w *Workloader) loadStock(ctx context.Context, warehouse int) error {
s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06,
s_dist_07, s_dist_08, s_dist_09, s_dist_10, s_ytd, s_order_cnt, s_remote_cnt, s_data) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < stockPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (w *Workloader) loadDistrict(ctx context.Context, warehouse int) error {
hint := `INSERT INTO district (d_id, d_w_id, d_name, d_street_1, d_street_2,
d_city, d_state, d_zip, d_tax, d_ytd, d_next_o_id) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < districtPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (w *Workloader) loadCustomer(ctx context.Context, warehouse int, district i
c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim,
c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_delivery_cnt, c_data) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < customerPerDistrict; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -212,7 +212,7 @@ func (w *Workloader) loadHistory(ctx context.Context, warehouse int, district in
s := getTPCCState(ctx)

hint := `INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES `
l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

// 1 customer has 1 row
for i := 0; i < customerPerDistrict; i++ {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (w *Workloader) loadOrder(ctx context.Context, warehouse int, district int)
hint := `INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d,
o_carrier_id, o_ol_cnt, o_all_local) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

cids := rand.Perm(orderPerDistrict)
s.R.Shuffle(len(cids), func(i, j int) {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *Workloader) loadNewOrder(ctx context.Context, warehouse int, district i

hint := `INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < newOrderPerDistrict; i++ {
s.Buf.Reset()
Expand All @@ -312,7 +312,7 @@ func (w *Workloader) loadOrderLine(ctx context.Context, warehouse int, district
hint := `INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number,
ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < orderPerDistrict; i++ {
for j := 0; j < olCnts[i]; j++ {
Expand Down
2 changes: 1 addition & 1 deletion tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (w *Workloader) runNewOrder(ctx context.Context, thread int) error {

// Process 1
if err := s.newOrderStmts[newOrderSelectCustomer].QueryRowContext(ctx, d.wID, d.dID, d.cID).Scan(&d.cDiscount, &d.cLast, &d.cCredit, &d.wTax); err != nil {
return fmt.Errorf("exec %s failed %v", newOrderSelectCustomer, err)
return fmt.Errorf("exec %s(wID=%d,dID=%d,cID=%d) failed %v", newOrderSelectCustomer, d.wID, d.dID, d.cID, err)
}

// Process 2
Expand Down
4 changes: 4 additions & 0 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Config struct {
OutputType string
OutputDir string
SpecifiedTables string

// connection, retry count when commiting statement fails, default 0
PrepareReCommitCount int
PrepareReCommitDuration time.Duration
}

// Workloader is TPCC workload
Expand Down
49 changes: 25 additions & 24 deletions tpch/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"

"github.com/pingcap/go-tpc/pkg/load"
"github.com/pingcap/go-tpc/tpch/dbgen"
)
Expand Down Expand Up @@ -173,43 +174,43 @@ func (r *regionLoader) Load(item interface{}) error {
return r.InsertValue(v)
}

func NewOrderLoader(ctx context.Context, conn *sql.Conn) *orderLoader {
return &orderLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `),
func NewOrderLoader(ctx context.Context, db *sql.DB) *orderLoader {
return &orderLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewLineItemLoader(ctx context.Context, conn *sql.Conn) *lineItemloader {
return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `),
func NewLineItemLoader(ctx context.Context, db *sql.DB) *lineItemloader {
return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewCustLoader(ctx context.Context, conn *sql.Conn) *custLoader {
return &custLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `),
func NewCustLoader(ctx context.Context, db *sql.DB) *custLoader {
return &custLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewPartLoader(ctx context.Context, conn *sql.Conn) *partLoader {
return &partLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `),
func NewPartLoader(ctx context.Context, db *sql.DB) *partLoader {
return &partLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewPartSuppLoader(ctx context.Context, conn *sql.Conn) *partSuppLoader {
return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `),
func NewPartSuppLoader(ctx context.Context, db *sql.DB) *partSuppLoader {
return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewSuppLoader(ctx context.Context, conn *sql.Conn) *suppLoader {
return &suppLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `),
func NewSuppLoader(ctx context.Context, db *sql.DB) *suppLoader {
return &suppLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewNationLoader(ctx context.Context, conn *sql.Conn) *nationLoader {
return &nationLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `),
func NewNationLoader(ctx context.Context, db *sql.DB) *nationLoader {
return &nationLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewRegionLoader(ctx context.Context, conn *sql.Conn) *regionLoader {
return &regionLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `),
func NewRegionLoader(ctx context.Context, db *sql.DB) *regionLoader {
return &regionLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `, 0, 0),
ctx}}
}
18 changes: 8 additions & 10 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,18 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error {
if threadID != 0 {
return nil
}
s := w.getState(ctx)

if err := w.createTables(ctx); err != nil {
return err
}
sqlLoader := map[dbgen.Table]dbgen.Loader{
dbgen.TOrder: NewOrderLoader(ctx, s.Conn),
dbgen.TLine: NewLineItemLoader(ctx, s.Conn),
dbgen.TPart: NewPartLoader(ctx, s.Conn),
dbgen.TPsupp: NewPartSuppLoader(ctx, s.Conn),
dbgen.TSupp: NewSuppLoader(ctx, s.Conn),
dbgen.TCust: NewCustLoader(ctx, s.Conn),
dbgen.TNation: NewNationLoader(ctx, s.Conn),
dbgen.TRegion: NewRegionLoader(ctx, s.Conn),
dbgen.TOrder: NewOrderLoader(ctx, w.db),
dbgen.TLine: NewLineItemLoader(ctx, w.db),
dbgen.TPart: NewPartLoader(ctx, w.db),
dbgen.TPsupp: NewPartSuppLoader(ctx, w.db),
dbgen.TSupp: NewSuppLoader(ctx, w.db),
dbgen.TCust: NewCustLoader(ctx, w.db),
dbgen.TNation: NewNationLoader(ctx, w.db),
dbgen.TRegion: NewRegionLoader(ctx, w.db),
}
dbgen.InitDbGen(int64(w.cfg.ScaleFactor))
if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TCust, dbgen.TSupp, dbgen.TPartPsupp, dbgen.TOrderLine}); err != nil {
Expand Down

0 comments on commit 08b8347

Please sign in to comment.