Skip to content

Commit

Permalink
Assign a separate lookup table to each group (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily committed Jul 1, 2024
1 parent d64eeb6 commit bd8384c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release Notes.
- Check unregistered nodes in background.
- Improve sorting performance of stream.
- Add the measure query trace.
- Assign a separate lookup table to each group in the maglev selector.

### Bugs

Expand Down
5 changes: 1 addition & 4 deletions pkg/cmdsetup/liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
nodeSel, err := node.NewMaglevSelector()
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate required node selector")
}
nodeSel := node.NewMaglevSelector()
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, nodeRegistry)
profSvc := observability.NewProfService()
Expand Down
5 changes: 0 additions & 5 deletions pkg/node/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package node

import (
"strconv"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -97,7 +96,3 @@ func (p *pickFirstSelector) Pick(_, _ string, _ uint32) (string, error) {
}
return p.nodeIDs[0], nil
}

func formatSearchKey(group, name string, shardID uint32) string {
return group + "/" + name + "#" + strconv.FormatUint(uint64(shardID), 10)
}
69 changes: 57 additions & 12 deletions pkg/node/maglev.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,81 @@
package node

import (
"sort"
"strconv"
"sync"

"github.com/kkdai/maglev"

databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
)

const lookupTableSize = 65537

var _ Selector = (*maglevSelector)(nil)

type maglevSelector struct {
maglev *maglev.Maglev
routers sync.Map
nodes []string
mutex sync.RWMutex
}

func (m *maglevSelector) AddNode(node *databasev1.Node) {
_ = m.maglev.Add(node.GetMetadata().GetName())
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.nodes {
if m.nodes[i] == node.GetMetadata().GetName() {
return
}
}
m.nodes = append(m.nodes, node.GetMetadata().GetName())
sort.StringSlice(m.nodes).Sort()
m.routers.Range(func(_, value any) bool {
_ = value.(*maglev.Maglev).Set(m.nodes)
return true
})
}

func (m *maglevSelector) RemoveNode(node *databasev1.Node) {
_ = m.maglev.Remove(node.GetMetadata().GetName())
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.nodes {
if m.nodes[i] == node.GetMetadata().GetName() {
m.nodes = append(m.nodes[:i], m.nodes[i+1:]...)
break
}
}
m.routers.Range(func(_, value any) bool {
_ = value.(*maglev.Maglev).Set(m.nodes)
return true
})
}

func (m *maglevSelector) Pick(group, name string, shardID uint32) (string, error) {
return m.maglev.Get(formatSearchKey(group, name, shardID))
}
router, ok := m.routers.Load(group)
if ok {
return router.(*maglev.Maglev).Get(formatSearchKey(name, shardID))
}
m.mutex.Lock()
defer m.mutex.Unlock()
router, ok = m.routers.Load(group)
if ok {
return router.(*maglev.Maglev).Get(formatSearchKey(name, shardID))
}

// NewMaglevSelector creates a new backend selector based on Maglev hashing algorithm.
func NewMaglevSelector() (Selector, error) {
alg, err := maglev.NewMaglev(nil, 65537)
mTab, err := maglev.NewMaglev(m.nodes, lookupTableSize)
if err != nil {
return nil, err
return "", err
}
return &maglevSelector{
maglev: alg,
}, nil
m.routers.Store(group, mTab)
return mTab.Get(formatSearchKey(name, shardID))
}

// NewMaglevSelector creates a new backend selector based on Maglev hashing algorithm.
func NewMaglevSelector() Selector {
return &maglevSelector{}
}

func formatSearchKey(name string, shardID uint32) string {
return name + "-" + strconv.FormatUint(uint64(shardID), 10)
}
12 changes: 5 additions & 7 deletions pkg/node/maglev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ const (
)

func TestMaglevSelector(t *testing.T) {
sel, err := NewMaglevSelector()
assert.NoError(t, err)
sel := NewMaglevSelector()
sel.AddNode(&databasev1.Node{
Metadata: &commonv1.Metadata{
Name: "data-node-1",
Expand All @@ -55,8 +54,7 @@ func TestMaglevSelector(t *testing.T) {
}

func TestMaglevSelector_EvenDistribution(t *testing.T) {
sel, err := NewMaglevSelector()
assert.NoError(t, err)
sel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
sel.AddNode(&databasev1.Node{
Expand All @@ -83,8 +81,8 @@ func TestMaglevSelector_EvenDistribution(t *testing.T) {
}

func TestMaglevSelector_DiffNode(t *testing.T) {
fullSel, _ := NewMaglevSelector()
brokenSel, _ := NewMaglevSelector()
fullSel := NewMaglevSelector()
brokenSel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
fullSel.AddNode(&databasev1.Node{
Expand Down Expand Up @@ -114,7 +112,7 @@ func TestMaglevSelector_DiffNode(t *testing.T) {
}

func BenchmarkMaglevSelector_Pick(b *testing.B) {
sel, _ := NewMaglevSelector()
sel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
sel.AddNode(&databasev1.Node{
Expand Down

0 comments on commit bd8384c

Please sign in to comment.