diff --git a/internal/network/mqtt/buffer.go b/internal/network/mqtt/buffer.go index 4b9f5608..591a0749 100644 --- a/internal/network/mqtt/buffer.go +++ b/internal/network/mqtt/buffer.go @@ -23,7 +23,7 @@ const smallBufferSize = 64 const maxInt = int(^uint(0) >> 1) // buffers are reusable fixed-side buffers for faster encoding. -var buffers = newBufferPool(maxMessageSize) +var buffers = newBufferPool(MaxMessageSize) // bufferPool represents a thread safe buffer pool type bufferPool struct { diff --git a/internal/network/mqtt/mqtt.go b/internal/network/mqtt/mqtt.go index 15cc3f67..fde82aff 100644 --- a/internal/network/mqtt/mqtt.go +++ b/internal/network/mqtt/mqtt.go @@ -22,14 +22,14 @@ import ( const ( maxHeaderSize = 6 // max MQTT header size - maxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len) + MaxMessageSize = 65536 // max MQTT message size is impossible to increase as per protocol (uint16 len) ) // ErrMessageTooLarge occurs when a message encoded/decoded is larger than max MQTT frame. var ErrMessageTooLarge = errors.New("mqtt: message size exceeds 64K") var ErrMessageBadPacket = errors.New("mqtt: bad packet") -//Message is the interface all our packets will be implementing +// Message is the interface all our packets will be implementing type Message interface { fmt.Stringer @@ -105,70 +105,70 @@ type Publish struct { Payload []byte } -//Puback is sent for QOS level one to verify the receipt of a publish -//Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server." +// Puback is sent for QOS level one to verify the receipt of a publish +// Qoth the spec: "A PUBACK message is sent by a server in response to a PUBLISH message from a publishing client, and by a subscriber in response to a PUBLISH message from the server." type Puback struct { MessageID uint16 } -//Pubrec is for verifying the receipt of a publish -//Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server." +// Pubrec is for verifying the receipt of a publish +// Qoth the spec:"It is the second message of the QoS level 2 protocol flow. A PUBREC message is sent by the server in response to a PUBLISH message from a publishing client, or by a subscriber in response to a PUBLISH message from the server." type Pubrec struct { MessageID uint16 } -//Pubrel is a response to pubrec from either the client or server. +// Pubrel is a response to pubrec from either the client or server. type Pubrel struct { MessageID uint16 //QOS1 Header Header } -//Pubcomp is for saying is in response to a pubrel sent by the publisher -//the final member of the QOS2 flow. both sides have said "hey, we did it!" +// Pubcomp is for saying is in response to a pubrel sent by the publisher +// the final member of the QOS2 flow. both sides have said "hey, we did it!" type Pubcomp struct { MessageID uint16 } -//Subscribe tells the server which topics the client would like to subscribe to +// Subscribe tells the server which topics the client would like to subscribe to type Subscribe struct { Header MessageID uint16 Subscriptions []TopicQOSTuple } -//Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern" +// Suback is to say "hey, you got it buddy. I will send you messages that fit this pattern" type Suback struct { MessageID uint16 Qos []uint8 } -//Unsubscribe is the message to send if you don't want to subscribe to a topic anymore +// Unsubscribe is the message to send if you don't want to subscribe to a topic anymore type Unsubscribe struct { Header MessageID uint16 Topics []TopicQOSTuple } -//Unsuback is to unsubscribe as suback is to subscribe +// Unsuback is to unsubscribe as suback is to subscribe type Unsuback struct { MessageID uint16 } -//Pingreq is a keepalive +// Pingreq is a keepalive type Pingreq struct { } -//Pingresp is for saying "hey, the server is alive" +// Pingresp is for saying "hey, the server is alive" type Pingresp struct { } -//Disconnect is to signal you want to cease communications with the server +// Disconnect is to signal you want to cease communications with the server type Disconnect struct { } -//TopicQOSTuple is a struct for pairing the Qos and topic together -//for the QOS' pairs in unsubscribe and subscribe +// TopicQOSTuple is a struct for pairing the Qos and topic together +// for the QOS' pairs in unsubscribe and subscribe type TopicQOSTuple struct { Qos uint8 Topic []byte @@ -326,7 +326,7 @@ func (p *Publish) EncodeTo(w io.Writer) (int, error) { length += 2 } - if length > maxMessageSize { + if length > MaxMessageSize { return 0, ErrMessageTooLarge } diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index f0b4cfd2..ca356d5c 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -22,6 +22,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/emitter-io/emitter/internal/async" "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/network/mqtt" "github.com/emitter-io/emitter/internal/provider/logging" "github.com/emitter-io/emitter/internal/service" "github.com/kelindar/binary" @@ -185,17 +186,27 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { // we'll iterate forward but have reverse time ('until' -> 'from') prefix := message.NewPrefix(q.Ssid, q.Until) + matchesSize := 0 // Seek the prefix and check the key so we can quickly exit the iteration. for it.Seek(prefix); it.Valid() && message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) && len(matches) < q.Limit; it.Next() { - if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) { - if msg, err := loadMessage(it.Item()); err == nil { - matches = append(matches, msg) - } + if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) { + continue + } + + var msg message.Message + var err error + if msg, err = loadMessage(it.Item()); err != nil { + continue } - } + if matchesSize += len(msg.Payload) + len(msg.ID) + len(msg.Channel); matchesSize > mqtt.MaxMessageSize { + break + } + + matches = append(matches, msg) + } return nil }); err != nil { logging.LogError("ssd", "query lookup", err) diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go index a0924848..968551e8 100644 --- a/internal/provider/storage/ssd_test.go +++ b/internal/provider/storage/ssd_test.go @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) { }) } +func TestSSD_MaxResponseSizeReached(t *testing.T) { + runSSDTest(func(store *SSD) { + testMaxResponseSizeReached(t, store) + }) +} + func TestSSD_QueryRetained(t *testing.T) { runSSDTest(func(store *SSD) { testRetained(t, store) @@ -191,11 +197,11 @@ func BenchmarkStore(b *testing.B) { }) } -//batch=1 batch/s=179990 msg/s=179990 -//batch=10 batch/s=51094 msg/s=510942 -//batch=100 batch/s=6606 msg/s=660574 -//batch=1000 batch/s=552 msg/s=551637 -//batch=10000 batch/s=50 msg/s=501079 +// batch=1 batch/s=179990 msg/s=179990 +// batch=10 batch/s=51094 msg/s=510942 +// batch=100 batch/s=6606 msg/s=660574 +// batch=1000 batch/s=552 msg/s=551637 +// batch=10000 batch/s=50 msg/s=501079 func BenchmarkStoreParallel(b *testing.B) { runSSDTest(func(store *SSD) { benchmarkStoreParallel(b, store, 1, runtime.NumCPU()) diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go index 5e8f3d66..9d9684bc 100644 --- a/internal/provider/storage/storage_test.go +++ b/internal/provider/storage/storage_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/network/mqtt" "github.com/stretchr/testify/assert" ) @@ -151,3 +152,23 @@ func Test_configUint32(t *testing.T) { v := configUint32(cfg.Config, "retain", 0) assert.Equal(t, uint32(99999999), v) } + +func testMaxResponseSizeReached(t *testing.T, store Storage) { + for i := int64(0); i < 10; i++ { + payload := make([]byte, mqtt.MaxMessageSize/5) + payload[0] = byte(i) + msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload) + msg.ID.SetTime(msg.ID.Time() + (i * 10000)) + assert.NoError(t, store.Store(msg)) + } + + zero := time.Unix(0, 0) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10) + assert.NoError(t, err) + + assert.Len(t, f, 4) + assert.Equal(t, 6, int(f[0].Payload[0])) + assert.Equal(t, 7, int(f[1].Payload[0])) + assert.Equal(t, 8, int(f[2].Payload[0])) + assert.Equal(t, 9, int(f[3].Payload[0])) +} diff --git a/internal/service/pubsub/subscribe.go b/internal/service/pubsub/subscribe.go index c9d0834f..8896e7f5 100644 --- a/internal/service/pubsub/subscribe.go +++ b/internal/service/pubsub/subscribe.go @@ -16,6 +16,7 @@ package pubsub import ( "bytes" + "github.com/emitter-io/emitter/internal/errors" "github.com/emitter-io/emitter/internal/event" "github.com/emitter-io/emitter/internal/message"