Skip to content

Commit

Permalink
*: add assertion in kvproto to check data consistency (pingcap#9180)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Mar 8, 2019
1 parent 85c4553 commit 3910a72
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 47 deletions.
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri
return errors.Trace(err)
}
for _, idxVals := range e.idxValues[handle] {
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, idxVals, handle); err != nil {
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, idxVals, handle, nil); err != nil {
return errors.Trace(err)
}
e.removeCnt++
Expand Down
22 changes: 11 additions & 11 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) {
sc := s.ctx.GetSessionVars().StmtCtx
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1)
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -119,7 +119,7 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) {

txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10)
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -133,15 +133,15 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) {

txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1)
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(2), 2)
err = indexOpr.Delete(sc, txn, types.MakeDatums(2), 2, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(3), 3)
err = indexOpr.Delete(sc, txn, types.MakeDatums(3), 3, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10)
err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 20)
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 20, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -193,13 +193,13 @@ func (s *testSuite2) TestAdminRecoverIndex1(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums("1"), 1)
err = indexOpr.Delete(sc, txn, types.MakeDatums("1"), 1, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums("2"), 2)
err = indexOpr.Delete(sc, txn, types.MakeDatums("2"), 2, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums("3"), 3)
err = indexOpr.Delete(sc, txn, types.MakeDatums("3"), 3, nil)
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums("10"), 4)
err = indexOpr.Delete(sc, txn, types.MakeDatums("10"), 4, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2946,9 +2946,9 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = idx.Delete(sc, txn, types.MakeDatums(int64(30)), 3)
err = idx.Delete(sc, txn, types.MakeDatums(int64(30)), 3, nil)
c.Assert(err, IsNil)
err = idx.Delete(sc, txn, types.MakeDatums(int64(20)), 2)
err = idx.Delete(sc, txn, types.MakeDatums(int64(20)), 2, nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -191,33 +190,27 @@ golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 h1:JG/0uqcGdTNgq7FdU+61l5Pdmb8putNZlXb65bJBROs=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0 h1:iRpjPej1fPzmfoBhMFkp3HdqzF+ytPmAwiQhJGV0zGw=
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
Expand All @@ -231,7 +224,6 @@ gopkg.in/stretchr/testify.v1 v1.2.2 h1:yhQC6Uy5CqibAIlk1wlusa/MJ3iAN49/BsR/dCCKz
gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ type Transaction interface {
GetMemBuffer() MemBuffer
// SetVars sets variables to the transaction.
SetVars(vars *Variables)
// SetAssertion sets an assertion for an operation on the key.
SetAssertion(key Key, assertion AssertionType)
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
}
Expand Down
2 changes: 2 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func (t *mockTxn) SetVars(vars *Variables) {

}

func (t *mockTxn) SetAssertion(key Key, assertion AssertionType) {}

// NewMockTxn new a mockTxn.
func NewMockTxn() Transaction {
return &mockTxn{
Expand Down
10 changes: 10 additions & 0 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ type UnionStore interface {
GetMemBuffer() MemBuffer
}

// AssertionType is the type of a assertion.
type AssertionType int

// The AssertionType constants.
const (
None AssertionType = iota
Exist
NotExist
)

// Option is used for customizing kv store's behaviors during a transaction.
type Option int

Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ func (e ErrAbort) Error() string {
type ErrAlreadyCommitted uint64

func (e ErrAlreadyCommitted) Error() string {
return fmt.Sprint("txn already committed")
return "txn already committed"
}
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
if err != nil {
return errors.Trace(err)
}

// Check assertions.
if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) ||
(!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) {
log.Error("ASSERTION FAIL!!!", mutation)
}

batch.Put(writeKey, writeValue)
return nil
}
Expand Down
60 changes: 47 additions & 13 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type twoPhaseCommitter struct {
txn *tikvTxn
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
mutations map[string]*mutationEx
lockTTL uint64
commitTS uint64
mu struct {
Expand All @@ -86,6 +86,11 @@ type twoPhaseCommitter struct {
detail *execdetails.CommitDetails
}

type mutationEx struct {
pb.Mutation
asserted bool
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) {
var (
Expand All @@ -95,23 +100,27 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
delCnt int
lockCnt int
)
mutations := make(map[string]*pb.Mutation)
mutations := make(map[string]*mutationEx)
err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error {
if len(v) > 0 {
op := pb.Op_Put
if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() {
op = pb.Op_Insert
}
mutations[string(k)] = &pb.Mutation{
Op: op,
Key: k,
Value: v,
mutations[string(k)] = &mutationEx{
Mutation: pb.Mutation{
Op: op,
Key: k,
Value: v,
},
}
putCnt++
} else {
mutations[string(k)] = &pb.Mutation{
Op: pb.Op_Del,
Key: k,
mutations[string(k)] = &mutationEx{
Mutation: pb.Mutation{
Op: pb.Op_Del,
Key: k,
},
}
delCnt++
}
Expand All @@ -128,9 +137,11 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
}
for _, lockKey := range txn.lockKeys {
if _, ok := mutations[string(lockKey)]; !ok {
mutations[string(lockKey)] = &pb.Mutation{
Op: pb.Op_Lock,
Key: lockKey,
mutations[string(lockKey)] = &mutationEx{
Mutation: pb.Mutation{
Op: pb.Op_Lock,
Key: lockKey,
},
}
lockCnt++
keys = append(keys, lockKey)
Expand All @@ -140,6 +151,28 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
if len(keys) == 0 {
return nil, nil
}

for _, pair := range txn.assertions {
mutation, ok := mutations[string(pair.key)]
if !ok {
log.Error("ASSERTION FAIL!!! assertion exists but no mutation?", pair)
continue
}
// Only apply the first assertion!
if mutation.asserted {
continue
}
switch pair.assertion {
case kv.Exist:
mutation.Assertion = pb.Assertion_Exist
case kv.NotExist:
mutation.Assertion = pb.Assertion_NotExist
default:
mutation.Assertion = pb.Assertion_None
}
mutation.asserted = true
}

entrylimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
if len(keys) > int(entrylimit) || size > kv.TxnTotalSizeLimit {
return nil, kv.ErrTxnTooLarge
Expand Down Expand Up @@ -350,7 +383,8 @@ func (c *twoPhaseCommitter) keySize(key []byte) int {
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
tmp := c.mutations[string(k)]
mutations[i] = &tmp.Mutation
}

req := &tikvrpc.Request{
Expand Down
13 changes: 13 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type tikvTxn struct {
dirty bool
setCnt int64
vars *kv.Variables

// For data consistency check.
assertions []assertionPair
}

func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
Expand All @@ -71,6 +74,16 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) {
}, nil
}

type assertionPair struct {
key kv.Key
assertion kv.AssertionType
}

// SetAssertion sets a assertion for the key operation.
func (txn *tikvTxn) SetAssertion(key kv.Key, assertion kv.AssertionType) {
txn.assertions = append(txn.assertions, assertionPair{key, assertion})
}

func (txn *tikvTxn) SetVars(vars *kv.Variables) {
txn.vars = vars
txn.snapshot.vars = vars
Expand Down
2 changes: 1 addition & 1 deletion table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Index interface {
// Create supports insert into statement.
Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...*CreateIdxOpt) (int64, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64) error
Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error
// Drop supports drop table, drop index statements.
Drop(rm kv.RetrieverMutator) error
// Exist supports check index exists or not.
Expand Down
11 changes: 10 additions & 1 deletion table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,21 @@ func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedVa
}

// Delete removes the entry for handle h and indexdValues from KV index.
func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64) error {
func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error {
key, _, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return errors.Trace(err)
}
err = m.Delete(key)
if ss != nil {
switch c.idxInfo.State {
case model.StatePublic:
// If the index is in public state, delete this index means it must exists.
ss.SetAssertion(key, kv.Exist)
default:
ss.SetAssertion(key, kv.None)
}
}
return errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion table/tables/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsTrue)

err = index.Delete(sc, txn, values, 1)
err = index.Delete(sc, txn, values, 1, nil)
c.Assert(err, IsNil)

it, err = index.SeekFirst(txn)
Expand Down
Loading

0 comments on commit 3910a72

Please sign in to comment.