Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support distributed dead lock detection #263

Merged
merged 8 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions tikv/deadlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package tikv

import (
"context"
"google.golang.org/grpc"
"sync/atomic"
"time"

"github.com/ngaut/log"
"github.com/ngaut/unistore/pd"
"github.com/ngaut/unistore/util/lockwaiter"
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/metapb"
)

// Follower will send detection rpc to Leader
const (
Follower = iota
Leader
)

// DeadlockDetector is a util used for distributed deadlock detection
type DeadlockDetector struct {
detector *Detector
pdClient pd.Client
storeMeta *metapb.Store
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
sendCh chan *deadlockPb.DeadlockRequest
waitMgr *lockwaiter.Manager
streamCli deadlockPb.Deadlock_DetectClient

// these fields used by multiple thread
role int32
}

// refreshFirstRegionLeader will send request to pd to find out the
// current leader node for the first region
func (dt *DeadlockDetector) refreshFirstRegionLeader() (string, error) {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
// find first region from pd, get the first region leader
ctx := context.Background()
_, leaderPeer, err := dt.pdClient.GetRegion(ctx, []byte{})
if err != nil {
log.Errorf("get first region failed, err: %v", err)
return "", err
}
leaderStoreMeta, err := dt.pdClient.GetStore(ctx, leaderPeer.GetStoreId())
if err != nil {
log.Errorf("get store=%d failed, err=%v", leaderPeer.GetStoreId(), err)
return "", err
}
log.Warnf("refreshFirstRegionLeader leader_peer=%v addr=%s", leaderPeer, leaderStoreMeta.GetAddress())
return leaderStoreMeta.GetAddress(), nil
}

// rebuildStreamClient builds connection to the first region leader,
// it's not thread safe and should be called only by `DeadlockDetector.Start` or `DeadlockDetector.SendReqLoop`
func (dt *DeadlockDetector) rebuildStreamClient() (string, error) {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
leaderArr, err := dt.refreshFirstRegionLeader()
if err != nil {
return "", err
}
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
cc, err := grpc.Dial(leaderArr, grpc.WithInsecure())
if err != nil {
return "", err
}
ctx, cancel := context.WithCancel(context.Background())
stream, err := deadlockPb.NewDeadlockClient(cc).Detect(ctx)
if err != nil {
cancel()
return "", err
}
dt.streamCli = stream
go dt.recvLoop(dt.streamCli)
return leaderArr, nil
}

// NewDeadlockDetector will create a new detector util, entryTTL is used for
// recycling the lock wait edge in detector wait wap. chSize is the pending
// detection sending task size(used on non leader node)
func NewDeadlockDetector(waiterMgr *lockwaiter.Manager) *DeadlockDetector {
chSize := 1000
entryTTL := time.Duration(3 * time.Second)
urgentSize := uint64(100000)
exipreInterval := 3600 * time.Second
newDetector := &DeadlockDetector{
detector: NewDetector(entryTTL, urgentSize, exipreInterval),
sendCh: make(chan *deadlockPb.DeadlockRequest, chSize),
waitMgr: waiterMgr,
}
return newDetector
}

// Start starts the detection `send`, `recv` and `entry recycle` loop
func (dt *DeadlockDetector) Start() {
go dt.sendReqLoop()
}

// sendReqLoop will send detection request to leader, stream connection will be rebuilt and
// a new recv goroutine using the same stream client will be created
func (dt *DeadlockDetector) sendReqLoop() {
var (
err error
rebuildErr error
req *deadlockPb.DeadlockRequest
leaderAddr string
)
for {
if dt.streamCli == nil {
leaderAddr, rebuildErr = dt.rebuildStreamClient()
if rebuildErr != nil {
log.Errorf("rebuild connection to first region failed, err=%v", rebuildErr)
time.Sleep(3 * time.Second)
continue
}
log.Infof("rebuild stream connection to leader peer success, leaderAddr=%v", leaderAddr)
}
req = <-dt.sendCh
err = dt.streamCli.Send(req)
if err != nil {
log.Warnf("send req to addr=%v failed, err=%v, invalid current stream and try to rebuild connection",
leaderAddr, err)
dt.streamCli = nil
}
}
}

// recvLoop tries to recv response(current only deadlock error) from leader, break loop if errors happen
func (dt *DeadlockDetector) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) {
var (
err error
resp *deadlockPb.DeadlockResponse
)
for {
resp, err = streamCli.Recv()
if err != nil {
log.Warnf("recv from failed, err=%v, stop receive", err)
break
}
// here only detection request will get response from leader
dt.waitMgr.WakeUpForDeadlock(resp)
}
}

func (dt *DeadlockDetector) sendReqToLeader(req *deadlockPb.DeadlockRequest) {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
dt.sendCh <- req
}

func (dt *DeadlockDetector) handleRemoteTask(requestType deadlockPb.DeadlockRequestType,
txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
detectReq := &deadlockPb.DeadlockRequest{}
detectReq.Tp = requestType
detectReq.Entry.Txn = txnTs
detectReq.Entry.WaitForTxn = waitForTxnTs
detectReq.Entry.KeyHash = keyHash
dt.sendReqToLeader(detectReq)
}

func (dt *DeadlockDetector) isLeader() bool {
return atomic.LoadInt32(&dt.role) == Leader
}

func (dt *DeadlockDetector) ChangeRole(newRole int32) {
atomic.StoreInt32(&dt.role, newRole)
}

// user interfaces
// Cleanup processes cleaup task on local detector
func (dt *DeadlockDetector) CleanUp(startTs uint64) {
if dt.isLeader() {
dt.detector.CleanUp(startTs)
} else {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0)
}
}

// CleanUpWaitFor cleans up the specific wait edge in detector's wait map
func (dt *DeadlockDetector) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) {
if dt.isLeader() {
dt.detector.CleanUpWaitFor(txnTs, waitForTxn, keyHash)
} else {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash)
}
}

// Detect will process the detection request on local detector
func (dt *DeadlockDetector) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64) error {
Copy link
Collaborator

@coocood coocood Nov 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the local detector optimization would make the code much simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used mainly for Server.Detect RPC service handler

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can separate to DetectorClient and DetectorServer, DetectorClient doesn't need to contain the Detector type.

Copy link
Contributor Author

@cfzjywxk cfzjywxk Nov 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeadlockDetector ->(split into) DetectorClient and DetectorServer , and
DetectorServer wraps the detector type and do real local detection(used by rpc handler),
DetectorClient warps connection related things and is responcible for send and recv
do I understand right? Seems same code lines 😅

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, refactored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coocood only detection client interface kept, others removed

err := dt.detector.Detect(txnTs, waitForTxnTs, keyHash)
if err != nil {
return err
}
return nil
}

// convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type
func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uint64) *deadlockPb.DeadlockResponse {
entry := deadlockPb.WaitForEntry{}
entry.Txn = txnTs
entry.WaitForTxn = waitForTxnTs
entry.KeyHash = keyHash
resp := &deadlockPb.DeadlockResponse{}
resp.Entry = entry
resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash
return resp
}

// DetectRemote post the detection request to local deadlock detector or remote first region leader,
// the caller should use `waiter.ch` to receive possible deadlock response
func (dt *DeadlockDetector) DetectRemote(txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
if dt.isLeader() {
err := dt.Detect(txnTs, waitForTxnTs, keyHash)
if err != nil {
resp := convertErrToResp(err.(*ErrDeadlock), txnTs, waitForTxnTs, keyHash)
dt.waitMgr.WakeUpForDeadlock(resp)
}
} else {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash)
}
}
179 changes: 179 additions & 0 deletions tikv/detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv

import (
"container/list"
"sync"
"time"

"github.com/ngaut/log"
)

// Detector detects deadlock.
type Detector struct {
waitForMap map[uint64]*txnList
lock sync.Mutex
entryTTL time.Duration
totalSize uint64
lastActiveExpire time.Time
urgentSize uint64
expireInterval time.Duration
}

type txnList struct {
//txns []txnKeyHashPair
txns *list.List
}

type txnKeyHashPair struct {
txn uint64
keyHash uint64
registerTime time.Time
}

func (p *txnKeyHashPair) isExpired(ttl time.Duration, nowTime time.Time) bool {
if p.registerTime.Add(ttl).Before(nowTime) {
return true
}
return false
}

// NewDetector creates a new Detector.
func NewDetector(ttl time.Duration, urgentSize uint64, expireInterval time.Duration) *Detector {
return &Detector{
waitForMap: map[uint64]*txnList{},
entryTTL: ttl,
lastActiveExpire: time.Now(),
urgentSize: urgentSize,
expireInterval: expireInterval,
}
}

// Detect detects deadlock for the sourceTxn on a locked key.
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64) *ErrDeadlock {
d.lock.Lock()
nowTime := time.Now()
d.activeExpire(nowTime)
err := d.doDetect(nowTime, sourceTxn, waitForTxn)
if err == nil {
d.register(sourceTxn, waitForTxn, keyHash)
}
d.lock.Unlock()
return err
}

func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *ErrDeadlock {
val := d.waitForMap[waitForTxn]
if val == nil {
return nil
}
var nextVal *list.Element
for cur := val.txns.Front(); cur != nil; cur = nextVal {
nextVal = cur.Next()
keyHashPair := cur.Value.(*txnKeyHashPair)
// check if this edge is expired
if keyHashPair.isExpired(d.entryTTL, nowTime) {
val.txns.Remove(cur)
d.totalSize--
continue
}
if keyHashPair.txn == sourceTxn {
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash}
}
if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil {
return err
}
}
if val.txns.Len() == 0 {
delete(d.waitForMap, waitForTxn)
}
return nil
}

func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64) {
val := d.waitForMap[sourceTxn]
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now()}
if val == nil {
newList := &txnList{txns: list.New()}
newList.txns.PushBack(&pair)
d.waitForMap[sourceTxn] = newList
d.totalSize++
return
}
for cur := val.txns.Front(); cur != nil; cur = cur.Next() {
valuePair := cur.Value.(*txnKeyHashPair)
if valuePair.txn == waitForTxn && valuePair.keyHash == keyHash {
return
}
}
val.txns.PushBack(&pair)
d.totalSize++
}

// CleanUp removes the wait for entry for the transaction.
func (d *Detector) CleanUp(txn uint64) {
d.lock.Lock()
if l, ok := d.waitForMap[txn]; ok {
d.totalSize -= uint64(l.txns.Len())
}
delete(d.waitForMap, txn)
d.lock.Unlock()
}

// CleanUpWaitFor removes a key in the wait for entry for the transaction.
func (d *Detector) CleanUpWaitFor(txn, waitForTxn, keyHash uint64) {
d.lock.Lock()
l := d.waitForMap[txn]
if l != nil {
var nextVal *list.Element
for cur := l.txns.Front(); cur != nil; cur = nextVal {
nextVal = cur.Next()
valuePair := cur.Value.(*txnKeyHashPair)
if valuePair.txn == waitForTxn && valuePair.keyHash == keyHash {
l.txns.Remove(cur)
d.totalSize--
break
}
}
if l.txns.Len() == 0 {
delete(d.waitForMap, txn)
}
}
d.lock.Unlock()

}

// activeExpire removes expired entries, should be called under d.lock protection
func (d *Detector) activeExpire(nowTime time.Time) {
if nowTime.Sub(d.lastActiveExpire) > d.expireInterval &&
d.totalSize >= d.urgentSize {
log.Infof("detector will do activeExpire, current size=%v", d.totalSize)
for txn, l := range d.waitForMap {
var nextVal *list.Element
for cur := l.txns.Front(); cur != nil; cur = nextVal {
nextVal = cur.Next()
valuePair := cur.Value.(*txnKeyHashPair)
if valuePair.isExpired(d.entryTTL, nowTime) {
l.txns.Remove(cur)
d.totalSize--
}
}
if l.txns.Len() == 0 {
delete(d.waitForMap, txn)
}
}
d.lastActiveExpire = nowTime
log.Infof("detector activeExpire finished, current size=%v", d.totalSize)
}
}
Loading