Skip to content

Commit

Permalink
Added extra TestConcurrentSubscribeMultipleTopics test (#196)
Browse files Browse the repository at this point in the history
* added extra TestConcurrentSubscribeMultipleTopics test + some old tests tweaks

* fix race
  • Loading branch information
roblaszczak authored Aug 18, 2020
1 parent 70e8798 commit 280f116
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
9 changes: 5 additions & 4 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
return errors.New("Pub/Sub closed")
}

messagesToPublish := make(message.Messages, len(messages))
for i, msg := range messages {
messages[i] = msg.Copy()
messagesToPublish[i] = msg.Copy()
}

g.subscribersLock.RLock()
Expand All @@ -101,12 +102,12 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
if _, ok := g.persistedMessages[topic]; !ok {
g.persistedMessages[topic] = make([]*message.Message, 0)
}
g.persistedMessages[topic] = append(g.persistedMessages[topic], messages...)
g.persistedMessages[topic] = append(g.persistedMessages[topic], messagesToPublish...)
g.persistedMessagesLock.Unlock()
}

for i := range messages {
msg := messages[i]
for i := range messagesToPublish {
msg := messagesToPublish[i]

ackedBySubscribers, err := g.sendMessage(topic, msg)
if err != nil {
Expand Down
102 changes: 87 additions & 15 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestPubSub(
}{
{Func: TestPublishSubscribe},
{Func: TestConcurrentSubscribe},
{Func: TestConcurrentSubscribeMultipleTopics},
{Func: TestResendOnError},
{Func: TestNoAck},
{Func: TestContinueAfterSubscribeClose},
Expand Down Expand Up @@ -121,7 +122,7 @@ type Features struct {
RequireSingleInstance bool

// NewSubscriberReceivesOldMessages should be set to true if messages are persisted even
// if they are already consumed (for example, like in Kafka).
// if they are already consumed (for example, like in Kafka).
NewSubscriberReceivesOldMessages bool
}

Expand Down Expand Up @@ -269,16 +270,7 @@ func TestConcurrentSubscribe(
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}

var messagesToPublish []*message.Message

for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
messagesToPublish = append(messagesToPublish, msg)
}
err := publishWithRetry(pub, topicName, messagesToPublish...)
require.NoError(t, err, "cannot publish message")
publishedMessages := AddSimpleMessagesParallel(t, messagesCount, pub, topicName, 50)

var sub message.Subscriber
if tCtx.Features.RequireSingleInstance {
Expand All @@ -292,10 +284,80 @@ func TestConcurrentSubscribe(
messages, err := sub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

receivedMessages, all := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout*3)
receivedMessages, all := bulkRead(tCtx, messages, len(publishedMessages), defaultTimeout*3)
assert.True(t, all)

AssertAllMessagesReceived(t, messagesToPublish, receivedMessages)
AssertAllMessagesReceived(t, publishedMessages, receivedMessages)
}

func TestConcurrentSubscribeMultipleTopics(
t *testing.T,
tCtx TestContext,
pubSubConstructor PubSubConstructor,
) {
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

messagesCount := 100
topicsCount := 50

if testing.Short() {
messagesCount = 50
topicsCount = 10
}

var messagesToPublish []*message.Message
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
messagesToPublish = append(messagesToPublish, msg)
}

subsWg := sync.WaitGroup{}
subsWg.Add(topicsCount)

receivedMessagesCh := make(chan message.Messages, topicsCount)

for i := 0; i < topicsCount; i++ {
topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("_%d", i)

go func() {
defer subsWg.Done()

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
err := subscribeInitializer.SubscribeInitialize(topicName)
if err != nil {
t.Fatal(err)
}
}

err := publishWithRetry(pub, topicName, messagesToPublish...)
if err != nil {
t.Fatal(err)
}

messages, err := sub.Subscribe(context.Background(), topicName)
if err != nil {
t.Fatal(err)
}
topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout)

receivedMessagesCh <- topicMessages
}()
}

subsWg.Wait()
close(receivedMessagesCh)

topicsReceivedMessages := 0

for msgs := range receivedMessagesCh {
AssertAllMessagesReceived(t, messagesToPublish, msgs)
topicsReceivedMessages++
}

assert.Equal(t, topicsCount, topicsReceivedMessages)
}

func TestPublishSubscribeInOrder(
Expand Down Expand Up @@ -521,16 +583,26 @@ func TestContinueAfterSubscribeClose(
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}

publishedMessages := PublishSimpleMessages(t, totalMessagesCount, pub, topicName)
publishedMessages := AddSimpleMessagesParallel(t, totalMessagesCount, pub, topicName, 50)

receivedMessages := map[string]*message.Message{}
for i := 0; i < readAttempts; i++ {

pub, sub := createPubSub(t)

messages, err := sub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

receivedMessagesBatch, _ := bulkRead(tCtx, messages, batchSize, defaultTimeout)
messagesToRead := batchSize
messagesLeft := totalMessagesCount - len(receivedMessages)

if messagesToRead > messagesLeft {
messagesToRead = messagesLeft
}

receivedMessagesBatch, _ := bulkRead(tCtx, messages, messagesToRead, defaultTimeout)
closePubSub(t, pub, sub)

for _, msg := range receivedMessagesBatch {
receivedMessages[msg.UUID] = msg
}
Expand Down

0 comments on commit 280f116

Please sign in to comment.