From 9ec673d9ce7733aaa73196a5c9cb0db773322515 Mon Sep 17 00:00:00 2001 From: Florimond Husquinet Date: Fri, 13 Sep 2024 01:28:20 +0200 Subject: [PATCH] more test and bug discovery --- internal/service/history/history_test.go | 114 +++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go index 03e7e569..5d17858f 100644 --- a/internal/service/history/history_test.go +++ b/internal/service/history/history_test.go @@ -15,10 +15,12 @@ package history import ( + "crypto/rand" "encoding/json" "testing" "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/network/mqtt" "github.com/emitter-io/emitter/internal/provider/storage" "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/service/fake" @@ -82,3 +84,115 @@ func TestHistory(t *testing.T) { // The response should have returned the last 2 messages. assert.Equal(t, 2, len(response.(*Response).Messages)) } + +func TestLargeMessage(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 1 long message + // Keep in mind the message will be composed of the ID and the channel size on top of the payload. + // So mqttMaxMessageSize is really smaller than the actual message size. + randomBytes := make([]byte, mqtt.MaxMessageSize) + rand.Read(randomBytes) + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + + reqBytes, _ := json.Marshal(request) + + // Issue the same request + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last message as per MQTT spec. + assert.Equal(t, 0, len(response.(*Response).Messages)) +} + +// ONLY PASSES BECAUSE OF THE BUG, THERE IS ONLY ONE SERVER SO NO GATHER +// match.Limit(limit) only limits based on the number of messages not the size of the frame +/*func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { + + // Construct a query and lookup locally first + query := newLookupQuery(ssid, from, until, startFromID, limit) + match := s.lookup(query) + + // Issue the message survey to the cluster + if req, err := binary.Marshal(query); err == nil && s.survey != nil { + if awaiter, err := s.survey.Query("ssdstore", req); err == nil { + + // Wait for all presence updates to come back (or a deadline) + for _, resp := range awaiter.Gather(2000 * time.Millisecond) { + if frame, err := message.DecodeFrame(resp); err == nil { + match = append(match, frame...) + } + } + } + } + + match.Limit(limit) + return match, nil +}*/ +func TestSumOfTwoExceedMaxSize(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 2 messages + randomBytes := make([]byte, int(mqtt.MaxMessageSize/2)) + rand.Read(randomBytes) + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: randomBytes, + TTL: 30, + }) + reqBytes, _ := json.Marshal(request) + + request.Channel = "key/a/b/c/?last=2" + reqBytes, _ = json.Marshal(request) + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last 2 messages. + assert.Equal(t, 1, len(response.(*Response).Messages)) +}