Skip to content

Commit

Permalink
controllers: move regions to the least populated up-to-date regionserver
Browse files Browse the repository at this point in the history
Instead of relying on HBase Master to assign regions during rolling restart,
use our insider knowledge to decide on the target based on the following
criteria:
- Move to only up-to-date regionserver to minimize reassignment
- Move to the regionserver with the least number of region
  • Loading branch information
timoha committed Jan 25, 2021
1 parent 4efc3c7 commit 902358f
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 71 deletions.
178 changes: 119 additions & 59 deletions controllers/hbase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package controllers

import (
"bytes"
"container/heap"
"context"
"crypto/sha256"
"fmt"
"hash"
"io"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -77,10 +78,9 @@ const (
// HBaseReconciler reconciles a HBase object
type HBaseReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
GhAdmin gohbase.AdminClient
GhClient gohbase.Client
Log logr.Logger
Scheme *runtime.Scheme
GhAdmin gohbase.AdminClient
}

// +kubebuilder:rbac:groups=hbase.elenskiy.co,namespace="hbase",resources=hbases,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -265,65 +265,103 @@ func (r *HBaseReconciler) deleteUnusedConfigMaps(ctx context.Context, hb *hbasev
return nil
}

// getRegionsPerRegionServer can be fairly expensive as it scans entire hbase:meta
func (r *HBaseReconciler) getRegionsPerRegionServer(ctx context.Context) (map[string][][]byte, error) {
// scan meta to get a regioninfo and server name
scan, err := hrpc.NewScan(ctx,
[]byte("hbase:meta"),
hrpc.Families(map[string][]string{"info": []string{"sn", "state"}}))
// get regions via cluster status because this way we can get
// regionservers that don't have any regions
cs, err := r.GhAdmin.ClusterStatus()
if err != nil {
return nil, err
return nil, fmt.Errorf("getting cluster status: %w", err)
}

// if some fields are nil, just let it panic as it's not expected
// and we won't be able to recover from that anyway
result := map[string][][]byte{}
scanner := r.GhClient.Scan(scan)
for {
res, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
for _, s := range cs.GetLiveServers() {
sn := fmt.Sprintf("%s,%d,%d", s.Server.GetHostName(),
s.Server.GetPort(), s.Server.GetStartCode())
result[sn] = [][]byte{} // add even if there are no regions
for _, r := range s.GetServerLoad().GetRegionLoads() {
rn := r.GetRegionSpecifier().GetValue()
if bytes.HasPrefix(rn, []byte("hbase:meta")) {
continue
}
rn = rn[len(rn)-33 : len(rn)-1]
result[sn] = append(result[sn], rn)
}
}
return result, nil
}

if l := len(res.Cells); l != 2 {
return nil, fmt.Errorf("got %v cells", l)
}
type rsCount struct {
serverName string
regionCount int
}

if state := string(res.Cells[1].Value); state != "OPEN" {
// skip the region if it's not OPEN as master won't be able to move it
// TODO: consider letting regionservers finish dealing with the region
// in case it's being split/merged/opened
continue
}
type regionServerTargets []*rsCount

// get region name from row and server name from value ("sn" column)
// TODO: parse actual regioninfo value
cell := res.Cells[0]
result[string(cell.Value)] = append(
result[string(cell.Value)],
cell.Row[len(cell.Row)-33:len(cell.Row)-1])
func (rst regionServerTargets) Len() int { return len(rst) }
func (rst regionServerTargets) Less(i, j int) bool {
// we want to pop regionserver with lowest number of regions
return rst[i].regionCount < rst[j].regionCount
}

}
return result, nil
func (rst regionServerTargets) Swap(i, j int) {
rst[i], rst[j] = rst[j], rst[i]
}

func (rst *regionServerTargets) Push(x interface{}) {
item := x.(*rsCount)
*rst = append(*rst, item)
}

func (rst *regionServerTargets) Pop() interface{} {
old := *rst
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*rst = old[0 : n-1]
return item
}

// TODO: make parallel
// TODO: specify destination server
func (r *HBaseReconciler) moveRegions(ctx context.Context, regions [][]byte) error {
func (r *HBaseReconciler) moveRegions(ctx context.Context, regions [][]byte, targets regionServerTargets) error {
// important to understand that this heuristic to deside which regionserver to move
// to does not account for the most recent state of the cluster. For example, if some
// regionserver were to be restarted during region moving, the region counts will not be updated.
var err error
for _, region := range regions {
mr, err := hrpc.NewMoveRegion(ctx, region)
var mr *hrpc.MoveRegion
if targets.Len() > 0 {
// get the regionserver with least regions
rc := heap.Pop(&targets).(*rsCount)
r.Log.Info("moving region", "region", string(region),
"target", rc.serverName,
"current_count", rc.regionCount)
mr, err = hrpc.NewMoveRegion(ctx, region, hrpc.WithDestinationRegionServer(rc.serverName))

// update the count and add it back to priority heap
rc.regionCount++
heap.Push(&targets, rc)
} else {
// no targets
r.Log.Info("moving region without target", "region", string(region))
mr, err = hrpc.NewMoveRegion(ctx, region)
}
if err != nil {
return err
return fmt.Errorf("creating request to move region %q: %w", region, err)
}
if err := r.GhAdmin.MoveRegion(mr); err != nil {
return fmt.Errorf("failed to move region %q: %w", region, err)
if strings.Contains(err.Error(), "DoNotRetryIOException") {
// means the region is not open
continue
}
return fmt.Errorf("moving region %q: %w", region, err)
}
}
return nil
}

func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error) {
func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error) {
if len(td) == 0 {
// make sure the balancer is on
sb, err := hrpc.NewSetBalancer(ctx, true)
Expand All @@ -347,26 +385,45 @@ func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td []*co
return nil, err
}

// pick the first regionserver
p := td[0]

// drain the regionserver
rrs, err := r.getRegionsPerRegionServer(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get regions per regionservers: %w", err)
}

// pick the first regionserver
p := td[0]

// get regions to move and region count per up-to-date regionserver
// TODO: this is n^2 for the case all other regionservers are up-to-date
var toMove [][]byte
var source string
var targets regionServerTargets
for rs, regions := range rrs {
if strings.HasPrefix(rs, p.Name) {
// match, drain it
r.Log.Info("moving regions from RegionServer",
"regionserver", rs, "pod", p.Name, "count", len(regions))
if err := r.moveRegions(ctx, regions); err != nil {
return nil, err
if strings.HasPrefix(rs, p.Name+".") {
// move regions
toMove = regions
source = rs
}
// check if this is one of the up-to-date regionservers
for _, up := range utd {
if strings.HasPrefix(rs, up.Name+".") {
targets = append(targets, &rsCount{
serverName: rs,
regionCount: len(regions),
})
break
}
break
}
}
heap.Init(&targets)

r.Log.Info("moving regions from RegionServer",
"regionserver", source, "pod", p.Name, "count", len(toMove),
"target_count", targets.Len())
if err := r.moveRegions(ctx, toMove, targets); err != nil {
return nil, err
}

return p, nil
}

Expand All @@ -378,7 +435,7 @@ func (r *HBaseReconciler) isRegionsInTransition() (bool, error) {
return len(cs.GetRegionsInTransition()) > 0, nil
}

func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error) {
func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error) {
if len(td) == 0 {
return nil, nil
}
Expand All @@ -392,7 +449,7 @@ func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.P
r.Log.Info("got backup masters", "masters", cs.GetBackupMasters())
for _, p := range td {
for _, bm := range cs.GetBackupMasters() {
if strings.HasPrefix(bm.GetHostName(), p.Name) {
if strings.HasPrefix(bm.GetHostName(), p.Name+".") {
// match, delete it
return p, nil
}
Expand All @@ -402,7 +459,7 @@ func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.P

// otherwise find the pod of the active master
for _, p := range td {
if strings.HasPrefix(cs.GetMaster().GetHostName(), p.Name) {
if strings.HasPrefix(cs.GetMaster().GetHostName(), p.Name+".") {
// match, delete it
return p, nil
}
Expand All @@ -422,7 +479,7 @@ func sprintPodList(l []*corev1.Pod) string {
}

func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1.StatefulSet,
pickToDelete func(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error)) (bool, error) {
pickToDelete func(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error)) (bool, error) {
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(sts.Namespace),
Expand All @@ -441,7 +498,7 @@ func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1

// make sure that all pods are up by checking that all containers are ready.
// the loop exists if any pod is not ready.
var toDelete []*corev1.Pod
var toDelete, upToDate []*corev1.Pod
for _, p := range podList.Items {
p := p
if p.DeletionTimestamp != nil {
Expand Down Expand Up @@ -479,13 +536,16 @@ func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1
if !isRecent {
// pod is healthy, but not of recent version, add to delete list
toDelete = append(toDelete, &p)
} else {
// pod is up-to-date, keep track of it
upToDate = append(upToDate, &p)
}
}

r.Log.Info("pick pod to delete", "statefulset", sts.Name, "pods", sprintPodList(toDelete))

// delete one pod at a time
p, err := pickToDelete(ctx, toDelete)
p, err := pickToDelete(ctx, toDelete, upToDate)
if err != nil {
r.Log.Error(err, "failed to pick pod to delete")
return false, fmt.Errorf("failed to pick pod to delete: %w", err)
Expand Down
11 changes: 4 additions & 7 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var ghClient *mock.MockClient
var ghAdmin *mock.MockAdminClient

func TestAPIs(t *testing.T) {
Expand Down Expand Up @@ -125,7 +124,6 @@ var _ = BeforeSuite(func(done Done) {
logf.SetLogger(zap.LoggerTo(GinkgoWriter, true))
gomockCtrl := gomock.NewController(GinkgoT())
defer gomockCtrl.Finish()
ghClient = mock.NewMockClient(gomockCtrl)
ghAdmin = mock.NewMockAdminClient(gomockCtrl)
ghAdmin.EXPECT().ClusterStatus().AnyTimes().Return(&pb.ClusterStatus{}, nil)
ghAdmin.EXPECT().SetBalancer(gomock.Any()).AnyTimes()
Expand All @@ -152,11 +150,10 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).ToNot(HaveOccurred())

err = (&HBaseReconciler{
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("HBase"),
Scheme: k8sManager.GetScheme(),
GhClient: ghClient,
GhAdmin: ghAdmin,
Client: k8sManager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("HBase"),
Scheme: k8sManager.GetScheme(),
GhAdmin: ghAdmin,
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expand Down
9 changes: 4 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ func main() {
}

if err = (&controllers.HBaseReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("HBase"),
Scheme: mgr.GetScheme(),
GhClient: gohbase.NewClient(zkQuorum),
GhAdmin: gohbase.NewAdminClient(zkQuorum),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("HBase"),
Scheme: mgr.GetScheme(),
GhAdmin: gohbase.NewAdminClient(zkQuorum),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HBase")
os.Exit(1)
Expand Down

0 comments on commit 902358f

Please sign in to comment.