Skip to content

Commit

Permalink
Limit the size of a response
Browse files Browse the repository at this point in the history
From the begining, gmqtt.go defines a maximum size for an MQTT response.
However if the limit was met, it resulted in an error and no message was
sent. Now, the Query itself limits the size of a frame and messages are
sent to the client, but only those whose total size <= to the total size
of an MQTT response.

Implications on the current API: for a subscribe request with a Last=X
parameter, the client will receive the last messages whose total size is <= to
the max size of a response, with a maximum of X messages. This behavior is an
improvement over just getting an error.
  • Loading branch information
Florimond committed Nov 19, 2023
1 parent 30adcd2 commit 8690299
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 30 deletions.
2 changes: 1 addition & 1 deletion internal/network/mqtt/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const smallBufferSize = 64
const maxInt = int(^uint(0) >> 1)

// buffers are reusable fixed-side buffers for faster encoding.
var buffers = newBufferPool(maxMessageSize)
var buffers = newBufferPool(MaxMessageSize)

// bufferPool represents a thread safe buffer pool
type bufferPool struct {
Expand Down
38 changes: 19 additions & 19 deletions internal/network/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (

const (
maxHeaderSize = 6 // max MQTT header size
maxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len)
MaxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len)
)

// ErrMessageTooLarge occurs when a message encoded/decoded is larger than max MQTT frame.
var ErrMessageTooLarge = errors.New("mqtt: message size exceeds 64K")
var ErrMessageBadPacket = errors.New("mqtt: bad packet")

//Message is the interface all our packets will be implementing
// Message is the interface all our packets will be implementing
type Message interface {
fmt.Stringer

Expand Down Expand Up @@ -105,70 +105,70 @@ type Publish struct {
Payload []byte
}

//Puback is sent for QOS level one to verify the receipt of a publish
//Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server."
// Puback is sent for QOS level one to verify the receipt of a publish
// Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server."
type Puback struct {
MessageID uint16
}

//Pubrec is for verifying the receipt of a publish
//Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server."
// Pubrec is for verifying the receipt of a publish
// Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server."
type Pubrec struct {
MessageID uint16
}

//Pubrel is a response to pubrec from either the client or server.
// Pubrel is a response to pubrec from either the client or server.
type Pubrel struct {
MessageID uint16
//QOS1
Header Header
}

//Pubcomp is for saying is in response to a pubrel sent by the publisher
//the final member of the QOS2 flow. both sides have said "hey, we did it!"
// Pubcomp is for saying is in response to a pubrel sent by the publisher
// the final member of the QOS2 flow. both sides have said "hey, we did it!"
type Pubcomp struct {
MessageID uint16
}

//Subscribe tells the server which topics the client would like to subscribe to
// Subscribe tells the server which topics the client would like to subscribe to
type Subscribe struct {
Header
MessageID uint16
Subscriptions []TopicQOSTuple
}

//Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern"
// Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern"
type Suback struct {
MessageID uint16
Qos []uint8
}

//Unsubscribe is the message to send if you don't want to subscribe to a topic anymore
// Unsubscribe is the message to send if you don't want to subscribe to a topic anymore
type Unsubscribe struct {
Header
MessageID uint16
Topics []TopicQOSTuple
}

//Unsuback is to unsubscribe as suback is to subscribe
// Unsuback is to unsubscribe as suback is to subscribe
type Unsuback struct {
MessageID uint16
}

//Pingreq is a keepalive
// Pingreq is a keepalive
type Pingreq struct {
}

//Pingresp is for saying "hey, the server is alive"
// Pingresp is for saying "hey, the server is alive"
type Pingresp struct {
}

//Disconnect is to signal you want to cease communications with the server
// Disconnect is to signal you want to cease communications with the server
type Disconnect struct {
}

//TopicQOSTuple is a struct for pairing the Qos and topic together
//for the QOS' pairs in unsubscribe and subscribe
// TopicQOSTuple is a struct for pairing the Qos and topic together
// for the QOS' pairs in unsubscribe and subscribe
type TopicQOSTuple struct {
Qos uint8
Topic []byte
Expand Down Expand Up @@ -326,7 +326,7 @@ func (p *Publish) EncodeTo(w io.Writer) (int, error) {
length += 2
}

if length > maxMessageSize {
if length > MaxMessageSize {
return 0, ErrMessageTooLarge
}

Expand Down
21 changes: 16 additions & 5 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/emitter-io/emitter/internal/async"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/service"
"github.com/kelindar/binary"
Expand Down Expand Up @@ -185,17 +186,27 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := message.NewPrefix(q.Ssid, q.Until)

matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
if msg, err := loadMessage(it.Item()); err == nil {
matches = append(matches, msg)
}
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
continue
}

var msg message.Message
var err error
if msg, err = loadMessage(it.Item()); err != nil {
continue
}
}

if matchesSize += len(msg.Payload) + len(msg.ID) + len(msg.Channel); matchesSize > mqtt.MaxMessageSize {
break
}

matches = append(matches, msg)
}
return nil
}); err != nil {
logging.LogError("ssd", "query lookup", err)
Expand Down
16 changes: 11 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
})
}

func TestSSD_QueryRetained(t *testing.T) {
runSSDTest(func(store *SSD) {
testRetained(t, store)
Expand Down Expand Up @@ -191,11 +197,11 @@ func BenchmarkStore(b *testing.B) {
})
}

//batch=1 batch/s=179990 msg/s=179990
//batch=10 batch/s=51094 msg/s=510942
//batch=100 batch/s=6606 msg/s=660574
//batch=1000 batch/s=552 msg/s=551637
//batch=10000 batch/s=50 msg/s=501079
// batch=1 batch/s=179990 msg/s=179990
// batch=10 batch/s=51094 msg/s=510942
// batch=100 batch/s=6606 msg/s=660574
// batch=1000 batch/s=552 msg/s=551637
// batch=10000 batch/s=50 msg/s=501079
func BenchmarkStoreParallel(b *testing.B) {
runSSDTest(func(store *SSD) {
benchmarkStoreParallel(b, store, 1, runtime.NumCPU())
Expand Down
21 changes: 21 additions & 0 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -151,3 +152,23 @@ func Test_configUint32(t *testing.T) {
v := configUint32(cfg.Config, "retain", 0)
assert.Equal(t, uint32(99999999), v)
}

func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
payload[0] = byte(i)
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
}

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
assert.NoError(t, err)

assert.Len(t, f, 4)
assert.Equal(t, 6, int(f[0].Payload[0]))
assert.Equal(t, 7, int(f[1].Payload[0]))
assert.Equal(t, 8, int(f[2].Payload[0]))
assert.Equal(t, 9, int(f[3].Payload[0]))
}
1 change: 1 addition & 0 deletions internal/service/pubsub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pubsub

import (
"bytes"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
Expand Down

0 comments on commit 8690299

Please sign in to comment.