From bd8384ca5f66838f51a878668ef2a3785a806565 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Mon, 1 Jul 2024 17:16:30 +0800 Subject: [PATCH] Assign a separate lookup table to each group (#482) --- CHANGES.md | 1 + pkg/cmdsetup/liaison.go | 5 +-- pkg/node/interface.go | 5 --- pkg/node/maglev.go | 69 ++++++++++++++++++++++++++++++++++------- pkg/node/maglev_test.go | 12 +++---- 5 files changed, 64 insertions(+), 28 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 82ca6e08d..408a8b298 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - Check unregistered nodes in background. - Improve sorting performance of stream. - Add the measure query trace. +- Assign a separate lookup table to each group in the maglev selector. ### Bugs diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index a7112e942..67cd942a3 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -46,10 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { } pipeline := pub.New(metaSvc) localPipeline := queue.Local() - nodeSel, err := node.NewMaglevSelector() - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate required node selector") - } + nodeSel := node.NewMaglevSelector() nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel) grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, nodeRegistry) profSvc := observability.NewProfService() diff --git a/pkg/node/interface.go b/pkg/node/interface.go index 7745c9ad6..26ec5cc2b 100644 --- a/pkg/node/interface.go +++ b/pkg/node/interface.go @@ -19,7 +19,6 @@ package node import ( - "strconv" "sync" "github.com/pkg/errors" @@ -97,7 +96,3 @@ func (p *pickFirstSelector) Pick(_, _ string, _ uint32) (string, error) { } return p.nodeIDs[0], nil } - -func formatSearchKey(group, name string, shardID uint32) string { - return group + "/" + name + "#" + strconv.FormatUint(uint64(shardID), 10) -} diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go index deb140f26..fea2c5b20 100644 --- a/pkg/node/maglev.go +++ b/pkg/node/maglev.go @@ -18,36 +18,81 @@ package node import ( + "sort" + "strconv" + "sync" + "github.com/kkdai/maglev" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" ) +const lookupTableSize = 65537 + var _ Selector = (*maglevSelector)(nil) type maglevSelector struct { - maglev *maglev.Maglev + routers sync.Map + nodes []string + mutex sync.RWMutex } func (m *maglevSelector) AddNode(node *databasev1.Node) { - _ = m.maglev.Add(node.GetMetadata().GetName()) + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.nodes { + if m.nodes[i] == node.GetMetadata().GetName() { + return + } + } + m.nodes = append(m.nodes, node.GetMetadata().GetName()) + sort.StringSlice(m.nodes).Sort() + m.routers.Range(func(_, value any) bool { + _ = value.(*maglev.Maglev).Set(m.nodes) + return true + }) } func (m *maglevSelector) RemoveNode(node *databasev1.Node) { - _ = m.maglev.Remove(node.GetMetadata().GetName()) + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.nodes { + if m.nodes[i] == node.GetMetadata().GetName() { + m.nodes = append(m.nodes[:i], m.nodes[i+1:]...) + break + } + } + m.routers.Range(func(_, value any) bool { + _ = value.(*maglev.Maglev).Set(m.nodes) + return true + }) } func (m *maglevSelector) Pick(group, name string, shardID uint32) (string, error) { - return m.maglev.Get(formatSearchKey(group, name, shardID)) -} + router, ok := m.routers.Load(group) + if ok { + return router.(*maglev.Maglev).Get(formatSearchKey(name, shardID)) + } + m.mutex.Lock() + defer m.mutex.Unlock() + router, ok = m.routers.Load(group) + if ok { + return router.(*maglev.Maglev).Get(formatSearchKey(name, shardID)) + } -// NewMaglevSelector creates a new backend selector based on Maglev hashing algorithm. -func NewMaglevSelector() (Selector, error) { - alg, err := maglev.NewMaglev(nil, 65537) + mTab, err := maglev.NewMaglev(m.nodes, lookupTableSize) if err != nil { - return nil, err + return "", err } - return &maglevSelector{ - maglev: alg, - }, nil + m.routers.Store(group, mTab) + return mTab.Get(formatSearchKey(name, shardID)) +} + +// NewMaglevSelector creates a new backend selector based on Maglev hashing algorithm. +func NewMaglevSelector() Selector { + return &maglevSelector{} +} + +func formatSearchKey(name string, shardID uint32) string { + return name + "-" + strconv.FormatUint(uint64(shardID), 10) } diff --git a/pkg/node/maglev_test.go b/pkg/node/maglev_test.go index d581dd03c..74505a3a8 100644 --- a/pkg/node/maglev_test.go +++ b/pkg/node/maglev_test.go @@ -34,8 +34,7 @@ const ( ) func TestMaglevSelector(t *testing.T) { - sel, err := NewMaglevSelector() - assert.NoError(t, err) + sel := NewMaglevSelector() sel.AddNode(&databasev1.Node{ Metadata: &commonv1.Metadata{ Name: "data-node-1", @@ -55,8 +54,7 @@ func TestMaglevSelector(t *testing.T) { } func TestMaglevSelector_EvenDistribution(t *testing.T) { - sel, err := NewMaglevSelector() - assert.NoError(t, err) + sel := NewMaglevSelector() dataNodeNum := 10 for i := 0; i < dataNodeNum; i++ { sel.AddNode(&databasev1.Node{ @@ -83,8 +81,8 @@ func TestMaglevSelector_EvenDistribution(t *testing.T) { } func TestMaglevSelector_DiffNode(t *testing.T) { - fullSel, _ := NewMaglevSelector() - brokenSel, _ := NewMaglevSelector() + fullSel := NewMaglevSelector() + brokenSel := NewMaglevSelector() dataNodeNum := 10 for i := 0; i < dataNodeNum; i++ { fullSel.AddNode(&databasev1.Node{ @@ -114,7 +112,7 @@ func TestMaglevSelector_DiffNode(t *testing.T) { } func BenchmarkMaglevSelector_Pick(b *testing.B) { - sel, _ := NewMaglevSelector() + sel := NewMaglevSelector() dataNodeNum := 10 for i := 0; i < dataNodeNum; i++ { sel.AddNode(&databasev1.Node{