diff --git a/executor/admin.go b/executor/admin.go index bc62de1084fda..53fd02640ef39 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -515,7 +515,7 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri return errors.Trace(err) } for _, idxVals := range e.idxValues[handle] { - if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, idxVals, handle); err != nil { + if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, idxVals, handle, nil); err != nil { return errors.Trace(err) } e.removeCnt++ diff --git a/executor/admin_test.go b/executor/admin_test.go index eb56c62684904..9afb949f667c1 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -96,7 +96,7 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { sc := s.ctx.GetSessionVars().StmtCtx txn, err := s.store.Begin() c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1) + err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1, nil) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) @@ -119,7 +119,7 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { txn, err = s.store.Begin() c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10) + err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10, nil) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) @@ -133,15 +133,15 @@ func (s *testSuite2) TestAdminRecoverIndex(c *C) { txn, err = s.store.Begin() c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1) + err = indexOpr.Delete(sc, txn, types.MakeDatums(1), 1, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(2), 2) + err = indexOpr.Delete(sc, txn, types.MakeDatums(2), 2, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(3), 3) + err = indexOpr.Delete(sc, txn, types.MakeDatums(3), 3, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10) + err = indexOpr.Delete(sc, txn, types.MakeDatums(10), 10, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 20) + err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 20, nil) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) @@ -193,13 +193,13 @@ func (s *testSuite2) TestAdminRecoverIndex1(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums("1"), 1) + err = indexOpr.Delete(sc, txn, types.MakeDatums("1"), 1, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums("2"), 2) + err = indexOpr.Delete(sc, txn, types.MakeDatums("2"), 2, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums("3"), 3) + err = indexOpr.Delete(sc, txn, types.MakeDatums("3"), 3, nil) c.Assert(err, IsNil) - err = indexOpr.Delete(sc, txn, types.MakeDatums("10"), 4) + err = indexOpr.Delete(sc, txn, types.MakeDatums("10"), 4, nil) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/executor/executor_test.go b/executor/executor_test.go index e69c09bfb199c..9822ccda9dd56 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2946,9 +2946,9 @@ func (s *testSuite) TestCheckIndex(c *C) { // table data (handle, data): (1, 10), (2, 20), (4, 40) txn, err = s.store.Begin() c.Assert(err, IsNil) - err = idx.Delete(sc, txn, types.MakeDatums(int64(30)), 3) + err = idx.Delete(sc, txn, types.MakeDatums(int64(30)), 3, nil) c.Assert(err, IsNil) - err = idx.Delete(sc, txn, types.MakeDatums(int64(20)), 2) + err = idx.Delete(sc, txn, types.MakeDatums(int64(20)), 2, nil) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/go.sum b/go.sum index efbb09d813ad1..d64efc2303748 100644 --- a/go.sum +++ b/go.sum @@ -150,7 +150,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -191,13 +190,11 @@ golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk= golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -205,19 +202,15 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 h1:JG/0uqcGdTNgq7FdU+61l5Pdmb8putNZlXb65bJBROs= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0 h1:iRpjPej1fPzmfoBhMFkp3HdqzF+ytPmAwiQhJGV0zGw= golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU= google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= -google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -231,7 +224,6 @@ gopkg.in/stretchr/testify.v1 v1.2.2 h1:yhQC6Uy5CqibAIlk1wlusa/MJ3iAN49/BsR/dCCKz gopkg.in/stretchr/testify.v1 v1.2.2/go.mod h1:QI5V/q6UbPmuhtm10CaFZxED9NreB8PnFYN9JcR6TxU= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/kv/kv.go b/kv/kv.go index 202cc8a1e4ffe..a0c94625bf029 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -148,6 +148,8 @@ type Transaction interface { GetMemBuffer() MemBuffer // SetVars sets variables to the transaction. SetVars(vars *Variables) + // SetAssertion sets an assertion for an operation on the key. + SetAssertion(key Key, assertion AssertionType) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. BatchGet(keys []Key) (map[string][]byte, error) } diff --git a/kv/mock.go b/kv/mock.go index 278c24636dc14..de35601edf718 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -114,6 +114,8 @@ func (t *mockTxn) SetVars(vars *Variables) { } +func (t *mockTxn) SetAssertion(key Key, assertion AssertionType) {} + // NewMockTxn new a mockTxn. func NewMockTxn() Transaction { return &mockTxn{ diff --git a/kv/union_store.go b/kv/union_store.go index b73f1c28de0ae..6fc2d97281b36 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -36,6 +36,16 @@ type UnionStore interface { GetMemBuffer() MemBuffer } +// AssertionType is the type of a assertion. +type AssertionType int + +// The AssertionType constants. +const ( + None AssertionType = iota + Exist + NotExist +) + // Option is used for customizing kv store's behaviors during a transaction. type Option int diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 584d581fd50b6..37033ad85ec43 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -58,5 +58,5 @@ func (e ErrAbort) Error() string { type ErrAlreadyCommitted uint64 func (e ErrAlreadyCommitted) Error() string { - return fmt.Sprint("txn already committed") + return "txn already committed" } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 920852cb16dc8..b844973e2f1b2 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -588,6 +588,13 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu if err != nil { return errors.Trace(err) } + + // Check assertions. + if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) || + (!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) { + log.Error("ASSERTION FAIL!!!", mutation) + } + batch.Put(writeKey, writeValue) return nil } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index e3b28c58dd793..3bcbe933d591a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -67,7 +67,7 @@ type twoPhaseCommitter struct { txn *tikvTxn startTS uint64 keys [][]byte - mutations map[string]*pb.Mutation + mutations map[string]*mutationEx lockTTL uint64 commitTS uint64 mu struct { @@ -86,6 +86,11 @@ type twoPhaseCommitter struct { detail *execdetails.CommitDetails } +type mutationEx struct { + pb.Mutation + asserted bool +} + // newTwoPhaseCommitter creates a twoPhaseCommitter. func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, error) { var ( @@ -95,23 +100,27 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro delCnt int lockCnt int ) - mutations := make(map[string]*pb.Mutation) + mutations := make(map[string]*mutationEx) err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error { if len(v) > 0 { op := pb.Op_Put if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() { op = pb.Op_Insert } - mutations[string(k)] = &pb.Mutation{ - Op: op, - Key: k, - Value: v, + mutations[string(k)] = &mutationEx{ + Mutation: pb.Mutation{ + Op: op, + Key: k, + Value: v, + }, } putCnt++ } else { - mutations[string(k)] = &pb.Mutation{ - Op: pb.Op_Del, - Key: k, + mutations[string(k)] = &mutationEx{ + Mutation: pb.Mutation{ + Op: pb.Op_Del, + Key: k, + }, } delCnt++ } @@ -128,9 +137,11 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro } for _, lockKey := range txn.lockKeys { if _, ok := mutations[string(lockKey)]; !ok { - mutations[string(lockKey)] = &pb.Mutation{ - Op: pb.Op_Lock, - Key: lockKey, + mutations[string(lockKey)] = &mutationEx{ + Mutation: pb.Mutation{ + Op: pb.Op_Lock, + Key: lockKey, + }, } lockCnt++ keys = append(keys, lockKey) @@ -140,6 +151,28 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro if len(keys) == 0 { return nil, nil } + + for _, pair := range txn.assertions { + mutation, ok := mutations[string(pair.key)] + if !ok { + log.Error("ASSERTION FAIL!!! assertion exists but no mutation?", pair) + continue + } + // Only apply the first assertion! + if mutation.asserted { + continue + } + switch pair.assertion { + case kv.Exist: + mutation.Assertion = pb.Assertion_Exist + case kv.NotExist: + mutation.Assertion = pb.Assertion_NotExist + default: + mutation.Assertion = pb.Assertion_None + } + mutation.asserted = true + } + entrylimit := atomic.LoadUint64(&kv.TxnEntryCountLimit) if len(keys) > int(entrylimit) || size > kv.TxnTotalSizeLimit { return nil, kv.ErrTxnTooLarge @@ -350,7 +383,8 @@ func (c *twoPhaseCommitter) keySize(key []byte) int { func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { - mutations[i] = c.mutations[string(k)] + tmp := c.mutations[string(k)] + mutations[i] = &tmp.Mutation } req := &tikvrpc.Request{ diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 79d24de09471b..68125966c6b1c 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -45,6 +45,9 @@ type tikvTxn struct { dirty bool setCnt int64 vars *kv.Variables + + // For data consistency check. + assertions []assertionPair } func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { @@ -71,6 +74,16 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) { }, nil } +type assertionPair struct { + key kv.Key + assertion kv.AssertionType +} + +// SetAssertion sets a assertion for the key operation. +func (txn *tikvTxn) SetAssertion(key kv.Key, assertion kv.AssertionType) { + txn.assertions = append(txn.assertions, assertionPair{key, assertion}) +} + func (txn *tikvTxn) SetVars(vars *kv.Variables) { txn.vars = vars txn.snapshot.vars = vars diff --git a/table/index.go b/table/index.go index 5553614822718..2e33b8f74323d 100644 --- a/table/index.go +++ b/table/index.go @@ -40,7 +40,7 @@ type Index interface { // Create supports insert into statement. Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...*CreateIdxOpt) (int64, error) // Delete supports delete from statement. - Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64) error + Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error // Drop supports drop table, drop index statements. Drop(rm kv.RetrieverMutator) error // Exist supports check index exists or not. diff --git a/table/tables/index.go b/table/tables/index.go index e6f67006aac7a..53207be61b537 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -230,12 +230,21 @@ func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedVa } // Delete removes the entry for handle h and indexdValues from KV index. -func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64) error { +func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error { key, _, err := c.GenIndexKey(sc, indexedValues, h, nil) if err != nil { return errors.Trace(err) } err = m.Delete(key) + if ss != nil { + switch c.idxInfo.State { + case model.StatePublic: + // If the index is in public state, delete this index means it must exists. + ss.SetAssertion(key, kv.Exist) + default: + ss.SetAssertion(key, kv.None) + } + } return errors.Trace(err) } diff --git a/table/tables/index_test.go b/table/tables/index_test.go index 993f077bfe537..c6646b1ec75c0 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -99,7 +99,7 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(err, IsNil) c.Assert(exist, IsTrue) - err = index.Delete(sc, txn, values, 1) + err = index.Delete(sc, txn, values, 1, nil) c.Assert(err, IsNil) it, err = index.SeekFirst(txn) diff --git a/table/tables/tables.go b/table/tables/tables.go index 0d0163acb4725..eb1e03eeb32ca 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -353,6 +353,10 @@ func (t *tableCommon) UpdateRecord(ctx sessionctx.Context, h int64, oldData, new } func (t *tableCommon) rebuildIndices(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, touched []bool, oldData []types.Datum, newData []types.Datum) error { + txn, err := ctx.Txn(true) + if err != nil { + return err + } for _, idx := range t.DeletableIndices() { for _, ic := range idx.Meta().Columns { if !touched[ic.Offset] { @@ -362,7 +366,7 @@ func (t *tableCommon) rebuildIndices(ctx sessionctx.Context, rm kv.RetrieverMuta if err != nil { return errors.Trace(err) } - if err = t.removeRowIndex(ctx.GetSessionVars().StmtCtx, rm, h, oldVs, idx); err != nil { + if err = t.removeRowIndex(ctx.GetSessionVars().StmtCtx, rm, h, oldVs, idx, txn); err != nil { return errors.Trace(err) } break @@ -503,6 +507,8 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. if err = txn.Set(key, value); err != nil { return 0, errors.Trace(err) } + txn.SetAssertion(key, kv.None) + if !sessVars.LightningMode { if err = rm.(*kv.BufferStore).SaveTo(txn); err != nil { return 0, errors.Trace(err) @@ -758,7 +764,10 @@ func (t *tableCommon) removeRowData(ctx sessionctx.Context, h int64) error { if err != nil { return errors.Trace(err) } - err = txn.Delete([]byte(t.RecordKey(h))) + + key := t.RecordKey(h) + txn.SetAssertion(key, kv.Exist) + err = txn.Delete([]byte(key)) if err != nil { return errors.Trace(err) } @@ -771,14 +780,13 @@ func (t *tableCommon) removeRowIndices(ctx sessionctx.Context, h int64, rec []ty if err != nil { return errors.Trace(err) } - for _, v := range t.DeletableIndices() { vals, err := v.FetchValues(rec, nil) if err != nil { log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, txn.StartTS(), h, rec) return errors.Trace(err) } - if err = v.Delete(ctx.GetSessionVars().StmtCtx, txn, vals, h); err != nil { + if err = v.Delete(ctx.GetSessionVars().StmtCtx, txn, vals, h, txn); err != nil { if v.Meta().State != model.StatePublic && kv.ErrNotExist.Equal(err) { // If the index is not in public state, we may have not created the index, // or already deleted the index, so skip ErrNotExist error. @@ -791,9 +799,9 @@ func (t *tableCommon) removeRowIndices(ctx sessionctx.Context, h int64, rec []ty return nil } -// removeRowIndex implements table.Table RemoveRowIndex interface. -func (t *tableCommon) removeRowIndex(sc *stmtctx.StatementContext, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index) error { - if err := idx.Delete(sc, rm, vals, h); err != nil { +// removeRowIndex implements table.Table RemoveRowIndex interface.能 +func (t *tableCommon) removeRowIndex(sc *stmtctx.StatementContext, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index, txn kv.Transaction) error { + if err := idx.Delete(sc, rm, vals, h, txn); err != nil { return errors.Trace(err) } return nil diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 113977d91ae1a..13eb09f8ded11 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -511,7 +511,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30) // table data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40) - err = idx.Delete(sc, txn, types.MakeDatums(int64(40)), 4) + err = idx.Delete(sc, txn, types.MakeDatums(int64(40)), 4, nil) c.Assert(err, IsNil) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40)))