Skip to content

Commit

Permalink
*: reduce tikvrpc request size (pingcap#11055)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and ngaut committed Jul 15, 2019
1 parent d381d84 commit d420a1f
Show file tree
Hide file tree
Showing 20 changed files with 438 additions and 450 deletions.
9 changes: 3 additions & 6 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,9 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
if !ok {
return
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdDebugGetRegionProperties,
DebugGetRegionProperties: &debugpb.GetRegionPropertiesRequest{
RegionId: loc.Region.GetID(),
},
}
req := tikvrpc.NewRequest(tikvrpc.CmdDebugGetRegionProperties, &debugpb.GetRegionPropertiesRequest{
RegionId: loc.Region.GetID(),
})
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (c *regionProperityClient) SendRequest(ctx context.Context, addr string, re
defer c.mu.Unlock()
c.mu.count++
// Mock failure once.
if req.DebugGetRegionProperties.RegionId == c.mu.regionID {
if req.DebugGetRegionProperties().RegionId == c.mu.regionID {
c.mu.regionID = 0
return &tikvrpc.Response{}, nil
}
Expand Down
9 changes: 3 additions & 6 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,9 @@ func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []by
return nil, errors.Trace(err)
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByStartTs,
MvccGetByStartTs: &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
},
}
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
})
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := t.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
if err != nil {
Expand Down
7 changes: 1 addition & 6 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe
return nil, errors.Trace(err)
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByKey,
MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{
Key: encodedKey,
},
}
tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{Key: encodedKey})
kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.BgLogger().Info("get MVCC by encoded key failed",
Expand Down
54 changes: 27 additions & 27 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,36 +671,36 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
resp.Type = req.Type
switch req.Type {
case tikvrpc.CmdGet:
r := req.Get
r := req.Get()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Get = &kvrpcpb.GetResponse{RegionError: err}
return resp, nil
}
resp.Get = handler.handleKvGet(r)
case tikvrpc.CmdScan:
r := req.Scan
r := req.Scan()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Scan = &kvrpcpb.ScanResponse{RegionError: err}
return resp, nil
}
resp.Scan = handler.handleKvScan(r)

case tikvrpc.CmdPrewrite:
r := req.Prewrite
r := req.Prewrite()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Prewrite = &kvrpcpb.PrewriteResponse{RegionError: err}
return resp, nil
}
resp.Prewrite = handler.handleKvPrewrite(r)
case tikvrpc.CmdPessimisticLock:
r := req.PessimisticLock
r := req.PessimisticLock()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.PessimisticLock = &kvrpcpb.PessimisticLockResponse{RegionError: err}
return resp, nil
}
resp.PessimisticLock = handler.handleKvPessimisticLock(r)
case tikvrpc.CmdPessimisticRollback:
r := req.PessimisticRollback
r := req.PessimisticRollback()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.PessimisticRollback = &kvrpcpb.PessimisticRollbackResponse{RegionError: err}
return resp, nil
Expand All @@ -724,7 +724,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
})

r := req.Commit
r := req.Commit()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Commit = &kvrpcpb.CommitResponse{RegionError: err}
return resp, nil
Expand All @@ -736,104 +736,104 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
})
case tikvrpc.CmdCleanup:
r := req.Cleanup
r := req.Cleanup()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Cleanup = &kvrpcpb.CleanupResponse{RegionError: err}
return resp, nil
}
resp.Cleanup = handler.handleKvCleanup(r)
case tikvrpc.CmdBatchGet:
r := req.BatchGet
r := req.BatchGet()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.BatchGet = &kvrpcpb.BatchGetResponse{RegionError: err}
return resp, nil
}
resp.BatchGet = handler.handleKvBatchGet(r)
case tikvrpc.CmdBatchRollback:
r := req.BatchRollback
r := req.BatchRollback()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.BatchRollback = &kvrpcpb.BatchRollbackResponse{RegionError: err}
return resp, nil
}
resp.BatchRollback = handler.handleKvBatchRollback(r)
case tikvrpc.CmdScanLock:
r := req.ScanLock
r := req.ScanLock()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.ScanLock = &kvrpcpb.ScanLockResponse{RegionError: err}
return resp, nil
}
resp.ScanLock = handler.handleKvScanLock(r)
case tikvrpc.CmdResolveLock:
r := req.ResolveLock
r := req.ResolveLock()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.ResolveLock = &kvrpcpb.ResolveLockResponse{RegionError: err}
return resp, nil
}
resp.ResolveLock = handler.handleKvResolveLock(r)
case tikvrpc.CmdGC:
r := req.GC
r := req.GC()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.GC = &kvrpcpb.GCResponse{RegionError: err}
return resp, nil
}
resp.GC = &kvrpcpb.GCResponse{}
case tikvrpc.CmdDeleteRange:
r := req.DeleteRange
r := req.DeleteRange()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.DeleteRange = &kvrpcpb.DeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.DeleteRange = handler.handleKvDeleteRange(r)
case tikvrpc.CmdRawGet:
r := req.RawGet
r := req.RawGet()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawGet = &kvrpcpb.RawGetResponse{RegionError: err}
return resp, nil
}
resp.RawGet = handler.handleKvRawGet(r)
case tikvrpc.CmdRawBatchGet:
r := req.RawBatchGet
r := req.RawBatchGet()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchGet = &kvrpcpb.RawBatchGetResponse{RegionError: err}
return resp, nil
}
resp.RawBatchGet = handler.handleKvRawBatchGet(r)
case tikvrpc.CmdRawPut:
r := req.RawPut
r := req.RawPut()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawPut = &kvrpcpb.RawPutResponse{RegionError: err}
return resp, nil
}
resp.RawPut = handler.handleKvRawPut(r)
case tikvrpc.CmdRawBatchPut:
r := req.RawBatchPut
r := req.RawBatchPut()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{RegionError: err}
return resp, nil
}
resp.RawBatchPut = handler.handleKvRawBatchPut(r)
case tikvrpc.CmdRawDelete:
r := req.RawDelete
r := req.RawDelete()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDelete = &kvrpcpb.RawDeleteResponse{RegionError: err}
return resp, nil
}
resp.RawDelete = handler.handleKvRawDelete(r)
case tikvrpc.CmdRawBatchDelete:
r := req.RawBatchDelete
r := req.RawBatchDelete()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchDelete = &kvrpcpb.RawBatchDeleteResponse{RegionError: err}
}
resp.RawBatchDelete = handler.handleKvRawBatchDelete(r)
case tikvrpc.CmdRawDeleteRange:
r := req.RawDeleteRange
r := req.RawDeleteRange()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.RawDeleteRange = handler.handleKvRawDeleteRange(r)
case tikvrpc.CmdRawScan:
r := req.RawScan
r := req.RawScan()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawScan = &kvrpcpb.RawScanResponse{RegionError: err}
return resp, nil
Expand All @@ -842,7 +842,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdUnsafeDestroyRange:
panic("unimplemented")
case tikvrpc.CmdCop:
r := req.Cop
r := req.Cop()
if err := handler.checkRequestContext(reqCtx); err != nil {
resp.Cop = &coprocessor.Response{RegionError: err}
return resp, nil
Expand All @@ -862,7 +862,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
resp.Cop = res
case tikvrpc.CmdCopStream:
r := req.Cop
r := req.Cop()
if err := handler.checkRequestContext(reqCtx); err != nil {
resp.CopStream = &tikvrpc.CopStreamResponse{
Tikv_CoprocessorStreamClient: &mockCopStreamErrClient{Error: err},
Expand Down Expand Up @@ -895,29 +895,29 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
streamResp.Response = first
resp.CopStream = streamResp
case tikvrpc.CmdMvccGetByKey:
r := req.MvccGetByKey
r := req.MvccGetByKey()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.MvccGetByKey = &kvrpcpb.MvccGetByKeyResponse{RegionError: err}
return resp, nil
}
resp.MvccGetByKey = handler.handleMvccGetByKey(r)
case tikvrpc.CmdMvccGetByStartTs:
r := req.MvccGetByStartTs
r := req.MvccGetByStartTs()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.MvccGetByStartTS = &kvrpcpb.MvccGetByStartTsResponse{RegionError: err}
return resp, nil
}
resp.MvccGetByStartTS = handler.handleMvccGetByStartTS(r)
case tikvrpc.CmdSplitRegion:
r := req.SplitRegion
r := req.SplitRegion()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.SplitRegion = &kvrpcpb.SplitRegionResponse{RegionError: err}
return resp, nil
}
resp.SplitRegion = handler.handleSplitRegion(r)
// DebugGetRegionProperties is for fast analyze in mock tikv.
case tikvrpc.CmdDebugGetRegionProperties:
r := req.DebugGetRegionProperties
r := req.DebugGetRegionProperties()
region, _ := c.Cluster.GetRegion(r.RegionId)
scanResp := handler.handleKvScan(&kvrpcpb.ScanRequest{StartKey: MvccKey(region.StartKey).Raw(), EndKey: MvccKey(region.EndKey).Raw(), Version: math.MaxUint64, Limit: math.MaxUint32})
resp.DebugGetRegionProperties = &debugpb.GetRegionPropertiesResponse{
Expand Down
Loading

0 comments on commit d420a1f

Please sign in to comment.