Skip to content

Commit

Permalink
Configurable parallelism for benchmarks (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Nov 5, 2023
1 parent 8ebee8a commit 83312da
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
19 changes: 17 additions & 2 deletions broker_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package centrifuge

import (
"context"
"os"
"strconv"
"testing"
"time"
Expand All @@ -10,6 +11,20 @@ import (
"github.com/stretchr/testify/require"
)

const defaultParallelism = 128

func getBenchParallelism() int {
parallelism := os.Getenv("PARALLELISM")
if parallelism == "" {
return defaultParallelism
}
p, err := strconv.Atoi(parallelism)
if err != nil {
panic(err)
}
return p
}

func testMemoryBroker() *MemoryBroker {
n, err := New(Config{
LogLevel: LogLevelDebug,
Expand Down Expand Up @@ -294,7 +309,7 @@ func BenchmarkMemoryPublish_1Ch(b *testing.B) {
defer func() { _ = e.node.Shutdown(context.Background()) }()

rawData := protocol.Raw(`{"bench": true}`)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand All @@ -312,7 +327,7 @@ func BenchmarkMemoryPublish_History_1Ch(b *testing.B) {

rawData := protocol.Raw(`{"bench": true}`)
chOpts := PublishOptions{HistorySize: 100, HistoryTTL: 60 * time.Second}
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down
16 changes: 8 additions & 8 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ func BenchmarkRedisPublish_1Ch(b *testing.B) {
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(broker)
rawData := []byte(`{"bench": true}`)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand All @@ -1246,7 +1246,7 @@ func BenchmarkRedisPublish_ManyCh(b *testing.B) {
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(broker)
rawData := []byte(`{"bench": true}`)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
j := int32(0)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -1272,7 +1272,7 @@ func BenchmarkRedisPublish_History_1Ch(b *testing.B) {
defer stopRedisBroker(broker)
rawData := []byte(`{"bench": true}`)
chOpts := PublishOptions{HistorySize: 100, HistoryTTL: 100 * time.Second}
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand All @@ -1299,7 +1299,7 @@ func BenchmarkRedisPub_History_ManyCh(b *testing.B) {
defer stopRedisBroker(broker)
rawData := []byte(`{"bench": true}`)
chOpts := PublishOptions{HistorySize: 100, HistoryTTL: 100 * time.Second}
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
j := int32(0)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand Down Expand Up @@ -1339,7 +1339,7 @@ func BenchmarkRedisSubscribe(b *testing.B) {
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(broker)
i := int32(0)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand All @@ -1366,7 +1366,7 @@ func BenchmarkRedisHistory_1Ch(b *testing.B) {
_, err := broker.Publish("channel", rawData, PublishOptions{HistorySize: 4, HistoryTTL: 300 * time.Second})
require.NoError(b, err)
}
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down Expand Up @@ -1394,7 +1394,7 @@ func BenchmarkRedisRecover_1Ch(b *testing.B) {
_, err := broker.Publish("channel", rawData, PublishOptions{HistorySize: numMessages, HistoryTTL: 300 * time.Second})
require.NoError(b, err)
}
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down Expand Up @@ -1655,7 +1655,7 @@ func BenchmarkPubSubThroughput(b *testing.B) {
defer stopRedisBroker(b2)

b.ReportAllocs()
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
var i int64
b.RunParallel(func(pb *testing.PB) {
Expand Down
2 changes: 2 additions & 0 deletions presence_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func BenchmarkMemoryAddPresence_OneChannel_Parallel(b *testing.B) {
defer func() { _ = e.node.Shutdown(context.Background()) }()

b.ResetTimer()
b.SetParallelism(getBenchParallelism())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := e.AddPresence("channel", "uid", &ClientInfo{})
Expand Down Expand Up @@ -131,6 +132,7 @@ func BenchmarkMemoryPresence_OneChannel_Parallel(b *testing.B) {
defer func() { _ = e.node.Shutdown(context.Background()) }()

_ = e.AddPresence("channel", "uid", &ClientInfo{})
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down
6 changes: 3 additions & 3 deletions presence_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BenchmarkRedisAddPresence_1Ch(b *testing.B) {
pm := newTestRedisPresenceManager(b, node, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisPresenceManager(pm)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand All @@ -132,7 +132,7 @@ func BenchmarkRedisPresence_1Ch(b *testing.B) {
pm := newTestRedisPresenceManager(b, node, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisPresenceManager(pm)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
_ = pm.AddPresence("channel", "uid", &ClientInfo{})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -154,7 +154,7 @@ func BenchmarkRedisPresence_ManyCh(b *testing.B) {
pm := newTestRedisPresenceManager(b, node, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisPresenceManager(pm)
b.SetParallelism(128)
b.SetParallelism(getBenchParallelism())
_ = pm.AddPresence("channel", "uid", &ClientInfo{})
j := int32(0)
b.ResetTimer()
Expand Down

0 comments on commit 83312da

Please sign in to comment.