Skip to content

Commit

Permalink
Implement more HTTP APIs
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 15, 2023
1 parent 86831ce commit 272d561
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 25 deletions.
104 changes: 94 additions & 10 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,58 @@ package http
import (
"fmt"
"net/url"
"time"
)

// The following constants are the paths of PD HTTP APIs.
const (
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
Regions = "/pd/api/v1/regions"
regionByID = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
regionsByKey = "/pd/api/v1/regions/key"
regionsByStoreID = "/pd/api/v1/regions/store"
Stores = "/pd/api/v1/stores"
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
ScheduleConfig = "/pd/api/v1/config/schedule"
ReplicateConfig = "/pd/api/v1/config/replicate"
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
// Admin
ResetTS = "/pd/api/v1/admin/reset-ts"
BaseAllocID = "/pd/api/v1/admin/base-alloc-id"
SnapshotRecoveringMark = "/pd/api/v1/admin/cluster/markers/snapshot-recovering"
// Debug
PProfProfile = "/pd/api/v1/debug/pprof/profile"
PProfHeap = "/pd/api/v1/debug/pprof/heap"
PProfMutex = "/pd/api/v1/debug/pprof/mutex"
PProfAllocs = "/pd/api/v1/debug/pprof/allocs"
PProfBlock = "/pd/api/v1/debug/pprof/block"
PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine"
// Others
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
func RegionByID(regionID uint64) string {
return fmt.Sprintf("%s/%d", regionByID, regionID)
return fmt.Sprintf("%s/%d", RegionByIDPrefix, regionID)

Check warning on line 71 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L71

Added line #L71 was not covered by tests
}

// RegionByKey returns the path of PD HTTP API to get region by key.
Expand All @@ -50,5 +84,55 @@ func RegionsByKey(startKey, endKey []byte, limit int) string {

// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID.
func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", regionsByStoreID, storeID)
return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID)

Check warning on line 87 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L87

Added line #L87 was not covered by tests
}

// RegionStatsByStartEndKey returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByStartEndKey(startKey, endKey string) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s", StatsRegion, startKey, endKey)

Check warning on line 92 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L92

Added line #L92 was not covered by tests
}

// StoreByID returns the store API with store ID parameter.
func StoreByID(id uint64) string {
return fmt.Sprintf("%s/%d", store, id)

Check warning on line 97 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L97

Added line #L97 was not covered by tests
}

// StoreLabelByID returns the store label API with store ID parameter.
func StoreLabelByID(id uint64) string {
return fmt.Sprintf("%s/%d/label", store, id)

Check warning on line 102 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L102

Added line #L102 was not covered by tests
}

// ConfigWithTTLSeconds returns the config API with the TTL seconds parameter.
func ConfigWithTTLSeconds(ttlSeconds float64) string {
return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds)

Check warning on line 107 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L107

Added line #L107 was not covered by tests
}

// PlacementRulesByGroup returns the path of PD HTTP API to get placement rules by group.
func PlacementRulesByGroup(group string) string {
return fmt.Sprintf("%s/%s", placementRulesByGroup, group)

Check warning on line 112 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L112

Added line #L112 was not covered by tests
}

// PlacementRuleByGroupAndID returns the path of PD HTTP API to get placement rule by group and ID.
func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)

Check warning on line 117 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L117

Added line #L117 was not covered by tests
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)

Check warning on line 122 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L122

Added line #L122 was not covered by tests
}

// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter.
func ScatterRangeSchedulerWithName(name string) string {
return fmt.Sprintf("%s%s", scatterRangeScheduler, name)

Check warning on line 127 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L127

Added line #L127 was not covered by tests
}

// PProfProfileAPIWithInterval returns the pprof profile API with interval parameter.
func PProfProfileAPIWithInterval(interval time.Duration) string {
return fmt.Sprintf("%s?seconds=%d", PProfProfile, interval/time.Second)

Check warning on line 132 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L132

Added line #L132 was not covered by tests
}

// PProfGoroutineWithDebugLevel returns the pprof goroutine API with debug level parameter.
func PProfGoroutineWithDebugLevel(level int) string {
return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level)

Check warning on line 137 in client/http/api.go

View check run for this annotation

Codecov / codecov/patch

client/http/api.go#L137

Added line #L137 was not covered by tests
}
73 changes: 58 additions & 15 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ type Client interface {
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKey(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
AccelerateSchedule(context.Context, []byte, []byte) error
Close()
}

Expand Down Expand Up @@ -154,7 +159,7 @@ func (c *client) execDuration(name string, duration time.Duration) {
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
ctx context.Context,
name, uri string,
name, uri, method string,
res interface{},
) error {
var (
Expand All @@ -163,7 +168,7 @@ func (c *client) requestWithRetry(
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, addr, uri, res)
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, res)
if err == nil {
break
}
Expand All @@ -175,16 +180,15 @@ func (c *client) requestWithRetry(

func (c *client) request(
ctx context.Context,
name, addr, uri string,
name, url, method string,
res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", addr, uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
zap.String("url", url),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
Expand Down Expand Up @@ -229,7 +233,7 @@ func (c *client) request(
// GetRegionByID gets the region info by ID.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), &region)
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), http.MethodGet, &region)

Check warning on line 236 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L236

Added line #L236 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -239,7 +243,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf
// GetRegionByKey gets the region info by key.
func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), &region)
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), http.MethodGet, &region)

Check warning on line 246 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L246

Added line #L246 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -249,7 +253,7 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e
// GetRegions gets the regions info.
func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegions", Regions, &regions)
err := c.requestWithRetry(ctx, "GetRegions", Regions, http.MethodGet, &regions)

Check warning on line 256 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L256

Added line #L256 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -259,7 +263,7 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), http.MethodGet, &regions)

Check warning on line 266 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L266

Added line #L266 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -269,7 +273,7 @@ func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, l
// GetRegionsByStoreID gets the regions info by store ID.
func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), http.MethodGet, &regions)

Check warning on line 276 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L276

Added line #L276 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -279,7 +283,7 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, http.MethodGet, &hotReadRegions)

Check warning on line 286 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L286

Added line #L286 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -289,23 +293,57 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er
// GetHotWriteRegions gets the hot write region statistics info.
func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, http.MethodGet, &hotWriteRegions)

Check warning on line 296 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L296

Added line #L296 was not covered by tests
if err != nil {
return nil, err
}
return &hotWriteRegions, nil
}

// GetRegionStatusByKey gets the region status by key range.
func (c *client) GetRegionStatusByKey(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(
ctx, "GetRegionStatusByKey",
RegionStatsByStartEndKey(string(startKey), string(endKey)), http.MethodGet,
&regionStats,
)
if err != nil {
return nil, err

Check warning on line 312 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L305-L312

Added lines #L305 - L312 were not covered by tests
}
return &regionStats, nil

Check warning on line 314 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L314

Added line #L314 was not covered by tests
}

// GetStores gets the stores info.
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
err := c.requestWithRetry(ctx, "GetStores", Stores, &stores)
err := c.requestWithRetry(ctx, "GetStores", Stores, http.MethodGet, &stores)

Check warning on line 320 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L320

Added line #L320 was not covered by tests
if err != nil {
return nil, err
}
return &stores, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
err := c.requestWithRetry(ctx, "GetPlacementRulesByGroup", PlacementRulesByGroup(group), http.MethodGet, &rules)
if err != nil {
return nil, err

Check warning on line 332 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L329-L332

Added lines #L329 - L332 were not covered by tests
}
return rules, nil

Check warning on line 334 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L334

Added line #L334 was not covered by tests
}

// SetPlacementRule sets the placement rule.
func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
return c.requestWithRetry(ctx, "SetPlacementRule", PlacementRule, http.MethodPost, nil)

Check warning on line 339 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L339

Added line #L339 was not covered by tests
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx, "DeletePlacementRule", PlacementRuleByGroupAndID(group, id), http.MethodDelete, nil)

Check warning on line 344 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L344

Added line #L344 was not covered by tests
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand All @@ -326,7 +364,7 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, http.MethodGet, &resp)
if err != nil {
return 0, nil, err
}
Expand All @@ -335,3 +373,8 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
return c.requestWithRetry(ctx, "AccelerateSchedule", accelerateSchedule, http.MethodPost, nil)

Check warning on line 379 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L379

Added line #L379 was not covered by tests
}
64 changes: 64 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,67 @@ type StoreStatus struct {
LastHeartbeatTS time.Time `json:"last_heartbeat_ts"`
Uptime string `json:"uptime"`
}

// RegionStats stores the statistics of regions.
type RegionStats struct {
Count int `json:"count"`
EmptyCount int `json:"empty_count"`
StorageSize int64 `json:"storage_size"`
StorageKeys int64 `json:"storage_keys"`
StoreLeaderCount map[uint64]int `json:"store_leader_count"`
StorePeerCount map[uint64]int `json:"store_peer_count"`
}

// PeerRoleType is the expected peer type of the placement rule.
type PeerRoleType string

const (
// Voter can either match a leader peer or follower peer
Voter PeerRoleType = "voter"
// Leader matches a leader.
Leader PeerRoleType = "leader"
// Follower matches a follower.
Follower PeerRoleType = "follower"
// Learner matches a learner.
Learner PeerRoleType = "learner"
)

// LabelConstraint is used to filter store when trying to place peer of a region.
type LabelConstraint struct {
Key string `json:"key,omitempty"`
Op LabelConstraintOp `json:"op,omitempty"`
Values []string `json:"values,omitempty"`
}

// LabelConstraintOp defines how a LabelConstraint matches a store. It can be one of
// 'in', 'notIn', 'exists', or 'notExists'.
type LabelConstraintOp string

const (
// In restricts the store label value should in the value list.
// If label does not exist, `in` is always false.
In LabelConstraintOp = "in"
// NotIn restricts the store label value should not in the value list.
// If label does not exist, `notIn` is always true.
NotIn LabelConstraintOp = "notIn"
// Exists restricts the store should have the label.
Exists LabelConstraintOp = "exists"
// NotExists restricts the store should not have the label.
NotExists LabelConstraintOp = "notExists"
)

// Rule is the placement rule that can be checked against a region. When
// applying rules (apply means schedule regions to match selected rules), the
// apply order is defined by the tuple [GroupIndex, GroupID, Index, ID].
type Rule struct {
GroupID string `json:"group_id"`
ID string `json:"id"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
Role PeerRoleType `json:"role"`
Count int `json:"count"`
Constraints LabelConstraint `json:"label_constraints,omitempty"`
LocationLabels []string `json:"location_labels,omitempty"`
}

0 comments on commit 272d561

Please sign in to comment.