Skip to content

Commit

Permalink
tweaks for more convenient redis benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 6, 2023
1 parent 83312da commit 9597830
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 16 deletions.
56 changes: 40 additions & 16 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -74,17 +75,41 @@ func NewTestRedisBroker(tb testing.TB, n *Node, prefix string, useStreams bool)

func NewTestRedisBrokerCluster(tb testing.TB, n *Node, prefix string, useStreams bool) *RedisBroker {
tb.Helper()

numClusterNodes := 3
numClusterNodesStr := os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS_NUM_NODES")
if numClusterNodesStr != "" {
num, err := strconv.Atoi(numClusterNodesStr)
require.NoError(tb, err)
numClusterNodes = num
}

var clusterAddresses []string

for i := 0; i < numClusterNodes; i++ {
clusterAddresses = append(clusterAddresses, net.JoinHostPort("127.0.0.1", strconv.Itoa(7000+i)))
}

redisConf := RedisShardConfig{
ClusterAddresses: []string{"localhost:7000", "localhost:7001", "localhost:7002"},
ClusterAddresses: clusterAddresses,
IOTimeout: 10 * time.Second,
}
s, err := NewRedisShard(n, redisConf)
require.NoError(tb, err)
e, err := NewRedisBroker(n, RedisBrokerConfig{
brokerConfig := RedisBrokerConfig{
Prefix: prefix,
UseLists: !useStreams,
Shards: []*RedisShard{s},
})
}

numClusterShardsStr := os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS_NUM_SHARDS")
if numClusterShardsStr != "" {
num, err := strconv.Atoi(numClusterShardsStr)
require.NoError(tb, err)
brokerConfig.numClusterShards = num
}

e, err := NewRedisBroker(n, brokerConfig)
if err != nil {
tb.Fatal(err)
}
Expand Down Expand Up @@ -139,7 +164,6 @@ func TestRedisBrokerSentinel(t *testing.T) {
},
})
require.NoError(t, err)

}

type redisTest struct {
Expand All @@ -149,10 +173,10 @@ type redisTest struct {
}

var redisTests = []redisTest{
{"lists", false, false},
{"streams", true, false},
{"lists_cluster", false, true},
{"streams_cluster", true, true},
{"list", false, false},
{"strm", true, false},
{"list_cluster", false, true},
{"strm_cluster", true, true},
}

var benchRedisTests = func() (tests []redisTest) {
Expand All @@ -163,9 +187,9 @@ var benchRedisTests = func() (tests []redisTest) {
for _, useStream := range []bool{false, true} {
var name string
if useStream {
name = "streams"
name = "strm"
} else {
name = "lists"
name = "list"
}
if useCluster {
name += "_cluster"
Expand Down Expand Up @@ -1236,7 +1260,7 @@ func BenchmarkRedisPublish_1Ch(b *testing.B) {
}
}

const benchmarkNumDifferentChannels = 1000
const benchmarkNumDifferentChannels = 1024

func BenchmarkRedisPublish_ManyCh(b *testing.B) {
for _, tt := range benchRedisTests {
Expand Down Expand Up @@ -1326,10 +1350,10 @@ func BenchmarkRedisSubscribe(b *testing.B) {
UseCluster bool
}
tests := []test{
{"non_cluster", false},
{"no_cluster", false},
}
if os.Getenv("CENTRIFUGE_REDIS_CLUSTER_BENCHMARKS") != "" {
tests = append(tests, test{"with_cluster", true})
tests = append(tests, test{"in_cluster", true})
}

for _, tt := range tests {
Expand All @@ -1344,7 +1368,7 @@ func BenchmarkRedisSubscribe(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ii := atomic.AddInt32(&i, 1)
err := broker.Subscribe("subscribe" + strconv.Itoa(int(ii)))
err := broker.Subscribe("subscribe" + strconv.Itoa(int(ii)%benchmarkNumDifferentChannels))
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -1624,8 +1648,8 @@ func BenchmarkPubSubThroughput(b *testing.B) {

node1.SetBroker(b1)

numChannels := 1024
pubCh := make(chan struct{}, 1024)
numChannels := benchmarkNumDifferentChannels
pubCh := make(chan struct{}, numChannels)
brokerEventHandler := &testBrokerEventHandler{
HandleControlFunc: func(bytes []byte) error {
return nil
Expand Down
24 changes: 24 additions & 0 deletions presence_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ func BenchmarkRedisAddPresence_1Ch(b *testing.B) {
}
}

func BenchmarkRedisAddPresence_ManyCh(b *testing.B) {
for _, tt := range benchRedisTests {
b.Run(tt.Name, func(b *testing.B) {
node := benchNode(b)
pm := newTestRedisPresenceManager(b, node, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisPresenceManager(pm)
b.SetParallelism(getBenchParallelism())
j := int32(0)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
jj := atomic.AddInt32(&j, 1)
channel := "channel" + strconv.Itoa(int(jj)%benchmarkNumDifferentChannels)
err := pm.AddPresence(channel, "uid", &ClientInfo{})
if err != nil {
b.Fatal(err)
}
}
})
})
}
}

func BenchmarkRedisPresence_1Ch(b *testing.B) {
for _, tt := range benchRedisTests {
b.Run(tt.Name, func(b *testing.B) {
Expand Down

0 comments on commit 9597830

Please sign in to comment.