From a11ef40b17e2892e8605a7be16400423ee4e1db9 Mon Sep 17 00:00:00 2001 From: Florimond Husquinet Date: Tue, 21 Nov 2023 23:14:35 +0100 Subject: [PATCH] Adds the new history API The history api allows to retrieve any number of messages stored in a channel. As the number of messages that can be returned in a single response is limited by the size of a response (as per #413), a system of pagination is required. This system is implemented with the introduction of the StartFromID query parameter. An client that already received a response with a certain number of messages can send a new request to the API with the StartFromID set to the ID of the oldest message it received (first in the array). The storage query will then begin to retrieve messages starting at the ID preceding the ID provided in the query, to generate the next page of messages. --- internal/broker/service.go | 2 + internal/provider/storage/memory_test.go | 8 +- internal/provider/storage/ssd.go | 18 ++++- internal/provider/storage/ssd_test.go | 16 ++-- internal/provider/storage/storage.go | 24 +++--- internal/provider/storage/storage_test.go | 41 ++++++++-- internal/service/history/history.go | 95 +++++++++++++++++++++++ internal/service/history/history_test.go | 84 ++++++++++++++++++++ internal/service/history/service.go | 42 ++++++++++ internal/service/pubsub/lastwill_test.go | 2 +- internal/service/pubsub/publish_test.go | 2 +- internal/service/pubsub/subscribe.go | 2 +- internal/service/pubsub/subscribe_test.go | 2 +- 13 files changed, 305 insertions(+), 33 deletions(-) create mode 100644 internal/service/history/history.go create mode 100644 internal/service/history/history_test.go create mode 100644 internal/service/history/service.go diff --git a/internal/broker/service.go b/internal/broker/service.go index 53d7c46b..d713d17a 100644 --- a/internal/broker/service.go +++ b/internal/broker/service.go @@ -43,6 +43,7 @@ import ( "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/security/license" "github.com/emitter-io/emitter/internal/service/cluster" + "github.com/emitter-io/emitter/internal/service/history" "github.com/emitter-io/emitter/internal/service/keyban" "github.com/emitter-io/emitter/internal/service/keygen" "github.com/emitter-io/emitter/internal/service/link" @@ -185,6 +186,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error) s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest) s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest) s.pubsub.Handle("me", me.New().OnRequest) + s.pubsub.Handle("history", history.New(s, s.storage).OnRequest) // Addresses and things logging.LogTarget("service", "configured node name", nodeName) diff --git a/internal/provider/storage/memory_test.go b/internal/provider/storage/memory_test.go index 5f045087..062e1add 100644 --- a/internal/provider/storage/memory_test.go +++ b/internal/provider/storage/memory_test.go @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit) + out, err := s.Query(tc.query, zero, zero, nil, tc.limit) assert.NoError(t, err) count := 0 @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) { } for _, tc := range tests { - matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit)) + matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit)) assert.Equal(t, tc.count, len(matches)) } } @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10), expectOk: true, expectCount: 2, }, diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index ca356d5c..31cc81df 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. -func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { // Construct a query and lookup locally first - query := newLookupQuery(ssid, from, until, limit) + query := newLookupQuery(ssid, from, until, startFromID, limit) match := s.lookup(query) // Issue the message survey to the cluster @@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { // Since we're starting backwards, seek to the 'until' position first and then // we'll iterate forward but have reverse time ('until' -> 'from') - prefix := message.NewPrefix(q.Ssid, q.Until) + var prefix message.ID + if len(q.StartFromID) == 0 { + prefix = message.NewPrefix(q.Ssid, q.Until) + it.Seek(prefix) + } else { + it.Seek(q.StartFromID) + if !it.Valid() { + return nil + } + it.Next() + } matchesSize := 0 // Seek the prefix and check the key so we can quickly exit the iteration. - for it.Seek(prefix); it.Valid() && + for ; it.Valid() && message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) && len(matches) < q.Limit; it.Next() { if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) { diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go index 968551e8..8a42dc7f 100644 --- a/internal/provider/storage/ssd_test.go +++ b/internal/provider/storage/ssd_test.go @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) { assert.NoError(t, err) zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5) + f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5) assert.NoError(t, err) assert.Len(t, f, 1) }) @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) { }) } +func TestSSD_QueryStartFromID(t *testing.T) { + runSSDTest(func(store *SSD) { + testStartFromID(t, store) + }) +} + func TestSSD_MaxResponseSizeReached(t *testing.T) { runSSDTest(func(store *SSD) { testMaxResponseSizeReached(t, store) @@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit) + out, err := s.Query(tc.query, zero, zero, nil, tc.limit) assert.NoError(t, err) count := 0 for range out { @@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10), expectOk: true, expectCount: 2, }, @@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) { return default: - store.Query(ssid, t0, t1, last) + store.Query(ssid, t0, t1, nil, last) m.Update(int32(last)) } } diff --git a/internal/provider/storage/storage.go b/internal/provider/storage/storage.go index 257b8c18..e3689195 100644 --- a/internal/provider/storage/storage.go +++ b/internal/provider/storage/storage.go @@ -47,7 +47,7 @@ type Storage interface { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. - Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) + Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) } // ------------------------------------------------------------------------------------ @@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) { // The lookup query to send out to the cluster. type lookupQuery struct { - Ssid message.Ssid // The ssid to match. - From int64 // The beginning of the time window. - Until int64 // The end of the time window. - Limit int // The maximum number of elements to return. + Ssid message.Ssid // The ssid to match. + From int64 // The beginning of the time window. + Until int64 // The end of the time window. + StartFromID message.ID // The ID to start from when retrieving message, used for pagination. + Limit int // The maximum number of elements to return. } // newLookupQuery creates a new lookup query -func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery { +func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery { t0, t1 := window(from, until) return lookupQuery{ - Ssid: ssid, - From: t0, - Until: t1, - Limit: limit, + Ssid: ssid, + From: t0, + Until: t1, + StartFromID: startFromID, + Limit: limit, } } @@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. -func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { return nil, nil } diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go index 9d9684bc..730979d5 100644 --- a/internal/provider/storage/storage_test.go +++ b/internal/provider/storage/storage_test.go @@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) { func TestNoop_Query(t *testing.T) { s := new(Noop) zero := time.Unix(0, 0) - r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10) + r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10) assert.NoError(t, err) for range r { t.Errorf("Should be empty") @@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5) assert.NoError(t, err) assert.Len(t, f, 5) @@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1) assert.NoError(t, err) assert.Len(t, f, 1) @@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) { } // Issue a query - f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5) + f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5) assert.NoError(t, err) assert.Len(t, f, 5) @@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) { assert.Equal(t, uint32(99999999), v) } +// Test the StartFromID option for pagination purposes. +func testStartFromID(t *testing.T, store Storage) { + var fourth message.ID + for i := int64(0); i < 10; i++ { + payload := make([]byte, 1) + payload[0] = byte(i) + msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload) + msg.TTL = message.RetainedTTL + msg.ID.SetTime(msg.ID.Time() + (i * 10000)) + assert.NoError(t, store.Store(msg)) + if i == 4 { + fourth = msg.ID + } + } + + // Issue a query, starting at the fourth message ID and going back 2. + zero := time.Unix(0, 0) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2) + assert.NoError(t, err) + + assert.Len(t, f, 2) + assert.Equal(t, 2, int(f[0].Payload[0])) + assert.Equal(t, 3, int(f[1].Payload[0])) + + // Issue a query, starting at the first message ID and going back 2. + f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2) + assert.NoError(t, err) + assert.Equal(t, 0, int(f[0].Payload[0])) + assert.Equal(t, 1, int(f[1].Payload[0])) +} + func testMaxResponseSizeReached(t *testing.T, store Storage) { for i := int64(0); i < 10; i++ { payload := make([]byte, mqtt.MaxMessageSize/5) @@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) { } zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10) assert.NoError(t, err) assert.Len(t, f, 4) diff --git a/internal/service/history/history.go b/internal/service/history/history.go new file mode 100644 index 00000000..db90ce94 --- /dev/null +++ b/internal/service/history/history.go @@ -0,0 +1,95 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "encoding/json" + + "github.com/emitter-io/emitter/internal/errors" + "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/provider/logging" + "github.com/emitter-io/emitter/internal/security" + "github.com/emitter-io/emitter/internal/service" +) + +// Request represents a historical messages request. +type Request struct { + Key string `json:"key"` // The channel key for this request. + Channel string `json:"channel"` // The target channel for this request. + StartFromID message.ID `json:"startFromID,omitempty"` +} + +type Message struct { + ID message.ID `json:"id"` + Topic string `json:"topic"` // The channel of the message + Payload string `json:"payload"` // The payload of the message +} +type Response struct { + Request uint16 `json:"req,omitempty"` // The corresponding request ID. + Messages []Message `json:"messages"` // The history of messages. +} + +// ForRequest sets the request ID in the response for matching +func (r *Response) ForRequest(id uint16) { + r.Request = id +} + +// OnRequest handles a request of historical messages. +func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) { + var request Request + if err := json.Unmarshal(payload, &request); err != nil { + return errors.ErrBadRequest, false + } + + channel := security.ParseChannel([]byte(request.Channel)) + if channel.ChannelType == security.ChannelInvalid { + return errors.ErrBadRequest, false + } + + // Check the authorization and permissions + _, key, allowed := s.auth.Authorize(channel, security.AllowLoad) + if !allowed { + return errors.ErrUnauthorized, false + } + + // Use limit = 1 if not specified, otherwise use the limit option. The limit now + // defaults to one as per MQTT spec we always need to send retained messages. + limit := int64(1) + if v, ok := channel.Last(); ok { + limit = v + } + + ssid := message.NewSsid(key.Contract(), channel.Query) + t0, t1 := channel.Window() // Get the window + + msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit)) + if err != nil { + logging.LogError("conn", "query last messages", err) + return errors.ErrServerError, false + } + + resp := &Response{ + Messages: make([]Message, 0, len(msgs)), + } + for _, m := range msgs { + msg := m + resp.Messages = append(resp.Messages, Message{ + ID: msg.ID, + Topic: string(msg.Channel), // The channel for this message. + Payload: string(msg.Payload), // The payload for this message. + }) + } + return resp, true +} diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go new file mode 100644 index 00000000..03e7e569 --- /dev/null +++ b/internal/service/history/history_test.go @@ -0,0 +1,84 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "encoding/json" + "testing" + + "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/provider/storage" + "github.com/emitter-io/emitter/internal/security" + "github.com/emitter-io/emitter/internal/service/fake" + "github.com/stretchr/testify/assert" +) + +// TestHistory tests the history service. +func TestHistory(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 2 messages + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + reqBytes, _ := json.Marshal(request) + + // Issue the same request + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last message as per MQTT spec. + assert.Equal(t, 1, len(response.(*Response).Messages)) + + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + request.Channel = "key/a/b/c/?last=2" + reqBytes, _ = json.Marshal(request) + response, ok = service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last 2 messages. + assert.Equal(t, 2, len(response.(*Response).Messages)) +} diff --git a/internal/service/history/service.go b/internal/service/history/service.go new file mode 100644 index 00000000..12f74e4a --- /dev/null +++ b/internal/service/history/service.go @@ -0,0 +1,42 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "github.com/emitter-io/emitter/internal/provider/storage" + "github.com/emitter-io/emitter/internal/security/hash" + "github.com/emitter-io/emitter/internal/service" +) + +// Service represents a history service. +type Service struct { + auth service.Authorizer // The authorizer to use. + store storage.Storage // The storage provider to use. + handlers map[uint32]service.Handler // The emitter request handlers. +} + +// New creates a new publisher service. +func New(auth service.Authorizer, store storage.Storage) *Service { + return &Service{ + auth: auth, + store: store, + handlers: make(map[uint32]service.Handler), + } +} + +// Handle adds a handler for an "emitter/..." request +func (s *Service) Handle(request string, handler service.Handler) { + s.handlers[hash.OfString(request)] = handler +} diff --git a/internal/service/pubsub/lastwill_test.go b/internal/service/pubsub/lastwill_test.go index 4a4da76e..34795a0b 100644 --- a/internal/service/pubsub/lastwill_test.go +++ b/internal/service/pubsub/lastwill_test.go @@ -130,7 +130,7 @@ func TestPubSub_LastWill(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/publish_test.go b/internal/service/pubsub/publish_test.go index 5709b71f..746589bb 100644 --- a/internal/service/pubsub/publish_test.go +++ b/internal/service/pubsub/publish_test.go @@ -122,7 +122,7 @@ func TestPubSub_Publish(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/subscribe.go b/internal/service/pubsub/subscribe.go index 8896e7f5..53870982 100644 --- a/internal/service/pubsub/subscribe.go +++ b/internal/service/pubsub/subscribe.go @@ -85,7 +85,7 @@ func (s *Service) OnSubscribe(c service.Conn, mqttTopic []byte) *errors.Error { // Check if the key has a load permission (also applies for retained) if key.HasPermission(security.AllowLoad) { t0, t1 := channel.Window() // Get the window - msgs, err := s.store.Query(ssid, t0, t1, int(limit)) + msgs, err := s.store.Query(ssid, t0, t1, nil, int(limit)) if err != nil { logging.LogError("conn", "query last messages", err) return errors.ErrServerError diff --git a/internal/service/pubsub/subscribe_test.go b/internal/service/pubsub/subscribe_test.go index 552fb101..06d394ad 100644 --- a/internal/service/pubsub/subscribe_test.go +++ b/internal/service/pubsub/subscribe_test.go @@ -177,7 +177,7 @@ func (s *buggyStore) Store(m *message.Message) error { return errors.New("not working") } -func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { return nil, errors.New("not working") }