Skip to content

Commit

Permalink
TiDB failover (#86)
Browse files Browse the repository at this point in the history
* tidb failover
  • Loading branch information
xiaojingchen authored and tennix committed Sep 11, 2018
1 parent 9336599 commit 84a26b1
Show file tree
Hide file tree
Showing 15 changed files with 751 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ spec:
- -cluster-scoped={{ .Values.clusterScoped }}
- -auto-failover={{ .Values.controllerManager.autoFailover | default false }}
- -pd-failover-period={{ .Values.controllerManager.pdFailoverPeriod | default "5m" }}
- -tidb-failover-period={{ .Values.controllerManager.tidbFailoverPeriod | default "5m" }}
- -v={{ .Values.controllerManager.logLevel }}
env:
- name: NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions charts/tidb-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ controllerManager:
autoFailover: false
# pd failover period default(5m)
pdFailoverPeriod: 5m
# tidb failover period default(5m)
tidbFailoverPeriod: 5m
22 changes: 12 additions & 10 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ import (
)

var (
printVersion bool
workers int
pdFailoverPeriod time.Duration
autoFailover bool
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
resyncDuration = 30 * time.Second
waitDuration = 5 * time.Second
printVersion bool
workers int
autoFailover bool
pdFailoverPeriod time.Duration
tidbFailoverPeriod time.Duration
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
resyncDuration = 30 * time.Second
waitDuration = 5 * time.Second
)

func init() {
Expand All @@ -57,6 +58,7 @@ func init() {
flag.StringVar(&controller.DefaultStorageClassName, "default-storage-class-name", "standard", "Default storage class name")
flag.BoolVar(&autoFailover, "auto-failover", false, "Auto failover")
flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)")
flag.DurationVar(&tidbFailoverPeriod, "tidb-failover-period", time.Duration(5*time.Minute), "TiDB failover period")

flag.Parse()
}
Expand Down Expand Up @@ -116,7 +118,7 @@ func main() {
},
}

tcController := tidbcluster.NewController(kubeCli, cli, informerFactory, kubeInformerFactory, autoFailover, pdFailoverPeriod)
tcController := tidbcluster.NewController(kubeCli, cli, informerFactory, kubeInformerFactory, autoFailover, pdFailoverPeriod, tidbFailoverPeriod)
stop := make(chan struct{})
defer close(stop)
go informerFactory.Start(stop)
Expand Down
18 changes: 14 additions & 4 deletions pkg/apis/pingcap.com/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,24 @@ type PDFailureMember struct {

// TiDBStatus is TiDB status
type TiDBStatus struct {
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
Members map[string]TiDBMember `json:"members,omitempty"`
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
Members map[string]TiDBMember `json:"members,omitempty"`
FailureMembers map[string]TiDBFailureMember `json:"failureMembers,omitempty"`
}

// TiDBMember is TiDB member
type TiDBMember struct {
IP string `json:"ip"`
Name string `json:"name"`
Health bool `json:"health"`
// Last time the health transitioned from one to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
}

// TiDBFailureMember is the tidb failure member information
type TiDBFailureMember struct {
PodName string `json:"podName,omitempty"`
Replicas int32 `json:"replicas,omitempty"`
}

// TiKVStatus is TiKV status
Expand Down
24 changes: 24 additions & 0 deletions pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func TiDBMemberName(clusterName string) string {
return fmt.Sprintf("%s-tidb", clusterName)
}

// TiDBPeerMemberName returns tidb peer service name
func TiDBPeerMemberName(clusterName string) string {
return fmt.Sprintf("%s-tidb-peer", clusterName)
}

// PriTiDBMemberName returns privileged tidb member name
func PriTiDBMemberName(clusterName string) string {
return fmt.Sprintf("%s-privileged-tidb", clusterName)
Expand Down
26 changes: 13 additions & 13 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

// PDControlInterface is an interface that knows how to manage and get tidb cluster's PD client
type PDControlInterface interface {
// GetPDClient provide PDClient of the tidb cluster.
// GetPDClient provides PDClient of the tidb cluster.
GetPDClient(tc *v1alpha1.TidbCluster) PDClient
}

Expand All @@ -47,12 +47,12 @@ type defaultPDControl struct {
pdClients map[string]PDClient
}

// NewDefaultPDControl return a defaultPDControl instance
// NewDefaultPDControl returns a defaultPDControl instance
func NewDefaultPDControl() PDControlInterface {
return &defaultPDControl{pdClients: map[string]PDClient{}}
}

// GetPDClient provide a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
func (pdc *defaultPDControl) GetPDClient(tc *v1alpha1.TidbCluster) PDClient {
pdc.mutex.Lock()
defer pdc.mutex.Unlock()
Expand All @@ -65,23 +65,23 @@ func (pdc *defaultPDControl) GetPDClient(tc *v1alpha1.TidbCluster) PDClient {
return pdc.pdClients[key]
}

// pdClientKey return the pd client key
// pdClientKey returns the pd client key
func pdClientKey(namespace, clusterName string) string {
return fmt.Sprintf("%s.%s", clusterName, namespace)
}

// pdClientUrl build the url of pd client
// pdClientUrl builds the url of pd client
func pdClientURL(namespace, clusterName string) string {
return fmt.Sprintf("http://%s-pd.%s:2379", clusterName, namespace)
}

// PDClient provider pd server's api
// PDClient provides pd server's api
type PDClient interface {
// GetHealth return the PD's health info
// GetHealth returns the PD's health info
GetHealth() (*HealthInfo, error)
// GetConfig returns PD's config
GetConfig() (*server.Config, error)
// GetCluster return used when syncing pod labels.
// GetCluster returns used when syncing pod labels.
GetCluster() (*metapb.Cluster, error)
// GetMembers returns all PD members from cluster
GetMembers() (*MembersInfo, error)
Expand All @@ -96,9 +96,9 @@ type PDClient interface {
SetStoreLabels(storeID uint64, labels map[string]string) (bool, error)
// DeleteStore deletes a TiKV store from cluster
DeleteStore(storeID uint64) error
// DeleteMember delete a PD member from cluster
// DeleteMember deletes a PD member from cluster
DeleteMember(name string) error
// DeleteMemberByID delete a PD member from cluster
// DeleteMemberByID deletes a PD member from cluster
DeleteMemberByID(memberID uint64) error
// BeginEvictLeader initiates leader eviction for a storeID.
// This is used when upgrading a pod.
Expand All @@ -123,7 +123,7 @@ type pdClient struct {
httpClient *http.Client
}

// NewPDClient return a new PDClient
// NewPDClient returns a new PDClient
func NewPDClient(url string, timeout time.Duration) PDClient {
return &pdClient{
url: url,
Expand Down Expand Up @@ -307,7 +307,7 @@ func (pc *pdClient) DeleteStore(storeID uint64) error {
}
defer DeferClose(res.Body, &err)

// Remove an offline store should return http.StatusOK
// Remove an offline store should returns http.StatusOK
if res.StatusCode == http.StatusOK {
return nil
}
Expand All @@ -317,7 +317,7 @@ func (pc *pdClient) DeleteStore(storeID uint64) error {
}

// FIXME: We should not rely on error text
// Remove a tombstone store should return "store has been removed"
// Remove a tombstone store should returns "store has been removed"
bodyStr := string(body)
if strings.Contains(bodyStr, "store has been removed") {
return nil
Expand Down
94 changes: 94 additions & 0 deletions pkg/controller/tidb_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"fmt"
"io/ioutil"
"net/http"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
)

// TiDBControlInterface is the interface that knows how to manage tidb peers
type TiDBControlInterface interface {
// GetHealth returns tidb's health info
GetHealth(tc *v1alpha1.TidbCluster) map[string]bool
}

// defaultTiDBControl is default implementation of TiDBControlInterface.
type defaultTiDBControl struct {
httpClient *http.Client
}

// NewDefaultTiDBControl returns a defaultTiDBControl instance
func NewDefaultTiDBControl() TiDBControlInterface {
httpClient := &http.Client{Timeout: timeout}
return &defaultTiDBControl{httpClient: httpClient}
}

func (tdc *defaultTiDBControl) GetHealth(tc *v1alpha1.TidbCluster) map[string]bool {
tcName := tc.GetName()
ns := tc.GetNamespace()

result := map[string]bool{}
for i := 0; i < int(tc.Spec.TiDB.Replicas); i++ {
hostName := fmt.Sprintf("%s-%d", TiDBMemberName(tcName), i)
url := fmt.Sprintf("http://%s.%s-tidb-peer.%s:10080/status", hostName, tcName, ns)
_, err := tdc.getBodyOK(url)
if err != nil {
result[hostName] = false
} else {
result[hostName] = true
}
}
return result
}

func (tdc *defaultTiDBControl) getBodyOK(apiURL string) ([]byte, error) {
res, err := tdc.httpClient.Get(apiURL)
if err != nil {
return nil, err
}
if res.StatusCode >= 400 {
errMsg := fmt.Errorf(fmt.Sprintf("Error response %v", res.StatusCode))
return nil, errMsg
}

defer DeferClose(res.Body, &err)
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, err
}

// FakeTiDBControl is a fake implementation of TiDBControlInterface.
type FakeTiDBControl struct {
healthInfo map[string]bool
}

// NewFakeTiDBControl returns a FakeTiDBControl instance
func NewFakeTiDBControl() *FakeTiDBControl {
return &FakeTiDBControl{}
}

// SetHealth set health info for FakeTiDBControl
func (ftd *FakeTiDBControl) SetHealth(healthInfo map[string]bool) {
ftd.healthInfo = healthInfo
}

func (ftd *FakeTiDBControl) GetHealth(tc *v1alpha1.TidbCluster) map[string]bool {
return ftd.healthInfo
}
4 changes: 3 additions & 1 deletion pkg/controller/tidbcluster/tidb_cluster_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetC
nodeInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Nodes()

pdControl := controller.NewFakePDControl()
tidbControl := controller.NewFakeTiDBControl()
setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer)
pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), recorder)
Expand All @@ -95,10 +96,11 @@ func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetC
tikvFailover := mm.NewFakeTiKVFailover()
tikvUpgrader := mm.NewFakeTiKVUpgrader()
tidbUpgrader := mm.NewFakeTiDBUpgrader()
tidbFailover := mm.NewFakeTiDBFailover()

pdMemberManager := mm.NewPDMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, pdUpgrader, autoFailover, pdFailover)
tikvMemberManager := mm.NewTiKVMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), autoFailover, tikvFailover, tikvScaler, tikvUpgrader)
tidbMemberManager := mm.NewTiDBMemberManager(setControl, setInformer.Lister(), tidbUpgrader)
tidbMemberManager := mm.NewTiDBMemberManager(setControl, svcControl, tidbControl, setInformer.Lister(), svcInformer.Lister(), tidbUpgrader, autoFailover, tidbFailover)
reclaimPolicyManager := meta.NewReclaimPolicyManager(pvcInformer.Lister(), pvInformer.Lister(), pvControl)
metaManager := meta.NewMetaManager(pvcInformer.Lister(), pvcControl, pvInformer.Lister(), pvControl, podInformer.Lister(), podControl)
control := NewDefaultTidbClusterControl(tcControl, pdMemberManager, tikvMemberManager, tidbMemberManager, reclaimPolicyManager, metaManager, recorder)
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func NewController(
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory,
autoFailover bool,
pdFailoverPeriod time.Duration) *Controller {
pdFailoverPeriod time.Duration,
tidbFailoverPeriod time.Duration,
) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Expand All @@ -88,6 +90,7 @@ func NewController(

tcControl := controller.NewRealTidbClusterControl(cli, tcInformer.Lister(), recorder)
pdControl := controller.NewDefaultPDControl()
tidbControl := controller.NewDefaultTiDBControl()
setControl := controller.NewRealStatefuSetControl(kubeCli, setInformer.Lister(), recorder)
svcControl := controller.NewRealServiceControl(kubeCli, svcInformer.Lister(), recorder)
pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), recorder)
Expand All @@ -100,6 +103,7 @@ func NewController(
tikvFailover := mm.NewTiKVFailover(pdControl)
tikvUpgrader := mm.NewTiKVUpgrader()
tidbUpgrader := mm.NewTiDBUpgrader()
tidbFailover := mm.NewTiDBFailover(tidbFailoverPeriod)

tcc := &Controller{
kubeClient: kubeCli,
Expand Down Expand Up @@ -135,8 +139,13 @@ func NewController(
),
mm.NewTiDBMemberManager(
setControl,
svcControl,
tidbControl,
setInformer.Lister(),
svcInformer.Lister(),
tidbUpgrader,
autoFailover,
tidbFailover,
),
meta.NewReclaimPolicyManager(
pvcInformer.Lister(),
Expand Down
Loading

0 comments on commit 84a26b1

Please sign in to comment.