diff --git a/internal/broker/service.go b/internal/broker/service.go index 53d7c46b..d713d17a 100644 --- a/internal/broker/service.go +++ b/internal/broker/service.go @@ -43,6 +43,7 @@ import ( "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/security/license" "github.com/emitter-io/emitter/internal/service/cluster" + "github.com/emitter-io/emitter/internal/service/history" "github.com/emitter-io/emitter/internal/service/keyban" "github.com/emitter-io/emitter/internal/service/keygen" "github.com/emitter-io/emitter/internal/service/link" @@ -185,6 +186,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error) s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest) s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest) s.pubsub.Handle("me", me.New().OnRequest) + s.pubsub.Handle("history", history.New(s, s.storage).OnRequest) // Addresses and things logging.LogTarget("service", "configured node name", nodeName) diff --git a/internal/provider/storage/memory_test.go b/internal/provider/storage/memory_test.go index 5f045087..062e1add 100644 --- a/internal/provider/storage/memory_test.go +++ b/internal/provider/storage/memory_test.go @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit) + out, err := s.Query(tc.query, zero, zero, nil, tc.limit) assert.NoError(t, err) count := 0 @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) { } for _, tc := range tests { - matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit)) + matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit)) assert.Equal(t, tc.count, len(matches)) } } @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10), expectOk: true, expectCount: 2, }, diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index ca356d5c..31cc81df 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. -func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { // Construct a query and lookup locally first - query := newLookupQuery(ssid, from, until, limit) + query := newLookupQuery(ssid, from, until, startFromID, limit) match := s.lookup(query) // Issue the message survey to the cluster @@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { // Since we're starting backwards, seek to the 'until' position first and then // we'll iterate forward but have reverse time ('until' -> 'from') - prefix := message.NewPrefix(q.Ssid, q.Until) + var prefix message.ID + if len(q.StartFromID) == 0 { + prefix = message.NewPrefix(q.Ssid, q.Until) + it.Seek(prefix) + } else { + it.Seek(q.StartFromID) + if !it.Valid() { + return nil + } + it.Next() + } matchesSize := 0 // Seek the prefix and check the key so we can quickly exit the iteration. - for it.Seek(prefix); it.Valid() && + for ; it.Valid() && message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) && len(matches) < q.Limit; it.Next() { if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) { diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go index 968551e8..8a42dc7f 100644 --- a/internal/provider/storage/ssd_test.go +++ b/internal/provider/storage/ssd_test.go @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) { assert.NoError(t, err) zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5) + f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5) assert.NoError(t, err) assert.Len(t, f, 1) }) @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) { }) } +func TestSSD_QueryStartFromID(t *testing.T) { + runSSDTest(func(store *SSD) { + testStartFromID(t, store) + }) +} + func TestSSD_MaxResponseSizeReached(t *testing.T) { runSSDTest(func(store *SSD) { testMaxResponseSizeReached(t, store) @@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit) + out, err := s.Query(tc.query, zero, zero, nil, tc.limit) assert.NoError(t, err) count := 0 for range out { @@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10), expectOk: true, expectCount: 2, }, @@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) { return default: - store.Query(ssid, t0, t1, last) + store.Query(ssid, t0, t1, nil, last) m.Update(int32(last)) } } diff --git a/internal/provider/storage/storage.go b/internal/provider/storage/storage.go index 257b8c18..e3689195 100644 --- a/internal/provider/storage/storage.go +++ b/internal/provider/storage/storage.go @@ -47,7 +47,7 @@ type Storage interface { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. - Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) + Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) } // ------------------------------------------------------------------------------------ @@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) { // The lookup query to send out to the cluster. type lookupQuery struct { - Ssid message.Ssid // The ssid to match. - From int64 // The beginning of the time window. - Until int64 // The end of the time window. - Limit int // The maximum number of elements to return. + Ssid message.Ssid // The ssid to match. + From int64 // The beginning of the time window. + Until int64 // The end of the time window. + StartFromID message.ID // The ID to start from when retrieving message, used for pagination. + Limit int // The maximum number of elements to return. } // newLookupQuery creates a new lookup query -func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery { +func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery { t0, t1 := window(from, until) return lookupQuery{ - Ssid: ssid, - From: t0, - Until: t1, - Limit: limit, + Ssid: ssid, + From: t0, + Until: t1, + StartFromID: startFromID, + Limit: limit, } } @@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error { // Query performs a query and attempts to fetch last n messages where // n is specified by limit argument. From and until times can also be specified // for time-series retrieval. -func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { return nil, nil } diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go index 9d9684bc..730979d5 100644 --- a/internal/provider/storage/storage_test.go +++ b/internal/provider/storage/storage_test.go @@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) { func TestNoop_Query(t *testing.T) { s := new(Noop) zero := time.Unix(0, 0) - r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10) + r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10) assert.NoError(t, err) for range r { t.Errorf("Should be empty") @@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5) assert.NoError(t, err) assert.Len(t, f, 5) @@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1) assert.NoError(t, err) assert.Len(t, f, 1) @@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) { } // Issue a query - f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5) + f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5) assert.NoError(t, err) assert.Len(t, f, 5) @@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) { assert.Equal(t, uint32(99999999), v) } +// Test the StartFromID option for pagination purposes. +func testStartFromID(t *testing.T, store Storage) { + var fourth message.ID + for i := int64(0); i < 10; i++ { + payload := make([]byte, 1) + payload[0] = byte(i) + msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload) + msg.TTL = message.RetainedTTL + msg.ID.SetTime(msg.ID.Time() + (i * 10000)) + assert.NoError(t, store.Store(msg)) + if i == 4 { + fourth = msg.ID + } + } + + // Issue a query, starting at the fourth message ID and going back 2. + zero := time.Unix(0, 0) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2) + assert.NoError(t, err) + + assert.Len(t, f, 2) + assert.Equal(t, 2, int(f[0].Payload[0])) + assert.Equal(t, 3, int(f[1].Payload[0])) + + // Issue a query, starting at the first message ID and going back 2. + f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2) + assert.NoError(t, err) + assert.Equal(t, 0, int(f[0].Payload[0])) + assert.Equal(t, 1, int(f[1].Payload[0])) +} + func testMaxResponseSizeReached(t *testing.T, store Storage) { for i := int64(0); i < 10; i++ { payload := make([]byte, mqtt.MaxMessageSize/5) @@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) { } zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10) assert.NoError(t, err) assert.Len(t, f, 4) diff --git a/internal/service/history/history.go b/internal/service/history/history.go new file mode 100644 index 00000000..db90ce94 --- /dev/null +++ b/internal/service/history/history.go @@ -0,0 +1,95 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "encoding/json" + + "github.com/emitter-io/emitter/internal/errors" + "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/provider/logging" + "github.com/emitter-io/emitter/internal/security" + "github.com/emitter-io/emitter/internal/service" +) + +// Request represents a historical messages request. +type Request struct { + Key string `json:"key"` // The channel key for this request. + Channel string `json:"channel"` // The target channel for this request. + StartFromID message.ID `json:"startFromID,omitempty"` +} + +type Message struct { + ID message.ID `json:"id"` + Topic string `json:"topic"` // The channel of the message + Payload string `json:"payload"` // The payload of the message +} +type Response struct { + Request uint16 `json:"req,omitempty"` // The corresponding request ID. + Messages []Message `json:"messages"` // The history of messages. +} + +// ForRequest sets the request ID in the response for matching +func (r *Response) ForRequest(id uint16) { + r.Request = id +} + +// OnRequest handles a request of historical messages. +func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) { + var request Request + if err := json.Unmarshal(payload, &request); err != nil { + return errors.ErrBadRequest, false + } + + channel := security.ParseChannel([]byte(request.Channel)) + if channel.ChannelType == security.ChannelInvalid { + return errors.ErrBadRequest, false + } + + // Check the authorization and permissions + _, key, allowed := s.auth.Authorize(channel, security.AllowLoad) + if !allowed { + return errors.ErrUnauthorized, false + } + + // Use limit = 1 if not specified, otherwise use the limit option. The limit now + // defaults to one as per MQTT spec we always need to send retained messages. + limit := int64(1) + if v, ok := channel.Last(); ok { + limit = v + } + + ssid := message.NewSsid(key.Contract(), channel.Query) + t0, t1 := channel.Window() // Get the window + + msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit)) + if err != nil { + logging.LogError("conn", "query last messages", err) + return errors.ErrServerError, false + } + + resp := &Response{ + Messages: make([]Message, 0, len(msgs)), + } + for _, m := range msgs { + msg := m + resp.Messages = append(resp.Messages, Message{ + ID: msg.ID, + Topic: string(msg.Channel), // The channel for this message. + Payload: string(msg.Payload), // The payload for this message. + }) + } + return resp, true +} diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go new file mode 100644 index 00000000..03e7e569 --- /dev/null +++ b/internal/service/history/history_test.go @@ -0,0 +1,84 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "encoding/json" + "testing" + + "github.com/emitter-io/emitter/internal/message" + "github.com/emitter-io/emitter/internal/provider/storage" + "github.com/emitter-io/emitter/internal/security" + "github.com/emitter-io/emitter/internal/service/fake" + "github.com/stretchr/testify/assert" +) + +// TestHistory tests the history service. +func TestHistory(t *testing.T) { + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} + store := storage.NewInMemory(nil) + store.Configure(nil) + auth := &fake.Authorizer{ + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + // Create new service + service := New(auth, store) + connection := &fake.Conn{} + + // The most basic request, on an empty store. + request := &Request{ + Key: "key", + Channel: "key/a/b/c/", + } + + // Store 2 messages + firstSSID := message.NewID(ssid) + store.Store(&message.Message{ + ID: firstSSID, + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + reqBytes, _ := json.Marshal(request) + + // Issue the same request + response, ok := service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last message as per MQTT spec. + assert.Equal(t, 1, len(response.(*Response).Messages)) + + store.Store(&message.Message{ + ID: message.NewID(ssid), + Channel: []byte("a/b/c/"), + Payload: []byte("hello"), + TTL: 30, + }) + request.Channel = "key/a/b/c/?last=2" + reqBytes, _ = json.Marshal(request) + response, ok = service.OnRequest(connection, reqBytes) + // The request should have succeeded and returned a response. + assert.Equal(t, true, ok) + // The response should have returned the last 2 messages. + assert.Equal(t, 2, len(response.(*Response).Messages)) +} diff --git a/internal/service/history/service.go b/internal/service/history/service.go new file mode 100644 index 00000000..12f74e4a --- /dev/null +++ b/internal/service/history/service.go @@ -0,0 +1,42 @@ +/********************************************************************************** +* Copyright (c) 2009-2020 Misakai Ltd. +* This program is free software: you can redistribute it and/or modify it under the +* terms of the GNU Affero General Public License as published by the Free Software +* Foundation, either version 3 of the License, or(at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License along +* with this program. If not, see. +************************************************************************************/ + +package history + +import ( + "github.com/emitter-io/emitter/internal/provider/storage" + "github.com/emitter-io/emitter/internal/security/hash" + "github.com/emitter-io/emitter/internal/service" +) + +// Service represents a history service. +type Service struct { + auth service.Authorizer // The authorizer to use. + store storage.Storage // The storage provider to use. + handlers map[uint32]service.Handler // The emitter request handlers. +} + +// New creates a new publisher service. +func New(auth service.Authorizer, store storage.Storage) *Service { + return &Service{ + auth: auth, + store: store, + handlers: make(map[uint32]service.Handler), + } +} + +// Handle adds a handler for an "emitter/..." request +func (s *Service) Handle(request string, handler service.Handler) { + s.handlers[hash.OfString(request)] = handler +} diff --git a/internal/service/pubsub/lastwill_test.go b/internal/service/pubsub/lastwill_test.go index 4a4da76e..34795a0b 100644 --- a/internal/service/pubsub/lastwill_test.go +++ b/internal/service/pubsub/lastwill_test.go @@ -130,7 +130,7 @@ func TestPubSub_LastWill(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/publish_test.go b/internal/service/pubsub/publish_test.go index 5709b71f..746589bb 100644 --- a/internal/service/pubsub/publish_test.go +++ b/internal/service/pubsub/publish_test.go @@ -122,7 +122,7 @@ func TestPubSub_Publish(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/subscribe.go b/internal/service/pubsub/subscribe.go index 8896e7f5..53870982 100644 --- a/internal/service/pubsub/subscribe.go +++ b/internal/service/pubsub/subscribe.go @@ -85,7 +85,7 @@ func (s *Service) OnSubscribe(c service.Conn, mqttTopic []byte) *errors.Error { // Check if the key has a load permission (also applies for retained) if key.HasPermission(security.AllowLoad) { t0, t1 := channel.Window() // Get the window - msgs, err := s.store.Query(ssid, t0, t1, int(limit)) + msgs, err := s.store.Query(ssid, t0, t1, nil, int(limit)) if err != nil { logging.LogError("conn", "query last messages", err) return errors.ErrServerError diff --git a/internal/service/pubsub/subscribe_test.go b/internal/service/pubsub/subscribe_test.go index 552fb101..06d394ad 100644 --- a/internal/service/pubsub/subscribe_test.go +++ b/internal/service/pubsub/subscribe_test.go @@ -177,7 +177,7 @@ func (s *buggyStore) Store(m *message.Message) error { return errors.New("not working") } -func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { return nil, errors.New("not working") }