From 705518ce6cf4a90c89947d73a89226313166f0f5 Mon Sep 17 00:00:00 2001 From: Florimond Husquinet Date: Tue, 14 Nov 2023 02:14:02 +0100 Subject: [PATCH] storage test --- internal/provider/storage/ssd.go | 1 - internal/provider/storage/ssd_test.go | 6 ++ internal/provider/storage/storage_test.go | 20 +++++++ internal/service/history/history.go | 73 ++++++----------------- internal/service/history/history_test.go | 68 +++++++++++++-------- 5 files changed, 85 insertions(+), 83 deletions(-) diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index 0b17266e..aac045dd 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -125,7 +125,6 @@ func encodeFrame(msgs message.Frame) []*badger.Entry { // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. func (s *SSD) Query(ssid message.Ssid, from, untilTime time.Time, untilID message.ID, limiter Limiter) (message.Frame, error) { - // Construct a query and lookup locally first query := newLookupQuery(ssid, from, untilTime, untilID, limiter) match := s.lookup(query) diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go index a5298437..bb96e21c 100644 --- a/internal/provider/storage/ssd_test.go +++ b/internal/provider/storage/ssd_test.go @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) { }) } +func TestSSD_QueryUntilID(t *testing.T) { + runSSDTest(func(store *SSD) { + testUntilID(t, store) + }) +} + func TestSSD_QueryRetained(t *testing.T) { runSSDTest(func(store *SSD) { testRetained(t, store) diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go index ba36f3d9..b51c1140 100644 --- a/internal/provider/storage/storage_test.go +++ b/internal/provider/storage/storage_test.go @@ -110,6 +110,26 @@ func testRetained(t *testing.T, store Storage) { assert.Equal(t, "9", string(f[0].Payload)) } +func testUntilID(t *testing.T, store Storage) { + var fourth message.ID + for i := int64(0); i < 10; i++ { + msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), []byte(fmt.Sprintf("%d", i))) + msg.TTL = message.RetainedTTL + msg.ID.SetTime(msg.ID.Time() + (i * 10000)) + assert.NoError(t, store.Store(msg)) + if i == 4 { + fourth = msg.ID + } + } + + // Issue a query + zero := time.Unix(0, 0) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, NewMessageNumberLimiter(100)) + assert.NoError(t, err) + + assert.Len(t, f, 4) +} + func testRange(t *testing.T, store Storage) { var t0, t1 int64 for i := int64(0); i < 100; i++ { diff --git a/internal/service/history/history.go b/internal/service/history/history.go index 97466b1e..1db61b27 100644 --- a/internal/service/history/history.go +++ b/internal/service/history/history.go @@ -41,7 +41,6 @@ type Message struct { type Response struct { Request uint16 `json:"req,omitempty"` // The corresponding request ID. Messages []Message `json:"messages"` // The history of messages. - //Messages message.Frame `json:"messages"` } // ForRequest sets the request ID in the response for matching @@ -49,19 +48,6 @@ func (r *Response) ForRequest(id uint16) { r.Request = id } -type limiter struct { - maxCount int64 - currentCount int64 - totalSize int64 -} - -func (l *limiter) CanAddMessage(m message.Message) bool { - if l.currentCount >= l.maxCount { - return false - } - return true -} - // OnRequest handles a request of historical messages. func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) { var request Request @@ -80,13 +66,12 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b return errors.ErrUnauthorized, false } - limit := int64(3) - // if v, ok := channel.Last(); ok { - // limit = v - // } - // messageLimiter := &limiter{ - // maxCount: limit, - // } + // Use limit = 1 if not specified, otherwise use the limit option. The limit now + // defaults to one as per MQTT spec we always need to send retained messages. + limit := int64(1) + if v, ok := channel.Last(); ok { + limit = v + } ssid := message.NewSsid(key.Contract(), channel.Query) t0, t1 := channel.Window() // Get the window @@ -98,40 +83,16 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b return errors.ErrServerError, false } - // This request is answered either by resending all messages on their - // original channel, potentially triggering mutliple handlers on the client - // side, or by responding with all messages in one big response here. - // Can be both, but the latter is the default behavior. - withResponse, okResponse := channel.GetOption("response") - withResend, okResend := channel.GetOption("resend") - doResend := okResend && withResend == 1 - doRespond := (okResponse && withResponse == 1) || !doResend - - // Resend every messages again like they were originally. - /* - if doResend { - // Range over the messages in the channel and forward them - for _, m := range msgs { - msg := m // Copy message - c.Send(&msg) - } - }*/ - - // Send all messages in the payload of the response to this request. - if doRespond { - resp := &Response{ - Messages: make([]Message, 0, len(msgs)), - } - for _, m := range msgs { - msg := m - resp.Messages = append(resp.Messages, Message{ - ID: msg.ID, - Topic: string(msg.Channel), // The channel for this message. - Payload: string(msg.Payload), // The payload for this message. - }) - } - return resp, true + resp := &Response{ + Messages: make([]Message, 0, len(msgs)), } - - return nil, true + for _, m := range msgs { + msg := m + resp.Messages = append(resp.Messages, Message{ + ID: msg.ID, + Topic: string(msg.Channel), // The channel for this message. + Payload: string(msg.Payload), // The payload for this message. + }) + } + return resp, true } diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go index 8d4814d5..5c3a3401 100644 --- a/internal/service/history/history_test.go +++ b/internal/service/history/history_test.go @@ -25,8 +25,10 @@ import ( "github.com/stretchr/testify/assert" ) +// TestHistory tests the history service and its default Limiter implementation +// which limits the number of messages that can be retrieved. This Limiter's +// purpose is to reproduce historical behavior of Emitter. func TestHistory(t *testing.T) { - //assert.True(t, true) ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} store := storage.NewInMemory(nil) store.Configure(nil) @@ -35,38 +37,52 @@ func TestHistory(t *testing.T) { Contract: uint32(1), ExtraPerm: security.AllowLoad, } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + // The most basic request, on an empty store. request := &Request{ Key: "key", Channel: "key/a/b/c/", } - // Prepare the request - b, _ := json.Marshal(request) - if request == nil { - b = []byte("invalid") - } else { - auth.Target = request.Channel - } - - // Create new service - s := New(auth, store) - c := &fake.Conn{} - - // Store a message - for i := 0; i < 1; i++ { - store.Store(&message.Message{ - ID: message.NewID(ssid), - Channel: []byte("a/b/c/"), - Payload: []byte("hello"), - TTL: 30, - }) - } - - // Issue a request - response, ok := s.OnRequest(c, b) - println(response) + // Store 2 messages + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + reqBytes, _ := json.Marshal(request) + + // Issue the same request + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last message as per MQTT spec. + assert.Equal(t, 1, len(response.(*Response).Messages)) + + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + request.Channel = "key/a/b/c/?last=2" + reqBytes, _ = json.Marshal(request) + response, ok = service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. assert.Equal(t, true, ok) + // The response should have returned the last 2 messages. + assert.Equal(t, 2, len(response.(*Response).Messages)) } // ------------------------------------------------------------------------------------