Skip to content

Commit

Permalink
cluster: cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed May 21, 2018
1 parent 18b2e30 commit 5c742ff
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 90 deletions.
81 changes: 64 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"math/rand"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -35,6 +36,7 @@ type ClusterOptions struct {
// Enables read-only commands on slave nodes.
ReadOnly bool
// Allows routing read-only commands to the closest master or slave node.
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
RouteRandomly bool
Expand Down Expand Up @@ -150,6 +152,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
return &node
}

func (n *clusterNode) String() string {
return n.Client.String()
}

func (n *clusterNode) Close() error {
return n.Client.Close()
}
Expand Down Expand Up @@ -379,15 +385,17 @@ func (c *clusterNodes) Random() (*clusterNode, error) {

type clusterState struct {
nodes *clusterNodes
masters []*clusterNode
slaves []*clusterNode
Masters []*clusterNode
Slaves []*clusterNode

slots [][]*clusterNode

generation uint32
}

func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) {
func newClusterState(
nodes *clusterNodes, slots []ClusterSlot, origin string,
) (*clusterState, error) {
c := clusterState{
nodes: nodes,
generation: nodes.NextGeneration(),
Expand All @@ -413,9 +421,9 @@ func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*
nodes = append(nodes, node)

if i == 0 {
c.masters = appendNode(c.masters, node)
c.Masters = appendUniqueNode(c.Masters, node)
} else {
c.slaves = appendNode(c.slaves, node)
c.Slaves = appendUniqueNode(c.Slaves, node)
}
}

Expand Down Expand Up @@ -497,6 +505,28 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}

func (c *clusterState) IsConsistent() bool {
if len(c.Masters) > len(c.Slaves) {
return false
}

for _, master := range c.Masters {
s := master.Client.Info("replication").Val()
if !strings.Contains(s, "role:master") {
return false
}
}

for _, slave := range c.Slaves {
s := slave.Client.Info("replication").Val()
if !strings.Contains(s, "role:slave") {
return false
}
}

return true
}

//------------------------------------------------------------------------------

type clusterStateHolder struct {
Expand All @@ -516,7 +546,18 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
}

func (c *clusterStateHolder) Load() (*clusterState, error) {
func (c *clusterStateHolder) Reload() (*clusterState, error) {
state, err := c.reload()
if err != nil {
return nil, err
}
if !state.IsConsistent() {
c.LazyReload()
}
return state, nil
}

func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
c.lastErrMu.Lock()
Expand All @@ -535,9 +576,15 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)

_, err := c.Load()
if err == nil {
time.Sleep(time.Second)
for {
state, err := c.reload()
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
if state.IsConsistent() {
return
}
}
}()
}
Expand Down Expand Up @@ -596,7 +643,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {

c.cmdable.setProcessor(c.Process)

_, _ = c.state.Load()
_, _ = c.state.Reload()
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
Expand Down Expand Up @@ -890,7 +937,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {

var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, master := range state.masters {
for _, master := range state.Masters {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
Expand Down Expand Up @@ -923,7 +970,7 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {

var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, slave := range state.slaves {
for _, slave := range state.Slaves {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
Expand Down Expand Up @@ -967,11 +1014,11 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
}
}

for _, node := range state.masters {
for _, node := range state.Masters {
wg.Add(1)
go worker(node)
}
for _, node := range state.slaves {
for _, node := range state.Slaves {
wg.Add(1)
go worker(node)
}
Expand All @@ -994,7 +1041,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
return &acc
}

for _, node := range state.masters {
for _, node := range state.Masters {
s := node.Client.connPool.Stats()
acc.Hits += s.Hits
acc.Misses += s.Misses
Expand All @@ -1005,7 +1052,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.StaleConns += s.StaleConns
}

for _, node := range state.slaves {
for _, node := range state.Slaves {
s := node.Client.connPool.Stats()
acc.Hits += s.Hits
acc.Misses += s.Misses
Expand Down Expand Up @@ -1438,7 +1485,7 @@ func isLoopbackAddr(addr string) bool {
return ip.IsLoopback()
}

func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {
return nodes
Expand Down
Loading

0 comments on commit 5c742ff

Please sign in to comment.