Skip to content

Commit

Permalink
add recycling util, remove detection client/conn
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Nov 19, 2019
1 parent 93cc16b commit 6c467d4
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 148 deletions.
238 changes: 92 additions & 146 deletions tikv/deadlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ngaut/unistore/util/lockwaiter"
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/util/deadlock"
)

// Follower will send detection rpc to Leader
Expand All @@ -21,220 +20,165 @@ const (
Leader
)

type detectConn struct {
stream deadlockPb.Deadlock_DetectClient
ctx context.Context
cancel context.CancelFunc
}

type detectClient struct {
sync.RWMutex
conn *detectConn
}

// DeadlockDetector is a util used for distributed deadlock detection
type DeadlockDetector struct {
mu sync.RWMutex
detector *deadlock.Detector
client *detectClient
detector *Detector
pdClient pd.Client
storeMeta *metapb.Store
sendCh chan *deadlockPb.DeadlockRequest
waitMgr *lockwaiter.Manager
streamCli deadlockPb.Deadlock_DetectClient

// these fields should be protected by mutex
role int32
leaderStoreMeta *metapb.Store
}

func NewDetectConn(addr string) (*detectConn, error) {
cc, err := grpc.Dial(addr, grpc.WithInsecure())
// refreshFirstRegionLeader will send request to pd to find out the
// current leader node for the first region
func (dt *DeadlockDetector) refreshFirstRegionLeader() error {
// find first region from pd, get the first region leader
ctx := context.Background()
_, leaderPeer, err := dt.pdClient.GetRegion(ctx, []byte{})
if err != nil {
return nil, err
log.Errorf("get first region failed, err: %v", err)
return err
}
ctx, cancel := context.WithCancel(context.Background())
stream, err := deadlockPb.NewDeadlockClient(cc).Detect(ctx)
leaderStoreMeta, err := dt.pdClient.GetStore(ctx, leaderPeer.GetStoreId())
if err != nil {
cancel()
return nil, err
log.Errorf("get store=%d failed, err=%v", leaderPeer.GetStoreId(), err)
return err
}
dtConn := &detectConn{
stream: stream,
ctx: ctx,
cancel: cancel,
dt.setLeaderStoreMeta(leaderStoreMeta)
log.Errorf("refreshFirstRegionLeader leader_peer=%v addr=%s", leaderPeer, leaderStoreMeta.GetAddress())
if leaderStoreMeta.GetId() == dt.storeMeta.GetId() {
log.Infof("refreshFirstRegionLeader found local node is leader")
dt.ChangeRole(Leader)
}
return dtConn, nil
}

func (c *detectConn) stop() {
c.cancel()
}

func (c *detectConn) send(req *deadlockPb.DeadlockRequest) error {
return c.stream.Send(req)
}

func (c *detectConn) recv() (*deadlockPb.DeadlockResponse, error) {
return c.stream.Recv()
}

func NewDetectClient() *detectClient {
return &detectClient{}
return nil
}

func (c *detectClient) getConn(addr string) (*detectConn, error) {
c.RLock()
if c.conn != nil {
return c.conn, nil
}
c.RUnlock()
newConn, err := NewDetectConn(addr)
// rebuildStreamClient builds connection to the first region leader,
// it's not thread safe and should be called only by `DeadlockDetector.Start` or `DeadlockDetector.SendReqLoop`
func (dt *DeadlockDetector) rebuildStreamClient() error {
err := dt.refreshFirstRegionLeader()
if err != nil {
return nil, err
}
log.Infof("detectClient connected to addr=%v", addr)
c.Lock()
defer c.Unlock()
if c.conn != nil {
newConn.stop()
return c.conn, nil
return err
}
c.conn = newConn
return newConn, nil
}

func (c *detectClient) send(addr string, req *deadlockPb.DeadlockRequest) error {
conn, err := c.getConn(addr)
addr := dt.getLeaderStoreMeta().GetAddress()
cc, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return err
}
err = conn.send(req)
if err == nil {
return nil
}
conn.stop()
c.Lock()
if conn == c.conn {
log.Error("detectClient failed to send, invalidate current conn, err=%v", err)
c.conn = nil
}
c.Unlock()
return err
}

func (c *detectClient) recv(addr string) (*deadlockPb.DeadlockResponse, error) {
conn, err := c.getConn(addr)
ctx, cancel := context.WithCancel(context.Background())
stream, err := deadlockPb.NewDeadlockClient(cc).Detect(ctx)
if err != nil {
return nil, err
}
res, err := conn.recv()
if err == nil {
return res, nil
}
conn.stop()
c.Lock()
if conn == c.conn {
log.Error("detectClient failed to recv, invalidate current conn, err=%v", err)
c.conn = nil
cancel()
return err
}
c.Unlock()
return nil, err
dt.streamCli = stream
go dt.recvLoop(dt.streamCli)
return nil
}

// NewDeadlockDetector will create a new detector util, entryTTL is used for
// recycling the lock wait edge in detector wait wap. chSize is the pending
// detection sending task size(used on non leader node)
func NewDeadlockDetector(waiterMgr *lockwaiter.Manager) *DeadlockDetector {
chSize := 1000
entryTTL := time.Duration(3 * time.Second)
newDetector := &DeadlockDetector{
detector: deadlock.NewDetector(),
detector: NewDetector(entryTTL),
role: Follower,
client: NewDetectClient(),
sendCh: make(chan *deadlockPb.DeadlockRequest, chSize),
waitMgr: waiterMgr,
}
return newDetector
}

// setLeaderStoreMeta sets the leaderStoreMeta
func (dt *DeadlockDetector) setLeaderStoreMeta(newMeta *metapb.Store) {
dt.mu.Lock()
defer dt.mu.Unlock()
dt.leaderStoreMeta = newMeta
}

// getLeaderStoreMeta returns the leader's store meta info
func (dt *DeadlockDetector) getLeaderStoreMeta() *metapb.Store {
dt.mu.RLock()
defer dt.mu.RUnlock()
return dt.leaderStoreMeta
}

func (dt *DeadlockDetector) SendReqLoop() {
// Start starts the detection `send`, `recv` and `entry recycle` loop
func (dt *DeadlockDetector) Start() error {
err := dt.rebuildStreamClient()
if err != nil {
return err
}
go dt.sendReqLoop()
go dt.entriesRecycleLoop()
return nil
}

// sendReqLoop will send detection request to leader, stream connection will be rebuilt and
// a new recv goroutine using the same stream client will be created
func (dt *DeadlockDetector) sendReqLoop() {
var (
leaderAddr string
err error
refreshErr error
rebuildErr error
req *deadlockPb.DeadlockRequest
)
for {
req := <-dt.sendCh
leaderAddr = dt.leaderStoreMeta.GetAddress()
err = dt.client.send(leaderAddr, req)
req = <-dt.sendCh
if dt.streamCli != nil {
err = dt.streamCli.Send(req)
}
if err != nil {
log.Warnf("send req to addr=%v failed, err=%v", leaderAddr, err)
refreshErr = dt.refreshFirstRegionLeader()
if refreshErr != nil {
log.Errorf("refresh leader addr failed, err=%v", refreshErr)
log.Warnf("send req to addr=%v failed, err=%v, try to refresh leader meta and rebuild stream client",
dt.getLeaderStoreMeta().GetAddress(), err)
rebuildErr = dt.rebuildStreamClient()
if rebuildErr != nil {
log.Errorf("rebuild connection to first region failed, err=%v", rebuildErr)
time.Sleep(3 * time.Second)
}
time.Sleep(3 * time.Second)
continue
}
}
}

func (dt *DeadlockDetector) Start() {
go dt.SendReqLoop()
go dt.RecvLoop()
}

func (dt *DeadlockDetector) RecvLoop() {
// recvLoop tries to recv response(current only deadlock error) from leader, break loop if errors happen
func (dt *DeadlockDetector) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) {
var (
leaderAddr string
err error
refreshErr error
resp *deadlockPb.DeadlockResponse
err error
resp *deadlockPb.DeadlockResponse
)
for {
leaderAddr = dt.leaderStoreMeta.GetAddress()
resp, err = dt.client.recv(leaderAddr)
resp, err = streamCli.Recv()
if err != nil {
log.Warnf("recv from addr=%v failed, err=%v", leaderAddr, err)
refreshErr = dt.refreshFirstRegionLeader()
if refreshErr != nil {
log.Errorf("refresh leader addr failed, err=%v", refreshErr)
}
time.Sleep(3 * time.Second)
continue
log.Warnf("recv from failed, err=%v, stop receive", err)
break
}
// here only detection request will get response from leader
dt.waitMgr.WakeUpForDeadlock(resp)
}
}

func (dt *DeadlockDetector) refreshFirstRegionLeader() error {
// find first region from pd, get the first region leader
ctx := context.Background()
_, leaderPeer, err := dt.pdClient.GetRegion(ctx, []byte{})
if err != nil {
log.Errorf("get first region failed, err: %v", err)
return err
}
leaderStoreMeta, err := dt.pdClient.GetStore(ctx, leaderPeer.GetStoreId())
if err != nil {
log.Errorf("get store=%d failed, err=%v", leaderPeer.GetStoreId(), err)
return err
}
dt.setLeaderStoreMeta(leaderStoreMeta)
log.Errorf("refreshFirstRegionLeader leader_peer=%v addr=%s", leaderPeer, leaderStoreMeta.GetAddress())
if leaderStoreMeta.GetId() == dt.storeMeta.GetId() {
log.Infof("refreshFirstRegionLeader found local node is leader")
dt.ChangeRole(Leader)
// entriesRecycleLoop recycles expired entry in detector
func (dt *DeadlockDetector) entriesRecycleLoop() {
recycleSize := uint64(100000)
commonInterVal := 5 * time.Second
urgentInterVal := 1 * time.Second
for {
dt.detector.ExpireEntries()
// TODO add a observable metric here
if dt.detector.GetTotalSize() >= recycleSize {
time.Sleep(urgentInterVal)
continue
}
time.Sleep(commonInterVal)
}
return nil
}

func (dt *DeadlockDetector) sendReqToLeader(req *deadlockPb.DeadlockRequest) {
Expand Down Expand Up @@ -269,6 +213,7 @@ func (dt *DeadlockDetector) CleanUp(startTs uint64) {
}
}

// CleanUpWaitFor cleans up the specific wait edge in detector's wait map
func (dt *DeadlockDetector) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) {
if dt.isLeader() {
dt.detector.CleanUpWaitFor(txnTs, waitForTxn, keyHash)
Expand All @@ -286,14 +231,15 @@ func (dt *DeadlockDetector) Detect(txnTs uint64, waitForTxnTs uint64, keyHash ui
return nil
}

func convertErrToResp(errDeadlock *deadlock.ErrDeadlock, txnTs, waitForTxnTs, keyHash uint64) *deadlockPb.DeadlockResponse {
// convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type
func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uint64) *deadlockPb.DeadlockResponse {
entry := deadlockPb.WaitForEntry{}
entry.Txn = txnTs
entry.WaitForTxn = waitForTxnTs
entry.KeyHash = keyHash
resp := &deadlockPb.DeadlockResponse{}
resp.Entry = entry
resp.DeadlockKeyHash = errDeadlock.KeyHash
resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash
return resp
}

Expand All @@ -303,7 +249,7 @@ func (dt *DeadlockDetector) DetectRemote(txnTs uint64, waitForTxnTs uint64, keyH
if dt.isLeader() {
err := dt.Detect(txnTs, waitForTxnTs, keyHash)
if err != nil {
resp := convertErrToResp(err.(*deadlock.ErrDeadlock), txnTs, waitForTxnTs, keyHash)
resp := convertErrToResp(err.(*ErrDeadlock), txnTs, waitForTxnTs, keyHash)
dt.waitMgr.WakeUpForDeadlock(resp)
}
} else {
Expand Down
Loading

0 comments on commit 6c467d4

Please sign in to comment.