Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in MQTT protocol when sending messages #1094

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions protocol/mqtt_paho/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

type Protocol struct {
client *paho.Client
config *paho.ClientConfig
yanmxa marked this conversation as resolved.
Show resolved Hide resolved
connOption *paho.Connect
publishOption *paho.Publish
subscribeOption *paho.Subscribe
Expand Down Expand Up @@ -89,7 +88,7 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
var err error
defer m.Finish(err)

msg := p.publishOption
msg := p.publishMsg()
if cecontext.TopicFrom(ctx) != "" {
msg.Topic = cecontext.TopicFrom(ctx)
cecontext.WithTopic(ctx, "")
Expand All @@ -107,6 +106,16 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...
return err
}

// publishMsg generate a new paho.Publish message from the p.publishOption
func (p *Protocol) publishMsg() *paho.Publish {
return &paho.Publish{
QoS: p.publishOption.QoS,
Retain: p.publishOption.Retain,
Topic: p.publishOption.Topic,
Properties: p.publishOption.Properties,
}
}
Comment on lines +109 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, the race condition is that the publishOption can be changed before the message is published?

Since there is only one place to set this field in the struct (

func WithPublish(publishOpt *paho.Publish) Option {
return func(p *Protocol) error {
if publishOpt == nil {
return fmt.Errorf("the paho.Publish option must not be nil")
}
p.publishOption = publishOpt
return nil
}
}
), maybe it would make sense to make the copy there instead of every time a message is published?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Left a comment in the issue to better understand the issue/root cause. What is causing the race? Why is publishOption changing during concurrent calls? Asking all these questions because I didn't write the implementation and not familiar with MQ. Furthermore, if we change as per the above, I wonder what the implication is for existing users because we're changing the semantics here (copy instead of pointer).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Cali0707! The code block you referred to is only invoked once when initializing the client. The issues arose when sending messages in many goroutines. Here is the detail

Copy link
Contributor Author

@yanmxa yanmxa Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @embano1.

I append the root cause here.

The publishOption is a pointer reference by the MQTT message, which is used to hold the CloudEvent payload. The message(publishOption) is shared across goroutines and will be changed by them.

The publishOption is a pointer referenced by the MQTT message, which holds the CloudEvent payload. So the message (publishOption) is shared across multiple goroutines and can be modified by them.

By making the suggested changes, the user should not perceive anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and will be changed by them

Can you please point me to the code/line where this happens?
Also, ideally you also write a test please to surface the current issue (panics) and how this PR fixes it, perhaps there's more issues due to the current implementation which we can detect this way 🙏

Copy link
Contributor Author

@yanmxa yanmxa Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@embano1 No worries!

Here’s the code line where the panic occurs.

I’ve also written an integration test that covers the concurrent panic issue, which you can use to reproduce it.


func (p *Protocol) OpenInbound(ctx context.Context) error {
if p.subscribeOption == nil {
return fmt.Errorf("the paho.Subscribe option must not be nil")
Expand Down
98 changes: 98 additions & 0 deletions test/integration/mqtt_paho/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"context"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)

func TestConcurrentSendingEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

topicName := "test-ce-client-" + uuid.New().String()

readyCh := make(chan bool)
defer close(readyCh)

senderNum := 10 // 10 gorutine to sending the events
eventNum := 1000 // each gorutine sender publishs 1,000 events

var g errgroup.Group

// start a receiver
c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)
g.Go(func() error {
// verify all of events can be recieved
count := senderNum * eventNum
var mu sync.Mutex
return c.StartReceiver(ctx, func(event cloudevents.Event) {
mu.Lock()
defer mu.Unlock()
count--
if count == 0 {
readyCh <- true
}
})
})
// wait for 5 seconds to ensure the receiver starts safely
time.Sleep(5 * time.Second)

// start a sender client to pulish events concurrently
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)

evt := cloudevents.NewEvent()
evt.SetType("com.cloudevents.sample.sent")
evt.SetSource("concurrent-sender")
err = evt.SetData(cloudevents.ApplicationJSON, map[string]interface{}{"message": "Hello, World!"})
require.NoError(t, err)

for i := 0; i < senderNum; i++ {
g.Go(func() error {
for j := 0; j < eventNum; j++ {
result := client.Send(
cecontext.WithTopic(ctx, topicName),
evt,
)
if result != nil {
return result
}
}
return nil
})
}

// wait until all the events are received
handleEvent(ctx, readyCh, cancel, t)

require.NoError(t, g.Wait())
}

func handleEvent(ctx context.Context, readyCh <-chan bool, cancel context.CancelFunc, t *testing.T) {
for {
select {
case <-ctx.Done():
require.Fail(t, "Test failed: timeout reached before events were received")
return
case <-readyCh:
cancel()
t.Logf("Test passed: events successfully received")
return
}
}
}