Skip to content

Commit

Permalink
Implement the stream update logic
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 7, 2025
1 parent 973234d commit 8e258ea
Show file tree
Hide file tree
Showing 24 changed files with 5,966 additions and 74 deletions.
25 changes: 4 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,23 +570,6 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region {
if res.Region == nil {
return nil
}

r := &router.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Buckets: res.Buckets,
}
for _, s := range res.DownPeers {
r.DownPeers = append(r.DownPeers, s.Peer)
}
return r
}

// GetRegionFromMember implements the RPCClient interface.
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*router.Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -623,7 +606,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegion implements the RPCClient interface.
Expand Down Expand Up @@ -663,7 +646,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
Expand Down Expand Up @@ -703,7 +686,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegionByID implements the RPCClient interface.
Expand Down Expand Up @@ -744,7 +727,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// ScanRegions implements the RPCClient interface.
Expand Down
Loading

0 comments on commit 8e258ea

Please sign in to comment.