diff --git a/internal/broker/service.go b/internal/broker/service.go
index 0367f52b..8f1ad6c7 100644
--- a/internal/broker/service.go
+++ b/internal/broker/service.go
@@ -43,6 +43,7 @@ import (
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/security/license"
"github.com/emitter-io/emitter/internal/service/cluster"
+ "github.com/emitter-io/emitter/internal/service/history"
"github.com/emitter-io/emitter/internal/service/keyban"
"github.com/emitter-io/emitter/internal/service/keygen"
"github.com/emitter-io/emitter/internal/service/link"
@@ -189,6 +190,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest)
s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest)
s.pubsub.Handle("me", me.New().OnRequest)
+ s.pubsub.Handle("history", history.New(s, s.storage).OnRequest)
// Addresses and things
logging.LogTarget("service", "configured node name", nodeName)
diff --git a/internal/provider/storage/memory_test.go b/internal/provider/storage/memory_test.go
index 5f045087..062e1add 100644
--- a/internal/provider/storage/memory_test.go
+++ b/internal/provider/storage/memory_test.go
@@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}
- out, err := s.Query(tc.query, zero, zero, tc.limit)
+ out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)
count := 0
@@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}
for _, tc := range tests {
- matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
+ matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit))
assert.Equal(t, tc.count, len(matches))
}
}
@@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
- query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
+ query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
- query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
+ query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go
index ca356d5c..31cc81df 100644
--- a/internal/provider/storage/ssd.go
+++ b/internal/provider/storage/ssd.go
@@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
-func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
+func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
// Construct a query and lookup locally first
- query := newLookupQuery(ssid, from, until, limit)
+ query := newLookupQuery(ssid, from, until, startFromID, limit)
match := s.lookup(query)
// Issue the message survey to the cluster
@@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
- prefix := message.NewPrefix(q.Ssid, q.Until)
+ var prefix message.ID
+ if len(q.StartFromID) == 0 {
+ prefix = message.NewPrefix(q.Ssid, q.Until)
+ it.Seek(prefix)
+ } else {
+ it.Seek(q.StartFromID)
+ if !it.Valid() {
+ return nil
+ }
+ it.Next()
+ }
matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
- for it.Seek(prefix); it.Valid() &&
+ for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go
index 968551e8..8a42dc7f 100644
--- a/internal/provider/storage/ssd_test.go
+++ b/internal/provider/storage/ssd_test.go
@@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)
zero := time.Unix(0, 0)
- f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
+ f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 1)
})
@@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}
+func TestSSD_QueryStartFromID(t *testing.T) {
+ runSSDTest(func(store *SSD) {
+ testStartFromID(t, store)
+ })
+}
+
func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
@@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}
- out, err := s.Query(tc.query, zero, zero, tc.limit)
+ out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)
count := 0
for range out {
@@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
- query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
+ query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
- query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
+ query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
@@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return
default:
- store.Query(ssid, t0, t1, last)
+ store.Query(ssid, t0, t1, nil, last)
m.Update(int32(last))
}
}
diff --git a/internal/provider/storage/storage.go b/internal/provider/storage/storage.go
index 257b8c18..e3689195 100644
--- a/internal/provider/storage/storage.go
+++ b/internal/provider/storage/storage.go
@@ -47,7 +47,7 @@ type Storage interface {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
- Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error)
+ Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error)
}
// ------------------------------------------------------------------------------------
@@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) {
// The lookup query to send out to the cluster.
type lookupQuery struct {
- Ssid message.Ssid // The ssid to match.
- From int64 // The beginning of the time window.
- Until int64 // The end of the time window.
- Limit int // The maximum number of elements to return.
+ Ssid message.Ssid // The ssid to match.
+ From int64 // The beginning of the time window.
+ Until int64 // The end of the time window.
+ StartFromID message.ID // The ID to start from when retrieving message, used for pagination.
+ Limit int // The maximum number of elements to return.
}
// newLookupQuery creates a new lookup query
-func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery {
+func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery {
t0, t1 := window(from, until)
return lookupQuery{
- Ssid: ssid,
- From: t0,
- Until: t1,
- Limit: limit,
+ Ssid: ssid,
+ From: t0,
+ Until: t1,
+ StartFromID: startFromID,
+ Limit: limit,
}
}
@@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
-func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
+func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
return nil, nil
}
diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go
index 9d9684bc..730979d5 100644
--- a/internal/provider/storage/storage_test.go
+++ b/internal/provider/storage/storage_test.go
@@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
- r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
+ r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10)
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
@@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) {
// Issue a query
zero := time.Unix(0, 0)
- f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
+ f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 5)
@@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) {
// Issue a query
zero := time.Unix(0, 0)
- f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
+ f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1)
assert.NoError(t, err)
assert.Len(t, f, 1)
@@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) {
}
// Issue a query
- f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
+ f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 5)
@@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) {
assert.Equal(t, uint32(99999999), v)
}
+// Test the StartFromID option for pagination purposes.
+func testStartFromID(t *testing.T, store Storage) {
+ var fourth message.ID
+ for i := int64(0); i < 10; i++ {
+ payload := make([]byte, 1)
+ payload[0] = byte(i)
+ msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
+ msg.TTL = message.RetainedTTL
+ msg.ID.SetTime(msg.ID.Time() + (i * 10000))
+ assert.NoError(t, store.Store(msg))
+ if i == 4 {
+ fourth = msg.ID
+ }
+ }
+
+ // Issue a query, starting at the fourth message ID and going back 2.
+ zero := time.Unix(0, 0)
+ f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2)
+ assert.NoError(t, err)
+
+ assert.Len(t, f, 2)
+ assert.Equal(t, 2, int(f[0].Payload[0]))
+ assert.Equal(t, 3, int(f[1].Payload[0]))
+
+ // Issue a query, starting at the first message ID and going back 2.
+ f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, int(f[0].Payload[0]))
+ assert.Equal(t, 1, int(f[1].Payload[0]))
+}
+
func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
@@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) {
}
zero := time.Unix(0, 0)
- f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
+ f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10)
assert.NoError(t, err)
assert.Len(t, f, 4)
diff --git a/internal/service/history/history.go b/internal/service/history/history.go
new file mode 100644
index 00000000..db90ce94
--- /dev/null
+++ b/internal/service/history/history.go
@@ -0,0 +1,95 @@
+/**********************************************************************************
+* Copyright (c) 2009-2020 Misakai Ltd.
+* This program is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Affero General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or(at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License along
+* with this program. If not, see.
+************************************************************************************/
+
+package history
+
+import (
+ "encoding/json"
+
+ "github.com/emitter-io/emitter/internal/errors"
+ "github.com/emitter-io/emitter/internal/message"
+ "github.com/emitter-io/emitter/internal/provider/logging"
+ "github.com/emitter-io/emitter/internal/security"
+ "github.com/emitter-io/emitter/internal/service"
+)
+
+// Request represents a historical messages request.
+type Request struct {
+ Key string `json:"key"` // The channel key for this request.
+ Channel string `json:"channel"` // The target channel for this request.
+ StartFromID message.ID `json:"startFromID,omitempty"`
+}
+
+type Message struct {
+ ID message.ID `json:"id"`
+ Topic string `json:"topic"` // The channel of the message
+ Payload string `json:"payload"` // The payload of the message
+}
+type Response struct {
+ Request uint16 `json:"req,omitempty"` // The corresponding request ID.
+ Messages []Message `json:"messages"` // The history of messages.
+}
+
+// ForRequest sets the request ID in the response for matching
+func (r *Response) ForRequest(id uint16) {
+ r.Request = id
+}
+
+// OnRequest handles a request of historical messages.
+func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
+ var request Request
+ if err := json.Unmarshal(payload, &request); err != nil {
+ return errors.ErrBadRequest, false
+ }
+
+ channel := security.ParseChannel([]byte(request.Channel))
+ if channel.ChannelType == security.ChannelInvalid {
+ return errors.ErrBadRequest, false
+ }
+
+ // Check the authorization and permissions
+ _, key, allowed := s.auth.Authorize(channel, security.AllowLoad)
+ if !allowed {
+ return errors.ErrUnauthorized, false
+ }
+
+ // Use limit = 1 if not specified, otherwise use the limit option. The limit now
+ // defaults to one as per MQTT spec we always need to send retained messages.
+ limit := int64(1)
+ if v, ok := channel.Last(); ok {
+ limit = v
+ }
+
+ ssid := message.NewSsid(key.Contract(), channel.Query)
+ t0, t1 := channel.Window() // Get the window
+
+ msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit))
+ if err != nil {
+ logging.LogError("conn", "query last messages", err)
+ return errors.ErrServerError, false
+ }
+
+ resp := &Response{
+ Messages: make([]Message, 0, len(msgs)),
+ }
+ for _, m := range msgs {
+ msg := m
+ resp.Messages = append(resp.Messages, Message{
+ ID: msg.ID,
+ Topic: string(msg.Channel), // The channel for this message.
+ Payload: string(msg.Payload), // The payload for this message.
+ })
+ }
+ return resp, true
+}
diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go
new file mode 100644
index 00000000..03e7e569
--- /dev/null
+++ b/internal/service/history/history_test.go
@@ -0,0 +1,84 @@
+/**********************************************************************************
+* Copyright (c) 2009-2020 Misakai Ltd.
+* This program is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Affero General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or(at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License along
+* with this program. If not, see.
+************************************************************************************/
+
+package history
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/emitter-io/emitter/internal/message"
+ "github.com/emitter-io/emitter/internal/provider/storage"
+ "github.com/emitter-io/emitter/internal/security"
+ "github.com/emitter-io/emitter/internal/service/fake"
+ "github.com/stretchr/testify/assert"
+)
+
+// TestHistory tests the history service.
+func TestHistory(t *testing.T) {
+ ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
+ store := storage.NewInMemory(nil)
+ store.Configure(nil)
+ auth := &fake.Authorizer{
+ Success: true,
+ Contract: uint32(1),
+ ExtraPerm: security.AllowLoad,
+ }
+ // Create new service
+ service := New(auth, store)
+ connection := &fake.Conn{}
+
+ // The most basic request, on an empty store.
+ request := &Request{
+ Key: "key",
+ Channel: "key/a/b/c/",
+ }
+
+ // Store 2 messages
+ firstSSID := message.NewID(ssid)
+ store.Store(&message.Message{
+ ID: firstSSID,
+ Channel: []byte("a/b/c/"),
+ Payload: []byte("hello"),
+ TTL: 30,
+ })
+ store.Store(&message.Message{
+ ID: message.NewID(ssid),
+ Channel: []byte("a/b/c/"),
+ Payload: []byte("hello"),
+ TTL: 30,
+ })
+ reqBytes, _ := json.Marshal(request)
+
+ // Issue the same request
+ response, ok := service.OnRequest(connection, reqBytes)
+ // The request should have succeeded and returned a response.
+ assert.Equal(t, true, ok)
+ // The response should have returned the last message as per MQTT spec.
+ assert.Equal(t, 1, len(response.(*Response).Messages))
+
+ store.Store(&message.Message{
+ ID: message.NewID(ssid),
+ Channel: []byte("a/b/c/"),
+ Payload: []byte("hello"),
+ TTL: 30,
+ })
+ request.Channel = "key/a/b/c/?last=2"
+ reqBytes, _ = json.Marshal(request)
+ response, ok = service.OnRequest(connection, reqBytes)
+ // The request should have succeeded and returned a response.
+ assert.Equal(t, true, ok)
+ // The response should have returned the last 2 messages.
+ assert.Equal(t, 2, len(response.(*Response).Messages))
+}
diff --git a/internal/service/history/service.go b/internal/service/history/service.go
new file mode 100644
index 00000000..12f74e4a
--- /dev/null
+++ b/internal/service/history/service.go
@@ -0,0 +1,42 @@
+/**********************************************************************************
+* Copyright (c) 2009-2020 Misakai Ltd.
+* This program is free software: you can redistribute it and/or modify it under the
+* terms of the GNU Affero General Public License as published by the Free Software
+* Foundation, either version 3 of the License, or(at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT ANY
+* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License along
+* with this program. If not, see.
+************************************************************************************/
+
+package history
+
+import (
+ "github.com/emitter-io/emitter/internal/provider/storage"
+ "github.com/emitter-io/emitter/internal/security/hash"
+ "github.com/emitter-io/emitter/internal/service"
+)
+
+// Service represents a history service.
+type Service struct {
+ auth service.Authorizer // The authorizer to use.
+ store storage.Storage // The storage provider to use.
+ handlers map[uint32]service.Handler // The emitter request handlers.
+}
+
+// New creates a new publisher service.
+func New(auth service.Authorizer, store storage.Storage) *Service {
+ return &Service{
+ auth: auth,
+ store: store,
+ handlers: make(map[uint32]service.Handler),
+ }
+}
+
+// Handle adds a handler for an "emitter/..." request
+func (s *Service) Handle(request string, handler service.Handler) {
+ s.handlers[hash.OfString(request)] = handler
+}
diff --git a/internal/service/pubsub/lastwill_test.go b/internal/service/pubsub/lastwill_test.go
index 4a4da76e..34795a0b 100644
--- a/internal/service/pubsub/lastwill_test.go
+++ b/internal/service/pubsub/lastwill_test.go
@@ -130,7 +130,7 @@ func TestPubSub_LastWill(t *testing.T) {
// Query the storage
{
- msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100)
+ msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100)
assert.NoError(t, err)
assert.Equal(t, tc.expectStored, len(msgs))
}
diff --git a/internal/service/pubsub/publish_test.go b/internal/service/pubsub/publish_test.go
index 5709b71f..746589bb 100644
--- a/internal/service/pubsub/publish_test.go
+++ b/internal/service/pubsub/publish_test.go
@@ -122,7 +122,7 @@ func TestPubSub_Publish(t *testing.T) {
// Query the storage
{
- msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100)
+ msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, 100)
assert.NoError(t, err)
assert.Equal(t, tc.expectStored, len(msgs))
}
diff --git a/internal/service/pubsub/subscribe.go b/internal/service/pubsub/subscribe.go
index 8896e7f5..53870982 100644
--- a/internal/service/pubsub/subscribe.go
+++ b/internal/service/pubsub/subscribe.go
@@ -85,7 +85,7 @@ func (s *Service) OnSubscribe(c service.Conn, mqttTopic []byte) *errors.Error {
// Check if the key has a load permission (also applies for retained)
if key.HasPermission(security.AllowLoad) {
t0, t1 := channel.Window() // Get the window
- msgs, err := s.store.Query(ssid, t0, t1, int(limit))
+ msgs, err := s.store.Query(ssid, t0, t1, nil, int(limit))
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError
diff --git a/internal/service/pubsub/subscribe_test.go b/internal/service/pubsub/subscribe_test.go
index 552fb101..06d394ad 100644
--- a/internal/service/pubsub/subscribe_test.go
+++ b/internal/service/pubsub/subscribe_test.go
@@ -177,7 +177,7 @@ func (s *buggyStore) Store(m *message.Message) error {
return errors.New("not working")
}
-func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
+func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
return nil, errors.New("not working")
}