Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the size of a response #413

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/network/mqtt/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const smallBufferSize = 64
const maxInt = int(^uint(0) >> 1)

// buffers are reusable fixed-side buffers for faster encoding.
var buffers = newBufferPool(maxMessageSize)
var buffers = newBufferPool(MaxMessageSize)

// bufferPool represents a thread safe buffer pool
type bufferPool struct {
Expand Down
38 changes: 19 additions & 19 deletions internal/network/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (

const (
maxHeaderSize = 6 // max MQTT header size
maxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len)
MaxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len)
)

// ErrMessageTooLarge occurs when a message encoded/decoded is larger than max MQTT frame.
var ErrMessageTooLarge = errors.New("mqtt: message size exceeds 64K")
var ErrMessageBadPacket = errors.New("mqtt: bad packet")

//Message is the interface all our packets will be implementing
// Message is the interface all our packets will be implementing
type Message interface {
fmt.Stringer

Expand Down Expand Up @@ -105,70 +105,70 @@ type Publish struct {
Payload []byte
}

//Puback is sent for QOS level one to verify the receipt of a publish
//Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server."
// Puback is sent for QOS level one to verify the receipt of a publish
// Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server."
type Puback struct {
MessageID uint16
}

//Pubrec is for verifying the receipt of a publish
//Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server."
// Pubrec is for verifying the receipt of a publish
// Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server."
type Pubrec struct {
MessageID uint16
}

//Pubrel is a response to pubrec from either the client or server.
// Pubrel is a response to pubrec from either the client or server.
type Pubrel struct {
MessageID uint16
//QOS1
Header Header
}

//Pubcomp is for saying is in response to a pubrel sent by the publisher
//the final member of the QOS2 flow. both sides have said "hey, we did it!"
// Pubcomp is for saying is in response to a pubrel sent by the publisher
// the final member of the QOS2 flow. both sides have said "hey, we did it!"
type Pubcomp struct {
MessageID uint16
}

//Subscribe tells the server which topics the client would like to subscribe to
// Subscribe tells the server which topics the client would like to subscribe to
type Subscribe struct {
Header
MessageID uint16
Subscriptions []TopicQOSTuple
}

//Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern"
// Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern"
type Suback struct {
MessageID uint16
Qos []uint8
}

//Unsubscribe is the message to send if you don't want to subscribe to a topic anymore
// Unsubscribe is the message to send if you don't want to subscribe to a topic anymore
type Unsubscribe struct {
Header
MessageID uint16
Topics []TopicQOSTuple
}

//Unsuback is to unsubscribe as suback is to subscribe
// Unsuback is to unsubscribe as suback is to subscribe
type Unsuback struct {
MessageID uint16
}

//Pingreq is a keepalive
// Pingreq is a keepalive
type Pingreq struct {
}

//Pingresp is for saying "hey, the server is alive"
// Pingresp is for saying "hey, the server is alive"
type Pingresp struct {
}

//Disconnect is to signal you want to cease communications with the server
// Disconnect is to signal you want to cease communications with the server
type Disconnect struct {
}

//TopicQOSTuple is a struct for pairing the Qos and topic together
//for the QOS' pairs in unsubscribe and subscribe
// TopicQOSTuple is a struct for pairing the Qos and topic together
// for the QOS' pairs in unsubscribe and subscribe
type TopicQOSTuple struct {
Qos uint8
Topic []byte
Expand Down Expand Up @@ -326,7 +326,7 @@ func (p *Publish) EncodeTo(w io.Writer) (int, error) {
length += 2
}

if length > maxMessageSize {
if length > MaxMessageSize {
return 0, ErrMessageTooLarge
}

Expand Down
21 changes: 16 additions & 5 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/emitter-io/emitter/internal/async"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/service"
"github.com/kelindar/binary"
Expand Down Expand Up @@ -185,17 +186,27 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := message.NewPrefix(q.Ssid, q.Until)

matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
if msg, err := loadMessage(it.Item()); err == nil {
matches = append(matches, msg)
}
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
continue
}

var msg message.Message
var err error
if msg, err = loadMessage(it.Item()); err != nil {
continue
}
}

if matchesSize += len(msg.Payload) + len(msg.ID) + len(msg.Channel); matchesSize > mqtt.MaxMessageSize {
break
}

matches = append(matches, msg)
}
return nil
}); err != nil {
logging.LogError("ssd", "query lookup", err)
Expand Down
16 changes: 11 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
})
}

func TestSSD_QueryRetained(t *testing.T) {
runSSDTest(func(store *SSD) {
testRetained(t, store)
Expand Down Expand Up @@ -191,11 +197,11 @@ func BenchmarkStore(b *testing.B) {
})
}

//batch=1 batch/s=179990 msg/s=179990
//batch=10 batch/s=51094 msg/s=510942
//batch=100 batch/s=6606 msg/s=660574
//batch=1000 batch/s=552 msg/s=551637
//batch=10000 batch/s=50 msg/s=501079
// batch=1 batch/s=179990 msg/s=179990
// batch=10 batch/s=51094 msg/s=510942
// batch=100 batch/s=6606 msg/s=660574
// batch=1000 batch/s=552 msg/s=551637
// batch=10000 batch/s=50 msg/s=501079
func BenchmarkStoreParallel(b *testing.B) {
runSSDTest(func(store *SSD) {
benchmarkStoreParallel(b, store, 1, runtime.NumCPU())
Expand Down
21 changes: 21 additions & 0 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -151,3 +152,23 @@ func Test_configUint32(t *testing.T) {
v := configUint32(cfg.Config, "retain", 0)
assert.Equal(t, uint32(99999999), v)
}

func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
payload[0] = byte(i)
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
}

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
assert.NoError(t, err)

assert.Len(t, f, 4)
assert.Equal(t, 6, int(f[0].Payload[0]))
assert.Equal(t, 7, int(f[1].Payload[0]))
assert.Equal(t, 8, int(f[2].Payload[0]))
assert.Equal(t, 9, int(f[3].Payload[0]))
}
1 change: 1 addition & 0 deletions internal/service/pubsub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pubsub

import (
"bytes"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
Expand Down
Loading